DDS是一套通信协议和API标准,它提供了以数据为中心的连接服务。Fast-RTPS是DDS的开源实现,借助它可以方便的开发出高效,可靠的分布式系统。本文是对DDS以及Fast RTPS的介绍文章。
- 从《变形金刚》电影说起
- DDS介绍
- DDS与汽车行业
- Fast-RTPS 介绍
- 源码与编译
- Fast-RTPS概述
- 对象与数据结构
- Domain
- 发现
- 传输控制
- FastRTPS代码示例
- FASTRTPSGEN
- 发布者-订阅者层
- 读者-写者层
- 持久化
- QoS
- 实操测试
- 参考资料与推荐读物
从《变形金刚》电影说起
这里要提到的是2011年的真人版电影,变形金刚第三部《Transformers: Dark of the Moon》。
这是一篇技术文章,为什么要扯到《变形金刚》电影呢?这是因为这部电影的主要内容与本文所提到的技术有一定的相关性。
在这部电影中,御天敌背叛了擎天柱,与霸天虎合作。在地球的各地布置了许多的能量柱,他试图借助这些能量柱将赛博坦星球传送到地球上,以此来重建自己的家园。
这些能量柱必须组合起来才能完成传输工作,并且在这其中有一个红色的能量柱比较特殊,因为它负责控制其他的传送柱。
由此可见,这是一个大型的分布式系统。在这个系统中,这个红色的能量柱被称之为“中心节点”,中心节点正如其名称那样,它是整个系统的中心。对于带有中心节点的分布式系统来说,一旦中心节点被摧毁,整个系统都将无法工作。
因此电影的后来,自然是擎天柱摧毁了这个中心节点,使得御天敌的传送计划彻底失败。
从设计上来说,对于一个如此大型的系统,却存在一个非常薄弱和重要的中心节点,这并不是一个好的方案。
而本文介绍的DDS就是一个去中心化的分布式技术。因此在这类系统中,不存在负责总控制的中心节点,所有节点都完全对等。任何一个节点的异常都不会影响整个系统的运行。
DDS介绍
DDS全称是Data Distribution Service,这是一套通信协议和API标准,它提供了以数据为中心的连接服务,基于发布者-订阅者模型。这是一套中间件,它提供介于操作系统和应用程序之间的功能,使得组件之间可以互相通信。并且提供了低延迟,高可靠的通信以及可扩展的架构。
或许,你已经知道很多种网络通信协议,对于发布-订阅这些概念也很熟悉。那DDS到底有什么特别之处呢?
下图展示了4个时代的数据通信方式:
- (第一代)点对点的CS(Client-Server)结构,这是大家最为熟悉的:一个服务器角色被许多的客户端使用,每次通信时,通信双方必须建立一条连接。当通信节点增多时,通信的连接数也会增多。并且,每个客户端都必须知道服务器的具体地址和所提供的服务。一旦服务器地址发生变化,所有客户端都会受到影响。
- (第二代)Broker模型:存在一个中间人,它负责初步处理大家的请求,并进一步找到真正能响应服务的角色,这就好像存在一个经纪人。这为客户端提供了一层抽象,使得服务器的具体地址变得不重要了。服务端地址如果发生变化,只需要告诉Broker就可以了。但这个模型的问题在于,Broker变成了模型的中心,它的处理速度会影响所有人的效率,这就好像城市中心的路口,当系统规则增长到一定程度,Broker终究会成为瓶颈。更糟糕的是,如果Broker瘫痪了,可能整个系统都将无法运转。
- (第三代)广播模型:所有人都可以在通道上广播消息,并且所有人都可以收到消息。这个模型解决了服务器地址的问题,且通信双方不用单独建立连接,但它存在的问题是:广播通道上的消息太多,太嘈杂,所有人都必须关心每条消息是否与自己有关。这就好像全公司一千号人坐在同一个房间里面办公一样。
- (第四代)DDS模型:这种模型与广播模型有些类似,所有人都可以在DataBus上发布和读取消息。但它更进一步的是,通信中包含了很多并行的通路,每个人可以只关心自己感兴趣的消息,自动忽略自己不需要的消息。
下图展示了DDS在网络栈中的位置,它位于传输层的上面,并且以TCP,UDP为基础。
这个图之所以是沙漏形状是因为:两头的技术变化都发展很快,但是中间的却鲜有变化。
对比大家常见的Socker API,DDS有如下特点:
特性 | Socket API | DDS |
---|---|---|
架构 | TCP:点对点 UDP:点对点,广播,多播 |
Publish-subscribe模型 |
平台独立 | 需要为不同硬件,操作系统和编程语言编写不同的代码 | 所有硬件,操作系统和编程语言使用相同的API |
发现 | 需要硬编码IP地址和端口号 | 动态发现,无需关注端点所在位置 |
类型安全 | 没有类型安全,应用需要将字节流转换成正确类型 | 强类型安全,write() 和read() 针对特定数据类型 |
通信行为定制 | 需要通过自定义的代码来实现 | 通过QoS策略来完成 |
互操作性 | 不支持 | 具有公认的互操作性的开放标准 |
关于DDS的更多特性,可以点击这个链接:《What is DDS?》。
DDS可以降低系统复杂度
对于分布式系统来说,有很多复杂的逻辑需要处理,例如:如何发现其他节点,如何为每个节点分配地址,如何配置消息的可靠性等。这使得应用程序变得臃肿。
而如果说通信的中间件能够完全处理好这些逻辑,则应用程序将可以集中处理自己的业务,变得更加敏捷。
下图是两种情况的对比:
如果考虑系统的演化,问题会更突出。
由于分布式系统中包含了许多的角色需要互相通信,随着角色数量的不断增长,其通信的通道数量会以爆炸式增长。
而如果有统一的DataBus,则即便新增了通信角色其通信模型也不会变得更加复杂。
DDS应用范围
尽管可能你原先没有听过DDS这个术语,但其实它的应用非常广泛,广泛到它涉及到了我们每天都要依赖的许多重要行业,例如:航空,国防,交通,医疗,能源等等。
下图是一些示例:
DDS 提供商
DDS本身是一套标准。由Object Management Group(简称OMG)维护。
OMG是一个开放性的非营利技术标准联盟,由许多大型IT公司组成:包括IBM,Apple Computer,Sun Microsystems等。
但OMG仅仅负责制定标准,而标准的实现则由其他服务提供商完成。
目前DDS的提供商包括下面这些:
- Vortex OpenSplice
- eProsima Fast RTPS
- Hamersham
- Company Summary Kongsberg Gallium
- MilSOFT
- Object Computing OpenDDS
- Remedy IT
- RTI
- Twin Oaks Computing, Inc.
DDS与RTPS
在DDS规范中,有两个描述标准的基本文档:
- DDS Specification:描述了以数据为中心的发布-订阅模型。该规范定义了API和通信语义(行为和服务质量),使消息从消息生产者有效地传递到匹配的消费者。DDS规范的目的可以概括为:“能够在正确的时间将正确的信息高效,可靠地传递到正确的位置”。
- DDSI-RTPS:描述了RTPS(Real Time Publish Subscribe Protocol)协议。该协议通过UDP等不可靠的传输,实现最大努力(Best-Effort)和可靠的发布-订阅通信。RTPS是DDS实现的标准协议,它的目的和范围是确保基于不同DDS供应商的应用程序可以实现互操作。
DDS与汽车行业
对于汽车行业来说,汽车开放系统架构(AUTomotive Open System ARchitecture)已经在AUTOSAR Adaptive Platform 18.03中包含了DDS协议。
另外,DDS的实时特性可能特别适合自动驾驶系统。在这类系统中,通常会存在感知,预测,决策和定位模块,这些需要非常高速和频繁的交换数据。借助DDS,可以很好的满足它们的通信需求。
Fast-RTPS 介绍
Fast-RTPS是eprosima对于RTPS的C++实现,这是一个免费开源软件,遵循Apache License 2.0。
eProsima Fast RTPS在性能,功能和对最新版本RTPS标准(RTPS 2.2)的遵守方面均处于领先地位。关于Fast RTPS的性能可以查看这个链接:eProsima Fast RTPS Performance。
它最为被大家知道的可能是因为被ROS2设定为默认的消息中间件。
Fast-RTPS支持平台包括:Windows, Linux, Mac OS, QNX, VxWorks, iOS, Android, Raspbian。
Fast-RTPS具有以下优点:
- 对于实时应用程序来说,可以在Best-Effort和可靠通信两种策略上进行配置。
- 即插即用的连接性,使得网络的所有成员自动发现其他新的成员。
- 模块化和可扩展性允许网络中设备不断增长。
- 可配置的网络行为和可互换的传输层:为每个部署选择最佳协议和系统输入/输出通道组合。
- 两个API层:一个简单易用的发布者-订阅者层和一个提供对RTPS协议内部更好控制的Writer-Reader层。
源码与编译
Fast-RTPS的源码位于Github上:eProsima/Fast-RTPS。
可以通过下面这条命令获取其源码:
git clone https://github.com/eProsima/Fast-RTPS.git
关于如何编译Fast-RTPS可以参见这个链接:Fast RTPS Installation from Sources。
Fast-RTPS概述
Fast-RTPS提供了两个层次的API:
- Publisher-Subscriber层:RTPS上的简化抽象。
- Writer-Reader层:对于RTPS端点的直接控制。
相较而言,后者更底层。两个层次的核心角色如下图所示:
Publisher-Subscriber层为大多数开发者提供了一个方便的抽象。它允许定义与Topic关联的发布者和订阅者,以及传输Topic数据的简单方法。
Writer-Reader层更接近于RTPS标准中定义的概念,并且可以进行更精细的控制,但是要求开发者直接与每个端点的历史记录缓存进行交互。
Fast RTPS是并发且基于事件的。每个参与者都会生成一组线程来处理后台任务,例如日志记录,消息接收和异步通信。
事件系统使得Fast RTPS能够响应某些条件并安排定期活动。用户中几乎不用感知它们,因为这些事件大多数仅仅与RTPS元数据有关。
对象与数据结构
下面是Fast-RTPS实现中的核心结构。
Publish-Subscriber模块
RTPS标准的高层类型。
- Domain:用来创建,管理和销毁Participants。
- Participant:包括Publisher和Subscriber,并管理它们的配置。
- ParticipantAttributes:创建Participant的配置参数。
- ParticipantListener:可以让开发者实现Participant的回调函数。
- Publisher:在Topic上发布数据的对象。
- PublisherAttributes:创建Publisher的配置参数。
- PublisherListener:可以让开发者实现Publisher的回调函数。
- Subscriber:在Topic上接受数据的对象。
- SubscriberAttributes:创建Subscriber的配置参数。
- SubscriberListener:可以让开发者实现Subscriber的回调函数。
RTPS模块
RTPS的底层模型。包含下面几个子模块:
- RTPS Common
- CacheChange_t:描述Topic上的变更,存储在历史Cache中。
- Data:Cache变化的负载。
- Message:RTPS消息。
- Header:RTPS协议的头信息。
- Sub-Message Header:标识RTPS的订阅消息。
- MessageReceiver:反序列化和处理接受到的RTPS消息。
- RTPSMessageCreator:构建RTPS消息。
- RTPS Domain
- RTPSDomain:用来创建,管理和销毁底层的RTPSParticipants。
- RTPSParticipant:包括Writer和Reader。
- RTPS Reader
- RTPSReader:读者的基类。
- ReaderAttributes:包含RTPS读者的配置参数。
- ReaderHistory:存储Topic变化的历史数据。
- ReaderListener:读者的回调类型。
- RTPS Writer
- RTPSWriter:写者的基类。
- WriterAttributes:包含RTPS写者的配置参数。
- WriterHistory:存储写者的历史数据。
配置Attributes
上面的数据结构中看到了许多Attributes
后缀的类名。这些类包含了对协议或者对象的配置参数,很多特性都需要设置这些属性来完成。
这些类的定义基本都位于下面三个文件夹中:
Fast RTPS支持非常多的配置参数,并且参数的结构常常是嵌套的。
通过代码去配置这些参数会产生很多啰嗦的代码,而且最大的问题在于:每次更改配置参数都需要重新编译。这个问题并非Fast RTPS才有,只要包含大量配置参数的软件都会这样的问题。通常的解决方法就是:提供文本格式的配置文件的方式来配置参数。因此对于Fast-RTPS来说,除了支持通过代码配置参数,它也支持通过XML文件的方式来进行配置。
有了配置文件之后,在代码中直接读取就好了,例如:
Participant *participant = Domain::createParticipant("participant_xml_profile");
在这之后,如果需要调整配置,只需要修改配置文件,不用在改动代码,自然也不用重新编译。这对于项目部署是很重要的。
Fast-RTPS支持的配置项,以及这些配置项说明和默认值都可以到这个链接中查看:XML profiles。
Domain
RTPS中的通信参数者之间,通过Domain进行隔离。
同一时刻可能有多个Domain同时存在,一个Domain中可以包含任意数目的消息发送者和接受者。
其结构如下图所示:
开发者可以通过domainId来指定参与者所属Domain。
如果没有指定,默认的domainId = 80
。
发现
作为DDS的实现,Fast-RTPS提供了Publisher和Subscriber自动发现和匹配的功能。在实现上,这分为两个步骤来完成:
- Participant Discovery Phase (PDP):在这个阶段,参与者互相通知彼此的存在。为了达到这个目的,每个参与者需要定时发送公告消息。公告消息通过周知的多播地址和端口发送(根据domain计算得到)。
- Endpoint Discovery Phase (EDP):在这个阶段,Publisher和Subscriber互相确认。为此,参与者使用在PDP期间建立的通信通道,彼此共享有关其发布者和订阅者的信息。 该信息包含了Topic和数据类型。为了使两个端点匹配,它们的Topic和数据类型必须一致。 一旦发布者和订阅者匹配,他们就发送/接收数据了。
这两个阶段对应了两个独立的协议:
- Simple Participant Discovery Protocol:指定参与者如何在网络中发现彼此。
- Simple Endpoint Discovery Protocol:定义了已经互相发现的参与者交换信息的协议。
Fast-RTPS提供了四种发现机制:
- Simple:这是默认机制。它在PDP和EDP阶段均使用RTPS标准,因此可与任何其他DDS和RTPS实现兼容。
- Static:此机制在PDP阶段使用Simple Participant Discovery Protocol。如果所有发布者和订阅者的地址以及端口和主题信息是事先知道的,则允许跳过EDP阶段。
- Server-Client:这种发现机制使用集中式发现结构,由服务器充当发现机制的Hub。
- Manual:此机制仅与RTPSDomain层兼容。它禁用了PDP阶段,使用户可以使用其选择的任何外部元信息通道手动匹配和取消匹配RTPS参与者,读者和写者。
不同的发现机制具有一些共同的配置:
名称 | 描述 | 默认值 |
---|---|---|
Ignore Participant flags | 在必要的时候,可以选择忽略一些参与者。 例如:另一台主机上的,另一个进程的或者同一个进程的。 |
NO_FILTER |
Lease Duration | 指定远程参与者在多少时间内认为本地参与者还活着。 | 20 s |
Announcement Period | 指定参与者的PDP公告消息的周期。 | 3s |
关于发现机制的更多信息可以浏览这个链接:Discovery。
传输控制
Fast-RTPS实现了可插拔的传输架构,这意味着每一个参与者可以随时加入和退出。
在传输上,Fast-RTPS支持以下五种传输方式:
- UDPv4
- UDPv6
- TCPv4
- TCPv6
- SHM(Shared Memory)
默认的,当Participant
创建时,会自动的配置两个传输通道:
- SHM:用来与同一个机器上的参与者通信。
- UDPv4:同来与跨机器的参与者通信。
当然,开发者可以改变这个默认行为,通过C++接口或者XML配置文件都可以。
SHM要求所有参与者位于同一个系统上,它是借助了操作系统提供的共享内存机制实现。共享内存的好处是:支持大数据传输,减少了数据拷贝,并且也减少系统负载。因此通常情况下,使用SHM会获得更好的性能。使用SHM时,可以配置共享内存的大小。
网络通信包含了非常多的参数需要配置,例如:Buffer大小,端口号,超时时间等等。框架本身为参数设置了默认值,大部分情况下开发者不用调整它们。但是知道这些默认值是什么,在一些情况下可能会对分析问题有所帮助。关于这些配置的默认值,以及如果配置可以查看这个链接:Transport descriptors。
与UDP不同,TCP传输是面向连接的,因此,Fast-RTPS必须在发送RTPS消息之前建立TCP连接。TCP传输可以具有两种行为:充当TCP服务器或充当TCP客户端。服务器打开一个TCP端口以侦听传入的连接,然后客户端尝试连接到服务器。服务器和客户端的概念独立于RTPS概念,例如:Publisher,Subscriber,Reader或Writer。它们中的任何一个都可以用作TCP服务器或TCP客户端,因为这些实体仅用于建立TCP连接,而RTPS协议可以在该TCP连接上工作。
如果要使用TCP传输,开发者需要做更多的配置,关于这部分内容可以继续阅读官方文档,这里不再赘述。
FastRTPS代码示例
FastRTPS不仅有框架文档,API Reference,还有丰富的代码示例。
对于开发者来说,浏览这些代码可能是上手最快捷的方法。
你可以在这里浏览这些示例:Fast-RTPS/examples/C++/。
FASTRTPSGEN
FASTRTPSGEN是一个Java程序。用来为在Topic上传输的数据类型生成源码。
开发者通过接口描述语言(Interface Definition Language)定义需要传输的数据类型。然后通过FASTRTPSGEN生成C++编译需要的源文件。
可以通过下面的方法获取和编译FASTRTPSGEN。
git clone --recursive https://github.com/eProsima/Fast-RTPS-Gen.git
cd Fast-RTPS-Gen
gradle assemble
编译完成之后可执行文件位于./scripts/
目录。如果需要,可以将该路径添加到$PATH
变量中。
关于如何通过IDL定义数据类型请参见这里:Defining a data type via IDL。
以下面这个示例文件为例:
struct TestData
{
char char_type;
octet octet_type;
long long_type;
string string_type;
float float_array[4];
sequence<double> double_list;
};
我们将其保存到文件名为data_type.idl
的文件中。然后通过下面这条命令生成C++文件:
~/Fast-RTPS-Gen/scripts/fastrtpsgen data_type.idl
最后会得到下面四个文件:
data_type.cxx
data_type.h
data_typePubSubTypes.cxx
data_typePubSubTypes.h
前两个文件定义的是实际存储数据的结构,后两个文件定义的类是eprosima::fastrtps::TopicDataType
的子类。用来在参与者上注册类型:
/**
* Register a type in a participant.
* @param part Pointer to the Participant.
* @param type Pointer to the Type.
* @return True if correctly registered.
*/
RTPS_DllAPI static bool registerType(
Participant* part,
fastdds::dds::TopicDataType * type);
每一套通信系统中通常都会包含一个或多个自定义的数据类型。
发布者-订阅者层
可以通过 HelloWorldExample 来熟悉发布者-订阅者层接口。
该目录下文件列表如下:
-rw-r--r-- 1 paul staff 1.8K 3 16 13:36 CMakeLists.txt
-rw-r--r-- 1 paul staff 2.8K 3 16 13:36 HelloWorld.cxx
-rw-r--r-- 1 paul staff 6.1K 3 16 13:36 HelloWorld.h
-rw-r--r-- 1 paul staff 62B 3 16 13:36 HelloWorld.idl
-rw-r--r-- 1 paul staff 4.4K 3 16 13:36 HelloWorldPubSubTypes.cxx
-rw-r--r-- 1 paul staff 1.7K 3 16 13:36 HelloWorldPubSubTypes.h
-rw-r--r-- 1 paul staff 4.6K 3 16 13:36 HelloWorldPublisher.cpp
-rw-r--r-- 1 paul staff 1.7K 3 16 13:36 HelloWorldPublisher.h
-rw-r--r-- 1 paul staff 3.8K 3 16 13:36 HelloWorldSubscriber.cpp
-rw-r--r-- 1 paul staff 1.8K 3 16 13:36 HelloWorldSubscriber.h
-rw-r--r-- 1 paul staff 2.0K 3 16 13:36 HelloWorld_main.cpp
-rw-r--r-- 1 paul staff 3.1K 3 16 13:36 Makefile
-rw-r--r-- 1 paul staff 203B 3 16 13:36 README.txt
这其中:
- README.txt是工程说明
- CMakeLists.txt与Makefile是编译文件
- HelloWorld_main.cpp包含了生成可执行文件的
main
函数 - HelloWorld.idl是待传输的数据结构定义
- HelloWorld.h,HelloWorld.cxx,HelloWorldPubSubTypes.h和HelloWorldPubSubTypes.cxx是由HelloWorld.idl文件生成
- HelloWorldPublisher.h和HelloWorldPublisher.cpp是发布者的实现
- HelloWorldSubscriber.h和HelloWorldSubscriber.cpp是订阅者的实现
熟悉一个C++工程可以先从main
入手,HelloWorld_main.cpp中的主要逻辑就是根据用户输入的参数是"publisher"
还是"subscriber"
来确定启动哪个模块。
switch(type)
{
case 1:
{
HelloWorldPublisher mypub;
if(mypub.init())
{
mypub.run(count, sleep);
}
break;
}
case 2:
{
HelloWorldSubscriber mysub;
if(mysub.init())
{
mysub.run();
}
break;
}
}
接下来我们直接看HelloWorldPublisher和HelloWorldSubscriber就好。
HelloWorldPublisher::init
中主要是为Publisher的对象设置参数:
bool HelloWorldPublisher::init()
{
m_Hello.index(0);
m_Hello.message("HelloWorld");
ParticipantAttributes PParam;
PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true;
PParam.rtps.builtin.domainId = 0;
PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
PParam.rtps.setName("Participant_pub");
mp_participant = Domain::createParticipant(PParam);
if(mp_participant==nullptr)
return false;
//REGISTER THE TYPE
Domain::registerType(mp_participant,&m_type);
//CREATE THE PUBLISHER
PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = "HelloWorld";
Wparam.topic.topicName = "HelloWorldTopic";
Wparam.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
Wparam.topic.historyQos.depth = 30;
Wparam.topic.resourceLimitsQos.max_samples = 50;
Wparam.topic.resourceLimitsQos.allocated_samples = 20;
Wparam.times.heartbeatPeriod.seconds = 2;
Wparam.times.heartbeatPeriod.nanosec = 200*1000*1000;
Wparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
mp_publisher = Domain::createPublisher(mp_participant,Wparam,(PublisherListener*)&m_listener);
if(mp_publisher == nullptr)
return false;
return true;
}
这里的参数配置请参阅API说明:
- RTPSParticipantAttributes
- BuiltinAttributes
- PublisherAttributes
- TopicAttributes
- WriterQos
- WriterTimes
- WriterQos
Publisher发送消息的逻辑很简单:
bool HelloWorldPublisher::publish(bool waitForListener)
{
if(m_listener.firstConnected || !waitForListener || m_listener.n_matched>0)
{
m_Hello.index(m_Hello.index()+1);
mp_publisher->write((void*)&m_Hello);
return true;
}
return false;
}
注意,这里write
的对象是通过idl文件生成的类型。
Subscriber的初始化和Publisher是类似的:
bool HelloWorldSubscriber::init()
{
ParticipantAttributes PParam;
PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true;
PParam.rtps.builtin.domainId = 0;
PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
PParam.rtps.setName("Participant_sub");
mp_participant = Domain::createParticipant(PParam);
if(mp_participant==nullptr)
return false;
//REGISTER THE TYPE
Domain::registerType(mp_participant,&m_type);
//CREATE THE SUBSCRIBER
SubscriberAttributes Rparam;
Rparam.topic.topicKind = NO_KEY;
Rparam.topic.topicDataType = "HelloWorld";
Rparam.topic.topicName = "HelloWorldTopic";
Rparam.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
Rparam.topic.historyQos.depth = 30;
Rparam.topic.resourceLimitsQos.max_samples = 50;
Rparam.topic.resourceLimitsQos.allocated_samples = 20;
Rparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
Rparam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
mp_subscriber = Domain::createSubscriber(mp_participant,Rparam,(SubscriberListener*)&m_listener);
if(mp_subscriber == nullptr)
return false;
return true;
}
当然,Subscriber有自己的配置参数类型:
需要注意的是,Subscriber与Publisher的通信是建立在Topic上的,因此对于Topic标识的配置要保持一致:
Wparam.topic.topicDataType = "HelloWorld";
Wparam.topic.topicName = "HelloWorldTopic";
有了Topic的这个抽象概念,使得Subscriber与Publisher不用物理地址上有任何关联,也屏蔽了硬件和操作系统的差异:同样的代码,其编译产物可以一个跑在x86的Mac系统上,一个跑在ARM架构的Android设备上。
Subscriber通过void HelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber* sub)
方法来处理接受到的数据。在示例的实现中,就是将消息体打印出来:
void HelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber* sub)
{
if(sub->takeNextData((void*)&m_Hello, &m_info))
{
if(m_info.sampleKind == ALIVE)
{
this->n_samples++;
// Print your structure data here.
std::cout << "Message "<<m_Hello.message()<< " "<< m_Hello.index()<< " RECEIVED"<<std::endl;
}
}
}
Publisher与Subscriber各自有一些回调,开发者可以利用它们来进行需要的处理:
回调 | Publisher | Subscriber |
---|---|---|
onNewDataMessage | - | √ |
onSubscriptionMatched | - | √ |
on_requested_deadline_missed | - | √ |
on_liveliness_changed | - | √ |
onPublicationMatched | √ | - |
on_offered_deadline_missed | √ | - |
on_liveliness_lost | √ | - |
读者-写者层
读者-写者层是相对于发布者-订阅者层更底层的API。
它提供了更多的控制,但也意味着使用起来会稍微麻烦一些。
两个层次在几个核心概念上存在一一对应的关系,如下表所示:
Publisher-Subscriber Layer | Writer-Reader Layer |
---|---|
Domain | RTPSDomain |
Participant | RTPSParticipant |
Publisher | RTPSWriter |
Subscriber | RTPSReader |
如果你浏览Fast-RTPS的源码,你会发现其实发布者-订阅者层的实现就是依赖读者-写者层的。
想要很快的熟悉读者-写者层的使用可以浏览下面三个代码示例:
RTPSParticipant,RTPSWriter和RTPSReader都通过RTPSDomain创建。
相对于发布者-订阅层不一样的是,这一层不支持通过XML的形式配置参数。开发者必须通过代码的形式配置所有的参数,例如:
//CREATE PARTICIPANT
RTPSParticipantAttributes PParam;
PParam.builtin.discovery_config.discoveryProtocol = eprosima::fastrtps::rtps::DiscoveryProtocol::SIMPLE;
PParam.builtin.use_WriterLivelinessProtocol = true;
mp_participant = RTPSDomain::createParticipant(PParam);
if(mp_participant==nullptr)
return false;
//CREATE WRITERHISTORY
HistoryAttributes hatt;
hatt.payloadMaxSize = 255;
hatt.maximumReservedCaches = 50;
mp_history = new WriterHistory(hatt);
//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
mp_writer = RTPSDomain::createRTPSWriter(mp_participant,watt,mp_history,&m_listener);
这里的逻辑主要就是设置参数和创建RTPSParticipant,RTPSWriter对象。并且,RTPSParticipant将被用来注册RTPSWriter:
TopicAttributes Tatt;
Tatt.topicKind = NO_KEY;
Tatt.topicDataType = "string";
Tatt.topicName = "exampleTopic";
ReaderQos Rqos;
return mp_participant->registerReader(mp_reader, Tatt, Rqos);
在RTPS协议中,Reader和Writer将有关Topic的数据保存在其关联的历史记录中。每个数据段都由一个变更表示,对应的实现是CacheChange_t
。
更改通过历史记录管理。读者和写者的历史是两种类型:
eprosima::fastrtps::rtps::WriterHistory
;eprosima::fastrtps::rtps::ReaderHistory
;
对于Writer来说,发送消息是往历史中添加变更:
//Request a change from the history
CacheChange_t* change = writer->new_change([]() -> uint32_t { return 255;}, ALIVE);
//Write serialized data into the change
change->serializedPayload.length = sprintf((char*) change->serializedPayload.data, "My example string %d", 2)+1;
//Insert change back into the history. The Writer takes care of the rest.
history->add_change(change);
而对于Reader来说,新消息会被放入到历史中,读取完了可以将其删除:
void TestReaderRegistered::MyListener::onNewCacheChangeAdded(
RTPSReader* reader,
const CacheChange_t* const change)
{
printf("Received: %s\n", change->serializedPayload.data);
reader->getHistory()->remove_change((CacheChange_t*)change);
n_received++;
}
框架会根据消息触发Reader的回调。
持久化
默认情况下,Writer的历史在其生命周期以内可以被Reader访问。这意味着,一旦Writer退出,则其历史就没有了。但如果需要,你可以配置持久化,这使得即便Writer重启了,仍然可以维护早先的历史。
使用持久化功能可以保护端点的状态免受意外故障的影响,因为端点在重新启动后仍会继续通信,就像它们刚从网络断开连接一样。
你可以通过RTPSTest_persistent这个示例来了解如何使用这个功能。
要使用持久化功能,Writer和Reader需要进行以下设置:
durabilityKind
设置为TRANSIENT
persistence_guid
不能是全0- 为Writer,Reader或者RTPSParticipant设置持久化插件。目前内置的插件是SQLITE3。
下面是一段代码示例:
PropertyPolicy property_policy;
property_policy.properties().emplace_back("dds.persistence.plugin", "builtin.SQLITE3");
property_policy.properties().emplace_back("dds.persistence.sqlite3.filename", "test.db");
//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
watt.endpoint.durabilityKind = TRANSIENT;
watt.endpoint.persistence_guid.guidPrefix.value[11] = 1;
watt.endpoint.persistence_guid.entityId.value[3] = 1;
watt.endpoint.properties = property_policy;
mp_writer = RTPSDomain::createRTPSWriter(mp_participant, watt, mp_history, &m_listener);
durabilityKind
参数定义了Writer与新Reader匹配时对于已发送的数据的行为,该参数有三个选项:
- VOLATILE(默认值):丢掉所有已经发送的数据。
- TRANSIENT_LOCAL:保存最近发送的k条数据。
- TRANSIENT:与TRANSIENT_LOCAL类似,但是还将消息保存到持久化存储中。这就使得即便它的进程异常退出了,其数据不会丢失。
对于读者来说,其配置方法是类似的:
PropertyPolicy property_policy;
property_policy.properties().emplace_back("dds.persistence.plugin", "builtin.SQLITE3");
property_policy.properties().emplace_back("dds.persistence.sqlite3.filename", "test.db");
//CREATE READER
ReaderAttributes ratt;
Locator_t loc(22222);
ratt.endpoint.unicastLocatorList.push_back(loc);
ratt.endpoint.durabilityKind = TRANSIENT;
ratt.endpoint.persistence_guid.guidPrefix.value[11] = 2;
ratt.endpoint.persistence_guid.entityId.value[3] = 1;
ratt.endpoint.properties = property_policy;
mp_reader = RTPSDomain::createRTPSReader(mp_participant, ratt, mp_history, &m_listener);
QoS
使用Fast-RTPS,你有非常多的QoS策略可以配置。
它们主要可以分为下面几类:
durability
:定义了Writer与新Reader匹配时对于已发送的数据的行为,“持久化”一节已经提到过。liveliness
:定义Publisher的活跃程度。例如:多长时间发布一次公告消息。reliability
:定义消息的可靠性。它有两个选项:1、BEST_EFFORT
,发送消息时,接收者(订阅者)没有到达确认。速度快,但是消息可能会丢失。2、RELIABLE
,发送方(发布者)期望接收方(订阅者)进行到达确认。速度较慢,但可以防止数据丢失。partition
:可以在domain的物理分区上建立逻辑分区。deadline
:指定消息的更新频率,当新消息的频率降至某个阈值以下时,会发出警报。这对于需要定期更新数据的场景很有用。lifespan
:指定Publisher发布数据的最大有效期限。当使用寿命到期时,数据将从历史记录中删除。disablePositiveAcks
:指定是否需要取消确认消息。在不需要严格可靠的通信且带宽受限时,这么做可以减少网络流量。
在实现中,QoS包含了一系列的类,它们继承自QoSPolicy父类:
之所以提供如此多的选择,是因为不同的系统对于消息的质量有不同的要求。
在实际系统中,并非每个端点都需要本地存储所有数据。DDS在发送信息方面很聪明,如果消息不一定总是到达预期的目的地,则中间件将保证需要的可靠性。当系统发生更改时,Fast-RTPS会动态地找出将哪些数据发送到何处,并智能地将更改通知参与者。如果总数据量巨大,则DDS会智能过滤并仅发送每个端点真正需要的数据。当需要快速更新时,DDS发送多播消息以一次更新许多远程应用程序。当数据格式变更时,DDS会跟踪系统各个部分使用的版本并自动进行转换。对于安全性至关重要的应用程序,DDS控制访问,强制执行数据流路径并实时加密数据。
当系统要满足:以极高的速度,在动态,苛刻且不可预测的环境工作时,DDS的真正威力就会显现。
实操测试
文章的最后,我们通过实际运行程序来进行一些实验。虽然Fast-RTPS支持非常多的操作系统,但在Ubuntu系统上验证可能是最方便的。
Fast-RTPS是面向分布式系统的,这意味着在一个系统上验证它的功能意义不大。但另一方面我们大部分人并没有同时拥有多个设备。
在这种情况下,我们可以借助docker,它可以在同一个系统上运行多个独立的虚拟系统。然后我们就可以在这些独立的系统上进行测试了,这样就模拟了分布式的环境。
Fast-RTPS提供了包含依赖环境的Docker容器,我们只要下载和运行这些容器,就可以拥有多个独立的系统了。
不过,在这运行下面这些示例之前,你需要配置好docker环境。关于docker的基本使用已经超过了本文的范畴,你可以浏览这个链接:Install Docker Engine on Ubuntu。
Fast-RTPS需要的文件可以到官网下载:https://eprosima.com/index.php/downloads-all。
点击上面这个链接,然后输入个人信息就可以进入下载页面了。你可以选择最新版本的Docker和Fast-RTPS包进行下载:
考虑到国内的网络状况,下载的速度可能非常慢。我下载需要的文件耗费的好几个小时,为了节省你的时间,我已经将下载好的文件放在了这里:
- eProsima_FastRTPS-1.9.3-Linux.tgz:包含了FastRTPS的源码和编译命令,用来在Ubuntu系统上安装环境。
- ubuntu-fast-rtps v1.9.3.tar:已经预装了Fast-RTPS环境,可以在上面运行Fast-RTPS的程序,用来进行测试。
在Ubuntu系统中,先将eProsima_FastRTPS-1.9.3-Linux.tgz解压缩,为了编译它,还需要安装一些依赖,相关命令如下:
sudo apt install cmake g++
sudo apt install libasio-dev libtinyxml2-dev
mkdir fast-rtps
tar -xvf eProsima_FastRTPS-1.9.3-Linux.tgz -C fast-rtps/
cd fast-rtps/
chmod a+x install.sh
sudo ./install.sh
在这之后你就可以转到/fast-rtps/src/fastrtps/examples/目录下编译示例了。不过这个目录下的CMakeList.txt似乎存在问题,我在这个文件的开头增加了下面一行才完成编译:
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")
编译完成之后,我们并非是在Ubuntu系统上运行程序,而是将这些可执行文件放到docker容器中,以分布式的环境来运行它们。
所以需要启动docker容器:
$ docker load -i ubuntu-fast-rtps.tar
$ docker run -it ubuntu-fast-rtps:v1.9.3
你可以通过docker run -it ...
同时启动多个docker容器以进行测试(每个容器对应一个通信的参与者。当然,你需要同时打开多个shell窗口)。
例如我启动了两个docker容器:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2125504ee62f ubuntu-fast-rtps:v1.9.3 "/bin/bash" 5 minutes ago Up 5 minutes mystifying_jennings
b17517fefecd ubuntu-fast-rtps:v1.9.3 "/bin/bash" 23 minutes ago Up 23 minutes stoic_leavitt
运行docker run -it ...
之后会直接进入docker的shell中,你可以在根目录创建fastrtps
目录用来存放测试程序。
然后在Ubuntu系统上将编译出的示例程序拷贝到docker中:
sudo docker cp ./ b17517fefecd:/fastrtps
sudo docker cp ./ 2125504ee62f:/fastrtps
在这之后就可以转到docker容器的shell中运行测试程序了。
例如,在两个docker上运行HelloWorld的示例:
- 下面是Publisher程序:
root@b17517fefecd:/fastrtps/HelloWorldExample# ./HelloWorldExample publisher
Publisher running 10 samples.
Publisher matched
Message: HelloWorld with index: 1 SENT
Message: HelloWorld with index: 2 SENT
Message: HelloWorld with index: 3 SENT
Message: HelloWorld with index: 4 SENT
Message: HelloWorld with index: 5 SENT
Message: HelloWorld with index: 6 SENT
Message: HelloWorld with index: 7 SENT
Message: HelloWorld with index: 8 SENT
Message: HelloWorld with index: 9 SENT
Message: HelloWorld with index: 10 SENT
- 下面是Subscriber程序:
root@2125504ee62f:/fastrtps/HelloWorldExample# ./HelloWorldExample subscriber
Starting
Subscriber running. Please press enter to stop the Subscriber
Subscriber matched
Message HelloWorld 1 RECEIVED
Message HelloWorld 2 RECEIVED
Message HelloWorld 3 RECEIVED
Message HelloWorld 4 RECEIVED
Message HelloWorld 5 RECEIVED
Message HelloWorld 6 RECEIVED
Message HelloWorld 7 RECEIVED
Message HelloWorld 8 RECEIVED
Message HelloWorld 9 RECEIVED
Message HelloWorld 10 RECEIVED
Subscriber unmatched
接下来是Benchmark程序:
- Benchmark subscriber端:
root@b17517fefecd:/fastrtps/Benchmark# ./Benchmark subscriber
Subscriber running...
Subscriber matched
Publisher matched
Subscriber unmatched
Publisher unmatched
- Benchmark publisher端:
root@2125504ee62f:/fastrtps/Benchmark# ./Benchmark publisher
Publisher running...
Subscriber matched
Publisher matched. Test starts...
RESULTS after 10000 milliseconds:
COUNT: 53951
SAMPLES: 0,771,668,548,582,716,700,706,408,440,592,636,738,698,648,574,706,776,690,584,638,556,750,740,640,584,572,542,526,560,552,528,608,504,630,478,598,708,620,528,660,718,578,646,702,528,652,528,450,508,566,544,516,616,652,584,532,434,542,678,752,696,412,544,654,766,736,612,496,470,662,580,566,634,674,568,532,546,528,552,552,528,490,508,598,620,672,506,468,654,
在运行的过程中,你可以感受到借助Fast-RTPS,不同系统上的参与者是多么快速的发现了对方并完成了通信的。 当然,你可以运行更多的用例,或者修改代码进行你想要的测试。
参考资料与推荐读物
- What is DDS?
- Where Can I Get DDS?
- PDF: What can DDS do for You?
- Data Distribution Services Performance Evaluation Framework
- Using DDS with TSN and Adaptive AUTOSAR
- Object Management Group: Data Distribution Service™
- DDS Interoperability Wire Protocol
- eProsima Fast RTPS Documentation
- RTPS Introduction
- eProsima Fast RTPS: PubSub Hello World
- Github: Fast-RTPS
- DDS in a Nutshell
- Data Distribution Service
- What’s the difference between DDS and SOME/IP?