自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的通訊傳輸

前言

計算框架是自動駕駛系統中的重中之重,也是整個系統得以高效穩定運行的基礎。爲了實時地完成感知、決策和執行,系統須要一系列的模塊相互緊密配合,高效地執行任務流。因爲各類緣由,這些模塊可能位於不一樣進程,也可能位於不一樣機器。這就要求計算框架中具備靈活的、高性能的通訊機制。咱們知道,Apollo在3.5版本中推出了Cyber RT替代了原先的ROS。以前寫過兩篇相關的文章介紹了其中的調度部分:《自動駕駛平臺Apollo 3.5閱讀手記:Cyber RT中的協程(Coroutine)》《自動駕駛平臺Apollo 5.5閱讀手記:Cyber RT中的任務調度》。今天就來聊一下其中的另外一重要部分-通訊系統。node

和ROS & ROS2中相似,Cyber RT中支持兩種數據交換模式:一種是Publish-Subscribe模式,經常使用於數據流處理中節點間通訊。即發佈者(Publisher)在channel(ROS中對應地稱爲topic)上發佈消息,訂閱該channel的訂閱者(Subscriber)便會收到消息數據;另外一種就是常見的Service-Client模式,經常使用於客戶端與服務端的請求與響應。本質上它是能夠基於前者實現的。Node是整個數據拓撲網絡中的基本單元。一個Node中能夠建立多個讀者/寫者,服務端/客戶端。讀者和寫者分別對應ReaderWriter,用於Publish-Subscribe模式。服務端和客戶端分別對應ServiceClient,用於Service-Client模式。後端

實現解析

自動駕駛系統中的各個處理模塊基本都是實現爲Component。一個Component中包含一個Node,另外會根據須要建立和管理WriterReaderServiceClient。這些用於通訊的類下面基於TrasmitterReceiver類。前者用於數據發送,後者用於數據接收。它們是數據傳輸層的抽象,之下可有多個傳輸層實現用於不一樣場景下的傳輸。如對於TrasmitterIntraTransmitterShmTransmitterRtpsTransmitterHybridTransmitter。對於Receiver也是相似的。其中RTPS後端基於Fast RTPS。Fast RTPS是DDS(Data Distribution Service)標準的一個很是流行的開源實現。DDS標準提供了一個平臺無關的數據模型,主要用於實時分佈式系統。不一樣的實現能夠相互通訊。整個通訊系統的架構層次圖以下。
在這裏插入圖片描述
數組

下面咱們就從幾個方面深刻地看下它們的實現機制。網絡

服務發現與拓撲管理

首先來看下比較基礎與核心的服務發現與拓撲管理。其實現主要在目錄cyber/service_discovery/下。節點間經過讀和寫端創建數據通路。以channel爲邊,這樣能夠獲得一個數據流圖絡。因爲節點可能退出,訂閱狀況也可能發生改變,因此這個網絡是動態的。所以須要對網絡拓撲進行監控。數據結構

主要負責這件事的數據結構是TopologyManager,它是個單例,由於每一個進程只要有一個來負責監控網絡拓撲就能夠了。TopologyManager有三個子管理器,並有共同的基類Manager。它們分別爲:
  - NodeManager用於管理網絡拓撲中的節點。
  - ChannelManager用於管理channel,即網絡拓撲中的邊。
  - ServiceManager用於管理ServiceClient


架構

Cyber RT中有兩個層面的拓撲變化的監控:app

  • 基於Fast RTPS的發現機制

它主要監視網絡中是否有參與者加入或退出。TopologyManager::CreateParticipant()函數建立transport::Participant對象時會輸入包含host name與process id的名稱。ParticipantListener用於監聽網絡的變化。網絡拓撲發生變化時,Fast RTPS傳上來ParticipantDiscoveryInfo,在TopologyManager::Convert()函數中對該信息轉換成Cyber RT中的數據結構ChangeMsg。而後調用回調函數TopologyManager::OnParticipantChange(),它會調用其它幾個子管理器的OnTopoModuleLeave()函數。而後子管理器中即可以將相應維護的信息進行更新(如NodeManager中將相應的節點刪除)。框架

這層拓撲監控主要是經過Fast RTPS提供的自動發現機制。如進程意外退出,則要將各管理中相應信息進行更新。它的優勢是若是進程出錯或設備斷開也能夠工做,但粒度比較粗,且不是很是及時(好比斷開時)。socket

  • 基於主動式的拓撲變動廣播

這一部分主要在TopologyManager::Init()函數中建立和初始化。在初始化時,會調用它們的StartDiscovery()函數開始啓動自動發現機制。基於TopologyManager中的RtpsParticipant對象,這幾個子管理會經過CreateSubscriber()CreatePublisher()函數建立相應的subscriber和publisher。子管理器中channel名稱分別爲node_change_broadcastchannel_change_broadcastservice_change_broadcast。Subscriber的回調函數爲Manager::OnRemoteChange()。該回調函數中會解析拓撲變動消息並調用Dispose()函數進行處理。分佈式

這層拓撲監控是主動式的,即須要相應的地方主動調用Join()Leave()來觸發,而後各子管理器中回調函數進行信息的更新。如NodeChannelImpl建立時會調用NodeManager::Join()ReaderWriter初始化時會調用JoinTheTopolicy()函數,繼而調用ChannelManager::Join()函數。相應地,有LeaveTheTopology()函數表示退出拓撲網絡。在這兩個函數中,會調用Dispose()函數,而這個函數是虛函數,在各子管理器中有各自的實現。另外Manager提供AddChangeListener()函數註冊當拓撲發生變化時的回調函數。舉例來講,Reader::JoinTheTopology()函數中會經過該函數註冊回調Reader::OnChannelChange()

數據傳輸

在一個分佈式計算系統中,根據兩個節點間的位置關係須要使用不一樣的傳輸方式(定義在CommunicationMode中):
  - INTRA:若是是同進程的,由於在同一地址空間,直接傳指針就完了。
  - SHM(Shared memory):若是是同一機器上,但跨進程的,爲了高效可使用共享內存。
  - RTPS:若是是跨設備的,那就老老實實經過網絡傳吧。


示意圖以下:
在這裏插入圖片描述

不少時候一個計算圖中各類狀況都有,因此爲了達到最好的性能,須要混合使用。這種混合模式稱爲HYBRID模式。框架須要根據節點間關係選擇合適的傳輸後端。

每一個WriterTransmitter,每一個ReaderReceiver。它們是負責消息發送與收取的類。TransmitterReceiver的基類爲Endpoint,表明一個通訊的端點,它主要的信息是身份標識與屬性。其類型爲RoleAttributes(定義在role_attributes.proto)的成員attr_包含了host name,process id和一個根據uuid產生的hash值做爲id。經過它們就能夠判斷節點之間的相對位置關係了。

ReaderWriter會調用Transport的方法CreateTransmitter()CreateReceiver()用於建立發送端的transmitter和接收端的receiver。建立時有四種模式可選,分別是INTRA,SHM和RTPS,和HYBRID。最後一種是前三種的混合模式,也是默認的模式。如Transmitter對應的繼承類爲IntraTransmitterShmTransmitterRtpsTransmitterHybridTransmitter。這幾個繼承類最主要實現了Transmit()函數用於傳輸數據。對於Receiver來講是相似的,它有4個繼承類對應四種傳輸方式,即IntraReceiverShmReceiverRtpsReceiverHybridReceiver

結合前面提到的幾種模式對應的場景,transmitter與receiver的對應關係以下:
在這裏插入圖片描述

前面提到,傳輸層實現主要有四個實現後端,對應四種模式:

  • RTPS:RTPS部分基於eProsimar的Fast RTPS。RtpsTransmitter類中建立和封裝publisher。Transmit()函數將消息序列化成Fast RTP中的格式UnderlayMessage,而後經過publisher發出去。RtpsReceiver中的dispatcher_成員指向單例RtpsDispatcher。它用於派發RTPS發來的數據,維護了channel id到subscriber的查找表。RtpsDispatcher::AddSubscriber()函數使用eprosima::fastrtps::Domain::createSubscriber()函數建立subscriber,其回調統一爲RtpsDispatcher::OnMessage()函數。該函數會將從RTPS通路來的消息進行派發。

  • SHMSegment類表示一塊對應一個channel的共享內存,由SegmentFactory::CreateSegment函數建立。它有兩個繼承類PosixSegmentXsiSegment,是平臺相關的實現。在寫端,ShmTransmitter::Transmit()函數用於發送消息,該函數先經過AcquireBlockToWrite()函數拿一個可寫的block。若是發現該Segment還沒有初始化,會調用OpenOrCreate()經過OS的接口建立共享內存而且map出虛擬地址。這塊共享內存區域大致分兩部分。一部分爲元信息,另外一部分爲消息數據。後者會被切分爲相同大小的block。block的buffer大小默認16K,但趕上消息超出大小的時候會調整。拿到該block後,將消息序列化後寫入,並通知讀者來取消息。通知機制是經過NotifierBase實現的。它有兩個實現類,分別爲ConditionNotifierMulticastNotifier。前者爲默認設置。它會單獨開一塊共享共享專門用於通知,其中包含了ReadableInfo等信息。MulticastNotifier的主要區別是它是經過指定的socket廣播。在讀端,ShmDispatcher::Init()初始化時會建立專門的線程,線程的執行體爲ShmDispatcher::Threadfunc()函數。它在循環體內會經過Listen()函數等待新消息。若是有新消息寫入後發出通知,這兒就會往下走。基於通知中的ReadableInfo信息,獲得channel id,block index等信息,而後調用ReadMessage()函數讀消息並反序列化。以後調用ShmDispatcher::OnMessage()函數進行消息派發。

  • INTRA :用於進程內通訊。因爲讀者和寫者是在同一進程內,所以能夠直接調用。在IntraTransmitter::Transmit()函數中,會直接調用讀端的IntraDispatcher::OnMessage()。該函數進行下一步消息的派發。

  • HYBRID:即默認模式,是前三種的結合體。具體功能其實仍是交給前面幾個後端完成的,只是它會根據讀者與寫者的關係使用相應的後端。

消息寫端

寫端的實現相對簡單一些。在模塊組件中,能夠經過CreateWriter()函數建立Writer對象,而後就能夠經過該對象向指定channel發送消息了。以CameraComponent爲例:

writer_ = node_->CreateWriter<Image>(camera_config_->channel_name());
...
auto pb_image = std::make_shared<Image>();                                 
pb_image->mutable_header()->set_frame_id(camera_config_->frame_id());      
pb_image->set_width(raw_image_->width);                                    
pb_image->set_height(raw_image_->height);                                  
pb_image->mutable_data()->reserve(raw_image_->image_size);       
...
writer_->Write(pb_image);

這裏先建立了Writer對象,而後填好了消息裏的數據(這裏發送的消息類型爲Image,定義在modules/drivers/proto/sensor_image.proto文件),最後調用Writer::Write()函數將該消息發出。

CreateWriter()函數中先建立Writer對象,再調用Writer::Init()函數進行初始化。初始化中主要經過CreateTransmitter()函數建立Transmitter對象。由於默認是HYBRID模式,因此這裏實際建立的是HybridTransmitter對象。Transmitter繼承自Endpoint類,它其中的屬性信息以用來判斷讀者與寫者的相對關係。不一樣的相對關係決定使用何種Transmitter對象。其配置在InitMode()函數中設置:

template <typename M>                                   
void HybridTransmitter<M>::InitMode() {                  
  mode_ = std::make_shared<proto::CommunicationMode>(); 
  mapping_table_[SAME_PROC] = mode_->same_proc();       
  mapping_table_[DIFF_PROC] = mode_->diff_proc();       
  mapping_table_[DIFF_HOST] = mode_->diff_host();       
}

Writer對象的初始化中還會將調用JointTheTopology()函數將之加入到ChannelManager維護的拓撲信息中。

template <typename MessageT>                                              
void Writer<MessageT>::JoinTheTopology() {                                 
  // add listener 
  change_conn_ = channel_manager_->AddChangeListener(std::bind(           
      &Writer<MessageT>::OnChannelChange, this, std::placeholders::_1));  
                                                                          
  // get peer readers 
  const std::string& channel_name = this->role_attr_.channel_name();      
  std::vector<proto::RoleAttributes> readers;                             
  channel_manager_->GetReadersOfChannel(channel_name, &readers);          
  for (auto& reader : readers) {                                           
    transmitter_->Enable(reader);                                         
  }                                                                       
                                                                          
  channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER,  
                         message::HasSerializer<MessageT>::value);        
}

這裏還會作一件比較重要的事是enable相應的Transmitter。先經過ChannelManager獲得該channel相應讀者的信息。而後對於每一個讀者,調用HybridTransmitter::Enable()函數。HybridTransmitter是混合模式的Transmitter,它其實包含了RTPS,SHM和INTRA三種Transmitter實例。但這三種Transmitter並不必定都須要用到。好比,若是該消息對應的讀者全是同進程的,那就不必整上SHM和RTPS了。HybridTransmitter::Enable()函數會根據參數來enable合適的Transmitter

template <typename M>
void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) {     
  auto relation = GetRelation(opposite_attr);
  if (relation == NO_RELATION) { 
    return;
  }
  
  uint64_t id = opposite_attr.id();
  std::lock_guard<std::mutex> lock(mutex_);                                 
  receivers_[mapping_table_[relation]].insert(id);                          
  transmitters_[mapping_table_[relation]]->Enable();
  TransmitHistoryMsg(opposite_attr);                                        
}

相應地,在Disable()函數中決定是否要disable相應的Transmitter。這樣在以後的Transmit()函數中只要把transmitters_中的全部Transmitter拿出來調用Transmit()函數便可。

發送數據是經過Writer::Write()函數繼而調用Transmitter::Transmit()函數來實現的。由於這裏是用的HybridTransmitter,所以實際調用的是HybridTransmitter::Transmit()函數:

template <typename M>
bool HybridTransmitter<M>::Transmit(const MessagePtr& msg,
                                    const MessageInfo& msg_info) { 
  std::lock_guard<std::mutex> lock(mutex_);
  history_->Add(msg, msg_info);
  for (auto& item : transmitters_) { 
    item.second->Transmit(msg, msg_info);
  }
  return true;
}

能夠看到這裏分別調用三大TransmitterTransmit()函數發送消息。

消息讀端

讀端的處理鏈路相比下複雜一些。先回顧一個Component中對消息的處理。對於一個Component來講,它可能會從多個channel收取消息,而後基於全部channel的消息才能處理。第一個channel暫且稱之爲主channel。這些channel消息的組合咱們暫且稱爲組合消息。咱們就來看下典型的兩個channel狀況,其初始化的主要代碼爲:

template <typename M0, typename M1>                      
bool Component<M0, M1, NullType, NullType>::Initialize(  
  ...
  ReaderConfig reader_cfg;                                                       
  reader_cfg.channel_name = config.readers(1).channel();                         
  reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());              
  reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();        
                                                                                 
  auto reader1 = node_->template CreateReader<M1>(reader_cfg);                   
                                                                                 
  reader_cfg.channel_name = config.readers(0).channel();                         
  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());              
  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();        
                                                                                 
  reader0 = node_->template CreateReader<M0>(reader_cfg);         
  ...
  readers_.push_back(std::move(reader0)); 
  readers_.push_back(std::move(reader1)); 
  ...  
  std::vector<data::VisitorConfig> config_list;                                   
  for (auto& reader : readers_) {                                                  
    config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());    
  }                                                                               
  auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);             
  croutine::RoutineFactory factory =                                              
      croutine::CreateRoutineFactory<M0, M1>(func, dv);                           
  return sched->CreateTask(factory, node_->Name());                               
}

其中對兩個channel分別建立Reader對象。該Reader對象是針對單個channel的。而後針對全部channel建立DataVisitor對象,這時就是針對全部channel的組合消息了。最後建立協程來進行組合數據的處理。後面會看到每一個Reader都會有單獨的協程來作數據讀取。所以,對於一個有n個channel的component,框架會爲此建立至少n+1個協程。

其中比較重要的結構就是用於讀取消息的Reader類了。咱們先看Reader對象的建立。其初始化函數Init()以下:

template <typename MessageT>                                                         
bool Reader<MessageT>::Init() {                                                       
  if (init_.exchange(true)) {                                                         
    return true;                                                                     
  }                                                                                  
  std::function<void(const std::shared_ptr<MessageT>&)> func;                        
  if (reader_func_ != nullptr) {                                                      
    func = [this](const std::shared_ptr<MessageT>& msg) {                             
      this->Enqueue(msg);                                                            
      this->reader_func_(msg);                                                       
    };                                                                               
  } else {                                                                            
    func = [this](const std::shared_ptr<MessageT>& msg) {  this->Enqueue(msg); };     
  }                                                                                  
  auto sched = scheduler::Instance();                                                
  croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();         
  auto dv = std::make_shared<data::DataVisitor<MessageT>>(                           
      role_attr_.channel_id(), pending_queue_size_);                                 
  // Using factory to wrap templates. 
  croutine::RoutineFactory factory =                                                 
      croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);                 
  if (!sched->CreateTask(factory, croutine_name_)) {                                  
    AERROR << "Create Task Failed!";                                                 
    init_.store(false);                                                              
    return false;                                                                    
  }                                                                                  
                                                                                     
  receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);        
  this->role_attr_.set_id(receiver_->id().HashValue());                              
  channel_manager_ =                                                                 
      service_discovery::TopologyManager::Instance()->channel_manager();             
  JoinTheTopology();                                                                 
                                                                                     
  return true;                                                                       
}

這裏主要建立了相應的DataVisitor類,協程和Receiver類等。其中DataVisitor主要用於消息數據的訪問。它存放到來的消息數據,並提供接口供消息讀取。仍是以兩個channel的狀況爲例:

template <typename M0, typename M1>                                                  
class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {              
 public:                                                                             
  explicit DataVisitor(const std::vector<VisitorConfig>& configs)                    
      : buffer_m0_(configs[0].channel_id,                                            
                   new BufferType<M0>(configs[0].queue_size)),                       
        buffer_m1_(configs[1].channel_id,                                            
                   new BufferType<M1>(configs[1].queue_size)) {                       
    DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);                           
    DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);                           
    data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);                 
    data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);            
  }   
  ... 
  bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) {   // NOLINT 
    if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {                                  
      next_msg_index_++;                                                                  
      return true;                                                                        
    }                                                                                     
    return false;                                                                         
  }                                                                                       
                                                                                          
 private:                                                                                 
  fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;                                     
  ChannelBuffer<M0> buffer_m0_;                                                           
  ChannelBuffer<M1> buffer_m1_;                                                           
};

它的成員變量中對每個channel都有一個對應的ChannelBuffer對象。DataDispatcher::AddBuffer()函數在DataVisitor初始化時用來將這些個ChannelBuffer加入到DataDispatcher的管理中。同時,DataNotifier::AddNotifier()函數用來以主channel的id爲鍵值加入到DataNotifier的管理中。DataDispatcherDataNotifier均爲單例。前者爲模板類,意味着每個消息類型會有對應的DataDispatcher對象,且相同消息類型會共享該對象。顧名思義,它主要用於數據傳輸層有數據來時的分發,即當新消息到來時經過DataDispatcher::Dispatch()函數把它放到相應的消息緩衝區中。後者用於管理全部的Notifier。它用於在消息派發完後喚醒相應的協程進行處理。這些對象的大致結構圖以下:
在這裏插入圖片描述

當channel多於一個時(組合消息),DataVisitor中還有一個DataFusion對象用於將多路channel的數據合併。DataFusion的實現類爲AllLatest,聽名字就知道它會取全部channel中的最新值。除了per-channel的ChannelBuffer對象外,它還有一個特殊的ChannelBuffer對象用於存放多channel消息的組合消息(即各個channel的消息類型的tuple)。當填入主channel的消息時,會調用由SetFusionCallback()函數註冊的回調。該回調判斷是否全部channel都有消息,若是都有消息的話就將這些消息做爲組合消息填入該組合消息的ChannelBuffer中。 在協程處理函數中會調用DataVisitor::TryFetch()函數從該ChannelBuffer中拿組合消息。值得注意的是這件事只在主channel有消息來時纔會被觸發,所以主channel的選取是有講究的。

Reader初始化時建立的另外一個關鍵對象爲Receiver。它有4個繼承類,默認爲混合模式的HybridReceiverHybridReceiver::InitReceivers中分別建立相應的IntraReceiverShmReceiverRtpsReceiver,放在成員receivers_數組中。它會來根據寫端的狀況來enable和disable相應的ReceiverReceiverManager用於管理這些Receiver對象。它以channel爲key進行管理,所以同一進程內訂閱同一個channel的會共用同一個Receiver對象。ReceiverManager::GetReceiver()函數用於按鍵值取出Receiver,如沒有,則經過Transport::CreateReceiver()函數新建一個Receiver。 這些個ReceiverEnable()函數中會經過AddListener()函數向對應的Dispatcher註冊其回調函數XXXReceiver::OnNewMessage()Dispatcher類中的成員msg_listeners_是channel id到ListenerHandler對象的查找表。ListenerHandler經過signal/slot機制保存了全部這些回調。注意不一樣傳輸後端的AddListener()實現略有不一樣。好比RtpsDispatcher::AddListener()函數中會將輸入的消息先經過ParseFromString()函數進行解析,而後調用傳入的回調。ShmDispatcher::AddListener()函數也是相似,它會先經過ParseFromArray()函數解析消息。而對於IntraDispatcher::AddListener(),因爲是同個進程內,是以消息自己的類型傳的,就不必解析了。

這些相關結構關係示意圖以下:
在這裏插入圖片描述
看了一些關鍵相關數據結構,接下來看下讀端的處理流程。首先,如以前介紹的,各Dispatcher的繼承類各顯神通使本身的OnMessage()回調函數被調用。以RtpsDispatcher爲例:

void RtpsDispatcher::OnMessage(uint64_t channel_id,
                               const std::shared_ptr<std::string>& msg_str,
                               const MessageInfo& msg_info) { 
  if (is_shutdown_.load()) { 
    return;
  }
    
  ListenerHandlerBasePtr* handler_base = nullptr;
  if (msg_listeners_.Get(channel_id, &handler_base)) { 
    auto handler =
        std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
    handler->Run(msg_str, msg_info);
  }
}

這裏ListenerHandler::Run()會根據消息的發送者信息找到對應的回調,即Receiver::OnNewMessage()

template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
                               const MessageInfo& msg_info) { 
  if (msg_listener_ != nullptr) { 
    msg_listener_(msg, msg_info, attr_);
  }
}

這裏的回調函數msg_listener_是在Receiver建立的時候傳入的。其實主要是調用了DataDispatcher::Dispatch()函數來消息的派發:

transport::Transport::Instance()->CreateReceiver<MessageT>(      
    role_attr, [](const std::shared_ptr<MessageT>& msg,          
                  const transport::MessageInfo& msg_info,        
                  const proto::RoleAttributes& reader_attr) {     
      (void)msg_info;                                            
      (void)reader_attr;                                         
      PerfEventCache::Instance()->AddTransportEvent(             
          TransPerf::DISPATCH, reader_attr.channel_id(),         
          msg_info.seq_num());                                   
      data::DataDispatcher<MessageT>::Instance()->Dispatch(      
          reader_attr.channel_id(), msg);                        
      PerfEventCache::Instance()->AddTransportEvent(             
          TransPerf::NOTIFY, reader_attr.channel_id(),           
          msg_info.seq_num());                                   
    });

DataDisaptcher是模板類單例,即對於一種特定類型的消息能夠共用一個DataDispatcher。以前在DataVisitor初始化時會經過AddBuffer()函數將ChannelBuffer加入到DataDispatcher的成員buffers_map_中。它是一個以channel id爲key的map,其value爲全部等待該channel上消息的CacheBuffer的數組。也就是說,消息分發時,只須要根據channel id找到這些buffer,而後將新來的消息填入其中便可。這就是Dispatcher::Dispatch()函數主要作的事:

template <typename T>                                                 
bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,           
                                 const std::shared_ptr<T>& msg) {      
  BufferVector* buffers = nullptr;                                    
  if (apollo::cyber::IsShutdown()) {                                   
    return false;                                                     
  }                                                                   
  if (buffers_map_.Get(channel_id, &buffers)) {                        
    for (auto& buffer_wptr : *buffers) {                               
      if (auto buffer = buffer_wptr.lock()) {                          
        std::lock_guard<std::mutex> lock(buffer->Mutex());            
        buffer->Fill(msg);                                            
      }                                                               
    }                                                                 
  } else {                                                             
    return false;                                                     
  }                                                                   
  return notifier_->Notify(channel_id);                               
}

最後調用DataNotifier::Notify()函數來通知新消息的到來。它會觸發該channel上全部對應Notifier中的回調。

inline bool DataNotifier::Notify(const uint64_t channel_id) {   
  NotifyVector* notifies = nullptr;                            
  if (notifies_map_.Get(channel_id, &notifies)) {               
    for (auto& notifier : *notifies) {                          
      if (notifier && notifier->callback) {                     
        notifier->callback();                                  
      }                                                        
    }                                                          
    return true;                                               
  }                                                            
  return false;                                                
}

這個Notifier中的回調是在建立協程時經過RegisterNotifyCallback()函數註冊進去的,目的是爲了喚醒相應的協程來處理該新消息。

visitor->RegisterNotifyCallback([this, task_id]() {  
  if (cyber_unlikely(stop_.load())) {                
    return;                                         
  }                                                 
  this->NotifyProcessor(task_id);                   
});

NotifyProcessor()函數會修改對應協程的狀態使之能被調度執行。前面提到,對於n個channel輸入的component,會有n+1個協程。它們都是以DataVisitor和消息回調函數一塊兒做爲參數建立的。這個協程主體中會調用DataVisitor::TryFetch()函數拿消息,而後調用註冊的消息處理函數:

factory.create_routine = [=]() {                                            
    return [=]() {                                                            
      std::shared_ptr<M0> msg;                                               
      for (;;) {                                                              
        CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);   
        if (dv->TryFetch(msg)) {                                              
          f(msg);                                                            
          CRoutine::Yield(RoutineState::READY);                              
        } else {                                                              
          CRoutine::Yield();                                                 
        }                                                                    
      }                                                                      
    };                                                                       
  };

對於那n個消息讀取協程來講,其消息處理函數爲:

func = [this](const std::shared_ptr<MessageT>& msg) {     
  this->Enqueue(msg);                                    
  this->reader_func_(msg);                               
};

這個回調函數中會調用Reader::Enqueue()函數。在該函數中,主要調用Blocker::Publish()函數,它繼而調用Blocker::Enqueue()Blocker::Notify()函數。Blocker類是一個存儲消息的結構。BlockerManager類用於管理Blocker,其中維護了以channel爲鍵值的Blocker的map。Reader::Enqueue()函數將消息放到Blocker的成員published_msg_queue_隊列中。以後,能夠經過Blocker::Observe()函數將成員published_msg_queue_隊列的消息放到成員observed_msg_queue_隊列,而後經過Blocker::GetLatestObserved()函數獲得最新的消息。好比ControlComponent中的:

chassis_reader_->Observe();
const auto &chassis_msg = chassis_reader_->GetLatestObserved();

而對於剩下那一個協程,它是由主channel來觸發的。因它處理的是多channel的組合消息,在協程主體中的TryFetch()函數會調用AllLatest::Fusion()函數同時拿多個channel上的最新消息。至於這個組合消息是怎麼填入的前面有提。簡單來講,對於它來講,主channel來消息時,同時也會將其它channel的消息寫入組合消息。而後調度協程,拿出組合消息進行處理。其消息處理函數爲:

auto func = [self](const std::shared_ptr<M0>& msg0,         
                    const std::shared_ptr<M1>& msg1) {        
   auto ptr = self.lock();                                   
   if (ptr) {                                                 
     ptr->Process(msg0, msg1);                               
   } else {                                                   
     AERROR << "Component object has been destroyed.";       
   }                                                         
 };

該實現中主要以收到的消息爲參數調用Component中的處理函數Process(),從而執行組件的自定義處理邏輯。

小結

文中提了很多細枝末節,最後很是high-level地歸納下從寫者到讀者的流程。寫者Writer寫消息時,會經過HybridTransmitter繼而使用合適後端的Transmitter發送消息。根據讀與寫者間的位置關係,通過網絡、共享內存或直接調用的方式,對應後端的Dispatcher收到消息。收到後轉成指定消息類型,交給Receiver。而後經過DataDispatcher派發消息。派發消息就是將消息放到對應的buffer中,而後通知相應的協程來做進一步處理。上層模塊要取用這些消息,主要兩種方式:一種是經過ComponentProc()接口,它被調用時參數就是最新的消息。另外一種是經過ReaderObserve()函數直接拿。
在這裏插入圖片描述

咱們知道,Apollo在版本3.5前是基於ROS的,同時也對ROS作了幾個重要改進。這些改進很多是關於通訊系統的,如共享內存、去中心化和數據兼容性。到Cyber RT的演進也天然延續了這幾個優勢。總得來講,Cyber RT基於自動發現機制與Publish-Subscribe模式實現了通訊網絡的拓撲管理。同時它對數據傳輸層作了抽象,下面實現多個後端分別適用於不一樣場景,並提供了HYBRID模式能夠根據讀者和寫者間的關係自動使用合適的傳輸層後端。這樣,通訊系統的複雜性就被很好地屏蔽,框架就能提供給應用層便利的開發接口。

相關文章
相關標籤/搜索