前言
計算框架是自動駕駛系統中的重中之重,也是整個系統得以高效穩定運行的基礎。爲了實時地完成感知、決策和執行,系統須要一系列的模塊相互緊密配合,高效地執行任務流。因爲各類緣由,這些模塊可能位於不一樣進程,也可能位於不一樣機器。這就要求計算框架中具備靈活的、高性能的通訊機制。咱們知道,Apollo在3.5版本中推出了Cyber RT替代了原先的ROS。以前寫過兩篇相關的文章介紹了其中的調度部分:《自動駕駛平臺Apollo 3.5閱讀手記:Cyber RT中的協程(Coroutine)》和《自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的任務調度》。今天就來聊一下其中的另外一重要部分-通訊系統。node
和ROS & ROS2中相似,Cyber RT中支持兩種數據交換模式:一種是Publish-Subscribe模式,經常使用於數據流處理中節點間通訊。即發佈者(Publisher)在channel(ROS中對應地稱爲topic)上發佈消息,訂閱該channel的訂閱者(Subscriber)便會收到消息數據;另外一種就是常見的Service-Client模式,經常使用於客戶端與服務端的請求與響應。本質上它是能夠基於前者實現的。Node
是整個數據拓撲網絡中的基本單元。一個Node
中能夠建立多個讀者/寫者,服務端/客戶端。讀者和寫者分別對應Reader
和Writer
,用於Publish-Subscribe模式。服務端和客戶端分別對應Service
和Client
,用於Service-Client模式。後端
實現解析
自動駕駛系統中的各個處理模塊基本都是實現爲Component
。一個Component
中包含一個Node
,另外會根據須要建立和管理Writer
,Reader
,Service
和Client
。這些用於通訊的類下面基於Trasmitter
和Receiver
類。前者用於數據發送,後者用於數據接收。它們是數據傳輸層的抽象,之下可有多個傳輸層實現用於不一樣場景下的傳輸。如對於Trasmitter
有IntraTransmitter
,ShmTransmitter
,RtpsTransmitter
和HybridTransmitter
。對於Receiver
也是相似的。其中RTPS後端基於Fast RTPS。Fast RTPS是DDS(Data Distribution Service)標準的一個很是流行的開源實現。DDS標準提供了一個平臺無關的數據模型,主要用於實時分佈式系統。不一樣的實現能夠相互通訊。整個通訊系統的架構層次圖以下。
數組
下面咱們就從幾個方面深刻地看下它們的實現機制。網絡
服務發現與拓撲管理
首先來看下比較基礎與核心的服務發現與拓撲管理。其實現主要在目錄cyber/service_discovery/
下。節點間經過讀和寫端創建數據通路。以channel爲邊,這樣能夠獲得一個數據流圖絡。因爲節點可能退出,訂閱狀況也可能發生改變,因此這個網絡是動態的。所以須要對網絡拓撲進行監控。數據結構
主要負責這件事的數據結構是TopologyManager
,它是個單例,由於每一個進程只要有一個來負責監控網絡拓撲就能夠了。TopologyManager
有三個子管理器,並有共同的基類Manager
。它們分別爲:
- NodeManager
用於管理網絡拓撲中的節點。
- ChannelManager
用於管理channel,即網絡拓撲中的邊。
- ServiceManager
用於管理Service
和Client
。
架構
Cyber RT中有兩個層面的拓撲變化的監控:app
- 基於Fast RTPS的發現機制
它主要監視網絡中是否有參與者加入或退出。TopologyManager::CreateParticipant()
函數建立transport::Participant
對象時會輸入包含host name與process id的名稱。ParticipantListener
用於監聽網絡的變化。網絡拓撲發生變化時,Fast RTPS傳上來ParticipantDiscoveryInfo
,在TopologyManager::Convert()
函數中對該信息轉換成Cyber RT中的數據結構ChangeMsg
。而後調用回調函數TopologyManager::OnParticipantChange()
,它會調用其它幾個子管理器的OnTopoModuleLeave()
函數。而後子管理器中即可以將相應維護的信息進行更新(如NodeManager
中將相應的節點刪除)。框架
這層拓撲監控主要是經過Fast RTPS提供的自動發現機制。如進程意外退出,則要將各管理中相應信息進行更新。它的優勢是若是進程出錯或設備斷開也能夠工做,但粒度比較粗,且不是很是及時(好比斷開時)。socket
- 基於主動式的拓撲變動廣播
這一部分主要在TopologyManager::Init()
函數中建立和初始化。在初始化時,會調用它們的StartDiscovery()
函數開始啓動自動發現機制。基於TopologyManager
中的RtpsParticipant
對象,這幾個子管理會經過CreateSubscriber()
和CreatePublisher()
函數建立相應的subscriber和publisher。子管理器中channel名稱分別爲node_change_broadcast
,channel_change_broadcast
和service_change_broadcast
。Subscriber的回調函數爲Manager::OnRemoteChange()
。該回調函數中會解析拓撲變動消息並調用Dispose()
函數進行處理。分佈式
這層拓撲監控是主動式的,即須要相應的地方主動調用Join()
或Leave()
來觸發,而後各子管理器中回調函數進行信息的更新。如NodeChannelImpl
建立時會調用NodeManager::Join()
。Reader
和Writer
初始化時會調用JoinTheTopolicy()
函數,繼而調用ChannelManager::Join()
函數。相應地,有LeaveTheTopology()
函數表示退出拓撲網絡。在這兩個函數中,會調用Dispose()
函數,而這個函數是虛函數,在各子管理器中有各自的實現。另外Manager
提供AddChangeListener()
函數註冊當拓撲發生變化時的回調函數。舉例來講,Reader::JoinTheTopology()
函數中會經過該函數註冊回調Reader::OnChannelChange()
。
數據傳輸
在一個分佈式計算系統中,根據兩個節點間的位置關係須要使用不一樣的傳輸方式(定義在CommunicationMode
中):
- INTRA:若是是同進程的,由於在同一地址空間,直接傳指針就完了。
- SHM(Shared memory):若是是同一機器上,但跨進程的,爲了高效可使用共享內存。
- RTPS:若是是跨設備的,那就老老實實經過網絡傳吧。
示意圖以下:
不少時候一個計算圖中各類狀況都有,因此爲了達到最好的性能,須要混合使用。這種混合模式稱爲HYBRID模式。框架須要根據節點間關係選擇合適的傳輸後端。
每一個Writer
有Transmitter
,每一個Reader
有Receiver
。它們是負責消息發送與收取的類。Transmitter
與Receiver
的基類爲Endpoint
,表明一個通訊的端點,它主要的信息是身份標識與屬性。其類型爲RoleAttributes
(定義在role_attributes.proto
)的成員attr_
包含了host name,process id和一個根據uuid產生的hash值做爲id。經過它們就能夠判斷節點之間的相對位置關係了。
Reader
和Writer
會調用Transport
的方法CreateTransmitter()
和CreateReceiver()
用於建立發送端的transmitter和接收端的receiver。建立時有四種模式可選,分別是INTRA,SHM和RTPS,和HYBRID。最後一種是前三種的混合模式,也是默認的模式。如Transmitter
對應的繼承類爲IntraTransmitter
,ShmTransmitter
,RtpsTransmitter
和HybridTransmitter
。這幾個繼承類最主要實現了Transmit()
函數用於傳輸數據。對於Receiver
來講是相似的,它有4個繼承類對應四種傳輸方式,即IntraReceiver
,ShmReceiver
,RtpsReceiver
和HybridReceiver
。
結合前面提到的幾種模式對應的場景,transmitter與receiver的對應關係以下:
前面提到,傳輸層實現主要有四個實現後端,對應四種模式:
-
RTPS:RTPS部分基於eProsimar的Fast RTPS。
RtpsTransmitter
類中建立和封裝publisher。Transmit()
函數將消息序列化成Fast RTP中的格式UnderlayMessage
,而後經過publisher發出去。RtpsReceiver
中的dispatcher_
成員指向單例RtpsDispatcher
。它用於派發RTPS發來的數據,維護了channel id到subscriber的查找表。RtpsDispatcher::AddSubscriber()
函數使用eprosima::fastrtps::Domain::createSubscriber()
函數建立subscriber,其回調統一爲RtpsDispatcher::OnMessage()
函數。該函數會將從RTPS通路來的消息進行派發。 -
SHM:
Segment
類表示一塊對應一個channel的共享內存,由SegmentFactory::CreateSegment
函數建立。它有兩個繼承類PosixSegment
和XsiSegment
,是平臺相關的實現。在寫端,ShmTransmitter::Transmit()
函數用於發送消息,該函數先經過AcquireBlockToWrite()
函數拿一個可寫的block。若是發現該Segment
還沒有初始化,會調用OpenOrCreate()
經過OS的接口建立共享內存而且map出虛擬地址。這塊共享內存區域大致分兩部分。一部分爲元信息,另外一部分爲消息數據。後者會被切分爲相同大小的block。block的buffer大小默認16K,但趕上消息超出大小的時候會調整。拿到該block後,將消息序列化後寫入,並通知讀者來取消息。通知機制是經過NotifierBase
實現的。它有兩個實現類,分別爲ConditionNotifier
和MulticastNotifier
。前者爲默認設置。它會單獨開一塊共享共享專門用於通知,其中包含了ReadableInfo
等信息。MulticastNotifier
的主要區別是它是經過指定的socket廣播。在讀端,ShmDispatcher::Init()
初始化時會建立專門的線程,線程的執行體爲ShmDispatcher::Threadfunc()
函數。它在循環體內會經過Listen()
函數等待新消息。若是有新消息寫入後發出通知,這兒就會往下走。基於通知中的ReadableInfo
信息,獲得channel id,block index等信息,而後調用ReadMessage()
函數讀消息並反序列化。以後調用ShmDispatcher::OnMessage()
函數進行消息派發。 -
INTRA :用於進程內通訊。因爲讀者和寫者是在同一進程內,所以能夠直接調用。在
IntraTransmitter::Transmit()
函數中,會直接調用讀端的IntraDispatcher::OnMessage()
。該函數進行下一步消息的派發。 -
HYBRID:即默認模式,是前三種的結合體。具體功能其實仍是交給前面幾個後端完成的,只是它會根據讀者與寫者的關係使用相應的後端。
消息寫端
寫端的實現相對簡單一些。在模塊組件中,能夠經過CreateWriter()
函數建立Writer
對象,而後就能夠經過該對象向指定channel發送消息了。以CameraComponent
爲例:
writer_ = node_->CreateWriter<Image>(camera_config_->channel_name()); ... auto pb_image = std::make_shared<Image>(); pb_image->mutable_header()->set_frame_id(camera_config_->frame_id()); pb_image->set_width(raw_image_->width); pb_image->set_height(raw_image_->height); pb_image->mutable_data()->reserve(raw_image_->image_size); ... writer_->Write(pb_image);
這裏先建立了Writer
對象,而後填好了消息裏的數據(這裏發送的消息類型爲Image
,定義在modules/drivers/proto/sensor_image.proto
文件),最後調用Writer::Write()
函數將該消息發出。
CreateWriter()
函數中先建立Writer
對象,再調用Writer::Init()
函數進行初始化。初始化中主要經過CreateTransmitter()
函數建立Transmitter
對象。由於默認是HYBRID模式,因此這裏實際建立的是HybridTransmitter
對象。Transmitter
繼承自Endpoint
類,它其中的屬性信息以用來判斷讀者與寫者的相對關係。不一樣的相對關係決定使用何種Transmitter
對象。其配置在InitMode()
函數中設置:
template <typename M> void HybridTransmitter<M>::InitMode() { mode_ = std::make_shared<proto::CommunicationMode>(); mapping_table_[SAME_PROC] = mode_->same_proc(); mapping_table_[DIFF_PROC] = mode_->diff_proc(); mapping_table_[DIFF_HOST] = mode_->diff_host(); }
Writer
對象的初始化中還會將調用JointTheTopology()
函數將之加入到ChannelManager
維護的拓撲信息中。
template <typename MessageT> void Writer<MessageT>::JoinTheTopology() { // add listener change_conn_ = channel_manager_->AddChangeListener(std::bind( &Writer<MessageT>::OnChannelChange, this, std::placeholders::_1)); // get peer readers const std::string& channel_name = this->role_attr_.channel_name(); std::vector<proto::RoleAttributes> readers; channel_manager_->GetReadersOfChannel(channel_name, &readers); for (auto& reader : readers) { transmitter_->Enable(reader); } channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER, message::HasSerializer<MessageT>::value); }
這裏還會作一件比較重要的事是enable相應的Transmitter
。先經過ChannelManager
獲得該channel相應讀者的信息。而後對於每一個讀者,調用HybridTransmitter::Enable()
函數。HybridTransmitter
是混合模式的Transmitter
,它其實包含了RTPS,SHM和INTRA三種Transmitter
實例。但這三種Transmitter
並不必定都須要用到。好比,若是該消息對應的讀者全是同進程的,那就不必整上SHM和RTPS了。HybridTransmitter::Enable()
函數會根據參數來enable合適的Transmitter
。
template <typename M> void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) { auto relation = GetRelation(opposite_attr); if (relation == NO_RELATION) { return; } uint64_t id = opposite_attr.id(); std::lock_guard<std::mutex> lock(mutex_); receivers_[mapping_table_[relation]].insert(id); transmitters_[mapping_table_[relation]]->Enable(); TransmitHistoryMsg(opposite_attr); }
相應地,在Disable()
函數中決定是否要disable相應的Transmitter
。這樣在以後的Transmit()
函數中只要把transmitters_
中的全部Transmitter
拿出來調用Transmit()
函數便可。
發送數據是經過Writer::Write()
函數繼而調用Transmitter::Transmit()
函數來實現的。由於這裏是用的HybridTransmitter
,所以實際調用的是HybridTransmitter::Transmit()
函數:
template <typename M> bool HybridTransmitter<M>::Transmit(const MessagePtr& msg, const MessageInfo& msg_info) { std::lock_guard<std::mutex> lock(mutex_); history_->Add(msg, msg_info); for (auto& item : transmitters_) { item.second->Transmit(msg, msg_info); } return true; }
能夠看到這裏分別調用三大Transmitter
的Transmit()
函數發送消息。
消息讀端
讀端的處理鏈路相比下複雜一些。先回顧一個Component
中對消息的處理。對於一個Component
來講,它可能會從多個channel收取消息,而後基於全部channel的消息才能處理。第一個channel暫且稱之爲主channel。這些channel消息的組合咱們暫且稱爲組合消息。咱們就來看下典型的兩個channel狀況,其初始化的主要代碼爲:
template <typename M0, typename M1> bool Component<M0, M1, NullType, NullType>::Initialize( ... ReaderConfig reader_cfg; reader_cfg.channel_name = config.readers(1).channel(); reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile()); reader_cfg.pending_queue_size = config.readers(1).pending_queue_size(); auto reader1 = node_->template CreateReader<M1>(reader_cfg); reader_cfg.channel_name = config.readers(0).channel(); reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile()); reader_cfg.pending_queue_size = config.readers(0).pending_queue_size(); reader0 = node_->template CreateReader<M0>(reader_cfg); ... readers_.push_back(std::move(reader0)); readers_.push_back(std::move(reader1)); ... std::vector<data::VisitorConfig> config_list; for (auto& reader : readers_) { config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize()); } auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list); croutine::RoutineFactory factory = croutine::CreateRoutineFactory<M0, M1>(func, dv); return sched->CreateTask(factory, node_->Name()); }
其中對兩個channel分別建立Reader
對象。該Reader
對象是針對單個channel的。而後針對全部channel建立DataVisitor
對象,這時就是針對全部channel的組合消息了。最後建立協程來進行組合數據的處理。後面會看到每一個Reader
都會有單獨的協程來作數據讀取。所以,對於一個有n個channel的component,框架會爲此建立至少n+1個協程。
其中比較重要的結構就是用於讀取消息的Reader
類了。咱們先看Reader
對象的建立。其初始化函數Init()
以下:
template <typename MessageT> bool Reader<MessageT>::Init() { if (init_.exchange(true)) { return true; } std::function<void(const std::shared_ptr<MessageT>&)> func; if (reader_func_ != nullptr) { func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); this->reader_func_(msg); }; } else { func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); }; } auto sched = scheduler::Instance(); croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name(); auto dv = std::make_shared<data::DataVisitor<MessageT>>( role_attr_.channel_id(), pending_queue_size_); // Using factory to wrap templates. croutine::RoutineFactory factory = croutine::CreateRoutineFactory<MessageT>(std::move(func), dv); if (!sched->CreateTask(factory, croutine_name_)) { AERROR << "Create Task Failed!"; init_.store(false); return false; } receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_); this->role_attr_.set_id(receiver_->id().HashValue()); channel_manager_ = service_discovery::TopologyManager::Instance()->channel_manager(); JoinTheTopology(); return true; }
這裏主要建立了相應的DataVisitor
類,協程和Receiver
類等。其中DataVisitor
主要用於消息數據的訪問。它存放到來的消息數據,並提供接口供消息讀取。仍是以兩個channel的狀況爲例:
template <typename M0, typename M1> class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase { public: explicit DataVisitor(const std::vector<VisitorConfig>& configs) : buffer_m0_(configs[0].channel_id, new BufferType<M0>(configs[0].queue_size)), buffer_m1_(configs[1].channel_id, new BufferType<M1>(configs[1].queue_size)) { DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_); DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_); data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_); data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_); } ... bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) { // NOLINT if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) { next_msg_index_++; return true; } return false; } private: fusion::DataFusion<M0, M1>* data_fusion_ = nullptr; ChannelBuffer<M0> buffer_m0_; ChannelBuffer<M1> buffer_m1_; };
它的成員變量中對每個channel都有一個對應的ChannelBuffer
對象。DataDispatcher::AddBuffer()
函數在DataVisitor
初始化時用來將這些個ChannelBuffer
加入到DataDispatcher
的管理中。同時,DataNotifier::AddNotifier()
函數用來以主channel的id爲鍵值加入到DataNotifier
的管理中。DataDispatcher
與DataNotifier
均爲單例。前者爲模板類,意味着每個消息類型會有對應的DataDispatcher
對象,且相同消息類型會共享該對象。顧名思義,它主要用於數據傳輸層有數據來時的分發,即當新消息到來時經過DataDispatcher::Dispatch()
函數把它放到相應的消息緩衝區中。後者用於管理全部的Notifier
。它用於在消息派發完後喚醒相應的協程進行處理。這些對象的大致結構圖以下:
當channel多於一個時(組合消息),DataVisitor
中還有一個DataFusion
對象用於將多路channel的數據合併。DataFusion
的實現類爲AllLatest
,聽名字就知道它會取全部channel中的最新值。除了per-channel的ChannelBuffer
對象外,它還有一個特殊的ChannelBuffer
對象用於存放多channel消息的組合消息(即各個channel的消息類型的tuple)。當填入主channel的消息時,會調用由SetFusionCallback()
函數註冊的回調。該回調判斷是否全部channel都有消息,若是都有消息的話就將這些消息做爲組合消息填入該組合消息的ChannelBuffer
中。 在協程處理函數中會調用DataVisitor::TryFetch()
函數從該ChannelBuffer
中拿組合消息。值得注意的是這件事只在主channel有消息來時纔會被觸發,所以主channel的選取是有講究的。
Reader
初始化時建立的另外一個關鍵對象爲Receiver
。它有4個繼承類,默認爲混合模式的HybridReceiver
。HybridReceiver::InitReceivers
中分別建立相應的IntraReceiver
、ShmReceiver
和RtpsReceiver
,放在成員receivers_
數組中。它會來根據寫端的狀況來enable和disable相應的Receiver
。ReceiverManager
用於管理這些Receiver
對象。它以channel爲key進行管理,所以同一進程內訂閱同一個channel的會共用同一個Receiver
對象。ReceiverManager::GetReceiver()
函數用於按鍵值取出Receiver
,如沒有,則經過Transport::CreateReceiver()
函數新建一個Receiver
。 這些個Receiver
在Enable()
函數中會經過AddListener()
函數向對應的Dispatcher
註冊其回調函數XXXReceiver::OnNewMessage()
。Dispatcher
類中的成員msg_listeners_
是channel id到ListenerHandler
對象的查找表。ListenerHandler
經過signal/slot機制保存了全部這些回調。注意不一樣傳輸後端的AddListener()
實現略有不一樣。好比RtpsDispatcher::AddListener()
函數中會將輸入的消息先經過ParseFromString()
函數進行解析,而後調用傳入的回調。ShmDispatcher::AddListener()
函數也是相似,它會先經過ParseFromArray()
函數解析消息。而對於IntraDispatcher::AddListener()
,因爲是同個進程內,是以消息自己的類型傳的,就不必解析了。
這些相關結構關係示意圖以下:
看了一些關鍵相關數據結構,接下來看下讀端的處理流程。首先,如以前介紹的,各Dispatcher
的繼承類各顯神通使本身的OnMessage()
回調函數被調用。以RtpsDispatcher
爲例:
void RtpsDispatcher::OnMessage(uint64_t channel_id, const std::shared_ptr<std::string>& msg_str, const MessageInfo& msg_info) { if (is_shutdown_.load()) { return; } ListenerHandlerBasePtr* handler_base = nullptr; if (msg_listeners_.Get(channel_id, &handler_base)) { auto handler = std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base); handler->Run(msg_str, msg_info); } }
這裏ListenerHandler::Run()
會根據消息的發送者信息找到對應的回調,即Receiver::OnNewMessage()
。
template <typename M> void Receiver<M>::OnNewMessage(const MessagePtr& msg, const MessageInfo& msg_info) { if (msg_listener_ != nullptr) { msg_listener_(msg, msg_info, attr_); } }
這裏的回調函數msg_listener_
是在Receiver
建立的時候傳入的。其實主要是調用了DataDispatcher::Dispatch()
函數來消息的派發:
transport::Transport::Instance()->CreateReceiver<MessageT>( role_attr, [](const std::shared_ptr<MessageT>& msg, const transport::MessageInfo& msg_info, const proto::RoleAttributes& reader_attr) { (void)msg_info; (void)reader_attr; PerfEventCache::Instance()->AddTransportEvent( TransPerf::DISPATCH, reader_attr.channel_id(), msg_info.seq_num()); data::DataDispatcher<MessageT>::Instance()->Dispatch( reader_attr.channel_id(), msg); PerfEventCache::Instance()->AddTransportEvent( TransPerf::NOTIFY, reader_attr.channel_id(), msg_info.seq_num()); });
DataDisaptcher
是模板類單例,即對於一種特定類型的消息能夠共用一個DataDispatcher
。以前在DataVisitor
初始化時會經過AddBuffer()
函數將ChannelBuffer
加入到DataDispatcher
的成員buffers_map_
中。它是一個以channel id爲key的map,其value爲全部等待該channel上消息的CacheBuffer
的數組。也就是說,消息分發時,只須要根據channel id找到這些buffer,而後將新來的消息填入其中便可。這就是Dispatcher::Dispatch()
函數主要作的事:
template <typename T> bool DataDispatcher<T>::Dispatch(const uint64_t channel_id, const std::shared_ptr<T>& msg) { BufferVector* buffers = nullptr; if (apollo::cyber::IsShutdown()) { return false; } if (buffers_map_.Get(channel_id, &buffers)) { for (auto& buffer_wptr : *buffers) { if (auto buffer = buffer_wptr.lock()) { std::lock_guard<std::mutex> lock(buffer->Mutex()); buffer->Fill(msg); } } } else { return false; } return notifier_->Notify(channel_id); }
最後調用DataNotifier::Notify()
函數來通知新消息的到來。它會觸發該channel上全部對應Notifier
中的回調。
inline bool DataNotifier::Notify(const uint64_t channel_id) { NotifyVector* notifies = nullptr; if (notifies_map_.Get(channel_id, ¬ifies)) { for (auto& notifier : *notifies) { if (notifier && notifier->callback) { notifier->callback(); } } return true; } return false; }
這個Notifier
中的回調是在建立協程時經過RegisterNotifyCallback()
函數註冊進去的,目的是爲了喚醒相應的協程來處理該新消息。
visitor->RegisterNotifyCallback([this, task_id]() { if (cyber_unlikely(stop_.load())) { return; } this->NotifyProcessor(task_id); });
NotifyProcessor()
函數會修改對應協程的狀態使之能被調度執行。前面提到,對於n個channel輸入的component,會有n+1個協程。它們都是以DataVisitor
和消息回調函數一塊兒做爲參數建立的。這個協程主體中會調用DataVisitor::TryFetch()
函數拿消息,而後調用註冊的消息處理函數:
factory.create_routine = [=]() { return [=]() { std::shared_ptr<M0> msg; for (;;) { CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT); if (dv->TryFetch(msg)) { f(msg); CRoutine::Yield(RoutineState::READY); } else { CRoutine::Yield(); } } }; };
對於那n個消息讀取協程來講,其消息處理函數爲:
func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); this->reader_func_(msg); };
這個回調函數中會調用Reader::Enqueue()
函數。在該函數中,主要調用Blocker::Publish()
函數,它繼而調用Blocker::Enqueue()
和Blocker::Notify()
函數。Blocker
類是一個存儲消息的結構。BlockerManager
類用於管理Blocker
,其中維護了以channel爲鍵值的Blocker
的map。Reader::Enqueue()
函數將消息放到Blocker
的成員published_msg_queue_
隊列中。以後,能夠經過Blocker::Observe()
函數將成員published_msg_queue_
隊列的消息放到成員observed_msg_queue_
隊列,而後經過Blocker::GetLatestObserved()
函數獲得最新的消息。好比ControlComponent
中的:
chassis_reader_->Observe(); const auto &chassis_msg = chassis_reader_->GetLatestObserved();
而對於剩下那一個協程,它是由主channel來觸發的。因它處理的是多channel的組合消息,在協程主體中的TryFetch()
函數會調用AllLatest::Fusion()
函數同時拿多個channel上的最新消息。至於這個組合消息是怎麼填入的前面有提。簡單來講,對於它來講,主channel來消息時,同時也會將其它channel的消息寫入組合消息。而後調度協程,拿出組合消息進行處理。其消息處理函數爲:
auto func = [self](const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) { auto ptr = self.lock(); if (ptr) { ptr->Process(msg0, msg1); } else { AERROR << "Component object has been destroyed."; } };
該實現中主要以收到的消息爲參數調用Component
中的處理函數Process()
,從而執行組件的自定義處理邏輯。
小結
文中提了很多細枝末節,最後很是high-level地歸納下從寫者到讀者的流程。寫者Writer
寫消息時,會經過HybridTransmitter
繼而使用合適後端的Transmitter
發送消息。根據讀與寫者間的位置關係,通過網絡、共享內存或直接調用的方式,對應後端的Dispatcher
收到消息。收到後轉成指定消息類型,交給Receiver
。而後經過DataDispatcher
派發消息。派發消息就是將消息放到對應的buffer中,而後通知相應的協程來做進一步處理。上層模塊要取用這些消息,主要兩種方式:一種是經過Component
的Proc()
接口,它被調用時參數就是最新的消息。另外一種是經過Reader
的Observe()
函數直接拿。
咱們知道,Apollo在版本3.5前是基於ROS的,同時也對ROS作了幾個重要改進。這些改進很多是關於通訊系統的,如共享內存、去中心化和數據兼容性。到Cyber RT的演進也天然延續了這幾個優勢。總得來講,Cyber RT基於自動發現機制與Publish-Subscribe模式實現了通訊網絡的拓撲管理。同時它對數據傳輸層作了抽象,下面實現多個後端分別適用於不一樣場景,並提供了HYBRID模式能夠根據讀者和寫者間的關係自動使用合適的傳輸層後端。這樣,通訊系統的複雜性就被很好地屏蔽,框架就能提供給應用層便利的開發接口。