FastDDS Transport功能模块初步整理

一. 总体结构

在这里插入图片描述

二. 主要类的功能

2.1 TransportDescriptor和TransportInterface

​ FastDDS中整个Transport类的设计遵循的是设计模式中的建造者模式,其中,TransportDescriptor就是建造者,而TransportInterface则是建造出来的产品。

​ TransportDescriptor是抽象类,申明了后续实现类必须实现的最终要的一个函数,也是建造者需要完成的主要工作,就是建造出实际的Transport对象:

struct RTPS_DllAPI TransportDescriptorInterface
{  .../*** Factory method pattern. It will create and return a TransportInterface* corresponding to this descriptor. This provides an interface to the NetworkFactory* to create the transports without the need to know about their type*/virtual TransportInterface* create_transport() const = 0;...
};

​ TransportInterface定义了Transort的接口,所有的Transport都必须实现该接口,根据传输方式的不同有TCPTransportInterface, UDPTransportInterface和SharedMemTransport,而根据TCP/UDP版本的不同,前两者分别有V4和V6两个派生类。
在这里插入图片描述
TransportInterface接口类的初衷是为了在FastRTPS中隔离传输层的实现,用户需要实现基于特定传输层协议(TCP, UDP, SharedMem)的通道和对应地址之间的代码逻辑。TransportInterface接口类中申明了实现类必须实现的一些接口(init,IsInputChannelOpen,IsLocatorSupported, is_locator_allowed, RemoteToMainLocal,transform_remote_locator等),其中最重要的两个接口是OpenOutputChannelOpenInputChannel这两个接口函数,前者会创建SenderResource而后者创建ReceiverResource。

2.2 TCPTransportInterface和UDPTransportInterface

2.2.1 TCPTransportInterface:

​ TCPTransportInterface定义并且实现了基于TCP传输层的通信操作,主要接口都在TCPTransportInterface中,另外还有一个辅助类TCPAcceptor用于实现TCP独有的连接操作,接收客户端的连接。

​ TCPTransoprtInterface中的perform_listen_operation函数实现基于TCP传输层的数据接收操作,send函数实现基于TCP传输层的数据发送操作,在初始化TCPTransportInterface的时候,如果发现TCPTransportDescriptor中定义了listen_port,那么会额外创建TCPAcceptBasic对象:

TCPv4Transport::TCPv4Transport(const TCPv4TransportDescriptor& descriptor): TCPTransportInterface(LOCATOR_KIND_TCPv4), configuration_(descriptor)
{...// 如果TCPv4TransportDescriptor配置了listening_ports监听端口,则创建TCPAcceptorBasicfor (uint16_t port : configuration_.listening_ports){Locator locator(LOCATOR_KIND_TCPv4, port);create_acceptor_socket(locator);}...
}

​ TCPAcceptBasic可以理解为对于listen socket的封装,主要执行listen操作,返回建立连接了的client socket给到TCPTransoprtInterface:

void TCPAcceptorBasic::accept(TCPTransportInterface* parent)
{...acceptor_.async_accept(socket_,[parent, locator, this](const std::error_code& error){if (!error){auto socket = std::make_shared<tcp::socket>(std::move(socket_));  // 返回建立连接的客户端socketparent->SocketAccepted(socket, locator, error); //TCPTransoprtInterface根据该socket创建ChannelResource}});
}
2.2.2 UDPTransportInterface:

​ UDPTRansportInterface的接口结构比TCPTransportInterface要简单很多,并且接口函数也会少许多,因为没有TCP的连接操作,此外,UDPTransportInterface中没有实现perform_listen_operation接口(UDP的perform_listen_operation接口在UDPChannelResource中实现),send函数实现基于UDP传输层的数据发送操作。

2.2.3 OpenInputChannel和OpenOutputChannel

​ 在2.1 TransportInterface中说过,其最重要的接口就是OpenInputChannel和OpenOutputChannel,下面以UDPTransportInterface来看下这两个接口实现了什么功能。

​ 首先,这两个接口的名字中都带有Channel,但是不要被名字误导,认为这两个接口都会和下面的ChannelResource打交道,从代码看,只有InputChannel才会创建ChannelResource,而OutputChannel创建的是SenderResource。

OpenInputChannel:

bool UDPv4Transport::OpenInputChannel(const Locator& locator,TransportReceiverInterface* receiver,uint32_t maxMsgSize) {...if (!IsInputChannelOpen(locator)){success = OpenAndBindInputSockets(locator, receiver, IPLocator::isMulticast(locator), maxMsgSize);}... // 如果是组播地址,下面还会创建额外的ChannelResource,同时将网卡加入组播地址
}bool UDPTransportInterface::OpenAndBindInputSockets(...) {...std::vector<std::string> vInterfaces = get_binding_interfaces_list(); // 获取白名单网卡列表for (std::string sInterface : vInterfaces){   // 在每张网卡上创建InputChannelResource(一般来说用户会通过白名单限制用于DDS通信的网卡,不太会出现在多个网卡上创建的情况)UDPChannelResource* p_channel_resource;p_channel_resource = CreateInputChannelResource(sInterface, locator, is_multicast, maxMsgSize, receiver);mInputSockets[IPLocator::getPhysicalPort(locator)].push_back(p_channel_resource);}...
}UDPChannelResource* UDPTransportInterface::CreateInputChannelResource(...) {// 创建接收数据用的socketeProsimaUDPSocket unicastSocket = OpenAndBindInputSocket(sInterface,IPLocator::getPhysicalPort(locator), is_multicast);// 创建ChannelResource,ChannelResource包装传入的socket,并且对外提供接收数据的统一接口UDPChannelResource* p_channel_resource = new UDPChannelResource(this, unicastSocket, maxMsgSize, locator,sInterface, receiver);return p_channel_resource;
}

OpenOutputChannel:

bool UDPTransportInterface::OpenOutputChannel(SendResourceList& sender_resource_list,   // 函数中创建的SenderResource放在这个list中返回给上层const Locator& locator)
{...if (is_interface_whitelist_empty()) { ... } // 没有配置网卡白名单,会在0.0.0.0地址上创建socket和SenderResourceelse {// 获取网卡列表(需要剔除已经在locator参数上创建SenderResource的网卡)get_unknown_network_interfaces(sender_resource_list, locNames, true);for (const auto& infoIP : locNames){   // 创建用于发送数据的socketeProsimaUDPSocket unicastSocket = OpenAndBindUnicastOutputSocket(generate_endpoint(infoIP.name, port), port);SetSocketOutboundInterface(unicastSocket, infoIP.name);  // 设置多播数据的发送接口...sender_resource_list.emplace_back(  // 创建SenderResource,用于包装上面创建的发送socket的操作接口static_cast<SenderResource*>(new UDPSenderResource(*this, unicastSocket, false, true)));}}...
}

2.3 ChannelResource, TCPChannelResource, UDPChannelResource

​ ChannelResource包装了用于接口数据的socket,并且对外提供相对统一的操作接口(针对不同的传输类型还是有所区别的)
在这里插入图片描述
​ ChannelResource是接口类,主要提供了用于接收数据使用的线程成员thread_以及保存接收到的消息的message_buffer_成员,后续可以看到transportinterface的perform_listen_operation函数就是在这个thread_成员上运行的。

class ChannelResource {...inline void thread(std::thread&& pThread){// 初始化接收数据的线程thread_...}...
};UDPChannelResource::UDPChannelResource(...): ChannelResource(maxMsgSize)...
{	//指定thread_线程运行perform_listen_operation函数thread(std::thread(&UDPChannelResource::perform_listen_operation, this, locator));
}

​ TCPChannelResource和UDPChannelResource两者都包含了用于接收数据使用的socket对象,除此以外,两者的接口差距还是挺大的(TCP和UDP的通信流程本来就有区别),TCPChannelResource的接口比较多,实现也很复杂,并且不属于DDS协议中约定的默认通信方式,因此不需要太关注TCP的视线。

​ 这里主要来看UDPChannelResource的接口,UDPChannelResource的接口很简单,就是perform_listen_operation:

void UDPChannelResource::perform_listen_operation(Locator input_locator) {// set thread name for debugpthread_setname_np(pthread_self(), "UDPChannel");   // thread_的线程名称为UDPChannelLocator remote_locator;while (alive()){// Blocking receive.auto& msg = message_buffer();if (!Receive(msg.buffer, msg.max_size, msg.length, remote_locator)) // Receive函数调用socket的recv_from接收数据(阻塞式){continue;}// 将收到的消息交个MessageReceiver去处理(通过message_receiver接口主导到UDPChannelResource中)...}
}

2.4 SenderResource, UDPSenderResource

​ SenderResource从名字看就是用于发送数据的,SenderResource是接口类,其中最重要的一个接口就是send,send内部调用了send_lambda_这个std::function类型的成员,send_lambda_成员有SenderResource的派生类来实现:
在这里插入图片描述
​ 对于UDPSenderResource来说,send_lambda_函数对象依赖UDPTransportInterface的接口来完成最终的数据发送:

send_lambda_ = [this, &transport](...) -> bool{	// 调用UDPTransportInterface::send接口, 并且将socket对象传入return transport.send(data, dataSize, socket_, destination_locators_begin,destination_locators_end, only_multicast_purpose_, whitelisted_,max_blocking_time_point);};

[!TIP]

读到这段代码的时候,不太理解为什么不在UDPSenderResource的send_lambda_中直接实现send的逻辑代码,反而要依赖UDPTransportInterface来实现。个人理解是不是因为SenderResource这个类主要功能还是体现在Resource上,说明其只是一个用来保管发送socket的资源类而已,个人猜想。

​ RTPSParticipantImpl是SenderResource的容器,保存了其创建的所有SenderResource,在发送数据的时候会调用这些SenderResource的接口:

bool sendSync(...) {...for (auto& send_resource : send_resource_list_)  // send_resource_list_中保存了该Participant创建的所有SenderResource{...send_resource->send(msg->buffer, msg->length, &locators_begin, &locators_end, max_blocking_time_point);}...
}

​ 第三章节中的3.2发送数据中会说明SenderResource的send接口是怎么被调用到的,调用栈是如何的。

2.5 ReceiverResource, MessageReciever

​ ReceiverResource实现TransportReceiverInterface接口,TransportReceiverInterface这个接口的用途从名字看就是用于接收Transport传输层收到数据的接口,其实现了TransoprtReceiverInterface接口中的OnDataReceived函数接口,OnDataReceived函数接口在UDPChannelResource的数据接收接口perform_listen_operation函数中被调用:

void UDPChannelResource::perform_listen_operation(Locator input_locator) {...// Processes the data through the CDR Message interface.if (message_receiver() != nullptr){message_receiver()->OnDataReceived(msg.buffer, msg.length, input_locator, remote_locator);}...
}

​ ReceiverResource的OnDataReceived接口实现非常简单,就是将Transoprt收到的CDRMessage交给MessageReceiver,MessageReceiver会对该CDRMessage进行进一步细化的处理,看过DDS协议文档的应该知道一个完整的DDS Message包括Header和Submessages两部分,其中Submessages中包含不止一种子消息(HeartBeat,GAP,Timestamp,Data, DataFrag等等)。
在这里插入图片描述
而MessageReceiver的作用就是解析ReceiverResource传递过来的CDRMessage,解析其中的Header以及各个子消息,可以看到MessageReceiver中定义了各种proess函数用来处理不同的子消息:
在这里插入图片描述
​ 这些proc函数中最重要的一个函数是process_data_message_without_security(假设没有开启TLS功能,开启的话就是with_security),该函数将Data子消息中的payload交给RTPSReader进行处理:

void MessageReceiver::process_data_message_without_security(const EntityId_t& reader_id,CacheChange_t& change)
{auto process_message = [&change](RTPSReader* reader){reader->processDataMsg(&change);};findAllReaders(reader_id, process_message); // 找到对应EntityID的RTPSReader,调用其processDataMsg处理Data子消息
}

2.6 LocatorSelector, LocatorSelectorSender和LocatorSelectorEntry

LocatorSelectorSender: Class used by writers to inform a RTPSMessageGroup object which remote participants will be addressees of next RTPS submessages. (Writer使用LocatorSelectSender告知RTPSMessageGroup下一包要发送的RTPS消息要发给哪些远端Particiant)

LocatorSelector: A class used for the efficient selection of locators when sending data to multiple entities. (当发送数据给不同的远端Reader时,该类可以协助选择合适的Locator地址进行发送)

LocatorSelectorEntry: This class holds the locators of a remote endpoint along with data required for the locator selection algorithm. (该类报错了某个远端Endpiont的地址信息,地址信息回被用在locator选择上)

​ LocatorSelector内部对于每一个匹配的远端RTPSReader,都维护了其Locator信息在LocatorSelectorEntry中。然后LocatorSelector是LocatorSelectorSender的内部成员,而LocatorSelectorSender是每个StatelessWriter/StatefulWriter的内部成员,当Writer匹配到远端Reader的时候,会将远端Reader的Locator信息(以LocatorSelectorEntry的方式)更新到LocatorSelectorSender的LocatorSelector成员中:

bool StatelessWriter::matched_reader_add(...) {...filter_remote_locators(*new_reader->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);locator_selector_.locator_selector.add_entry(new_reader->general_locator_selector_entry());...
}

​ 而在RTPSWriter发送RTPSMessage的时候,则可以根据locator_selector_.locator_selector找到目前还存活的匹配的远端RTPSRerader的LocatorSelectorEntry,取出其中的Locator作为消息的发送目的地:

DeliveryRetCode StatelessWriter::deliver_sample_nts(...) {...locator_selector.locator_selector.reset(true);  //重新选择目前还存活的RTPSReader的LocatorSelectorEntry作为目标地址集合size_t num_locators = locator_selector.locator_selector.selected_size() + fixed_locators_.size();if (0 < num_locators) {...if (group.add_data(*cache_change, is_inline_qos_expected_)) {  // 向RTPSMessageGroup添加Data_Submessage...}}
}bool LocatorSelectorSender::send(CDRMessage_t* message,std::chrono::steady_clock::time_point max_blocking_time_point) const
{return writer_.send_nts(message, *this, max_blocking_time_point);
}bool RTPSWriter::send_nts(...) {RTPSParticipantImpl* participant = getRTPSParticipant();// 向LocatorSelectorSender.LocatorSelector中每一个有效的RTPSReader的Locator发送消息return locator_selector.locator_selector.selected_size() == 0 ||participant->sendSync(message, m_guid, locator_selector.locator_selector.begin(),locator_selector.locator_selector.end(), max_blocking_time_point);
}

三. 主要流程

3.1 初始化

​ 初始化流程中,RTPSParticipantImpl和各种Endpoint会创建各个SenderResource/ReceiverResource,这里主要梳理一下会创建哪些Resource,在什么时候创建的:
在这里插入图片描述
在这里插入图片描述
​ 对于RecevierResource创建的socket,其端口一般是有固定的计算规则的,根据domainid,participantid以及是地址是否为组播地址可以算出固定的端口。

​ 对于SenderResource创建的socket,则是随机绑定未使用的端口,而且因为每个RTPSWriter知道匹配的Reader信息(保存在ReaderProxy/ReaderLocator中),因此,socket绑定的IP都是本地网卡的IP,然后发送的时候使用sendto发送到Reader的地址和端口上去就行了。

3.2 发送数据

从RTPS层看,发送数据时,我们一般向RTPSWriter索取一个CacheChange,然后将要发送的数据填充到CacheChange的payload中,最后将这个CacheChange加入到WriterHistory中。

​ 从我们将数据填充到CacheChange到通过SenderResource关联的socket发送到对端的大致流程如下:
在这里插入图片描述

3.3 接收数据

​ 接收数据的工作从UDPChannelResource的thread_线程开始的,前面说到过,UDPChannelResource的thread_线程被用来运行perform_listen_operation函数,该函数中调用关联的socket执行receive_from操作,从绑定的Locator上读取数据,读取到的数据通过MessageReceiver,RTPSParticipantImpl最终到达RTPSReader手上:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/905339.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zabbix最新版本7.2超级详细安装部署(一)

如果文章对你有用&#xff0c;请留下痕迹在配置过程中有问题请及时留言&#xff0c;本作者可以及时更新文章 目录 1、提前准备环境 2、zabbix7.2安装部署 3、安装并配置数据库 4、为Zabbix server配置数据库 5、为Zabbix前端配置PHP 6、启动Zabbix server和agent进程 7、关闭防…

CodeBlocks调试报错

尝试打断点&#xff0c;并且点击红色箭头启动debugger时&#xff0c;控制台报错 Active debugger config: GDB/CDB debugger:Default Building to ensure sources are up-to-date Selecting target: Debug Adding source dir: C:\Users\Lenovo\Desktop\exercise\ Adding source…

Manus 开放注册:AI 智能体领域的新起点

2025 年 5 月 13 日成为了一个具有特殊意义的日子 —— 备受瞩目的 AI 智能体平台 Manus&#xff08;Manus&#xff09;正式宣布开放注册。这一消息犹如一颗重磅炸弹&#xff0c;瞬间在全球科技圈引起了广泛关注和热烈讨论。在此之前&#xff0c;Manus 一直以其独特的魅力和极高…

车载网关作为车辆网络系统的核心枢纽

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 钝感力的“钝”&#xff0c;不是木讷、迟钝&#xff0c;而是直面困境的韧劲和耐力&#xff0c;是面对外界…

俄罗斯方块算法2025.5.10

问题描述 俄罗斯方块&#xff08;Tetris&#xff09;作为风靡全球38年的现象级益智游戏&#xff0c;其简单易学但难于精通的特性使其成为游戏史上的不朽经典。以下是其核心游戏规则解析及我们的要求&#xff1a; 游戏界面由20行10列的可视区域组成&#xff0c;7种不同形状的四…

Femap许可网络配置

电磁仿真领域&#xff0c;Femap以其卓越的性能和广泛的应用场景&#xff0c;成为众多工程师和科研人员的首选工具。为了满足多用户协作的需求&#xff0c;Femap提供了灵活的网络配置方案。本文将详细介绍Femap许可网络配置的方法和优势&#xff0c;帮助您轻松实现多用户高效协作…

计算机视觉----时域频域在图像中的意义、傅里叶变换在图像中的应用、卷积核的频域解释

1、时域&#xff08;时间域&#xff09;——自变量是时间,即横轴是时间,纵轴是信号的变化。其动态信号x&#xff08;t&#xff09;是描述信号在不同时刻取值的函数。 2、频域&#xff08;频率域&#xff09;——自变量是频率,即横轴是频率,纵轴是该频率信号的幅度,也就是通常说…

主流高防服务器技术对比与AI防御方案实战

1. 高防服务器核心能力对比 当前市场主流高防服务商&#xff08;如阿里云、腾讯云、华为云&#xff09;的核心防御能力集中在流量清洗与静态规则防护&#xff0c;但面临以下挑战&#xff1a; 静态防御瓶颈&#xff1a;传统方案依赖预定义规则&#xff0c;对新型攻击&#xff…

常时间运行的程序 导致系统卡顿 自动监控系统CPU和内存利用率 自动选择 内存回收 软件重启 电脑重启

长时间运行安防系统&#xff0c;导致CPU或内存利用率超80%&#xff0c;使得电脑变的缓慢、卡顿的问题。定时获取CPU和内存利用率的数据&#xff0c;在不同时间段&#xff08;如凌晨与平时&#xff09;&#xff0c;根据利用率的不同的阈值&#xff0c;进行&#xff1a;内存回收(…

OpenCV播放摄像头视频

OpenCV计算机视觉开发实践&#xff1a;基于Qt C - 商品搜索 - 京东 播放摄像头视频和播放视频文件类似&#xff0c;也是通过类VideoCapture来实现&#xff0c;只不过调用open的时候传入的是摄像头的索引号。如果计算机安装了一个摄像头&#xff0c;则open的第一个参数通常是0&…

操作系统:内存管理

目录 1、主要目标 2、核心概念和技术 2.1 物理内存与虚拟内存 2.2 内存分页机制 2.3 页面置换算法 3、监控与性能优化 3.1 查看物理内存 3.2 查看虚拟内存 3.3 性能问题 1> 内存不足&#xff08;OOM&#xff09; 2> 内存泄漏 3> 内存碎片 3.4 性能优化策…

专题四:综合练习( 找出所有子集的异或总和再求和)

以leetcode1863题为例 题目分析&#xff1a; 找到每个子集&#xff0c;然后子集中的元素异或之后全部相加 算法原理分析&#xff1a; 画决策树&#xff1a;第一层为这个子集有一个元素 第二层这个子集有两个元素 从上往下罗列&#xff0c;把所有子集都罗列出来&#xf…

【python】—conda新建python3.11的环境报错

1.报错 conda create -n py3.11 python3.11 --channel https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/ Collecting package metadata: done Solving environment: failed PackagesNotFoundError: The following packages are not available from current channel…

RabbitMQ事务机制

在RabbitMQ中&#xff0c;生产者为了确保消息发送成功&#xff0c;一种是使用 confirm 确认机制&#xff0c;另一种就是使用事务机制&#xff0c;事务机制就是允许生产者在发送消息时&#xff0c;将多个消息操作作为一个原子单元进行处理&#xff0c;要么所有操作都成功执行&am…

两台笔记本电脑直接通过HDMI线连接?

两台笔记本电脑直接通过HDMI线连接通常无法实现屏幕共享或数据传输&#xff0c;因为HDMI接口设计主要用于单向音视频输出(如连接显示器或电视)。以下是详细分析和替代方案&#xff1a; 为什么HDMI直连两台电脑不适用&#xff1f; 接口功能限制&#xff1a;• 大多数笔记本电脑的…

CentOS 和 RHEL

CentOS 和 RHEL&#xff08;Red Hat Enterprise Linux&#xff09;关系非常紧密&#xff0c;简而言之&#xff1a; CentOS 最初是 RHEL 的免费、开源克隆版&#xff0c;几乎与 RHEL 二进制兼容。 CentOS 原是 RHEL 的“免费双胞胎”&#xff0c;但已被放弃&#xff0c;现在推荐…

使用光标测量,使用 TDR 测量 pH 和 fF

时域反射计 &#xff08;TDR&#xff09; 是一种通常用于测量印刷电路板 &#xff08;PCB&#xff09; 测试试样和电缆阻抗的仪器。TDR 对于测量过孔和元件焊盘的电感和电容、探针尖端电容和电感&#xff0c;甚至寄生电感收发器耦合电容器也非常有用。这也是验证仿真或提取您自…

9.9 Ollama私有化部署Mistral 7B全指南:命令行交互到API集成全流程解析

Ollama私有化部署Mistral 7B全指南:命令行交互到API集成全流程解析 关键词:Ollama 私有化部署, Mistral 7B 运行, 本地大模型管理, 命令行交互, REST API 集成 一、Mistral 7B 模型特性解析 Mistral 7B 是由 Mistral AI 团队开发的高性能开源大语言模型,在同等参数量级模型…

vscode与keil的乱码不兼容问题

都用英文注释 中文注释的话&#xff0c;打开vscode的自动识别格式,如下 解决VSCode中文乱码 自动识别也可以设置识别优先级&#xff0c;把GB2312和UTF8排在自动识别序列前面(因为keil默认就是GB2312) 4.!!!在暂存更改的时候&#xff0c;不要把vscode的设置给暂存了&#xff…

大模型中的KV Cache

1. KV Cache的定义与核心原理 KV Cache&#xff08;Key-Value Cache&#xff09;是一种在Transformer架构的大模型推理阶段使用的优化技术&#xff0c;通过缓存自注意力机制中的键&#xff08;Key&#xff09;和值&#xff08;Value&#xff09;矩阵&#xff0c;避免重复计算&…