[源碼解析] 機器學習參數服務器ps-lite(2) ----- 通訊模塊Van

[源碼解析] 機器學習參數服務器ps-lite(2) ----- 通訊模塊Van

0x00 摘要

本文是參數服務器系列第二篇,介紹ps-lite的通訊模塊 Van。html

本系列其餘文章是:node

[源碼解析] 機器學習參數服務器ps-lite 之(1) ----- PostOfficepython

0x01 功能概述

郵局裏有了地址簿,就須要有貨車來負責拉送物件,Van 就是整個Parameter Server的通訊模塊,其特色以下。c++

  • PostOffice 類在實例化的時候,會建立一個 Van 類的實例 做爲成員變量。該 Van 實例與所屬 PostOffice 實例生命週期相同(每一個節點只有一個該對象);
  • Van 負責具體的節點間通訊。具體來講就是負責創建起節點之間的互相鏈接(例如Worker與Scheduler之間的鏈接),而且開啓本地的receiving thread用來監聽收到的message。

VAN 目前有兩個實現:git

  • ZMQVan是基於zeromq的Van的實現,即用zmq庫實現了鏈接的底層細節(zmq庫是一個開源庫,對socket進行了優良的封裝,他使得Socket編程更加簡單、簡潔和性能更高)。
  • IBVerbsVan 是字節跳動的實現,具體沒有深刻研究。

0x02 定義

2.1 UML圖

首先給出 UML 圖。github

2.2 主要說明

下面咱們只給出Van對象關鍵變量和成員函數說明。編程

其主要變量以下:緩存

  • Node scheduler_ :Scheduler 節點參數,每個node都會記錄Scheduler 節點的信息服務器

  • Node my_node_ : 本節點參數。若是本節點是Scheduler,則 my_node_ 會指向上面的 scheduler_ ;微信

  • bool is_scheduler_ : 本節點是不是 scheduler;

  • std::unique_ptr< std::thread> receiver_thread_ :接收消息線程指針;

  • std::unique_ptr< std::thread> heartbeat_thread_ :發送心跳線程指針;

  • std::vector barrier_count_ :barrier 計數,用來記錄登記節點數目,只有全部節點都登記以後,系統纔到了 ready 狀態,scheduler 纔會給全部節點發送 ready 消息,系統才正式啓動。

  • Resender *resender_ = nullptr :從新發送消息指針;

  • std::atomic timestamp_{0} :message 自增 id,原子變量;

  • std::unordered_map<std::string, int> connected_nodes_ : 記錄了目前鏈接到哪些 nodes;

其主要函數功能以下:

  • start :創建通訊初始化;

  • Receiving :接收消息線程的處理函數;

  • Heartbeat :發送心跳線程的處理函數;

  • ProcessAddNodeCommandAtScheduler :scheduler 的 AddNode 消息處理函數;

    • ProcessHearbeat:心跳包處理函數;

    • ProcessDataMsg :數據消息(push & pull)處理函數;

    • ProcessAddNodeCommand :worker 和 server 的 AddNode 消息處理函數;

    • ProcessBarrierCommand :Barrier 消息處理函數;

2.3 線程管理

PS Lite 定義的三種角色採用多線程機制工做,每一個線程承擔特定的職責,在所屬的 Van 實例啓動時被建立。

具體描述以下:

  • Scheduler,Worker 和 Server 的 Van 實例裏均持有一個接受數據的線程。
  • Worker 和 Server 的 Van 實例裏還持有一個間歇地向 Scheduler 發送心跳的線程。
  • 若是定義了值不爲 0 環境變量 PS_RESEND,那麼 Scheduler、Worker 和 Server 還會啓動一個監控線程。

2.4 類定義

詳細代碼(摘要)以下:

class Van {
 public:
  static Van *Create(const std::string &type);
  virtual void Start(int customer_id);
  int Send(const Message &msg);
  virtual void Stop();
  inline int GetTimestamp() { return timestamp_++; }
  inline bool IsReady() { return ready_; }

 protected:
  //連結節點
  virtual void Connect(const Node &node) = 0;
  //綁定到本身節點之上  
  virtual int Bind(const Node &node, int max_retry) = 0;
  //接收消息,用阻塞方式  
  virtual int RecvMsg(Message *msg) = 0;
 //發送消息
  virtual int SendMsg(const Message &msg) = 0;

  /**
   * \brief pack meta into a string
   */
  void PackMeta(const Meta &meta, char **meta_buf, int *buf_size);
  /**
   * \brief pack meta into protobuf
   */
  void PackMetaPB(const Meta &meta, PBMeta *pb);
  /**
   * \brief unpack meta from a string
   */
  void UnpackMeta(const char *meta_buf, int buf_size, Meta *meta);

  Node scheduler_;
  Node my_node_;
  bool is_scheduler_;
  std::mutex start_mu_;

 private:
  /** thread function for receving */
  void Receiving();

  /** thread function for heartbeat */
  void Heartbeat();

  // node's address string (i.e. ip:port) -> node id
  // this map is updated when ip:port is received for the first time
  std::unordered_map<std::string, int> connected_nodes_;
  // maps the id of node which is added later to the id of node
  // which is with the same ip:port and added first
  std::unordered_map<int, int> shared_node_mapping_;

  /** whether it is ready for sending */
  std::atomic<bool> ready_{false};
  std::atomic<size_t> send_bytes_{0};
  size_t recv_bytes_ = 0;
  int num_servers_ = 0;
  int num_workers_ = 0;
  /** the thread for receiving messages */
  std::unique_ptr<std::thread> receiver_thread_;
  /** the thread for sending heartbeat */
  std::unique_ptr<std::thread> heartbeat_thread_;
  std::vector<int> barrier_count_;
  /** msg resender */
  Resender *resender_ = nullptr;
  int drop_rate_ = 0;
  std::atomic<int> timestamp_{0};
  int init_stage = 0;

 //如下是處理各類類型消息
  void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes,
                                        Meta *recovery_nodes);
  void ProcessTerminateCommand();
  void ProcessAddNodeCommand(Message *msg, Meta *nodes, Meta *recovery_nodes);
  void ProcessBarrierCommand(Message *msg);
  void ProcessHearbeat(Message *msg);
  void ProcessDataMsg(Message *msg);

  //更新本地NodeID
  void UpdateLocalID(Message *msg, std::unordered_set<int> *deadnodes_set,
                     Meta *nodes, Meta *recovery_nodes);

  const char *heartbeat_timeout_val =
      Environment::Get()->find("PS_HEARTBEAT_TIMEOUT");
  int heartbeat_timeout_ =
      heartbeat_timeout_val ? atoi(heartbeat_timeout_val) : 0;

  DISALLOW_COPY_AND_ASSIGN(Van);
};

0x03 初始化

Van對象的初始化函數做用就是依據本地節點類型的不一樣,作不一樣設置,從而啓動端口,創建到scheduler的連結,啓動接收消息線程,心跳線程等,這樣就能夠進行通訊了。具體以下:

  1. 首先從環境變量中獲得相關信息,好比scheduler 的 "ip,port"(這兩個是預先設置的),本節點的角色(Worker/Server/Scheduler)等等,而後 初始化scheduler_這個成員變量;
  2. 若是本節點是 scheduler,則把 scheduler_ 賦值給 my_node_;
  3. 若是本節點不是 scheduler,則:
    1. 從系統中獲取本節點的ip信息;
    2. 使用 GetAvailablePort 獲取一個port;
  4. 使用 Bind 綁定一個端口;
  5. 調用 Connect 創建到 Scheduler 的鏈接(scheduler也鏈接到本身的那個預先設置的固定端口);
  6. 啓動本地Node的接收消息線程receiver_thread_,執行Van::Receiving
  7. 若是本節點不是 scheduler,給 Scheduler 發送一個 ADD_NODE 消息,這樣能夠將本地Node的信息告知Scheduler,即註冊到 scheduler;
  8. 而後進入等待狀態,等待Scheduler通知 Ready(scheduler 會等待全部節點都完成註冊後,統一發送 ready); 注意,這裏 scheduler 節點也會等,可是不影響 scheduler 節點 的 recevie 線程接受處理消息;
  9. Ready後啓動心跳線程,創建到Scheduler的Heartbeat 鏈接;

關於7,8兩點的進一步說明就是:

  • 當worker和server節點綁定ip和port後,便向scheduler節點發送ADD_NODE message。
  • 當 scheduler收到全部worker和server的ADD_NODE message後,則依次應答ADD_NODE message,
  • 各個節點在此過程當中經過原子變量ready_等待上述過程完成。

具體代碼以下:

void Van::Start(int customer_id) {
  // get scheduler info
  start_mu_.lock();

  if (init_stage == 0) {
    // 初始化scheduler_這個成員變量
    scheduler_.hostname = std::string(
        CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
    scheduler_.port =
        atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
    scheduler_.role = Node::SCHEDULER;
    scheduler_.id = kScheduler;
    // 確認本節點是scheduler節點
    is_scheduler_ = Postoffice::Get()->is_scheduler();

    // get my node info
    if (is_scheduler_) {
      // 初始化本節點,由於是scheduler,因此直接賦值
      my_node_ = scheduler_;
    } else {
      auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
      const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
      std::string ip;
      if (nhost) ip = std::string(nhost);
      if (ip.empty()) {
        const char* itf = Environment::Get()->find("DMLC_INTERFACE");
        std::string interface;
        if (itf) interface = std::string(itf);
        if (interface.size()) {
          GetIP(interface, &ip);
        } else {
          GetAvailableInterfaceAndIP(&interface, &ip);
        }
      }
      int port = GetAvailablePort();
      const char* pstr = Environment::Get()->find("PORT");
      if (pstr) port = atoi(pstr);
      my_node_.hostname = ip;
      my_node_.role = role;
      my_node_.port = port;
      // cannot determine my id now, the scheduler will assign it later
      // set it explicitly to make re-register within a same process possible
      my_node_.id = Node::kEmpty;
      my_node_.customer_id = customer_id;
    }

    // bind.
    //綁定接口,把本節點綁定到ip:port這個socket上,理論來講這個函數就是初始化了receiver_
    my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);

    // connect to the scheduler
    // 鏈接上scheduler_,因爲本節點就是scheduler_,其實就是初始化senders_,因爲發送的節點不少,因此這裏是一個map<int,void*>
	  // 在這裏就是senders_[1] = socket_1, socket_1中的body設置一點字符「ps1***」, 注意連接不是sendMsg
    Connect(scheduler_);

    // for debug use
    if (Environment::Get()->find("PS_DROP_MSG")) {
      drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG"));
    }
    // start receiver
    // 開啓一個接收消息的線程,這裏就是處理消息
    receiver_thread_ =
        std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
    init_stage++;
  }
  start_mu_.unlock();
  
  if (!is_scheduler_) {
    // let the scheduler know myself
    // worker和server節點會經過 ADD_NODE 消息把本地節點的信息告訴scheduler,好比角色,ip,port...
    Message msg;
    Node customer_specific_node = my_node_;
    customer_specific_node.customer_id = customer_id;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::ADD_NODE;
    msg.meta.control.node.push_back(customer_specific_node);
    msg.meta.timestamp = timestamp_++;
    Send(msg);
  }

  // wait until ready
  // 等待 ready_ 從false變成true,當是scheduler的時候,必需要有等worker和server節點過來,否則一直都是阻塞在這,若是是 worker/server,則是等待 scheduler 發送系統allready消息。
  while (!ready_.load()) {
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
  }

  start_mu_.lock();
  if (init_stage == 1) {
    // resender
    if (Environment::Get()->find("PS_RESEND") &&
        atoi(Environment::Get()->find("PS_RESEND")) != 0) {
      int timeout = 1000;
      if (Environment::Get()->find("PS_RESEND_TIMEOUT")) {
        timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT"));
      }
      // 若是設置了超時重傳,就初始化resender_這個變量
      resender_ = new Resender(timeout, 10, this);
    }

    if (!is_scheduler_) {
      // start heartbeat thread
      // 初始化心跳線程
      heartbeat_thread_ =
          std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
    }
    init_stage++;
  }
  start_mu_.unlock();
}

0x04 接受消息

咱們首先介紹後臺線程是如何運行,而後會具體分析如何處理各類消息。

4.1 後臺處理消息線程

ps-lite 啓動了一個後臺線程 receiver_thread_ 進行接受/處理消息。

// start receiver
receiver_thread_ =
        std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));

4.2 處理函數

receiver_thread_ 使用 Receiving 函數進行消息處理。

4.2.1 控制信息

除了傳遞參數的數據消息外,各個節點之間控制信息有:

  • ADD_NODE:worker和server向shceduler進行節點註冊;
  • BARRIER:節點間的同步阻塞消息;
  • HEARTBEAT:節點間的心跳信號;
  • TERMINATE:節點退出信號;
  • ACK:確認消息,ACK 類型只有啓用了 Resender 類纔會出現
  • EMPTY:push or pull;

所以在 Receiving 之中會調用 不一樣處理函數處理不一樣類型的消息:

  • ProcessTerminateCommand :處理 TERMINATE;
  • ProcessAddNodeCommand :處理 ADD_NODE;
  • ProcessBarrierCommand :處理 BARRIER(在上文已經分析);
  • ProcessHearbeat :處理 HEARTBEAT;

4.2.2 線程內全局變量

線程內有兩個變量,由於其是在 while (true) 循環以外,因此屬於線程內的全局變量,這點在閱讀代碼時候須要注意。

  • nodes :只有 scheduler 在處理 ADD_NODE 時候會用到,存儲目前 scheduler 內部擁有的全部 nodes;
  • recovery_nodes :只有 scheduler 在處理 ADD_NODE 時候會用到,存儲目前 scheduler 內部擁有的全部 recovery nodes(康復重啓的節點);

4.2.3 具體實現

Receiving 邏輯以下:

  • 調用 RecvMsg(派生類會實現)獲取最新消息;
  • 若是設定了採樣,則進行 drop;
  • 若設置了重傳機制,則會檢測此消息是否重複,利用 resender_->AddIncomming(msg) 來處理重複消息;
  • 處理控制消息或者數據消息;

具體代碼以下

void Van::Receiving() {
  Meta nodes;
  // 如下兩個能夠認爲是全局變量
  Meta recovery_nodes;  // store recovery nodes 儲存康復重啓的節點
  recovery_nodes.control.cmd = Control::ADD_NODE; // 康復重啓節點的control.cmd 都設置爲 ADD_NODE

  while (true) {
    Message msg;
    int recv_bytes = RecvMsg(&msg); //利用receiver_ 變量拿到消息
    // For debug, drop received message
    if (ready_.load() && drop_rate_ > 0) {
      unsigned seed = time(NULL) + my_node_.id;
      if (rand_r(&seed) % 100 < drop_rate_) {
        LOG(WARNING) << "Drop message " << msg.DebugString();
        continue;
      }
    }

    CHECK_NE(recv_bytes, -1);
    recv_bytes_ += recv_bytes; //收到的字節數累加
    if (Postoffice::Get()->verbose() >= 2) {
      PS_VLOG(2) << msg.DebugString();
    }
    // duplicated message
    if (resender_ && resender_->AddIncomming(msg)) continue; //重傳確認機制

    if (!msg.meta.control.empty()) { //若是是控制類型的消息
      // control msg
      auto& ctrl = msg.meta.control;
      if (ctrl.cmd == Control::TERMINATE) {
        ProcessTerminateCommand();
        break;
      } else if (ctrl.cmd == Control::ADD_NODE) {
        ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); //當執行到這個位置的時候繼續跳轉
      } else if (ctrl.cmd == Control::BARRIER) {
        ProcessBarrierCommand(&msg);
      } else if (ctrl.cmd == Control::HEARTBEAT) {
        ProcessHearbeat(&msg); // 發回Heartbeat的ACK
      } else {
        LOG(WARNING) << "Drop unknown typed message " << msg.DebugString();
      }
    } else { //非控制類型的消息處理方式
      ProcessDataMsg(&msg);
    }
  }
}

4.3 處理 ADD_NODE 消息

ADD_NODE 是 worker / server 用來向 scheduler 註冊自身的控制消息。

4.3.1 註冊邏輯

先回憶下注冊基本思路。

  • 當worker和server節點綁定ip和port後,便向scheduler節點發送ADD_NODE message。
  • 當 scheduler收到全部worker和server的ADD_NODE message後則依次應答ADD_NODE message,注意,應答的也是 同類型ADD_NODE 消息。
  • 各個節點(scheduler, worker, server)在此過程當中經過原子變量ready_等待上述過程完成。

4.3.2 ProcessAddNodeCommand

ProcessAddNodeCommand 邏輯以下。

  • 查出心跳包超時的id,轉存到dead_set之中。
  • 拿到收到消息裏面的control信息。
  • 調用 UpdateLocalID,在其中:
    • 若是是新node,Scheduler記錄這個新的node。
    • 若是這個node是重啓產生的,則將舊node的信息更新。
  • 若是是 scheduler,則:
    • 調用 ProcessAddNodeCommandAtScheduler 收到全部worker和server的ADD_NODE 的消息後進行節點id分配並應答,即 設定最新的全部node的rank併發送給全部Worker和Server。
  • 若是不是 scheduler,說明 work & server 收到了 scheduler 回答的 ADD_NODE 消息,則:
    • 若是自身是現有節點,則在 connected_nodes_ 之中不會找到這個新節點,則先有節點會調用 Connect 與新節點創建鏈接。
    • 若是自身是新節點,則會鏈接全部現有節點(非同類型)。
    • 在 connected_nodes_ 之中更新 全局節點信息,包括 global rank(本地Node的全局rank等信息是由receiver_thread_在這裏獲取);
    • 最後設置 ready_ = true,即本節點也能夠運行了,由於主線程會阻塞在其上。

具體代碼以下:

void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes,
                                Meta* recovery_nodes) {
  auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//查出心跳包超時的id
  std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//轉存到dead_set之中
  auto& ctrl = msg->meta.control; //拿到收到消息裏面的control信息

  UpdateLocalID(msg, &dead_set, nodes, recovery_nodes);

  if (is_scheduler_) { // Scheduler 節點
    ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
  } else { // Worker & Server 節點
    for (const auto& node : ctrl.node) {
      std::string addr_str = node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
        // 現有節點會在本身鏈接之中查找這個新節點,發現現有鏈接中沒有這個新節點
        // 若是是新節點,則會鏈接現有節點(非同類型)
        Connect(node); // 與新節點進行鏈接        
        connected_nodes_[addr_str] = node.id; // 加入已經鏈接的節點
      }
      if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
      if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
    }
    ready_ = true;
  }
}

4.3.3 UpdateLocalID

此函數做用是更新節點內部的node id 信息,也是分爲兩種狀況,函數邏輯以下:

  • 若是msg->meta.sender是Meta::kEmpty,即未設定,則處理此message的必定是Scheduler,會進入 if 分支。
    • 若是目前 nodes 的control.node數目小於 "配置的server數目 + 配置的worker數目",則說明是系統啓動階段,將當前消息的node信息加入到 control.node 之中。
    • 不然說明是系統運行階段,應該是有些節點死掉重啓後再次鏈接。那麼,就從 nodes 的control.node 之中找到一個已經死掉的且節點role 與當前消息一致(同類型)的 node id,把這個 node id 賦給這個重啓的節點。而且更新 nodes->control.node 和 recovery_nodes。
  • 下面就是普通節點處理的邏輯:
    • 即在 scheduler 傳回來的全部節點信息中查找,目的是找到與本身的ip,port一致的節點。
    • 若是找到,就更新本地節點信息(由於在本節點啓動時候,並無設置 node_id 這個信息,這個須要scheduler統一設置,從註釋看,目的是爲了使從新註冊成爲可能)。包括全局 rank 信息。

具體代碼以下:

void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set,
                        Meta* nodes, Meta* recovery_nodes) {
  auto& ctrl = msg->meta.control;
  size_t num_nodes =
      Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
  // assign an id
  if (msg->meta.sender == Meta::kEmpty) { //若是sender未設定,則處理此message的必定是Scheduler
    CHECK(is_scheduler_);
    CHECK_EQ(ctrl.node.size(), 1); //msg中的control命令中的節點集合就是worker本身,因此就是1個節點
    if (nodes->control.node.size() < num_nodes) { //沒有到齊
      nodes->control.node.push_back(ctrl.node[0]);
    } else { //若是全部work和server到齊了,就進入else
      // some node dies and restarts
      CHECK(ready_.load());
      for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) {
        const auto& node = nodes->control.node[i];
        if (deadnodes_set->find(node.id) != deadnodes_set->end() &&
            node.role == ctrl.node[0].role) {
          auto& recovery_node = ctrl.node[0];
          // assign previous node id
          recovery_node.id = node.id;
          recovery_node.is_recovery = true;
          nodes->control.node[i] = recovery_node;
          recovery_nodes->control.node.push_back(recovery_node);
          break;
        }
      }
    }
  }

  // update my id / 對普通的node,更新其rank,scheduler 節點不會起做用(由於找不到)。
  // schedule發給此work節點的消息,若是發現本地的ip和port和消息中的某個一點重合,那麼就把本地節點的ID(初始化時候沒有ID,只是等於Empty)改成schedule發過來的 node id。
  for (size_t i = 0; i < ctrl.node.size(); ++i) {
    const auto& node = ctrl.node[i];
    if (my_node_.hostname == node.hostname && my_node_.port == node.port) {
      if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) {
        my_node_ = node;
        std::string rank = std::to_string(Postoffice::IDtoRank(node.id));
#ifdef _MSC_VER
        _putenv_s("DMLC_RANK", rank.c_str());
#else
        setenv("DMLC_RANK", rank.c_str(), true);
#endif
      }
    }
  }
}

4.3.4 ProcessAddNodeCommandAtScheduler

ProcessAddNodeCommandAtScheduler 是在 Scheduler 以內運行,是對控制類型消息的處理。

對於Scheduler節點來講,scheduler收到全部worker和server的ADD_NODE的消息後進行節點id分配並應答,即,須要設定 最新的全部node的 全局rank 併發送給全部Worker和Server。

  • 當接受到全部 worker & server 的註冊消息以後(nodes->control.node.size() == num_nodes):
    • 將節點按照 ip + port 組合排序。
    • Scheduler 與全部註冊的節點創建鏈接、更新心跳時間戳,給 scheduler全部鏈接的節點分配全局 rank。
    • 向全部的worker和server發送ADD_NODE消息(攜帶scheduler之中的全部node信息)。
    • 會把 ready_ = true; 即 scheduler 是一個 ready 狀態了,無論 worker 和 server 是否確認收到ADD_NODE消息。
    • 而在接收端(worker & server)的,每個本地Node的全局rank等信息是由接收端 receiver_thread_(其餘函數)獲取,就是獲得了 scheduler 返回的這些 nodes 信息。
  • 若是 !recovery_nodes->control.node.empty(),這就代表是處理某些重啓節點的註冊行爲:
    • 查出心跳包超時的id,轉存到dead_set之中。
    • 與重啓節點創建鏈接(由於接收到了一個ADD_NODE),因此只與這個新重啓節點創建鏈接便可(在代碼中有 CHECK_EQ(recovery_nodes->control.node.size(), 1) 來確認重啓節點爲 1 個)。
    • 更新重啓節點的心跳。
    • 由於新加入了重啓節點,因此用一個發送達到兩個目的:
      • 向全部 recovery 的worker和server發送ADD_NODE消息(攜帶scheduler之中的目前全部node信息)。
      • 向 alive 節點發送 recovery 節點信息。
      • 這樣,收到消息的節點會則分別與新節點相互創建鏈接;

具體代碼以下:

void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes,
                                           Meta* recovery_nodes) {
  recovery_nodes->control.cmd = Control::ADD_NODE;
  time_t t = time(NULL);
  size_t num_nodes =
      Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
  // scheduler收到全部worker和server的ADD_NODE的消息後進行節點id分配並應答
  if (nodes->control.node.size() == num_nodes) { // 節點收集徹底
    // sort the nodes according their ip and port, 根據IP和port給worker,server排個序
    std::sort(nodes->control.node.begin(), nodes->control.node.end(),
              [](const Node& a, const Node& b) {
                return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0;
              });
    // assign node rank
    for (auto& node : nodes->control.node) {
      // 創建鏈接、更新心跳時間戳,給 scheduler全部鏈接的節點分配全局 rank。
      std::string node_host_ip =
          node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { //若是ip:port不存在van_中的話
        CHECK_EQ(node.id, Node::kEmpty); //判斷是否是初始化節點
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_); //若是是sever的話,就id產生一個id號,num_servers_初始化爲0
        node.id = id; //將這個新節點的id賦值爲id
        Connect(node); //鏈接這個新節點, 即創建一個socket, 而後senders_[id] = sender; 就是將目標id的socket存放起來後面使用
        Postoffice::Get()->UpdateHeartbeat(node.id, t);//更新心跳包
        connected_nodes_[node_host_ip] = id; //既然 worker, server 已經發message來了,scheduler要把這個節點做爲已經連接的節點
      } else {
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_);
        shared_node_mapping_[id] = connected_nodes_[node_host_ip];
        node.id = connected_nodes_[node_host_ip];
      }
      if (node.role == Node::SERVER) num_servers_++;//更新rank
      if (node.role == Node::WORKER) num_workers_++;
    }
    nodes->control.node.push_back(my_node_); //把本節點放到裏面
    nodes->control.cmd = Control::ADD_NODE;
    Message back;
    back.meta = *nodes;
    // 向全部的worker和server發送ADD_NODE消息
    for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
      int recver_id = r;
      if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
        back.meta.recver = recver_id;
        back.meta.timestamp = timestamp_++;
        Send(back);
      }
    }

    ready_ = true; //scheduler已經準備好了
  } else if (!recovery_nodes->control.node.empty()) { // 節點沒有收集徹底
    auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//查出心跳包超時的id
    std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//轉存到dead_set
    // send back the recovery node
    CHECK_EQ(recovery_nodes->control.node.size(), 1);
    Connect(recovery_nodes->control.node[0]);
    Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t);
    Message back;
    for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
      if (r != recovery_nodes->control.node[0].id &&
          dead_set.find(r) != dead_set.end()) {
        // do not try to send anything to dead node
        continue;
      }
      // only send recovery_node to nodes already exist
      // but send all nodes to the recovery_node
      back.meta =
          (r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes;
      back.meta.recver = r;
      back.meta.timestamp = timestamp_++;
      Send(back);
    }
  }
}

此部分流程邏輯以下:

+
    Scheduler                                                 |      Worker
                                                              |
        +                                                     |        +
        |                                                     |        |
        |                                                     |        |
        v                                                     |        |
Postoffice::Start +---->  Van::Start                          |        |
                             +                                |        |
                             |                                |        |
                             |                                |        |
                             v                                |        |
                          Connect--do nothing                 |        |
                             +                                |        v
                             |                                |
                             |                                |  Postoffice::Start +----->  Van::Start
                             |                                |                                +
                             v                                |                                |
                         receiver_thread_ +---+               |                                |
                             +                |               |                                v
                             |                |               |                             Connect--to scheduler
                             |                |               |                                +
                             |                |               |                                |
                             |                |               |                                |
                             |                |               |                                |
                             |                |               |                                v
                             |                |               |                          receiver_thread_  +----->+
                             |                |               |                                +                  |
                             |                |               |                                |                  |
                             |                |               |                                |                  |
                             |                |               |                                v                  |
                             |                |   <---------------------------------------+   Send                |
                             |                |               |   ADD_NODE                     +                  |
                             |                v               |                                |                  |
                             |                                |                                |                  |
                             |       ProcessAddNodeCommand    |                                |                  |
                             |                +               |                                |                  |
                             |                |               |                                |                  |
                             |                | All nodes OK  |                                |                  |
                             |                |               |                                |                  |
                             v                |               |                                |                  |
                                              | set rank      |                                |                  |
                      wait until ready        |               |                                |                  |
                             +                |               |                                |                  |
                             |                +---------------------------------------------------------------->  |
                             |                |               |  ADD_NODE response(nodes info) |                  |
                             |                |               |                                |         ProcessAddNodeCommand
                             |                |               |                                v                  |
                             |                |               |                                                   |
                             | <--------------+               |                         wait until ready          |
                             |    ready_ = true               |                                +                  |
                             |                                |                                |  <---------------+
       +-------------------+ v                                |                                |
       |                                                      |         +--------------------+ v
       |                                                      |         |
       v                                                      |         |
                                                              |         v
  Postoffice::Barrier                                         |
                                                              |   Postoffice::Barrier
                                                              +

手機以下,左側是 Scheduler,右側是 worker:

4.3.5 一個新加節點的序列

其互聯過程能夠分爲3步:

第一步:worker/server節點初始化的時候,向schedular節點發送一個鏈接信息,假定自身是節點 2;

if (!is_scheduler_) {
    // let the scheduler know myself
    Message msg;
    Node customer_specific_node = my_node_;
    customer_specific_node.customer_id = customer_id;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::ADD_NODE;
    msg.meta.control.node.push_back(customer_specific_node);
    msg.meta.timestamp = timestamp_++;
    Send(msg); //發送給schedular, 創建連接信息。
  }

第二步:Scheduler 節點收到信息後,在 ProcessAddNodeCommandAtScheduler 之中,首先會和 節點 2 創建一個鏈接。會向全部已經和schedular創建鏈接的worker節點/server節點 廣播此 "節點的加入信息「,並把 節點 2 請求鏈接的信息放入meta信息中。

// assign node rank
    for (auto& node : nodes->control.node) {
      std::string node_host_ip =
          node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) {
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_);
        node.id = id;
        Connect(node); // 鏈接這個新節點, 即創建一個socket, 而後senders_[id] = sender; 就是將目標id的socket存放起來後面使用
        Postoffice::Get()->UpdateHeartbeat(node.id, t);
        connected_nodes_[node_host_ip] = id;
      } else {
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_);
        shared_node_mapping_[id] = connected_nodes_[node_host_ip];
        node.id = connected_nodes_[node_host_ip];
      }
      if (node.role == Node::SERVER) num_servers_++;
      if (node.role == Node::WORKER) num_workers_++;
    }
    nodes->control.node.push_back(my_node_);
    nodes->control.cmd = Control::ADD_NODE;

    Message back;
    back.meta = *nodes;
    // 向全部已經和schedular創建鏈接的worker節點/server節點 廣播此 "節點的加入信息「,並把 節點 2 請求鏈接的信息放入meta信息中。
    for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
      int recver_id = r;
      if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
        back.meta.recver = recver_id;
        back.meta.timestamp = timestamp_++;
        Send(back);
      }
    }

第三步:現有worker/server節點收到這個命令後,在 ProcessAddNodeCommand 之中 會和 節點 2 造成鏈接。

for (const auto& node : ctrl.node) {
      std::string addr_str = node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { // 現有鏈接中沒有這個新節點
        Connect(node); // 與新節點進行鏈接
        connected_nodes_[addr_str] = node.id;
      }
      if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
      if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;

至此,整個過程就描述完了。每一個新節點加入後,已經加入的節點都會經過schedular節點和這個新節點創建鏈接。

4.4 處理 HEARTBEAT 消息

咱們接下來分析心跳機制。

4.4.1 心跳機制

爲了記錄網絡的可達性,PS Lite 設計了心跳機制。具體而言:

  • 每個節點的 PostOffice 單例中維護了一個 MAP 結構,存儲了心跳關聯的節點的活躍信息。鍵爲節點編號,值爲上次收到其 HEARTBEAT 消息的時間戳。
  • Worker/Server 只記錄 Scheduler 的心跳,Scheduler 則記錄全部節點的心跳。基於時間戳和心跳超時,能夠輸出全部的死亡節點。
  • 每個 Worker/Server 節點,會新創建一個心跳線程,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 發送一條 HEARTBEAT 消息;
  • Scheduler 節點收到後,響應一個 HEARTBEAT 消息。
  • scheduler進行應答,經過當前時間與心跳包接收時間之差判斷是否alive。
  • Scheduler 會依據心跳節點的時間戳來判斷死亡節點。若是新增的節點id在dead_node容器裏,表示這個節點是從新恢復的;而新增節點經過schedular的中轉與現有節點造成互聯。

具體以下:

4.4.2 數據結構

std::unordered_map<int, time_t> heartbeats_ 就是存儲了心跳關聯的節點的活躍信息。鍵爲節點編號,值爲上次收到其 HEARTBEAT 消息的時間戳。

UpdateHeartbeat 會按期更新心跳。

void UpdateHeartbeat(int node_id, time_t t) {
    std::lock_guard<std::mutex> lk(heartbeat_mu_);
    heartbeats_[node_id] = t;
  }
  
  std::unordered_map<int, time_t> heartbeats_;

4.4.3 Worker / Server 發送心跳

在這兩種節點中,啓動了一個線程,每個 Worker/Server 節點,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 發送一條 HEARTBEAT 消息:

if (!is_scheduler_) {
      // start heartbeat thread
      heartbeat_thread_ =
          std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
    }

具體心跳函數是:

void Van::Heartbeat() {
  const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL");
  const int interval = val ? atoi(val) : kDefaultHeartbeatInterval;
  while (interval > 0 && ready_.load()) {
    std::this_thread::sleep_for(std::chrono::seconds(interval));
    Message msg;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::HEARTBEAT;
    msg.meta.control.node.push_back(my_node_);
    msg.meta.timestamp = timestamp_++;
    Send(msg);
  }
}

4.4.4 Scheduler 節點處理心跳

Scheduler 節點收到後 HEARTBEAT 消息後,響應一個 HEARTBEAT 消息。UpdateHeartbeat 會按期更新心跳。

void Van::ProcessHearbeat(Message* msg) {
  auto& ctrl = msg->meta.control;
  time_t t = time(NULL);
  for (auto& node : ctrl.node) {
    Postoffice::Get()->UpdateHeartbeat(node.id, t);
    if (is_scheduler_) {
      Message heartbeat_ack;
      heartbeat_ack.meta.recver = node.id;
      heartbeat_ack.meta.control.cmd = Control::HEARTBEAT;
      heartbeat_ack.meta.control.node.push_back(my_node_);
      heartbeat_ack.meta.timestamp = timestamp_++;
      // send back heartbeat
      Send(heartbeat_ack);
    }
  }
}

4.4.5 死亡節點

Scheduler 在處理 ADD_NODE 消息時候,會看看是否已經有死亡節點,具體判經過當前時間戳與心跳包接收時間戳之差判斷是否alive。

std::vector<int> Postoffice::GetDeadNodes(int t) {
  std::vector<int> dead_nodes;
  if (!van_->IsReady() || t == 0) return dead_nodes;

  time_t curr_time = time(NULL);
  const auto& nodes = is_scheduler_
    ? GetNodeIDs(kWorkerGroup + kServerGroup)
    : GetNodeIDs(kScheduler);
  {
    std::lock_guard<std::mutex> lk(heartbeat_mu_);
    for (int r : nodes) {
      auto it = heartbeats_.find(r);
      if ((it == heartbeats_.end() || it->second + t < curr_time)
            && start_time_ + t < curr_time) {
        dead_nodes.push_back(r);
      }
    }
  }
  return dead_nodes;
}

邏輯以下:

+----------------------------------------------------+
| Scheduler                                          |
|                                                    |
|                                                    |
|                                                    |
| heartbeats_                                        |
|                                                    |
| receiver_thread_+-------->  ProcessHearbeat        |
|                            ^      +    ^  +        |
|                            |      |    |  |        |
|                            |      |    |  |        |
|                            |      |    |  |        |
+----------------------------------------------------+
                             |      |    |  |
                             |      |    |  |                        RESPONSE
                             |      |    |  +-------------------------------------+
                             |      |    |                                        |
                             |      |    +-------------------------------+        |
                             |      |                                    |        |
                  HEARTBEAT  |      | RESPONSE                HEARTBEAT  |        |
                             |      |                                    |        |
+-----------------------------------------+  +-----------------------------------------+
| Worker                     |      |     |  | Server                    |        |    |
|                            |      |     |  |                           |        |    |
|                            |      |     |  |                           |        |    |
|                            |      |     |  |                           |        |    |
| heartbeats_                |      |     |  |  heartbeats_              |        |    |
|                            +      |     |  |                           +        |    |
| heartbeat_thread_+----> Heartbeat |     |  |  heartbeat_thread_+-->  Heartbeat  |    |
|                                   |     |  |                                    |    |
|                                   v     |  |                                    v    |
| receiver_thread_ +--->  ProcessHearbeat |  | receiver_thread_  +-->  ProcessHearbeat |
|                                         |  |                                         |
|                                         |  |                                         |
|                                         |  |                                         |
+-----------------------------------------+  +-----------------------------------------+

4.5 處理 TERMINATE 消息

ProcessTerminateCommand 會處理結束消息,具體就是設定 ready_ 爲 false。

這樣就預示着 Van 狀態不對,不能夠繼續處理。

void Van::ProcessTerminateCommand() {
  PS_VLOG(1) << my_node().ShortDebugString() << " is stopped";
  ready_ = false;
}

inline bool IsReady() { return ready_; }

4.6 處理 ACK 消息

4.6.1 Ack機制

在分佈式系統中,通訊也是不可靠的,丟包、延時都是必須考慮的場景。PS Lite 設計了 Resender類來提升通訊的可靠性,它引入了 ACK 機制。即:

  • 每個節點,對於收到的非 ACK/TERMINATE 消息,必須響應一個 ACK 消息。
  • 每個節點,對於發送的每個非 ACK/TERMINATE 消息,必須在本地緩存下來。存儲的數據結構是一個 MAP,根據消息的內容生產惟一的鍵。
  • 每個節點,對於收到的 ACK 消息,必須根據反饋的鍵從本地緩存中移除對應的消息。
  • 每個節點運行一個監控線程,每隔 PS_RESEND_TIMEOUT 毫秒檢查一下本地緩存。根據每一個消息的發送時間戳和當前時間,找出超時的消息進行重發,並累加其重試次數。

4.6.2 Resender類

定義以下,其中 send_buff_ 就是發送緩存,用來存儲發送了的消息列表。acked_ 就是已經確認的消息。

class Resender {
  std::thread* monitor_;
  std::unordered_set<uint64_t> acked_;
  std::atomic<bool> exit_{false};
  std::mutex mu_;
  int timeout_;
  int max_num_retry_;
  Van* van_;
  using Time = std::chrono::milliseconds;
  // the buffer entry
  struct Entry {
    Message msg;
    Time send;
    int num_retry = 0;
  };
  std::unordered_map<uint64_t, Entry> send_buff_;  
};

4.6.3 監控線程

監控線程以及函數以下以下,就是被喚醒時候,從send_buff_(本地緩存)找到每一個消息的發送時間戳和當前時間,找出超時的消息進行重發,並累加其重試次數。 :

monitor_ = new std::thread(&Resender::Monitoring, this);

  void Monitoring() {
    while (!exit_) {
      std::this_thread::sleep_for(Time(timeout_));
      std::vector<Message> resend;
      Time now = Now();
      mu_.lock();
      for (auto& it : send_buff_) {
        if (it.second.send + Time(timeout_) * (1+it.second.num_retry) < now) {
          resend.push_back(it.second.msg);
          ++it.second.num_retry;
          CHECK_LT(it.second.num_retry, max_num_retry_);
        }
      }
      mu_.unlock();

      for (const auto& msg : resend) van_->Send(msg);
    }
  }

4.6.4 發送時緩存

當 Van 發送消息時候,若是配置了重傳,就調用AddOutgoing函數把消息加入到發送緩存。

int Van::Send(const Message& msg) {
  int send_bytes = SendMsg(msg);
  CHECK_NE(send_bytes, -1);
  send_bytes_ += send_bytes;
  if (resender_) resender_->AddOutgoing(msg);
  if (Postoffice::Get()->verbose() >= 2) {
    PS_VLOG(2) << msg.DebugString();
  }
  return send_bytes;
}

下面函數就是加入到發送緩存。

/**
   * \brief add an outgoining message
   *
   */
  void AddOutgoing(const Message& msg) {
    if (msg.meta.control.cmd == Control::ACK) return;
    CHECK_NE(msg.meta.timestamp, Meta::kEmpty) << msg.DebugString();
    auto key = GetKey(msg);
    std::lock_guard<std::mutex> lk(mu_);
    // already buffered, which often due to call Send by the monitor thread
    if (send_buff_.find(key) != send_buff_.end()) return;

    auto& ent = send_buff_[key];
    ent.msg = msg;
    ent.send = Now();
    ent.num_retry = 0;
  }

4.6.5 清除緩存

下面函數有兩個做用:

  • 檢查是不是重複消息,則已經收到的確認消息;
  • 若是是確認消息,則從發送緩存中清除。
/**
   * \brief add an incomming message
   * \brief return true if msg has been added before or a ACK message
   */
  bool AddIncomming(const Message& msg) {
    // a message can be received by multiple times
    if (msg.meta.control.cmd == Control::TERMINATE) {
      return false;
    } else if (msg.meta.control.cmd == Control::ACK) {
      mu_.lock();
      auto key = msg.meta.control.msg_sig;
      auto it = send_buff_.find(key);
      if (it != send_buff_.end()) send_buff_.erase(it);
      mu_.unlock();
      return true;
    } else {
      mu_.lock();
      auto key = GetKey(msg);
      auto it = acked_.find(key);
      bool duplicated = it != acked_.end();
      if (!duplicated) acked_.insert(key);
      mu_.unlock();
      // send back ack message (even if it is duplicated)
      Message ack;
      ack.meta.recver = msg.meta.sender;
      ack.meta.sender = msg.meta.recver;
      ack.meta.control.cmd = Control::ACK;
      ack.meta.control.msg_sig = key;
      van_->Send(ack);
      // warning
      if (duplicated) LOG(WARNING) << "Duplicated message: " << msg.DebugString();
      return duplicated;
    }
  }

4.7 處理數據消息

ProcessDataMsg 用來處理 worker 發過來的數據消息(就是worker向server更新梯度),具體是取得對應的Customer後,調用 Customer 的方法進行處理,直接將msg放入處理隊列中。

咱們會放在 Customer 之中進行介紹。

void Van::ProcessDataMsg(Message* msg) {
  // data msg
  int app_id = msg->meta.app_id;
  int customer_id =
      Postoffice::Get()->is_worker() ? msg->meta.customer_id : app_id;
  auto* obj = Postoffice::Get()->GetCustomer(app_id, customer_id, 5);
  obj->Accept(*msg); // 這裏給 Customer 添加消息
}

0x05 ZMQVan

ZMQVan是基於zeromq的Van的實現,即爲用zmq庫實現了鏈接的底層細節(zmq庫是一個開源庫,對socket進行了優良的封裝,他使得Socket編程更加簡單、簡潔和性能更高)。

5.1 定義

ZMQVan定義以下:

ZMQVan 繼承於Van ,在這個類的基礎上加了兩個成員變量,分別是:

  • unordered_map<int, void*> senders_ :senders_是一個集合,就是本節點發送 socket 的集合,即node id 與 socket 的映射。好比 8號節點要給9號節點發消息,那麼只要找到(9,socket_9)這個組合就好了,而後調用 socket_9.send(message),
  • void *receiver_ = nullptr :是 Bind 函數獲得的 socket 鏈接,由於是接受端,因此只有一個 socket 就行。

具體以下:

class ZMQVan : public Van {
  void *context_ = nullptr;
  /**
   * \brief node_id to the socket for sending data to this node
   */
  std::unordered_map<int, void*> senders_;
  std::mutex mu_;
  void *receiver_ = nullptr;
};

5.2 Van 函數

Van類 有以下函數會調用到 ZMQVan 或者被 ZMQVan 調用。

5.2.1 發送消息

Send 函數就是調用 ZMQVan 的 SendMsg 函數進行發送消息,發送以後若是設定了ACK機制,則會調用 resender_->AddOutgoing。

int Van::Send(const Message& msg) {
  int send_bytes = SendMsg(msg);
  CHECK_NE(send_bytes, -1);
  send_bytes_ += send_bytes;
  if (resender_) resender_->AddOutgoing(msg);
  if (Postoffice::Get()->verbose() >= 2) {
    PS_VLOG(2) << msg.DebugString();
  }
  return send_bytes;
}

5.2.2 Meta 類

Meta封裝了元數據,發送者,接受者,時間戳,請求仍是響應等。

/**
 * \brief meta info of a message
 */
struct Meta {
  /** \brief the empty value */
  static const int kEmpty;
  /** \brief an int head */
  int head;
  /** \brief the unique id of the application of messsage is for*/
  int app_id;
  /** \brief customer id*/
  int customer_id;
  /** \brief the timestamp of this message */
  int timestamp;
  /** \brief the node id of the sender of this message */
  int sender;
  /** \brief the node id of the receiver of this message */
  int recver;
  /** \brief whether or not this is a request message*/
  bool request;
  /** \brief whether or not a push message */
  bool push;
  /** \brief whether or not a pull message */
  bool pull;
  /** \brief whether or not it's for SimpleApp */
  bool simple_app;
  /** \brief an string body */
  std::string body;
  /** \brief data type of message.data[i] */
  std::vector<DataType> data_type;
  /** \brief system control message */
  Control control;
  /** \brief the byte size */
  int data_size = 0;
  /** \brief message priority */
  int priority = 0;
};

爲了緩解通訊壓力,ps-lite 使用了Protobuf對 Meta 進行數據壓縮。

5.2.3 壓縮 Meta

就是按照 protobuf 來進行數據壓縮。

void Van::PackMeta(const Meta& meta, char** meta_buf, int* buf_size) {
  // convert into protobuf
  PBMeta pb;
  pb.set_head(meta.head);
  if (meta.app_id != Meta::kEmpty) pb.set_app_id(meta.app_id);
  if (meta.timestamp != Meta::kEmpty) pb.set_timestamp(meta.timestamp);
  if (meta.body.size()) pb.set_body(meta.body);
  pb.set_push(meta.push);
  pb.set_pull(meta.pull);
  pb.set_request(meta.request);
  pb.set_simple_app(meta.simple_app);
  pb.set_priority(meta.priority);
  pb.set_customer_id(meta.customer_id);
  for (auto d : meta.data_type) pb.add_data_type(d);
  if (!meta.control.empty()) {
    auto ctrl = pb.mutable_control();
    ctrl->set_cmd(meta.control.cmd);
    if (meta.control.cmd == Control::BARRIER) {
      ctrl->set_barrier_group(meta.control.barrier_group);
    } else if (meta.control.cmd == Control::ACK) {
      ctrl->set_msg_sig(meta.control.msg_sig);
    }
    for (const auto& n : meta.control.node) {
      auto p = ctrl->add_node();
      p->set_id(n.id);
      p->set_role(n.role);
      p->set_port(n.port);
      p->set_hostname(n.hostname);
      p->set_is_recovery(n.is_recovery);
      p->set_customer_id(n.customer_id);
    }
  }

  // to string
  *buf_size = pb.ByteSize();
  *meta_buf = new char[*buf_size + 1];
  CHECK(pb.SerializeToArray(*meta_buf, *buf_size))
      << "failed to serialize protobuf";
}

5.2.3 解壓 UnpackMeta

按照protobuf 預先生成的 PBMeta 格式進行解壓。

void Van::UnpackMeta(const char* meta_buf, int buf_size, Meta* meta) {
  // to protobuf
  PBMeta pb;
  CHECK(pb.ParseFromArray(meta_buf, buf_size))
      << "failed to parse string into protobuf";

  // to meta
  meta->head = pb.head();
  meta->app_id = pb.has_app_id() ? pb.app_id() : Meta::kEmpty;
  meta->timestamp = pb.has_timestamp() ? pb.timestamp() : Meta::kEmpty;
  meta->request = pb.request();
  meta->push = pb.push();
  meta->pull = pb.pull();
  meta->simple_app = pb.simple_app();
  meta->priority = pb.priority();
  meta->body = pb.body();
  meta->customer_id = pb.customer_id();
  meta->data_type.resize(pb.data_type_size());
  for (int i = 0; i < pb.data_type_size(); ++i) {
    meta->data_type[i] = static_cast<DataType>(pb.data_type(i));
  }
  if (pb.has_control()) {
    const auto& ctrl = pb.control();
    meta->control.cmd = static_cast<Control::Command>(ctrl.cmd());
    meta->control.barrier_group = ctrl.barrier_group();
    meta->control.msg_sig = ctrl.msg_sig();
    for (int i = 0; i < ctrl.node_size(); ++i) {
      const auto& p = ctrl.node(i);
      Node n;
      n.role = static_cast<Node::Role>(p.role());
      n.port = p.port();
      n.hostname = p.hostname();
      n.id = p.has_id() ? p.id() : Node::kEmpty;
      n.is_recovery = p.is_recovery();
      n.customer_id = p.customer_id();
      meta->control.node.push_back(n);
    }
  } else {
    meta->control.cmd = Control::EMPTY;
  }
}

5.2.4 PackMetaPB

PackMetaPB 從註釋看,是字節跳動提交的,主要用於 ibverbs_van.h,因此咱們不作深刻研究。

void Van::PackMetaPB(const Meta& meta, PBMeta* pb) {
  pb->set_head(meta.head);
  if (meta.app_id != Meta::kEmpty) pb->set_app_id(meta.app_id);
  if (meta.timestamp != Meta::kEmpty) pb->set_timestamp(meta.timestamp);
  if (meta.body.size()) pb->set_body(meta.body);
  pb->set_push(meta.push);
  pb->set_request(meta.request);
  pb->set_simple_app(meta.simple_app);
  pb->set_priority(meta.priority);
  pb->set_customer_id(meta.customer_id);
  for (auto d : meta.data_type) pb->add_data_type(d);
  if (!meta.control.empty()) {
    auto ctrl = pb->mutable_control();
    ctrl->set_cmd(meta.control.cmd);
    if (meta.control.cmd == Control::BARRIER) {
      ctrl->set_barrier_group(meta.control.barrier_group);
    } else if (meta.control.cmd == Control::ACK) {
      ctrl->set_msg_sig(meta.control.msg_sig);
    }
    for (const auto& n : meta.control.node) {
      auto p = ctrl->add_node();
      p->set_id(n.id);
      p->set_role(n.role);
      p->set_port(n.port);
      p->set_hostname(n.hostname);
      p->set_is_recovery(n.is_recovery);
      p->set_customer_id(n.customer_id);
    }
  }
  pb->set_data_size(meta.data_size);
}

5.3 ZMQVan 派生函數

ZMQVan 有以下重要的派生函數。

5.3.1 Bind

Bind 邏輯以下:

  • 使用 zmq_bind() 來把一個socket綁定在一個本地的網絡節點(endpoint)上,而後開始接收發送到本節點上的消息。
  • 節點地址信息是一個字符串,它包括一個協議 😕/ 而後跟着一個address。
  • Bind 函數會依據配置的變量 "DMLC_LOCAL" 來決定是啓用 ipc 方式仍是 tcp 方式,從而配置節點地址信息。
  • 若是是 schedule節點調用,則不須要指定port,可是對於work和server須要本身查找一個本地可用端口。
  • 在查找端口時候,會設置最大重試次數。
int Bind(const Node& node, int max_retry) override {
    receiver_ = zmq_socket(context_, ZMQ_ROUTER);
    int local = GetEnv("DMLC_LOCAL", 0);
    std::string hostname = node.hostname.empty() ? "*" : node.hostname;
    int use_kubernetes = GetEnv("DMLC_USE_KUBERNETES", 0);
    if (use_kubernetes > 0 && node.role == Node::SCHEDULER) {
      hostname = "0.0.0.0";
    }
    std::string addr = local ? "ipc:///tmp/" : "tcp://" + hostname + ":";
    int port = node.port;
    unsigned seed = static_cast<unsigned>(time(NULL) + port);
    for (int i = 0; i < max_retry + 1; ++i) {
      auto address = addr + std::to_string(port);
      if (zmq_bind(receiver_, address.c_str()) == 0) break;
      if (i == max_retry) {
        port = -1;
      } else {
        port = 10000 + rand_r(&seed) % 40000;
      }
    }
    return port;
  }

5.3.2 Connect

主要就是初始化 Sender_,邏輯以下:

  • 若是找到了對應socket就關閉socket。
  • 若是發現是 worker 發給同類,或者 server 發給同類,而且不是本身發給本身(Scheduler 能夠本身發給本身),則返回。
  • 創建一個ZMQ套接字(socket),而且以一個不透明指針的形式把這新建立的socket賦值給 sender。
  • 若是自己是scheduler,則配置socket,把本身的 id 綁定到 socket上。
  • 將sender這個socket和目標地址鏈接。
  • 將目標id的socket存放起來,即把 socket 加入到Sender_。

具體以下:

void Connect(const Node& node) override {
    int id = node.id;
    auto it = senders_.find(id);
    if (it != senders_.end()) {
      zmq_close(it->second); // 若是找到了對應socket就關閉socket
    }
    // worker doesn't need to connect to the other workers. same for server
    if ((node.role == my_node_.role) && (node.id != my_node_.id)) {
      return;
    }
    void *sender = zmq_socket(context_, ZMQ_DEALER); //創建一個socket

  	//若是自己是scheduler,則一開始就是知道本身的id = 1,因此這個if條件就是說把本身的id綁定到socket上
    if (my_node_.id != Node::kEmpty) {
      std::string my_id = "ps" + std::to_string(my_node_.id);
      zmq_setsockopt(sender, ZMQ_IDENTITY, my_id.data(), my_id.size());
      const char* watermark = Environment::Get()->find("DMLC_PS_WATER_MARK");
      if (watermark) {
        const int hwm = atoi(watermark);
        zmq_setsockopt(sender, ZMQ_SNDHWM, &hwm, sizeof(hwm));
      }
    }
    // connect
    std::string addr = "tcp://" + node.hostname + ":" + std::to_string(node.port);
    if (GetEnv("DMLC_LOCAL", 0)) {
      addr = "ipc:///tmp/" + std::to_string(node.port);
    }
    if (zmq_connect(sender, addr.c_str()) != 0) { //將sender這個socket和目標地址鏈接
      LOG(FATAL) <<  "connect to " + addr + " failed: " + zmq_strerror(errno);
    }
    senders_[id] = sender; //將目標id的socket存放起來後面使用
}

5.3.3 SendMsg

邏輯以下:

  • 從保存的 sender_ 之中找到以前保留的socket;
  • 壓縮 meta;
  • 發送 meta;
  • 循環分段發送data;
int SendMsg(const Message& msg) override {
    std::lock_guard<std::mutex> lk(mu_);
    // find the socket
    int id = msg.meta.recver;
    CHECK_NE(id, Meta::kEmpty);
    auto it = senders_.find(id);
    if (it == senders_.end()) {
      LOG(WARNING) << "there is no socket to node " << id;
      return -1;
    }
    void *socket = it->second;

    // send meta
    int meta_size; char* meta_buf;
    PackMeta(msg.meta, &meta_buf, &meta_size);
    int tag = ZMQ_SNDMORE;
    int n = msg.data.size();
    if (n == 0) tag = 0;
    zmq_msg_t meta_msg;
    zmq_msg_init_data(&meta_msg, meta_buf, meta_size, FreeData, NULL);
    while (true) {
      if (zmq_msg_send(&meta_msg, socket, tag) == meta_size) break;
      if (errno == EINTR) continue;
      return -1;
    }
    // zmq_msg_close(&meta_msg);
    int send_bytes = meta_size;
    // send data
    for (int i = 0; i < n; ++i) {
      zmq_msg_t data_msg;
      SArray<char>* data = new SArray<char>(msg.data[i]);
      int data_size = data->size();
      zmq_msg_init_data(&data_msg, data->data(), data->size(), FreeData, data);
      if (i == n - 1) tag = 0;
      while (true) {
        if (zmq_msg_send(&data_msg, socket, tag) == data_size) break;
        if (errno == EINTR) continue;
        return -1;
      }
      // zmq_msg_close(&data_msg);
      send_bytes += data_size;
    }
    return send_bytes;
  }

5.3.4 RecvMsg

RecvMsg 就是在綁定的端口上接受消息。

接受消息時候,會判斷是第幾個消息,而後作不一樣的處理。

int RecvMsg(Message* msg) override {
    msg->data.clear();
    size_t recv_bytes = 0;
    for (int i = 0; ; ++i) {
      zmq_msg_t* zmsg = new zmq_msg_t;
      CHECK(zmq_msg_init(zmsg) == 0) << zmq_strerror(errno);
      while (true) {
        if (zmq_msg_recv(zmsg, receiver_, 0) != -1) break;
        if (errno == EINTR) {
          std::cout << "interrupted";
          continue;
        }
        return -1;
      }
      char* buf = CHECK_NOTNULL((char *)zmq_msg_data(zmsg));
      size_t size = zmq_msg_size(zmsg);
      recv_bytes += size;

      if (i == 0) {
        // identify
        msg->meta.sender = GetNodeID(buf, size);
        msg->meta.recver = my_node_.id;
        CHECK(zmq_msg_more(zmsg));
        zmq_msg_close(zmsg);
        delete zmsg;
      } else if (i == 1) {
        // task
        UnpackMeta(buf, size, &(msg->meta));
        zmq_msg_close(zmsg);
        bool more = zmq_msg_more(zmsg);
        delete zmsg;
        if (!more) break;
      } else {
        // zero-copy
        SArray<char> data;
        data.reset(buf, size, [zmsg, size](char* buf) {
          zmq_msg_close(zmsg);
          delete zmsg;
        });
        msg->data.push_back(data);
        if (!zmq_msg_more(zmsg)) {
          break;
        }
      }
    }
    return recv_bytes;
  }

GetNodeID 函數是

/**
   * return the node id given the received identity
   * \return -1 if not find
   */
  int GetNodeID(const char* buf, size_t size) {
    if (size > 2 && buf[0] == 'p' && buf[1] == 's') {
      int id = 0;
      size_t i = 2;
      for (; i < size; ++i) {
        if (buf[i] >= '0' && buf[i] <= '9') {
          id = id * 10 + buf[i] - '0';
        } else {
          break;
        }
      }
      if (i == size) return id;
    }
    return Meta::kEmpty;
  }

0x06 總結

咱們最後進行一下總結:

郵局裏有了地址簿,就須要有貨車來負責拉送物件,Van 就是整個Parameter Server的通訊模塊,其特色以下。

  • PostOffice 類在實例化的時候,會建立一個 Van 類的實例 做爲成員變量。該 Van 實例與所屬 PostOffice 實例生命週期相同(每一個節點只有一個該對象);
  • Van 負責具體的節點間通訊。具體來講就是負責創建起節點之間的互相鏈接(例如Worker與Scheduler之間的鏈接),而且開啓本地的receiving thread用來監聽收到的message。
  • Van對象的初始化函數做用就是依據本地節點類型的不一樣,作不一樣設置,從而啓動端口,創建本地節點到scheduler的連結,啓動接收消息線程,心跳線程等,這樣就能夠進行通訊了。
  • Parameter Server在後臺線程 receiver_thread_ 進行接受/處理消息。除了傳遞參數的數據消息外,各個節點之間控制信息有:
    • ADD_NODE:worker和server向shceduler進行節點註冊;
    • BARRIER:節點間的同步阻塞消息;
    • HEARTBEAT:節點間的心跳信號;
    • TERMINATE:節點退出信號;
    • ACK:確認消息,ACK 類型只有啓用了 Resender 類纔會出現
    • EMPTY:push or pull;

0xEE 我的信息

★★★★★★關於生活和技術的思考★★★★★★

微信公衆帳號:羅西的思考

若是您想及時獲得我的撰寫文章的消息推送,或者想看看我的推薦的技術資料,敬請關注。

在這裏插入圖片描述

0xFF 參考

入門分佈式機器學習---基於參數服務器的邏輯迴歸實現原理

【分佈式】基於ps-lite的分佈式計算實例解析

ps-lite 源代碼分析

官方簡要使用說明

PS-Lite源碼分析-KangRoger

ps-lite源碼剖析-zybuluo

ps-lite代碼筆記-willzhang

相關文章
相關標籤/搜索