DDS与FastRTPS

Posted on May 15, 2020


DDS是一套通信协议和API标准,它提供了以数据为中心的连接服务。Fast-RTPS是DDS的开源实现,借助它可以方便的开发出高效,可靠的分布式系统。本文是对DDS以及Fast RTPS的介绍文章。

从《变形金刚》电影说起

这里要提到的是2011年的真人版电影,变形金刚第三部《Transformers: Dark of the Moon》。

这是一篇技术文章,为什么要扯到《变形金刚》电影呢?这是因为这部电影的主要内容与本文所提到的技术有一定的相关性。

在这部电影中,御天敌背叛了擎天柱,与霸天虎合作。在地球的各地布置了许多的能量柱,他试图借助这些能量柱将赛博坦星球传送到地球上,以此来重建自己的家园。

这些能量柱必须组合起来才能完成传输工作,并且在这其中有一个红色的能量柱比较特殊,因为它负责控制其他的传送柱。

由此可见,这是一个大型的分布式系统。在这个系统中,这个红色的能量柱被称之为“中心节点”,中心节点正如其名称那样,它是整个系统的中心。对于带有中心节点的分布式系统来说,一旦中心节点被摧毁,整个系统都将无法工作。

因此电影的后来,自然是擎天柱摧毁了这个中心节点,使得御天敌的传送计划彻底失败。

从设计上来说,对于一个如此大型的系统,却存在一个非常薄弱和重要的中心节点,这并不是一个好的方案。

而本文介绍的DDS就是一个去中心化的分布式技术。因此在这类系统中,不存在负责总控制的中心节点,所有节点都完全对等。任何一个节点的异常都不会影响整个系统的运行。

DDS介绍

DDS全称是Data Distribution Service,这是一套通信协议和API标准,它提供了以数据为中心的连接服务,基于发布者-订阅者模型。这是一套中间件,它提供介于操作系统和应用程序之间的功能,使得组件之间可以互相通信。并且提供了低延迟,高可靠的通信以及可扩展的架构。

或许,你已经知道很多种网络通信协议,对于发布-订阅这些概念也很熟悉。那DDS到底有什么特别之处呢?

下图展示了4个时代的数据通信方式:

  • (第一代)点对点的CS(Client-Server)结构,这是大家最为熟悉的:一个服务器角色被许多的客户端使用,每次通信时,通信双方必须建立一条连接。当通信节点增多时,通信的连接数也会增多。并且,每个客户端都必须知道服务器的具体地址和所提供的服务。一旦服务器地址发生变化,所有客户端都会受到影响。
  • (第二代)Broker模型:存在一个中间人,它负责初步处理大家的请求,并进一步找到真正能响应服务的角色,这就好像存在一个经纪人。这为客户端提供了一层抽象,使得服务器的具体地址变得不重要了。服务端地址如果发生变化,只需要告诉Broker就可以了。但这个模型的问题在于,Broker变成了模型的中心,它的处理速度会影响所有人的效率,这就好像城市中心的路口,当系统规则增长到一定程度,Broker终究会成为瓶颈。更糟糕的是,如果Broker瘫痪了,可能整个系统都将无法运转。
  • (第三代)广播模型:所有人都可以在通道上广播消息,并且所有人都可以收到消息。这个模型解决了服务器地址的问题,且通信双方不用单独建立连接,但它存在的问题是:广播通道上的消息太多,太嘈杂,所有人都必须关心每条消息是否与自己有关。这就好像全公司一千号人坐在同一个房间里面办公一样。
  • (第四代)DDS模型:这种模型与广播模型有些类似,所有人都可以在DataBus上发布和读取消息。但它更进一步的是,通信中包含了很多并行的通路,每个人可以只关心自己感兴趣的消息,自动忽略自己不需要的消息。

下图展示了DDS在网络栈中的位置,它位于传输层的上面,并且以TCP,UDP为基础。

这个图之所以是沙漏形状是因为:两头的技术变化都发展很快,但是中间的却鲜有变化。

对比大家常见的Socker API,DDS有如下特点:

特性 Socket API DDS
架构 TCP:点对点
UDP:点对点,广播,多播
Publish-subscribe模型
平台独立 需要为不同硬件,操作系统和编程语言编写不同的代码 所有硬件,操作系统和编程语言使用相同的API
发现 需要硬编码IP地址和端口号 动态发现,无需关注端点所在位置
类型安全 没有类型安全,应用需要将字节流转换成正确类型 强类型安全,write()read()针对特定数据类型
通信行为定制 需要通过自定义的代码来实现 通过QoS策略来完成
互操作性 不支持 具有公认的互操作性的开放标准

关于DDS的更多特性,可以点击这个链接:《What is DDS?》

DDS可以降低系统复杂度

对于分布式系统来说,有很多复杂的逻辑需要处理,例如:如何发现其他节点,如何为每个节点分配地址,如何配置消息的可靠性等。这使得应用程序变得臃肿。

而如果说通信的中间件能够完全处理好这些逻辑,则应用程序将可以集中处理自己的业务,变得更加敏捷。

下图是两种情况的对比:

如果考虑系统的演化,问题会更突出。

由于分布式系统中包含了许多的角色需要互相通信,随着角色数量的不断增长,其通信的通道数量会以爆炸式增长。

而如果有统一的DataBus,则即便新增了通信角色其通信模型也不会变得更加复杂。

DDS应用范围

尽管可能你原先没有听过DDS这个术语,但其实它的应用非常广泛,广泛到它涉及到了我们每天都要依赖的许多重要行业,例如:航空,国防,交通,医疗,能源等等。

下图是一些示例:

DDS 提供商

DDS本身是一套标准。由Object Management Group(简称OMG)维护。

OMG是一个开放性的非营利技术标准联盟,由许多大型IT公司组成:包括IBM,Apple Computer,Sun Microsystems等。

但OMG仅仅负责制定标准,而标准的实现则由其他服务提供商完成。

目前DDS的提供商包括下面这些:

DDS与RTPS

在DDS规范中,有两个描述标准的基本文档:

  • DDS Specification:描述了以数据为中心的发布-订阅模型。该规范定义了API和通信语义(行为和服务质量),使消息从消息生产者有效地传递到匹配的消费者。DDS规范的目的可以概括为:“能够在正确的时间将正确的信息高效,可靠地传递到正确的位置”。
  • DDSI-RTPS:描述了RTPS(Real Time Publish Subscribe Protocol)协议。该协议通过UDP等不可靠的传输,实现最大努力(Best-Effort)和可靠的发布-订阅通信。RTPS是DDS实现的标准协议,它的目的和范围是确保基于不同DDS供应商的应用程序可以实现互操作。

DDS与汽车行业

对于汽车行业来说,汽车开放系统架构(AUTomotive Open System ARchitecture)已经在AUTOSAR Adaptive Platform 18.03中包含了DDS协议

ROS2架构也以DDS为基础

另外,DDS的实时特性可能特别适合自动驾驶系统。在这类系统中,通常会存在感知,预测,决策和定位模块,这些需要非常高速和频繁的交换数据。借助DDS,可以很好的满足它们的通信需求。

Fast-RTPS 介绍

Fast-RTPS是eprosima对于RTPS的C++实现,这是一个免费开源软件,遵循Apache License 2.0。

eProsima Fast RTPS在性能,功能和对最新版本RTPS标准(RTPS 2.2)的遵守方面均处于领先地位。关于Fast RTPS的性能可以查看这个链接:eProsima Fast RTPS Performance

它最为被大家知道的可能是因为被ROS2设定为默认的消息中间件。

Fast-RTPS支持平台包括:Windows, Linux, Mac OS, QNX, VxWorks, iOS, Android, Raspbian。

Fast-RTPS具有以下优点:

  • 对于实时应用程序来说,可以在Best-Effort和可靠通信两种策略上进行配置。
  • 即插即用的连接性,使得网络的所有成员自动发现其他新的成员。
  • 模块化和可扩展性允许网络中设备不断增长。
  • 可配置的网络行为和可互换的传输层:为每个部署选择最佳协议和系统输入/输出通道组合。
  • 两个API层:一个简单易用的发布者-订阅者层和一个提供对RTPS协议内部更好控制的Writer-Reader层。

源码与编译

Fast-RTPS的源码位于Github上:eProsima/Fast-RTPS

可以通过下面这条命令获取其源码:

git clone https://github.com/eProsima/Fast-RTPS.git

关于如何编译Fast-RTPS可以参见这个链接:Fast RTPS Installation from Sources

Fast-RTPS概述

Fast-RTPS提供了两个层次的API:

  • Publisher-Subscriber层:RTPS上的简化抽象。
  • Writer-Reader层:对于RTPS端点的直接控制。

相较而言,后者更底层。两个层次的核心角色如下图所示:

Publisher-Subscriber层为大多数开发者提供了一个方便的抽象。它允许定义与Topic关联的发布者和订阅者,以及传输Topic数据的简单方法。

Writer-Reader层更接近于RTPS标准中定义的概念,并且可以进行更精细的控制,但是要求开发者直接与每个端点的历史记录缓存进行交互。

Fast RTPS是并发且基于事件的。每个参与者都会生成一组线程来处理后台任务,例如日志记录,消息接收和异步通信。

事件系统使得Fast RTPS能够响应某些条件并安排定期活动。用户中几乎不用感知它们,因为这些事件大多数仅仅与RTPS元数据有关。

对象与数据结构

下面是Fast-RTPS实现中的核心结构。

Publish-Subscriber模块

RTPS标准的高层类型。

  • Domain:用来创建,管理和销毁Participants。
  • Participant:包括Publisher和Subscriber,并管理它们的配置。
    • ParticipantAttributes:创建Participant的配置参数。
    • ParticipantListener:可以让开发者实现Participant的回调函数。
  • Publisher:在Topic上发布数据的对象。
    • PublisherAttributes:创建Publisher的配置参数。
    • PublisherListener:可以让开发者实现Publisher的回调函数。
  • Subscriber:在Topic上接受数据的对象。
    • SubscriberAttributes:创建Subscriber的配置参数。
    • SubscriberListener:可以让开发者实现Subscriber的回调函数。

RTPS模块

RTPS的底层模型。包含下面几个子模块:

  • RTPS Common
    • CacheChange_t:描述Topic上的变更,存储在历史Cache中。
    • Data:Cache变化的负载。
    • Message:RTPS消息。
    • Header:RTPS协议的头信息。
    • Sub-Message Header:标识RTPS的订阅消息。
    • MessageReceiver:反序列化和处理接受到的RTPS消息。
    • RTPSMessageCreator:构建RTPS消息。
  • RTPS Domain
    • RTPSDomain:用来创建,管理和销毁底层的RTPSParticipants。
    • RTPSParticipant:包括Writer和Reader。
  • RTPS Reader
    • RTPSReader:读者的基类。
    • ReaderAttributes:包含RTPS读者的配置参数。
    • ReaderHistory:存储Topic变化的历史数据。
    • ReaderListener:读者的回调类型。
  • RTPS Writer
    • RTPSWriter:写者的基类。
    • WriterAttributes:包含RTPS写者的配置参数。
    • WriterHistory:存储写者的历史数据。

配置Attributes

上面的数据结构中看到了许多Attributes后缀的类名。这些类包含了对协议或者对象的配置参数,很多特性都需要设置这些属性来完成。

这些类的定义基本都位于下面三个文件夹中:

Fast RTPS支持非常多的配置参数,并且参数的结构常常是嵌套的。

通过代码去配置这些参数会产生很多啰嗦的代码,而且最大的问题在于:每次更改配置参数都需要重新编译。这个问题并非Fast RTPS才有,只要包含大量配置参数的软件都会这样的问题。通常的解决方法就是:提供文本格式的配置文件的方式来配置参数。因此对于Fast-RTPS来说,除了支持通过代码配置参数,它也支持通过XML文件的方式来进行配置。

有了配置文件之后,在代码中直接读取就好了,例如:

Participant *participant = Domain::createParticipant("participant_xml_profile");

在这之后,如果需要调整配置,只需要修改配置文件,不用在改动代码,自然也不用重新编译。这对于项目部署是很重要的。

Fast-RTPS支持的配置项,以及这些配置项说明和默认值都可以到这个链接中查看:XML profiles

Domain

RTPS中的通信参数者之间,通过Domain进行隔离。

同一时刻可能有多个Domain同时存在,一个Domain中可以包含任意数目的消息发送者和接受者。

其结构如下图所示:

开发者可以通过domainId来指定参与者所属Domain。

如果没有指定,默认的domainId = 80

发现

作为DDS的实现,Fast-RTPS提供了Publisher和Subscriber自动发现和匹配的功能。在实现上,这分为两个步骤来完成:

  • Participant Discovery Phase (PDP):在这个阶段,参与者互相通知彼此的存在。为了达到这个目的,每个参与者需要定时发送公告消息。公告消息通过周知的多播地址和端口发送(根据domain计算得到)。
  • Endpoint Discovery Phase (EDP):在这个阶段,Publisher和Subscriber互相确认。为此,参与者使用在PDP期间建立的通信通道,彼此共享有关其发布者和订阅者的信息。 该信息包含了Topic和数据类型。为了使两个端点匹配,它们的Topic和数据类型必须一致。 一旦发布者和订阅者匹配,他们就发送/接收数据了。

这两个阶段对应了两个独立的协议:

  • Simple Participant Discovery Protocol:指定参与者如何在网络中发现彼此。
  • Simple Endpoint Discovery Protocol:定义了已经互相发现的参与者交换信息的协议。

Fast-RTPS提供了四种发现机制:

  • Simple:这是默认机制。它在PDP和EDP阶段均使用RTPS标准,因此可与任何其他DDS和RTPS实现兼容。
  • Static:此机制在PDP阶段使用Simple Participant Discovery Protocol。如果所有发布者和订阅者的地址以及端口和主题信息是事先知道的,则允许跳过EDP阶段。
  • Server-Client:这种发现机制使用集中式发现结构,由服务器充当发现机制的Hub。
  • Manual:此机制仅与RTPSDomain层兼容。它禁用了PDP阶段,使用户可以使用其选择的任何外部元信息通道手动匹配和取消匹配RTPS参与者,读者和写者。

不同的发现机制具有一些共同的配置:

名称 描述 默认值
Ignore Participant flags 在必要的时候,可以选择忽略一些参与者。
例如:另一台主机上的,另一个进程的或者同一个进程的。
NO_FILTER
Lease Duration 指定远程参与者在多少时间内认为本地参与者还活着。 20 s
Announcement Period 指定参与者的PDP公告消息的周期。 3s

关于发现机制的更多信息可以浏览这个链接:Discovery

传输控制

Fast-RTPS实现了可插拔的传输架构,这意味着每一个参与者可以随时加入和退出。

在传输上,Fast-RTPS支持以下五种传输方式:

  • UDPv4
  • UDPv6
  • TCPv4
  • TCPv6
  • SHM(Shared Memory)

默认的,当Participant创建时,会自动的配置两个传输通道:

  • SHM:用来与同一个机器上的参与者通信。
  • UDPv4:同来与跨机器的参与者通信。

当然,开发者可以改变这个默认行为,通过C++接口或者XML配置文件都可以。

SHM要求所有参与者位于同一个系统上,它是借助了操作系统提供的共享内存机制实现。共享内存的好处是:支持大数据传输,减少了数据拷贝,并且也减少系统负载。因此通常情况下,使用SHM会获得更好的性能。使用SHM时,可以配置共享内存的大小。

网络通信包含了非常多的参数需要配置,例如:Buffer大小,端口号,超时时间等等。框架本身为参数设置了默认值,大部分情况下开发者不用调整它们。但是知道这些默认值是什么,在一些情况下可能会对分析问题有所帮助。关于这些配置的默认值,以及如果配置可以查看这个链接:Transport descriptors

与UDP不同,TCP传输是面向连接的,因此,Fast-RTPS必须在发送RTPS消息之前建立TCP连接。TCP传输可以具有两种行为:充当TCP服务器或充当TCP客户端。服务器打开一个TCP端口以侦听传入的连接,然后客户端尝试连接到服务器。服务器和客户端的概念独立于RTPS概念,例如:Publisher,Subscriber,Reader或Writer。它们中的任何一个都可以用作TCP服务器或TCP客户端,因为这些实体仅用于建立TCP连接,而RTPS协议可以在该TCP连接上工作。

如果要使用TCP传输,开发者需要做更多的配置,关于这部分内容可以继续阅读官方文档,这里不再赘述。

FastRTPS代码示例

FastRTPS不仅有框架文档API Reference,还有丰富的代码示例。

对于开发者来说,浏览这些代码可能是上手最快捷的方法。

你可以在这里浏览这些示例:Fast-RTPS/examples/C++/

FASTRTPSGEN

FASTRTPSGEN是一个Java程序。用来为在Topic上传输的数据类型生成源码。

开发者通过接口描述语言(Interface Definition Language)定义需要传输的数据类型。然后通过FASTRTPSGEN生成C++编译需要的源文件。

可以通过下面的方法获取和编译FASTRTPSGEN。

git clone --recursive https://github.com/eProsima/Fast-RTPS-Gen.git
cd Fast-RTPS-Gen
gradle assemble

编译完成之后可执行文件位于./scripts/目录。如果需要,可以将该路径添加到$PATH变量中。

关于如何通过IDL定义数据类型请参见这里:Defining a data type via IDL

以下面这个示例文件为例:

struct TestData 
{
char char_type;
octet octet_type;
long long_type;
string string_type;

float float_array[4];

sequence<double> double_list;
};

我们将其保存到文件名为data_type.idl的文件中。然后通过下面这条命令生成C++文件:

~/Fast-RTPS-Gen/scripts/fastrtpsgen data_type.idl 

最后会得到下面四个文件:

data_type.cxx
data_type.h
data_typePubSubTypes.cxx
data_typePubSubTypes.h

前两个文件定义的是实际存储数据的结构,后两个文件定义的类是eprosima::fastrtps::TopicDataType的子类。用来在参与者上注册类型:

/**
 * Register a type in a participant.
 * @param part Pointer to the Participant.
 * @param type Pointer to the Type.
 * @return True if correctly registered.
 */
RTPS_DllAPI static bool registerType(
        Participant* part,
        fastdds::dds::TopicDataType * type);

每一套通信系统中通常都会包含一个或多个自定义的数据类型。

发布者-订阅者层

可以通过 HelloWorldExample 来熟悉发布者-订阅者层接口。

该目录下文件列表如下:

-rw-r--r--  1 paul  staff   1.8K  3 16 13:36 CMakeLists.txt
-rw-r--r--  1 paul  staff   2.8K  3 16 13:36 HelloWorld.cxx
-rw-r--r--  1 paul  staff   6.1K  3 16 13:36 HelloWorld.h
-rw-r--r--  1 paul  staff    62B  3 16 13:36 HelloWorld.idl
-rw-r--r--  1 paul  staff   4.4K  3 16 13:36 HelloWorldPubSubTypes.cxx
-rw-r--r--  1 paul  staff   1.7K  3 16 13:36 HelloWorldPubSubTypes.h
-rw-r--r--  1 paul  staff   4.6K  3 16 13:36 HelloWorldPublisher.cpp
-rw-r--r--  1 paul  staff   1.7K  3 16 13:36 HelloWorldPublisher.h
-rw-r--r--  1 paul  staff   3.8K  3 16 13:36 HelloWorldSubscriber.cpp
-rw-r--r--  1 paul  staff   1.8K  3 16 13:36 HelloWorldSubscriber.h
-rw-r--r--  1 paul  staff   2.0K  3 16 13:36 HelloWorld_main.cpp
-rw-r--r--  1 paul  staff   3.1K  3 16 13:36 Makefile
-rw-r--r--  1 paul  staff   203B  3 16 13:36 README.txt

这其中:

  • README.txt是工程说明
  • CMakeLists.txt与Makefile是编译文件
  • HelloWorld_main.cpp包含了生成可执行文件的main函数
  • HelloWorld.idl是待传输的数据结构定义
  • HelloWorld.h,HelloWorld.cxx,HelloWorldPubSubTypes.h和HelloWorldPubSubTypes.cxx是由HelloWorld.idl文件生成
  • HelloWorldPublisher.h和HelloWorldPublisher.cpp是发布者的实现
  • HelloWorldSubscriber.h和HelloWorldSubscriber.cpp是订阅者的实现

熟悉一个C++工程可以先从main入手,HelloWorld_main.cpp中的主要逻辑就是根据用户输入的参数是"publisher"还是"subscriber"来确定启动哪个模块。

switch(type)
{
    case 1:
        {
            HelloWorldPublisher mypub;
            if(mypub.init())
            {
                mypub.run(count, sleep);
            }
            break;
        }
    case 2:
        {
            HelloWorldSubscriber mysub;
            if(mysub.init())
            {
                mysub.run();
            }
            break;
        }
}

接下来我们直接看HelloWorldPublisher和HelloWorldSubscriber就好。

HelloWorldPublisher::init中主要是为Publisher的对象设置参数:

bool HelloWorldPublisher::init()
{
    m_Hello.index(0);
    m_Hello.message("HelloWorld");
    ParticipantAttributes PParam;
    PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
    PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true;
    PParam.rtps.builtin.domainId = 0;
    PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
    PParam.rtps.setName("Participant_pub");
    mp_participant = Domain::createParticipant(PParam);

    if(mp_participant==nullptr)
        return false;
    //REGISTER THE TYPE

    Domain::registerType(mp_participant,&m_type);

    //CREATE THE PUBLISHER
    PublisherAttributes Wparam;
    Wparam.topic.topicKind = NO_KEY;
    Wparam.topic.topicDataType = "HelloWorld";
    Wparam.topic.topicName = "HelloWorldTopic";
    Wparam.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
    Wparam.topic.historyQos.depth = 30;
    Wparam.topic.resourceLimitsQos.max_samples = 50;
    Wparam.topic.resourceLimitsQos.allocated_samples = 20;
    Wparam.times.heartbeatPeriod.seconds = 2;
    Wparam.times.heartbeatPeriod.nanosec = 200*1000*1000;
    Wparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
    mp_publisher = Domain::createPublisher(mp_participant,Wparam,(PublisherListener*)&m_listener);
    if(mp_publisher == nullptr)
        return false;

    return true;

}

这里的参数配置请参阅API说明:

Publisher发送消息的逻辑很简单:

bool HelloWorldPublisher::publish(bool waitForListener)
{
    if(m_listener.firstConnected || !waitForListener || m_listener.n_matched>0)
    {
        m_Hello.index(m_Hello.index()+1);
        mp_publisher->write((void*)&m_Hello);
        return true;
    }
    return false;
}

注意,这里write的对象是通过idl文件生成的类型。

Subscriber的初始化和Publisher是类似的:

bool HelloWorldSubscriber::init()
{
    ParticipantAttributes PParam;
    PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
    PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true;
    PParam.rtps.builtin.domainId = 0;
    PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
    PParam.rtps.setName("Participant_sub");
    mp_participant = Domain::createParticipant(PParam);
    if(mp_participant==nullptr)
        return false;

    //REGISTER THE TYPE

    Domain::registerType(mp_participant,&m_type);
    //CREATE THE SUBSCRIBER
    SubscriberAttributes Rparam;
    Rparam.topic.topicKind = NO_KEY;
    Rparam.topic.topicDataType = "HelloWorld";
    Rparam.topic.topicName = "HelloWorldTopic";
    Rparam.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
    Rparam.topic.historyQos.depth = 30;
    Rparam.topic.resourceLimitsQos.max_samples = 50;
    Rparam.topic.resourceLimitsQos.allocated_samples = 20;
    Rparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
    Rparam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
    mp_subscriber = Domain::createSubscriber(mp_participant,Rparam,(SubscriberListener*)&m_listener);

    if(mp_subscriber == nullptr)
        return false;


    return true;
}

当然,Subscriber有自己的配置参数类型:

需要注意的是,Subscriber与Publisher的通信是建立在Topic上的,因此对于Topic标识的配置要保持一致:

Wparam.topic.topicDataType = "HelloWorld";
Wparam.topic.topicName = "HelloWorldTopic";

有了Topic的这个抽象概念,使得Subscriber与Publisher不用物理地址上有任何关联,也屏蔽了硬件和操作系统的差异:同样的代码,其编译产物可以一个跑在x86的Mac系统上,一个跑在ARM架构的Android设备上。

Subscriber通过void HelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber* sub)方法来处理接受到的数据。在示例的实现中,就是将消息体打印出来:

void HelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber* sub)
{
    if(sub->takeNextData((void*)&m_Hello, &m_info))
    {
        if(m_info.sampleKind == ALIVE)
        {
            this->n_samples++;
            // Print your structure data here.
            std::cout << "Message "<<m_Hello.message()<< " "<< m_Hello.index()<< " RECEIVED"<<std::endl;
        }
    }

}

Publisher与Subscriber各自有一些回调,开发者可以利用它们来进行需要的处理:

回调 Publisher Subscriber
onNewDataMessage -
onSubscriptionMatched -
on_requested_deadline_missed -
on_liveliness_changed -
onPublicationMatched -
on_offered_deadline_missed -
on_liveliness_lost -

读者-写者层

读者-写者层是相对于发布者-订阅者层更底层的API。

它提供了更多的控制,但也意味着使用起来会稍微麻烦一些。

两个层次在几个核心概念上存在一一对应的关系,如下表所示:

Publisher-Subscriber Layer Writer-Reader Layer
Domain RTPSDomain
Participant RTPSParticipant
Publisher RTPSWriter
Subscriber RTPSReader

如果你浏览Fast-RTPS的源码,你会发现其实发布者-订阅者层的实现就是依赖读者-写者层的。

想要很快的熟悉读者-写者层的使用可以浏览下面三个代码示例:

RTPSParticipant,RTPSWriter和RTPSReader都通过RTPSDomain创建。

相对于发布者-订阅层不一样的是,这一层不支持通过XML的形式配置参数。开发者必须通过代码的形式配置所有的参数,例如:

//CREATE PARTICIPANT
RTPSParticipantAttributes PParam;
PParam.builtin.discovery_config.discoveryProtocol = eprosima::fastrtps::rtps::DiscoveryProtocol::SIMPLE;
PParam.builtin.use_WriterLivelinessProtocol = true;
mp_participant = RTPSDomain::createParticipant(PParam);
if(mp_participant==nullptr)
	return false;

//CREATE WRITERHISTORY
HistoryAttributes hatt;
hatt.payloadMaxSize = 255;
hatt.maximumReservedCaches = 50;
mp_history = new WriterHistory(hatt);

//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
mp_writer = RTPSDomain::createRTPSWriter(mp_participant,watt,mp_history,&m_listener);

这里的逻辑主要就是设置参数和创建RTPSParticipant,RTPSWriter对象。并且,RTPSParticipant将被用来注册RTPSWriter:

TopicAttributes Tatt;
Tatt.topicKind = NO_KEY;
Tatt.topicDataType = "string";
Tatt.topicName = "exampleTopic";
ReaderQos Rqos;
return mp_participant->registerReader(mp_reader, Tatt, Rqos);

在RTPS协议中,Reader和Writer将有关Topic的数据保存在其关联的历史记录中。每个数据段都由一个变更表示,对应的实现是CacheChange_t

更改通过历史记录管理。读者和写者的历史是两种类型:

  • eprosima::fastrtps::rtps::WriterHistory;
  • eprosima::fastrtps::rtps::ReaderHistory;

对于Writer来说,发送消息是往历史中添加变更:

//Request a change from the history
CacheChange_t* change = writer->new_change([]() -> uint32_t { return 255;}, ALIVE);
//Write serialized data into the change
change->serializedPayload.length = sprintf((char*) change->serializedPayload.data, "My example string %d", 2)+1;
//Insert change back into the history. The Writer takes care of the rest.
history->add_change(change);

而对于Reader来说,新消息会被放入到历史中,读取完了可以将其删除:

void TestReaderRegistered::MyListener::onNewCacheChangeAdded(
        RTPSReader* reader,
        const CacheChange_t* const change)
{
    printf("Received: %s\n", change->serializedPayload.data);
    reader->getHistory()->remove_change((CacheChange_t*)change);
    n_received++;
}

框架会根据消息触发Reader的回调。

持久化

默认情况下,Writer的历史在其生命周期以内可以被Reader访问。这意味着,一旦Writer退出,则其历史就没有了。但如果需要,你可以配置持久化,这使得即便Writer重启了,仍然可以维护早先的历史。

使用持久化功能可以保护端点的状态免受意外故障的影响,因为端点在重新启动后仍会继续通信,就像它们刚从网络断开连接一样。

你可以通过RTPSTest_persistent这个示例来了解如何使用这个功能。

要使用持久化功能,Writer和Reader需要进行以下设置:

  • durabilityKind设置为TRANSIENT
  • persistence_guid不能是全0
  • 为Writer,Reader或者RTPSParticipant设置持久化插件。目前内置的插件是SQLITE3。

下面是一段代码示例:

PropertyPolicy property_policy;
property_policy.properties().emplace_back("dds.persistence.plugin", "builtin.SQLITE3");
property_policy.properties().emplace_back("dds.persistence.sqlite3.filename", "test.db");

//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
watt.endpoint.durabilityKind = TRANSIENT;
watt.endpoint.persistence_guid.guidPrefix.value[11] = 1;
watt.endpoint.persistence_guid.entityId.value[3] = 1;
watt.endpoint.properties = property_policy;

mp_writer = RTPSDomain::createRTPSWriter(mp_participant, watt, mp_history, &m_listener);

durabilityKind参数定义了Writer与新Reader匹配时对于已发送的数据的行为,该参数有三个选项:

  • VOLATILE(默认值):丢掉所有已经发送的数据。
  • TRANSIENT_LOCAL:保存最近发送的k条数据。
  • TRANSIENT:与TRANSIENT_LOCAL类似,但是还将消息保存到持久化存储中。这就使得即便它的进程异常退出了,其数据不会丢失。

对于读者来说,其配置方法是类似的:

PropertyPolicy property_policy;
property_policy.properties().emplace_back("dds.persistence.plugin", "builtin.SQLITE3");
property_policy.properties().emplace_back("dds.persistence.sqlite3.filename", "test.db");

//CREATE READER
ReaderAttributes ratt;
Locator_t loc(22222);
ratt.endpoint.unicastLocatorList.push_back(loc);
ratt.endpoint.durabilityKind = TRANSIENT;
ratt.endpoint.persistence_guid.guidPrefix.value[11] = 2;
ratt.endpoint.persistence_guid.entityId.value[3] = 1;
ratt.endpoint.properties = property_policy;
mp_reader = RTPSDomain::createRTPSReader(mp_participant, ratt, mp_history, &m_listener);

QoS

使用Fast-RTPS,你有非常多的QoS策略可以配置。

它们主要可以分为下面几类:

  • durability:定义了Writer与新Reader匹配时对于已发送的数据的行为,“持久化”一节已经提到过。
  • liveliness:定义Publisher的活跃程度。例如:多长时间发布一次公告消息。
  • reliability:定义消息的可靠性。它有两个选项:1、BEST_EFFORT,发送消息时,接收者(订阅者)没有到达确认。速度快,但是消息可能会丢失。2、RELIABLE,发送方(发布者)期望接收方(订阅者)进行到达确认。速度较慢,但可以防止数据丢失。
  • partition:可以在domain的物理分区上建立逻辑分区。
  • deadline:指定消息的更新频率,当新消息的频率降至某个阈值以下时,会发出警报。这对于需要定期更新数据的场景很有用。
  • lifespan:指定Publisher发布数据的最大有效期限。当使用寿命到期时,数据将从历史记录中删除。
  • disablePositiveAcks:指定是否需要取消确认消息。在不需要严格可靠的通信且带宽受限时,这么做可以减少网络流量。

在实现中,QoS包含了一系列的类,它们继承自QoSPolicy父类:

之所以提供如此多的选择,是因为不同的系统对于消息的质量有不同的要求。

在实际系统中,并非每个端点都需要本地存储所有数据。DDS在发送信息方面很聪明,如果消息不一定总是到达预期的目的地,则中间件将保证需要的可靠性。当系统发生更改时,Fast-RTPS会动态地找出将哪些数据发送到何处,并智能地将更改通知参与者。如果总数据量巨大,则DDS会智能过滤并仅发送每个端点真正需要的数据。当需要快速更新时,DDS发送多播消息以一次更新许多远程应用程序。当数据格式变更时,DDS会跟踪系统各个部分使用的版本并自动进行转换。对于安全性至关重要的应用程序,DDS控制访问,强制执行数据流路径并实时加密数据。

当系统要满足:以极高的速度,在动态,苛刻且不可预测的环境工作时,DDS的真正威力就会显现。

实操测试

文章的最后,我们通过实际运行程序来进行一些实验。虽然Fast-RTPS支持非常多的操作系统,但在Ubuntu系统上验证可能是最方便的。

Fast-RTPS是面向分布式系统的,这意味着在一个系统上验证它的功能意义不大。但另一方面我们大部分人并没有同时拥有多个设备。

在这种情况下,我们可以借助docker,它可以在同一个系统上运行多个独立的虚拟系统。然后我们就可以在这些独立的系统上进行测试了,这样就模拟了分布式的环境。

Fast-RTPS提供了包含依赖环境的Docker容器,我们只要下载和运行这些容器,就可以拥有多个独立的系统了。

不过,在这运行下面这些示例之前,你需要配置好docker环境。关于docker的基本使用已经超过了本文的范畴,你可以浏览这个链接:Install Docker Engine on Ubuntu

Fast-RTPS需要的文件可以到官网下载:https://eprosima.com/index.php/downloads-all

点击上面这个链接,然后输入个人信息就可以进入下载页面了。你可以选择最新版本的Docker和Fast-RTPS包进行下载:

考虑到国内的网络状况,下载的速度可能非常慢。我下载需要的文件耗费的好几个小时,为了节省你的时间,我已经将下载好的文件放在了这里:

在Ubuntu系统中,先将eProsima_FastRTPS-1.9.3-Linux.tgz解压缩,为了编译它,还需要安装一些依赖,相关命令如下:

sudo apt install cmake g++
sudo apt install libasio-dev libtinyxml2-dev
mkdir fast-rtps
tar -xvf eProsima_FastRTPS-1.9.3-Linux.tgz -C fast-rtps/
cd fast-rtps/
chmod a+x install.sh
sudo ./install.sh

在这之后你就可以转到/fast-rtps/src/fastrtps/examples/目录下编译示例了。不过这个目录下的CMakeList.txt似乎存在问题,我在这个文件的开头增加了下面一行才完成编译:

SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")

编译完成之后,我们并非是在Ubuntu系统上运行程序,而是将这些可执行文件放到docker容器中,以分布式的环境来运行它们。

所以需要启动docker容器:

$ docker load -i ubuntu-fast-rtps.tar
$ docker run -it ubuntu-fast-rtps:v1.9.3

你可以通过docker run -it ...同时启动多个docker容器以进行测试(每个容器对应一个通信的参与者。当然,你需要同时打开多个shell窗口)。

例如我启动了两个docker容器:

CONTAINER ID        IMAGE                     COMMAND             CREATED             STATUS              PORTS               NAMES
2125504ee62f        ubuntu-fast-rtps:v1.9.3   "/bin/bash"         5 minutes ago       Up 5 minutes                            mystifying_jennings
b17517fefecd        ubuntu-fast-rtps:v1.9.3   "/bin/bash"         23 minutes ago      Up 23 minutes                           stoic_leavitt

运行docker run -it ...之后会直接进入docker的shell中,你可以在根目录创建fastrtps目录用来存放测试程序。

然后在Ubuntu系统上将编译出的示例程序拷贝到docker中:

sudo docker cp ./ b17517fefecd:/fastrtps
sudo docker cp ./ 2125504ee62f:/fastrtps

在这之后就可以转到docker容器的shell中运行测试程序了。

例如,在两个docker上运行HelloWorld的示例:

  • 下面是Publisher程序:
root@b17517fefecd:/fastrtps/HelloWorldExample# ./HelloWorldExample publisher
Publisher running 10 samples.
Publisher matched
Message: HelloWorld with index: 1 SENT
Message: HelloWorld with index: 2 SENT
Message: HelloWorld with index: 3 SENT
Message: HelloWorld with index: 4 SENT
Message: HelloWorld with index: 5 SENT
Message: HelloWorld with index: 6 SENT
Message: HelloWorld with index: 7 SENT
Message: HelloWorld with index: 8 SENT
Message: HelloWorld with index: 9 SENT
Message: HelloWorld with index: 10 SENT
  • 下面是Subscriber程序:
root@2125504ee62f:/fastrtps/HelloWorldExample# ./HelloWorldExample subscriber
Starting 
Subscriber running. Please press enter to stop the Subscriber
Subscriber matched
Message HelloWorld 1 RECEIVED
Message HelloWorld 2 RECEIVED
Message HelloWorld 3 RECEIVED
Message HelloWorld 4 RECEIVED
Message HelloWorld 5 RECEIVED
Message HelloWorld 6 RECEIVED
Message HelloWorld 7 RECEIVED
Message HelloWorld 8 RECEIVED
Message HelloWorld 9 RECEIVED
Message HelloWorld 10 RECEIVED
Subscriber unmatched

接下来是Benchmark程序:

  • Benchmark subscriber端:
root@b17517fefecd:/fastrtps/Benchmark# ./Benchmark subscriber
Subscriber running...
Subscriber matched
Publisher matched
Subscriber unmatched
Publisher unmatched
  • Benchmark publisher端:
root@2125504ee62f:/fastrtps/Benchmark# ./Benchmark publisher
Publisher running...
Subscriber matched
Publisher matched. Test starts...
RESULTS after 10000 milliseconds:
COUNT: 53951
SAMPLES: 0,771,668,548,582,716,700,706,408,440,592,636,738,698,648,574,706,776,690,584,638,556,750,740,640,584,572,542,526,560,552,528,608,504,630,478,598,708,620,528,660,718,578,646,702,528,652,528,450,508,566,544,516,616,652,584,532,434,542,678,752,696,412,544,654,766,736,612,496,470,662,580,566,634,674,568,532,546,528,552,552,528,490,508,598,620,672,506,468,654,

在运行的过程中,你可以感受到借助Fast-RTPS,不同系统上的参与者是多么快速的发现了对方并完成了通信的。 当然,你可以运行更多的用例,或者修改代码进行你想要的测试。

参考资料与推荐读物


原文地址:《DDS与FastRTPS》 by 保罗的酒吧
 Contents