一. 總體結構
二. 主要類的功能
2.1 TransportDescriptor和TransportInterface
? FastDDS中整個Transport類的設計遵循的是設計模式中的建造者模式,其中,TransportDescriptor就是建造者,而TransportInterface則是建造出來的產品。
? TransportDescriptor是抽象類,申明了后續實現類必須實現的最終要的一個函數,也是建造者需要完成的主要工作,就是建造出實際的Transport對象:
struct RTPS_DllAPI TransportDescriptorInterface
{ .../*** Factory method pattern. It will create and return a TransportInterface* corresponding to this descriptor. This provides an interface to the NetworkFactory* to create the transports without the need to know about their type*/virtual TransportInterface* create_transport() const = 0;...
};
? TransportInterface定義了Transort的接口,所有的Transport都必須實現該接口,根據傳輸方式的不同有TCPTransportInterface, UDPTransportInterface和SharedMemTransport,而根據TCP/UDP版本的不同,前兩者分別有V4和V6兩個派生類。
TransportInterface接口類的初衷是為了在FastRTPS中隔離傳輸層的實現,用戶需要實現基于特定傳輸層協議(TCP, UDP, SharedMem)的通道和對應地址之間的代碼邏輯。TransportInterface接口類中申明了實現類必須實現的一些接口(init,IsInputChannelOpen,IsLocatorSupported, is_locator_allowed, RemoteToMainLocal,transform_remote_locator等),其中最重要的兩個接口是OpenOutputChannel和OpenInputChannel這兩個接口函數,前者會創建SenderResource而后者創建ReceiverResource。
2.2 TCPTransportInterface和UDPTransportInterface
2.2.1 TCPTransportInterface:
? TCPTransportInterface定義并且實現了基于TCP傳輸層的通信操作,主要接口都在TCPTransportInterface中,另外還有一個輔助類TCPAcceptor用于實現TCP獨有的連接操作,接收客戶端的連接。
? TCPTransoprtInterface中的perform_listen_operation函數實現基于TCP傳輸層的數據接收操作,send函數實現基于TCP傳輸層的數據發送操作,在初始化TCPTransportInterface的時候,如果發現TCPTransportDescriptor中定義了listen_port,那么會額外創建TCPAcceptBasic對象:
TCPv4Transport::TCPv4Transport(const TCPv4TransportDescriptor& descriptor): TCPTransportInterface(LOCATOR_KIND_TCPv4), configuration_(descriptor)
{...// 如果TCPv4TransportDescriptor配置了listening_ports監聽端口,則創建TCPAcceptorBasicfor (uint16_t port : configuration_.listening_ports){Locator locator(LOCATOR_KIND_TCPv4, port);create_acceptor_socket(locator);}...
}
? TCPAcceptBasic可以理解為對于listen socket的封裝,主要執行listen操作,返回建立連接了的client socket給到TCPTransoprtInterface:
void TCPAcceptorBasic::accept(TCPTransportInterface* parent)
{...acceptor_.async_accept(socket_,[parent, locator, this](const std::error_code& error){if (!error){auto socket = std::make_shared<tcp::socket>(std::move(socket_)); // 返回建立連接的客戶端socketparent->SocketAccepted(socket, locator, error); //TCPTransoprtInterface根據該socket創建ChannelResource}});
}
2.2.2 UDPTransportInterface:
? UDPTRansportInterface的接口結構比TCPTransportInterface要簡單很多,并且接口函數也會少許多,因為沒有TCP的連接操作,此外,UDPTransportInterface中沒有實現perform_listen_operation接口(UDP的perform_listen_operation接口在UDPChannelResource中實現),send函數實現基于UDP傳輸層的數據發送操作。
2.2.3 OpenInputChannel和OpenOutputChannel
? 在2.1 TransportInterface中說過,其最重要的接口就是OpenInputChannel和OpenOutputChannel,下面以UDPTransportInterface來看下這兩個接口實現了什么功能。
? 首先,這兩個接口的名字中都帶有Channel,但是不要被名字誤導,認為這兩個接口都會和下面的ChannelResource打交道,從代碼看,只有InputChannel才會創建ChannelResource,而OutputChannel創建的是SenderResource。
OpenInputChannel:
bool UDPv4Transport::OpenInputChannel(const Locator& locator,TransportReceiverInterface* receiver,uint32_t maxMsgSize) {...if (!IsInputChannelOpen(locator)){success = OpenAndBindInputSockets(locator, receiver, IPLocator::isMulticast(locator), maxMsgSize);}... // 如果是組播地址,下面還會創建額外的ChannelResource,同時將網卡加入組播地址
}bool UDPTransportInterface::OpenAndBindInputSockets(...) {...std::vector<std::string> vInterfaces = get_binding_interfaces_list(); // 獲取白名單網卡列表for (std::string sInterface : vInterfaces){ // 在每張網卡上創建InputChannelResource(一般來說用戶會通過白名單限制用于DDS通信的網卡,不太會出現在多個網卡上創建的情況)UDPChannelResource* p_channel_resource;p_channel_resource = CreateInputChannelResource(sInterface, locator, is_multicast, maxMsgSize, receiver);mInputSockets[IPLocator::getPhysicalPort(locator)].push_back(p_channel_resource);}...
}UDPChannelResource* UDPTransportInterface::CreateInputChannelResource(...) {// 創建接收數據用的socketeProsimaUDPSocket unicastSocket = OpenAndBindInputSocket(sInterface,IPLocator::getPhysicalPort(locator), is_multicast);// 創建ChannelResource,ChannelResource包裝傳入的socket,并且對外提供接收數據的統一接口UDPChannelResource* p_channel_resource = new UDPChannelResource(this, unicastSocket, maxMsgSize, locator,sInterface, receiver);return p_channel_resource;
}
OpenOutputChannel:
bool UDPTransportInterface::OpenOutputChannel(SendResourceList& sender_resource_list, // 函數中創建的SenderResource放在這個list中返回給上層const Locator& locator)
{...if (is_interface_whitelist_empty()) { ... } // 沒有配置網卡白名單,會在0.0.0.0地址上創建socket和SenderResourceelse {// 獲取網卡列表(需要剔除已經在locator參數上創建SenderResource的網卡)get_unknown_network_interfaces(sender_resource_list, locNames, true);for (const auto& infoIP : locNames){ // 創建用于發送數據的socketeProsimaUDPSocket unicastSocket = OpenAndBindUnicastOutputSocket(generate_endpoint(infoIP.name, port), port);SetSocketOutboundInterface(unicastSocket, infoIP.name); // 設置多播數據的發送接口...sender_resource_list.emplace_back( // 創建SenderResource,用于包裝上面創建的發送socket的操作接口static_cast<SenderResource*>(new UDPSenderResource(*this, unicastSocket, false, true)));}}...
}
2.3 ChannelResource, TCPChannelResource, UDPChannelResource
? ChannelResource包裝了用于接口數據的socket,并且對外提供相對統一的操作接口(針對不同的傳輸類型還是有所區別的)
? ChannelResource是接口類,主要提供了用于接收數據使用的線程成員thread_以及保存接收到的消息的message_buffer_成員,后續可以看到transportinterface的perform_listen_operation函數就是在這個thread_成員上運行的。
class ChannelResource {...inline void thread(std::thread&& pThread){// 初始化接收數據的線程thread_...}...
};UDPChannelResource::UDPChannelResource(...): ChannelResource(maxMsgSize)...
{ //指定thread_線程運行perform_listen_operation函數thread(std::thread(&UDPChannelResource::perform_listen_operation, this, locator));
}
? TCPChannelResource和UDPChannelResource兩者都包含了用于接收數據使用的socket對象,除此以外,兩者的接口差距還是挺大的(TCP和UDP的通信流程本來就有區別),TCPChannelResource的接口比較多,實現也很復雜,并且不屬于DDS協議中約定的默認通信方式,因此不需要太關注TCP的視線。
? 這里主要來看UDPChannelResource的接口,UDPChannelResource的接口很簡單,就是perform_listen_operation:
void UDPChannelResource::perform_listen_operation(Locator input_locator) {// set thread name for debugpthread_setname_np(pthread_self(), "UDPChannel"); // thread_的線程名稱為UDPChannelLocator remote_locator;while (alive()){// Blocking receive.auto& msg = message_buffer();if (!Receive(msg.buffer, msg.max_size, msg.length, remote_locator)) // Receive函數調用socket的recv_from接收數據(阻塞式){continue;}// 將收到的消息交個MessageReceiver去處理(通過message_receiver接口主導到UDPChannelResource中)...}
}
2.4 SenderResource, UDPSenderResource
? SenderResource從名字看就是用于發送數據的,SenderResource是接口類,其中最重要的一個接口就是send,send內部調用了send_lambda_這個std::function類型的成員,send_lambda_成員有SenderResource的派生類來實現:
? 對于UDPSenderResource來說,send_lambda_函數對象依賴UDPTransportInterface的接口來完成最終的數據發送:
send_lambda_ = [this, &transport](...) -> bool{ // 調用UDPTransportInterface::send接口, 并且將socket對象傳入return transport.send(data, dataSize, socket_, destination_locators_begin,destination_locators_end, only_multicast_purpose_, whitelisted_,max_blocking_time_point);};
[!TIP]
讀到這段代碼的時候,不太理解為什么不在UDPSenderResource的send_lambda_中直接實現send的邏輯代碼,反而要依賴UDPTransportInterface來實現。個人理解是不是因為SenderResource這個類主要功能還是體現在Resource上,說明其只是一個用來保管發送socket的資源類而已,個人猜想。
? RTPSParticipantImpl是SenderResource的容器,保存了其創建的所有SenderResource,在發送數據的時候會調用這些SenderResource的接口:
bool sendSync(...) {...for (auto& send_resource : send_resource_list_) // send_resource_list_中保存了該Participant創建的所有SenderResource{...send_resource->send(msg->buffer, msg->length, &locators_begin, &locators_end, max_blocking_time_point);}...
}
? 第三章節中的3.2發送數據中會說明SenderResource的send接口是怎么被調用到的,調用棧是如何的。
2.5 ReceiverResource, MessageReciever
? ReceiverResource實現TransportReceiverInterface接口,TransportReceiverInterface這個接口的用途從名字看就是用于接收Transport傳輸層收到數據的接口,其實現了TransoprtReceiverInterface接口中的OnDataReceived函數接口,OnDataReceived函數接口在UDPChannelResource的數據接收接口perform_listen_operation函數中被調用:
void UDPChannelResource::perform_listen_operation(Locator input_locator) {...// Processes the data through the CDR Message interface.if (message_receiver() != nullptr){message_receiver()->OnDataReceived(msg.buffer, msg.length, input_locator, remote_locator);}...
}
? ReceiverResource的OnDataReceived接口實現非常簡單,就是將Transoprt收到的CDRMessage交給MessageReceiver,MessageReceiver會對該CDRMessage進行進一步細化的處理,看過DDS協議文檔的應該知道一個完整的DDS Message包括Header和Submessages兩部分,其中Submessages中包含不止一種子消息(HeartBeat,GAP,Timestamp,Data, DataFrag等等)。
而MessageReceiver的作用就是解析ReceiverResource傳遞過來的CDRMessage,解析其中的Header以及各個子消息,可以看到MessageReceiver中定義了各種proess函數用來處理不同的子消息:
? 這些proc函數中最重要的一個函數是process_data_message_without_security(假設沒有開啟TLS功能,開啟的話就是with_security),該函數將Data子消息中的payload交給RTPSReader進行處理:
void MessageReceiver::process_data_message_without_security(const EntityId_t& reader_id,CacheChange_t& change)
{auto process_message = [&change](RTPSReader* reader){reader->processDataMsg(&change);};findAllReaders(reader_id, process_message); // 找到對應EntityID的RTPSReader,調用其processDataMsg處理Data子消息
}
2.6 LocatorSelector, LocatorSelectorSender和LocatorSelectorEntry
? LocatorSelectorSender: Class used by writers to inform a RTPSMessageGroup object which remote participants will be addressees of next RTPS submessages. (Writer使用LocatorSelectSender告知RTPSMessageGroup下一包要發送的RTPS消息要發給哪些遠端Particiant)
? LocatorSelector: A class used for the efficient selection of locators when sending data to multiple entities. (當發送數據給不同的遠端Reader時,該類可以協助選擇合適的Locator地址進行發送)
? LocatorSelectorEntry: This class holds the locators of a remote endpoint along with data required for the locator selection algorithm. (該類報錯了某個遠端Endpiont的地址信息,地址信息回被用在locator選擇上)
? LocatorSelector內部對于每一個匹配的遠端RTPSReader,都維護了其Locator信息在LocatorSelectorEntry中。然后LocatorSelector是LocatorSelectorSender的內部成員,而LocatorSelectorSender是每個StatelessWriter/StatefulWriter的內部成員,當Writer匹配到遠端Reader的時候,會將遠端Reader的Locator信息(以LocatorSelectorEntry的方式)更新到LocatorSelectorSender的LocatorSelector成員中:
bool StatelessWriter::matched_reader_add(...) {...filter_remote_locators(*new_reader->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);locator_selector_.locator_selector.add_entry(new_reader->general_locator_selector_entry());...
}
? 而在RTPSWriter發送RTPSMessage的時候,則可以根據locator_selector_.locator_selector找到目前還存活的匹配的遠端RTPSRerader的LocatorSelectorEntry,取出其中的Locator作為消息的發送目的地:
DeliveryRetCode StatelessWriter::deliver_sample_nts(...) {...locator_selector.locator_selector.reset(true); //重新選擇目前還存活的RTPSReader的LocatorSelectorEntry作為目標地址集合size_t num_locators = locator_selector.locator_selector.selected_size() + fixed_locators_.size();if (0 < num_locators) {...if (group.add_data(*cache_change, is_inline_qos_expected_)) { // 向RTPSMessageGroup添加Data_Submessage...}}
}bool LocatorSelectorSender::send(CDRMessage_t* message,std::chrono::steady_clock::time_point max_blocking_time_point) const
{return writer_.send_nts(message, *this, max_blocking_time_point);
}bool RTPSWriter::send_nts(...) {RTPSParticipantImpl* participant = getRTPSParticipant();// 向LocatorSelectorSender.LocatorSelector中每一個有效的RTPSReader的Locator發送消息return locator_selector.locator_selector.selected_size() == 0 ||participant->sendSync(message, m_guid, locator_selector.locator_selector.begin(),locator_selector.locator_selector.end(), max_blocking_time_point);
}
三. 主要流程
3.1 初始化
? 初始化流程中,RTPSParticipantImpl和各種Endpoint會創建各個SenderResource/ReceiverResource,這里主要梳理一下會創建哪些Resource,在什么時候創建的:
? 對于RecevierResource創建的socket,其端口一般是有固定的計算規則的,根據domainid,participantid以及是地址是否為組播地址可以算出固定的端口。
? 對于SenderResource創建的socket,則是隨機綁定未使用的端口,而且因為每個RTPSWriter知道匹配的Reader信息(保存在ReaderProxy/ReaderLocator中),因此,socket綁定的IP都是本地網卡的IP,然后發送的時候使用sendto發送到Reader的地址和端口上去就行了。
3.2 發送數據
從RTPS層看,發送數據時,我們一般向RTPSWriter索取一個CacheChange,然后將要發送的數據填充到CacheChange的payload中,最后將這個CacheChange加入到WriterHistory中。
? 從我們將數據填充到CacheChange到通過SenderResource關聯的socket發送到對端的大致流程如下:
3.3 接收數據
? 接收數據的工作從UDPChannelResource的thread_線程開始的,前面說到過,UDPChannelResource的thread_線程被用來運行perform_listen_operation函數,該函數中調用關聯的socket執行receive_from操作,從綁定的Locator上讀取數據,讀取到的數據通過MessageReceiver,RTPSParticipantImpl最終到達RTPSReader手上: