本文是參數服務器系列第二篇,介紹ps-lite的通訊模塊 Van。html
本系列其餘文章是:node
[源碼解析] 機器學習參數服務器ps-lite 之(1) ----- PostOfficepython
郵局裏有了地址簿,就須要有貨車來負責拉送物件,Van 就是整個Parameter Server的通訊模塊,其特色以下。c++
VAN 目前有兩個實現:git
首先給出 UML 圖。github
下面咱們只給出Van對象關鍵變量和成員函數說明。編程
其主要變量以下:緩存
Node scheduler_ :Scheduler 節點參數,每個node都會記錄Scheduler 節點的信息;服務器
Node my_node_ : 本節點參數。若是本節點是Scheduler,則 my_node_ 會指向上面的 scheduler_ ;微信
bool is_scheduler_ : 本節點是不是 scheduler;
std::unique_ptr< std::thread> receiver_thread_ :接收消息線程指針;
std::unique_ptr< std::thread> heartbeat_thread_ :發送心跳線程指針;
std::vector
Resender *resender_ = nullptr :從新發送消息指針;
std::atomic
std::unordered_map<std::string, int> connected_nodes_ : 記錄了目前鏈接到哪些 nodes;
其主要函數功能以下:
start :創建通訊初始化;
Receiving :接收消息線程的處理函數;
Heartbeat :發送心跳線程的處理函數;
ProcessAddNodeCommandAtScheduler :scheduler 的 AddNode 消息處理函數;
ProcessHearbeat:心跳包處理函數;
ProcessDataMsg :數據消息(push & pull)處理函數;
ProcessAddNodeCommand :worker 和 server 的 AddNode 消息處理函數;
ProcessBarrierCommand :Barrier 消息處理函數;
PS Lite 定義的三種角色採用多線程機制工做,每一個線程承擔特定的職責,在所屬的 Van 實例啓動時被建立。
具體描述以下:
詳細代碼(摘要)以下:
class Van { public: static Van *Create(const std::string &type); virtual void Start(int customer_id); int Send(const Message &msg); virtual void Stop(); inline int GetTimestamp() { return timestamp_++; } inline bool IsReady() { return ready_; } protected: //連結節點 virtual void Connect(const Node &node) = 0; //綁定到本身節點之上 virtual int Bind(const Node &node, int max_retry) = 0; //接收消息,用阻塞方式 virtual int RecvMsg(Message *msg) = 0; //發送消息 virtual int SendMsg(const Message &msg) = 0; /** * \brief pack meta into a string */ void PackMeta(const Meta &meta, char **meta_buf, int *buf_size); /** * \brief pack meta into protobuf */ void PackMetaPB(const Meta &meta, PBMeta *pb); /** * \brief unpack meta from a string */ void UnpackMeta(const char *meta_buf, int buf_size, Meta *meta); Node scheduler_; Node my_node_; bool is_scheduler_; std::mutex start_mu_; private: /** thread function for receving */ void Receiving(); /** thread function for heartbeat */ void Heartbeat(); // node's address string (i.e. ip:port) -> node id // this map is updated when ip:port is received for the first time std::unordered_map<std::string, int> connected_nodes_; // maps the id of node which is added later to the id of node // which is with the same ip:port and added first std::unordered_map<int, int> shared_node_mapping_; /** whether it is ready for sending */ std::atomic<bool> ready_{false}; std::atomic<size_t> send_bytes_{0}; size_t recv_bytes_ = 0; int num_servers_ = 0; int num_workers_ = 0; /** the thread for receiving messages */ std::unique_ptr<std::thread> receiver_thread_; /** the thread for sending heartbeat */ std::unique_ptr<std::thread> heartbeat_thread_; std::vector<int> barrier_count_; /** msg resender */ Resender *resender_ = nullptr; int drop_rate_ = 0; std::atomic<int> timestamp_{0}; int init_stage = 0; //如下是處理各類類型消息 void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes, Meta *recovery_nodes); void ProcessTerminateCommand(); void ProcessAddNodeCommand(Message *msg, Meta *nodes, Meta *recovery_nodes); void ProcessBarrierCommand(Message *msg); void ProcessHearbeat(Message *msg); void ProcessDataMsg(Message *msg); //更新本地NodeID void UpdateLocalID(Message *msg, std::unordered_set<int> *deadnodes_set, Meta *nodes, Meta *recovery_nodes); const char *heartbeat_timeout_val = Environment::Get()->find("PS_HEARTBEAT_TIMEOUT"); int heartbeat_timeout_ = heartbeat_timeout_val ? atoi(heartbeat_timeout_val) : 0; DISALLOW_COPY_AND_ASSIGN(Van); };
Van對象的初始化函數做用就是依據本地節點類型的不一樣,作不一樣設置,從而啓動端口,創建到scheduler的連結,啓動接收消息線程,心跳線程等,這樣就能夠進行通訊了。具體以下:
receiver_thread_
,執行Van::Receiving
;關於7,8兩點的進一步說明就是:
具體代碼以下:
void Van::Start(int customer_id) { // get scheduler info start_mu_.lock(); if (init_stage == 0) { // 初始化scheduler_這個成員變量 scheduler_.hostname = std::string( CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI"))); scheduler_.port = atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT"))); scheduler_.role = Node::SCHEDULER; scheduler_.id = kScheduler; // 確認本節點是scheduler節點 is_scheduler_ = Postoffice::Get()->is_scheduler(); // get my node info if (is_scheduler_) { // 初始化本節點,由於是scheduler,因此直接賦值 my_node_ = scheduler_; } else { auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER; const char* nhost = Environment::Get()->find("DMLC_NODE_HOST"); std::string ip; if (nhost) ip = std::string(nhost); if (ip.empty()) { const char* itf = Environment::Get()->find("DMLC_INTERFACE"); std::string interface; if (itf) interface = std::string(itf); if (interface.size()) { GetIP(interface, &ip); } else { GetAvailableInterfaceAndIP(&interface, &ip); } } int port = GetAvailablePort(); const char* pstr = Environment::Get()->find("PORT"); if (pstr) port = atoi(pstr); my_node_.hostname = ip; my_node_.role = role; my_node_.port = port; // cannot determine my id now, the scheduler will assign it later // set it explicitly to make re-register within a same process possible my_node_.id = Node::kEmpty; my_node_.customer_id = customer_id; } // bind. //綁定接口,把本節點綁定到ip:port這個socket上,理論來講這個函數就是初始化了receiver_ my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40); // connect to the scheduler // 鏈接上scheduler_,因爲本節點就是scheduler_,其實就是初始化senders_,因爲發送的節點不少,因此這裏是一個map<int,void*> // 在這裏就是senders_[1] = socket_1, socket_1中的body設置一點字符「ps1***」, 注意連接不是sendMsg Connect(scheduler_); // for debug use if (Environment::Get()->find("PS_DROP_MSG")) { drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG")); } // start receiver // 開啓一個接收消息的線程,這裏就是處理消息 receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this)); init_stage++; } start_mu_.unlock(); if (!is_scheduler_) { // let the scheduler know myself // worker和server節點會經過 ADD_NODE 消息把本地節點的信息告訴scheduler,好比角色,ip,port... Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::ADD_NODE; msg.meta.control.node.push_back(customer_specific_node); msg.meta.timestamp = timestamp_++; Send(msg); } // wait until ready // 等待 ready_ 從false變成true,當是scheduler的時候,必需要有等worker和server節點過來,否則一直都是阻塞在這,若是是 worker/server,則是等待 scheduler 發送系統allready消息。 while (!ready_.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } start_mu_.lock(); if (init_stage == 1) { // resender if (Environment::Get()->find("PS_RESEND") && atoi(Environment::Get()->find("PS_RESEND")) != 0) { int timeout = 1000; if (Environment::Get()->find("PS_RESEND_TIMEOUT")) { timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT")); } // 若是設置了超時重傳,就初始化resender_這個變量 resender_ = new Resender(timeout, 10, this); } if (!is_scheduler_) { // start heartbeat thread // 初始化心跳線程 heartbeat_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this)); } init_stage++; } start_mu_.unlock(); }
咱們首先介紹後臺線程是如何運行,而後會具體分析如何處理各類消息。
ps-lite 啓動了一個後臺線程 receiver_thread_ 進行接受/處理消息。
// start receiver receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
receiver_thread_ 使用 Receiving 函數進行消息處理。
除了傳遞參數的數據消息外,各個節點之間控制信息有:
所以在 Receiving 之中會調用 不一樣處理函數處理不一樣類型的消息:
線程內有兩個變量,由於其是在 while (true) 循環以外,因此屬於線程內的全局變量,這點在閱讀代碼時候須要注意。
Receiving 邏輯以下:
具體代碼以下
void Van::Receiving() { Meta nodes; // 如下兩個能夠認爲是全局變量 Meta recovery_nodes; // store recovery nodes 儲存康復重啓的節點 recovery_nodes.control.cmd = Control::ADD_NODE; // 康復重啓節點的control.cmd 都設置爲 ADD_NODE while (true) { Message msg; int recv_bytes = RecvMsg(&msg); //利用receiver_ 變量拿到消息 // For debug, drop received message if (ready_.load() && drop_rate_ > 0) { unsigned seed = time(NULL) + my_node_.id; if (rand_r(&seed) % 100 < drop_rate_) { LOG(WARNING) << "Drop message " << msg.DebugString(); continue; } } CHECK_NE(recv_bytes, -1); recv_bytes_ += recv_bytes; //收到的字節數累加 if (Postoffice::Get()->verbose() >= 2) { PS_VLOG(2) << msg.DebugString(); } // duplicated message if (resender_ && resender_->AddIncomming(msg)) continue; //重傳確認機制 if (!msg.meta.control.empty()) { //若是是控制類型的消息 // control msg auto& ctrl = msg.meta.control; if (ctrl.cmd == Control::TERMINATE) { ProcessTerminateCommand(); break; } else if (ctrl.cmd == Control::ADD_NODE) { ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); //當執行到這個位置的時候繼續跳轉 } else if (ctrl.cmd == Control::BARRIER) { ProcessBarrierCommand(&msg); } else if (ctrl.cmd == Control::HEARTBEAT) { ProcessHearbeat(&msg); // 發回Heartbeat的ACK } else { LOG(WARNING) << "Drop unknown typed message " << msg.DebugString(); } } else { //非控制類型的消息處理方式 ProcessDataMsg(&msg); } } }
ADD_NODE 是 worker / server 用來向 scheduler 註冊自身的控制消息。
先回憶下注冊基本思路。
ProcessAddNodeCommand 邏輯以下。
具體代碼以下:
void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes, Meta* recovery_nodes) { auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//查出心跳包超時的id std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//轉存到dead_set之中 auto& ctrl = msg->meta.control; //拿到收到消息裏面的control信息 UpdateLocalID(msg, &dead_set, nodes, recovery_nodes); if (is_scheduler_) { // Scheduler 節點 ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes); } else { // Worker & Server 節點 for (const auto& node : ctrl.node) { std::string addr_str = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { // 現有節點會在本身鏈接之中查找這個新節點,發現現有鏈接中沒有這個新節點 // 若是是新節點,則會鏈接現有節點(非同類型) Connect(node); // 與新節點進行鏈接 connected_nodes_[addr_str] = node.id; // 加入已經鏈接的節點 } if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_; if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_; } ready_ = true; } }
此函數做用是更新節點內部的node id 信息,也是分爲兩種狀況,函數邏輯以下:
具體代碼以下:
void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set, Meta* nodes, Meta* recovery_nodes) { auto& ctrl = msg->meta.control; size_t num_nodes = Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers(); // assign an id if (msg->meta.sender == Meta::kEmpty) { //若是sender未設定,則處理此message的必定是Scheduler CHECK(is_scheduler_); CHECK_EQ(ctrl.node.size(), 1); //msg中的control命令中的節點集合就是worker本身,因此就是1個節點 if (nodes->control.node.size() < num_nodes) { //沒有到齊 nodes->control.node.push_back(ctrl.node[0]); } else { //若是全部work和server到齊了,就進入else // some node dies and restarts CHECK(ready_.load()); for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) { const auto& node = nodes->control.node[i]; if (deadnodes_set->find(node.id) != deadnodes_set->end() && node.role == ctrl.node[0].role) { auto& recovery_node = ctrl.node[0]; // assign previous node id recovery_node.id = node.id; recovery_node.is_recovery = true; nodes->control.node[i] = recovery_node; recovery_nodes->control.node.push_back(recovery_node); break; } } } } // update my id / 對普通的node,更新其rank,scheduler 節點不會起做用(由於找不到)。 // schedule發給此work節點的消息,若是發現本地的ip和port和消息中的某個一點重合,那麼就把本地節點的ID(初始化時候沒有ID,只是等於Empty)改成schedule發過來的 node id。 for (size_t i = 0; i < ctrl.node.size(); ++i) { const auto& node = ctrl.node[i]; if (my_node_.hostname == node.hostname && my_node_.port == node.port) { if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) { my_node_ = node; std::string rank = std::to_string(Postoffice::IDtoRank(node.id)); #ifdef _MSC_VER _putenv_s("DMLC_RANK", rank.c_str()); #else setenv("DMLC_RANK", rank.c_str(), true); #endif } } } }
ProcessAddNodeCommandAtScheduler 是在 Scheduler 以內運行,是對控制類型消息的處理。
對於Scheduler節點來講,scheduler收到全部worker和server的ADD_NODE的消息後進行節點id分配並應答,即,須要設定 最新的全部node的 全局rank 併發送給全部Worker和Server。
nodes->control.node.size() == num_nodes
):
ready_ = true
; 即 scheduler 是一個 ready 狀態了,無論 worker 和 server 是否確認收到ADD_NODE消息。!recovery_nodes->control.node.empty()
,這就代表是處理某些重啓節點的註冊行爲:
CHECK_EQ(recovery_nodes->control.node.size(), 1)
來確認重啓節點爲 1 個)。具體代碼以下:
void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes, Meta* recovery_nodes) { recovery_nodes->control.cmd = Control::ADD_NODE; time_t t = time(NULL); size_t num_nodes = Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers(); // scheduler收到全部worker和server的ADD_NODE的消息後進行節點id分配並應答 if (nodes->control.node.size() == num_nodes) { // 節點收集徹底 // sort the nodes according their ip and port, 根據IP和port給worker,server排個序 std::sort(nodes->control.node.begin(), nodes->control.node.end(), [](const Node& a, const Node& b) { return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0; }); // assign node rank for (auto& node : nodes->control.node) { // 創建鏈接、更新心跳時間戳,給 scheduler全部鏈接的節點分配全局 rank。 std::string node_host_ip = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { //若是ip:port不存在van_中的話 CHECK_EQ(node.id, Node::kEmpty); //判斷是否是初始化節點 int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); //若是是sever的話,就id產生一個id號,num_servers_初始化爲0 node.id = id; //將這個新節點的id賦值爲id Connect(node); //鏈接這個新節點, 即創建一個socket, 而後senders_[id] = sender; 就是將目標id的socket存放起來後面使用 Postoffice::Get()->UpdateHeartbeat(node.id, t);//更新心跳包 connected_nodes_[node_host_ip] = id; //既然 worker, server 已經發message來了,scheduler要把這個節點做爲已經連接的節點 } else { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); shared_node_mapping_[id] = connected_nodes_[node_host_ip]; node.id = connected_nodes_[node_host_ip]; } if (node.role == Node::SERVER) num_servers_++;//更新rank if (node.role == Node::WORKER) num_workers_++; } nodes->control.node.push_back(my_node_); //把本節點放到裏面 nodes->control.cmd = Control::ADD_NODE; Message back; back.meta = *nodes; // 向全部的worker和server發送ADD_NODE消息 for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { back.meta.recver = recver_id; back.meta.timestamp = timestamp_++; Send(back); } } ready_ = true; //scheduler已經準備好了 } else if (!recovery_nodes->control.node.empty()) { // 節點沒有收集徹底 auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//查出心跳包超時的id std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//轉存到dead_set // send back the recovery node CHECK_EQ(recovery_nodes->control.node.size(), 1); Connect(recovery_nodes->control.node[0]); Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t); Message back; for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { if (r != recovery_nodes->control.node[0].id && dead_set.find(r) != dead_set.end()) { // do not try to send anything to dead node continue; } // only send recovery_node to nodes already exist // but send all nodes to the recovery_node back.meta = (r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes; back.meta.recver = r; back.meta.timestamp = timestamp_++; Send(back); } } }
此部分流程邏輯以下:
+ Scheduler | Worker | + | + | | | | | | v | | Postoffice::Start +----> Van::Start | | + | | | | | | | | v | | Connect--do nothing | | + | v | | | | Postoffice::Start +-----> Van::Start | | + v | | receiver_thread_ +---+ | | + | | v | | | Connect--to scheduler | | | + | | | | | | | | | | | | | | | v | | | receiver_thread_ +----->+ | | | + | | | | | | | | | | | | | | v | | | <---------------------------------------+ Send | | | | ADD_NODE + | | v | | | | | | | | ProcessAddNodeCommand | | | | + | | | | | | | | | | All nodes OK | | | | | | | | v | | | | | set rank | | | wait until ready | | | | + | | | | | +----------------------------------------------------------------> | | | | ADD_NODE response(nodes info) | | | | | | ProcessAddNodeCommand | | | v | | | | | | <--------------+ | wait until ready | | ready_ = true | + | | | | <---------------+ +-------------------+ v | | | | +--------------------+ v | | | v | | | v Postoffice::Barrier | | Postoffice::Barrier +
手機以下,左側是 Scheduler,右側是 worker:
其互聯過程能夠分爲3步:
第一步:worker/server節點初始化的時候,向schedular節點發送一個鏈接信息,假定自身是節點 2;
if (!is_scheduler_) { // let the scheduler know myself Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::ADD_NODE; msg.meta.control.node.push_back(customer_specific_node); msg.meta.timestamp = timestamp_++; Send(msg); //發送給schedular, 創建連接信息。 }
第二步:Scheduler 節點收到信息後,在 ProcessAddNodeCommandAtScheduler 之中,首先會和 節點 2 創建一個鏈接。會向全部已經和schedular創建鏈接的worker節點/server節點 廣播此 "節點的加入信息「,並把 節點 2 請求鏈接的信息放入meta信息中。
// assign node rank for (auto& node : nodes->control.node) { std::string node_host_ip = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); node.id = id; Connect(node); // 鏈接這個新節點, 即創建一個socket, 而後senders_[id] = sender; 就是將目標id的socket存放起來後面使用 Postoffice::Get()->UpdateHeartbeat(node.id, t); connected_nodes_[node_host_ip] = id; } else { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); shared_node_mapping_[id] = connected_nodes_[node_host_ip]; node.id = connected_nodes_[node_host_ip]; } if (node.role == Node::SERVER) num_servers_++; if (node.role == Node::WORKER) num_workers_++; } nodes->control.node.push_back(my_node_); nodes->control.cmd = Control::ADD_NODE; Message back; back.meta = *nodes; // 向全部已經和schedular創建鏈接的worker節點/server節點 廣播此 "節點的加入信息「,並把 節點 2 請求鏈接的信息放入meta信息中。 for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { back.meta.recver = recver_id; back.meta.timestamp = timestamp_++; Send(back); } }
第三步:現有worker/server節點收到這個命令後,在 ProcessAddNodeCommand 之中 會和 節點 2 造成鏈接。
for (const auto& node : ctrl.node) { std::string addr_str = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { // 現有鏈接中沒有這個新節點 Connect(node); // 與新節點進行鏈接 connected_nodes_[addr_str] = node.id; } if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_; if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
至此,整個過程就描述完了。每一個新節點加入後,已經加入的節點都會經過schedular節點和這個新節點創建鏈接。
咱們接下來分析心跳機制。
爲了記錄網絡的可達性,PS Lite 設計了心跳機制。具體而言:
具體以下:
std::unordered_map<int, time_t> heartbeats_ 就是存儲了心跳關聯的節點的活躍信息。鍵爲節點編號,值爲上次收到其 HEARTBEAT 消息的時間戳。
UpdateHeartbeat 會按期更新心跳。
void UpdateHeartbeat(int node_id, time_t t) { std::lock_guard<std::mutex> lk(heartbeat_mu_); heartbeats_[node_id] = t; } std::unordered_map<int, time_t> heartbeats_;
在這兩種節點中,啓動了一個線程,每個 Worker/Server 節點,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 發送一條 HEARTBEAT 消息:
if (!is_scheduler_) { // start heartbeat thread heartbeat_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this)); }
具體心跳函數是:
void Van::Heartbeat() { const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL"); const int interval = val ? atoi(val) : kDefaultHeartbeatInterval; while (interval > 0 && ready_.load()) { std::this_thread::sleep_for(std::chrono::seconds(interval)); Message msg; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::HEARTBEAT; msg.meta.control.node.push_back(my_node_); msg.meta.timestamp = timestamp_++; Send(msg); } }
Scheduler 節點收到後 HEARTBEAT 消息後,響應一個 HEARTBEAT 消息。UpdateHeartbeat 會按期更新心跳。
void Van::ProcessHearbeat(Message* msg) { auto& ctrl = msg->meta.control; time_t t = time(NULL); for (auto& node : ctrl.node) { Postoffice::Get()->UpdateHeartbeat(node.id, t); if (is_scheduler_) { Message heartbeat_ack; heartbeat_ack.meta.recver = node.id; heartbeat_ack.meta.control.cmd = Control::HEARTBEAT; heartbeat_ack.meta.control.node.push_back(my_node_); heartbeat_ack.meta.timestamp = timestamp_++; // send back heartbeat Send(heartbeat_ack); } } }
Scheduler 在處理 ADD_NODE 消息時候,會看看是否已經有死亡節點,具體判經過當前時間戳與心跳包接收時間戳之差判斷是否alive。
std::vector<int> Postoffice::GetDeadNodes(int t) { std::vector<int> dead_nodes; if (!van_->IsReady() || t == 0) return dead_nodes; time_t curr_time = time(NULL); const auto& nodes = is_scheduler_ ? GetNodeIDs(kWorkerGroup + kServerGroup) : GetNodeIDs(kScheduler); { std::lock_guard<std::mutex> lk(heartbeat_mu_); for (int r : nodes) { auto it = heartbeats_.find(r); if ((it == heartbeats_.end() || it->second + t < curr_time) && start_time_ + t < curr_time) { dead_nodes.push_back(r); } } } return dead_nodes; }
邏輯以下:
+----------------------------------------------------+ | Scheduler | | | | | | | | heartbeats_ | | | | receiver_thread_+--------> ProcessHearbeat | | ^ + ^ + | | | | | | | | | | | | | | | | | | | +----------------------------------------------------+ | | | | | | | | RESPONSE | | | +-------------------------------------+ | | | | | | +-------------------------------+ | | | | | HEARTBEAT | | RESPONSE HEARTBEAT | | | | | | +-----------------------------------------+ +-----------------------------------------+ | Worker | | | | Server | | | | | | | | | | | | | | | | | | | | | | | | | | | | heartbeats_ | | | | heartbeats_ | | | | + | | | + | | | heartbeat_thread_+----> Heartbeat | | | heartbeat_thread_+--> Heartbeat | | | | | | | | | v | | v | | receiver_thread_ +---> ProcessHearbeat | | receiver_thread_ +--> ProcessHearbeat | | | | | | | | | | | | | +-----------------------------------------+ +-----------------------------------------+
ProcessTerminateCommand 會處理結束消息,具體就是設定 ready_ 爲 false。
這樣就預示着 Van 狀態不對,不能夠繼續處理。
void Van::ProcessTerminateCommand() { PS_VLOG(1) << my_node().ShortDebugString() << " is stopped"; ready_ = false; } inline bool IsReady() { return ready_; }
在分佈式系統中,通訊也是不可靠的,丟包、延時都是必須考慮的場景。PS Lite 設計了 Resender類來提升通訊的可靠性,它引入了 ACK 機制。即:
定義以下,其中 send_buff_ 就是發送緩存,用來存儲發送了的消息列表。acked_ 就是已經確認的消息。
class Resender { std::thread* monitor_; std::unordered_set<uint64_t> acked_; std::atomic<bool> exit_{false}; std::mutex mu_; int timeout_; int max_num_retry_; Van* van_; using Time = std::chrono::milliseconds; // the buffer entry struct Entry { Message msg; Time send; int num_retry = 0; }; std::unordered_map<uint64_t, Entry> send_buff_; };
監控線程以及函數以下以下,就是被喚醒時候,從send_buff_(本地緩存)找到每一個消息的發送時間戳和當前時間,找出超時的消息進行重發,並累加其重試次數。 :
monitor_ = new std::thread(&Resender::Monitoring, this); void Monitoring() { while (!exit_) { std::this_thread::sleep_for(Time(timeout_)); std::vector<Message> resend; Time now = Now(); mu_.lock(); for (auto& it : send_buff_) { if (it.second.send + Time(timeout_) * (1+it.second.num_retry) < now) { resend.push_back(it.second.msg); ++it.second.num_retry; CHECK_LT(it.second.num_retry, max_num_retry_); } } mu_.unlock(); for (const auto& msg : resend) van_->Send(msg); } }
當 Van 發送消息時候,若是配置了重傳,就調用AddOutgoing函數把消息加入到發送緩存。
int Van::Send(const Message& msg) { int send_bytes = SendMsg(msg); CHECK_NE(send_bytes, -1); send_bytes_ += send_bytes; if (resender_) resender_->AddOutgoing(msg); if (Postoffice::Get()->verbose() >= 2) { PS_VLOG(2) << msg.DebugString(); } return send_bytes; }
下面函數就是加入到發送緩存。
/** * \brief add an outgoining message * */ void AddOutgoing(const Message& msg) { if (msg.meta.control.cmd == Control::ACK) return; CHECK_NE(msg.meta.timestamp, Meta::kEmpty) << msg.DebugString(); auto key = GetKey(msg); std::lock_guard<std::mutex> lk(mu_); // already buffered, which often due to call Send by the monitor thread if (send_buff_.find(key) != send_buff_.end()) return; auto& ent = send_buff_[key]; ent.msg = msg; ent.send = Now(); ent.num_retry = 0; }
下面函數有兩個做用:
/** * \brief add an incomming message * \brief return true if msg has been added before or a ACK message */ bool AddIncomming(const Message& msg) { // a message can be received by multiple times if (msg.meta.control.cmd == Control::TERMINATE) { return false; } else if (msg.meta.control.cmd == Control::ACK) { mu_.lock(); auto key = msg.meta.control.msg_sig; auto it = send_buff_.find(key); if (it != send_buff_.end()) send_buff_.erase(it); mu_.unlock(); return true; } else { mu_.lock(); auto key = GetKey(msg); auto it = acked_.find(key); bool duplicated = it != acked_.end(); if (!duplicated) acked_.insert(key); mu_.unlock(); // send back ack message (even if it is duplicated) Message ack; ack.meta.recver = msg.meta.sender; ack.meta.sender = msg.meta.recver; ack.meta.control.cmd = Control::ACK; ack.meta.control.msg_sig = key; van_->Send(ack); // warning if (duplicated) LOG(WARNING) << "Duplicated message: " << msg.DebugString(); return duplicated; } }
ProcessDataMsg 用來處理 worker 發過來的數據消息(就是worker向server更新梯度),具體是取得對應的Customer後,調用 Customer 的方法進行處理,直接將msg
放入處理隊列中。
咱們會放在 Customer 之中進行介紹。
void Van::ProcessDataMsg(Message* msg) { // data msg int app_id = msg->meta.app_id; int customer_id = Postoffice::Get()->is_worker() ? msg->meta.customer_id : app_id; auto* obj = Postoffice::Get()->GetCustomer(app_id, customer_id, 5); obj->Accept(*msg); // 這裏給 Customer 添加消息 }
ZMQVan是基於zeromq的Van的實現,即爲用zmq庫實現了鏈接的底層細節(zmq庫是一個開源庫,對socket進行了優良的封裝,他使得Socket編程更加簡單、簡潔和性能更高)。
ZMQVan定義以下:
ZMQVan 繼承於Van ,在這個類的基礎上加了兩個成員變量,分別是:
具體以下:
class ZMQVan : public Van { void *context_ = nullptr; /** * \brief node_id to the socket for sending data to this node */ std::unordered_map<int, void*> senders_; std::mutex mu_; void *receiver_ = nullptr; };
Van類 有以下函數會調用到 ZMQVan 或者被 ZMQVan 調用。
Send 函數就是調用 ZMQVan 的 SendMsg 函數進行發送消息,發送以後若是設定了ACK機制,則會調用 resender_->AddOutgoing。
int Van::Send(const Message& msg) { int send_bytes = SendMsg(msg); CHECK_NE(send_bytes, -1); send_bytes_ += send_bytes; if (resender_) resender_->AddOutgoing(msg); if (Postoffice::Get()->verbose() >= 2) { PS_VLOG(2) << msg.DebugString(); } return send_bytes; }
Meta封裝了元數據,發送者,接受者,時間戳,請求仍是響應等。
/** * \brief meta info of a message */ struct Meta { /** \brief the empty value */ static const int kEmpty; /** \brief an int head */ int head; /** \brief the unique id of the application of messsage is for*/ int app_id; /** \brief customer id*/ int customer_id; /** \brief the timestamp of this message */ int timestamp; /** \brief the node id of the sender of this message */ int sender; /** \brief the node id of the receiver of this message */ int recver; /** \brief whether or not this is a request message*/ bool request; /** \brief whether or not a push message */ bool push; /** \brief whether or not a pull message */ bool pull; /** \brief whether or not it's for SimpleApp */ bool simple_app; /** \brief an string body */ std::string body; /** \brief data type of message.data[i] */ std::vector<DataType> data_type; /** \brief system control message */ Control control; /** \brief the byte size */ int data_size = 0; /** \brief message priority */ int priority = 0; };
爲了緩解通訊壓力,ps-lite 使用了Protobuf對 Meta 進行數據壓縮。
就是按照 protobuf 來進行數據壓縮。
void Van::PackMeta(const Meta& meta, char** meta_buf, int* buf_size) { // convert into protobuf PBMeta pb; pb.set_head(meta.head); if (meta.app_id != Meta::kEmpty) pb.set_app_id(meta.app_id); if (meta.timestamp != Meta::kEmpty) pb.set_timestamp(meta.timestamp); if (meta.body.size()) pb.set_body(meta.body); pb.set_push(meta.push); pb.set_pull(meta.pull); pb.set_request(meta.request); pb.set_simple_app(meta.simple_app); pb.set_priority(meta.priority); pb.set_customer_id(meta.customer_id); for (auto d : meta.data_type) pb.add_data_type(d); if (!meta.control.empty()) { auto ctrl = pb.mutable_control(); ctrl->set_cmd(meta.control.cmd); if (meta.control.cmd == Control::BARRIER) { ctrl->set_barrier_group(meta.control.barrier_group); } else if (meta.control.cmd == Control::ACK) { ctrl->set_msg_sig(meta.control.msg_sig); } for (const auto& n : meta.control.node) { auto p = ctrl->add_node(); p->set_id(n.id); p->set_role(n.role); p->set_port(n.port); p->set_hostname(n.hostname); p->set_is_recovery(n.is_recovery); p->set_customer_id(n.customer_id); } } // to string *buf_size = pb.ByteSize(); *meta_buf = new char[*buf_size + 1]; CHECK(pb.SerializeToArray(*meta_buf, *buf_size)) << "failed to serialize protobuf"; }
按照protobuf 預先生成的 PBMeta 格式進行解壓。
void Van::UnpackMeta(const char* meta_buf, int buf_size, Meta* meta) { // to protobuf PBMeta pb; CHECK(pb.ParseFromArray(meta_buf, buf_size)) << "failed to parse string into protobuf"; // to meta meta->head = pb.head(); meta->app_id = pb.has_app_id() ? pb.app_id() : Meta::kEmpty; meta->timestamp = pb.has_timestamp() ? pb.timestamp() : Meta::kEmpty; meta->request = pb.request(); meta->push = pb.push(); meta->pull = pb.pull(); meta->simple_app = pb.simple_app(); meta->priority = pb.priority(); meta->body = pb.body(); meta->customer_id = pb.customer_id(); meta->data_type.resize(pb.data_type_size()); for (int i = 0; i < pb.data_type_size(); ++i) { meta->data_type[i] = static_cast<DataType>(pb.data_type(i)); } if (pb.has_control()) { const auto& ctrl = pb.control(); meta->control.cmd = static_cast<Control::Command>(ctrl.cmd()); meta->control.barrier_group = ctrl.barrier_group(); meta->control.msg_sig = ctrl.msg_sig(); for (int i = 0; i < ctrl.node_size(); ++i) { const auto& p = ctrl.node(i); Node n; n.role = static_cast<Node::Role>(p.role()); n.port = p.port(); n.hostname = p.hostname(); n.id = p.has_id() ? p.id() : Node::kEmpty; n.is_recovery = p.is_recovery(); n.customer_id = p.customer_id(); meta->control.node.push_back(n); } } else { meta->control.cmd = Control::EMPTY; } }
PackMetaPB 從註釋看,是字節跳動提交的,主要用於 ibverbs_van.h,因此咱們不作深刻研究。
void Van::PackMetaPB(const Meta& meta, PBMeta* pb) { pb->set_head(meta.head); if (meta.app_id != Meta::kEmpty) pb->set_app_id(meta.app_id); if (meta.timestamp != Meta::kEmpty) pb->set_timestamp(meta.timestamp); if (meta.body.size()) pb->set_body(meta.body); pb->set_push(meta.push); pb->set_request(meta.request); pb->set_simple_app(meta.simple_app); pb->set_priority(meta.priority); pb->set_customer_id(meta.customer_id); for (auto d : meta.data_type) pb->add_data_type(d); if (!meta.control.empty()) { auto ctrl = pb->mutable_control(); ctrl->set_cmd(meta.control.cmd); if (meta.control.cmd == Control::BARRIER) { ctrl->set_barrier_group(meta.control.barrier_group); } else if (meta.control.cmd == Control::ACK) { ctrl->set_msg_sig(meta.control.msg_sig); } for (const auto& n : meta.control.node) { auto p = ctrl->add_node(); p->set_id(n.id); p->set_role(n.role); p->set_port(n.port); p->set_hostname(n.hostname); p->set_is_recovery(n.is_recovery); p->set_customer_id(n.customer_id); } } pb->set_data_size(meta.data_size); }
ZMQVan 有以下重要的派生函數。
Bind 邏輯以下:
int Bind(const Node& node, int max_retry) override { receiver_ = zmq_socket(context_, ZMQ_ROUTER); int local = GetEnv("DMLC_LOCAL", 0); std::string hostname = node.hostname.empty() ? "*" : node.hostname; int use_kubernetes = GetEnv("DMLC_USE_KUBERNETES", 0); if (use_kubernetes > 0 && node.role == Node::SCHEDULER) { hostname = "0.0.0.0"; } std::string addr = local ? "ipc:///tmp/" : "tcp://" + hostname + ":"; int port = node.port; unsigned seed = static_cast<unsigned>(time(NULL) + port); for (int i = 0; i < max_retry + 1; ++i) { auto address = addr + std::to_string(port); if (zmq_bind(receiver_, address.c_str()) == 0) break; if (i == max_retry) { port = -1; } else { port = 10000 + rand_r(&seed) % 40000; } } return port; }
主要就是初始化 Sender_,邏輯以下:
具體以下:
void Connect(const Node& node) override { int id = node.id; auto it = senders_.find(id); if (it != senders_.end()) { zmq_close(it->second); // 若是找到了對應socket就關閉socket } // worker doesn't need to connect to the other workers. same for server if ((node.role == my_node_.role) && (node.id != my_node_.id)) { return; } void *sender = zmq_socket(context_, ZMQ_DEALER); //創建一個socket //若是自己是scheduler,則一開始就是知道本身的id = 1,因此這個if條件就是說把本身的id綁定到socket上 if (my_node_.id != Node::kEmpty) { std::string my_id = "ps" + std::to_string(my_node_.id); zmq_setsockopt(sender, ZMQ_IDENTITY, my_id.data(), my_id.size()); const char* watermark = Environment::Get()->find("DMLC_PS_WATER_MARK"); if (watermark) { const int hwm = atoi(watermark); zmq_setsockopt(sender, ZMQ_SNDHWM, &hwm, sizeof(hwm)); } } // connect std::string addr = "tcp://" + node.hostname + ":" + std::to_string(node.port); if (GetEnv("DMLC_LOCAL", 0)) { addr = "ipc:///tmp/" + std::to_string(node.port); } if (zmq_connect(sender, addr.c_str()) != 0) { //將sender這個socket和目標地址鏈接 LOG(FATAL) << "connect to " + addr + " failed: " + zmq_strerror(errno); } senders_[id] = sender; //將目標id的socket存放起來後面使用 }
邏輯以下:
int SendMsg(const Message& msg) override { std::lock_guard<std::mutex> lk(mu_); // find the socket int id = msg.meta.recver; CHECK_NE(id, Meta::kEmpty); auto it = senders_.find(id); if (it == senders_.end()) { LOG(WARNING) << "there is no socket to node " << id; return -1; } void *socket = it->second; // send meta int meta_size; char* meta_buf; PackMeta(msg.meta, &meta_buf, &meta_size); int tag = ZMQ_SNDMORE; int n = msg.data.size(); if (n == 0) tag = 0; zmq_msg_t meta_msg; zmq_msg_init_data(&meta_msg, meta_buf, meta_size, FreeData, NULL); while (true) { if (zmq_msg_send(&meta_msg, socket, tag) == meta_size) break; if (errno == EINTR) continue; return -1; } // zmq_msg_close(&meta_msg); int send_bytes = meta_size; // send data for (int i = 0; i < n; ++i) { zmq_msg_t data_msg; SArray<char>* data = new SArray<char>(msg.data[i]); int data_size = data->size(); zmq_msg_init_data(&data_msg, data->data(), data->size(), FreeData, data); if (i == n - 1) tag = 0; while (true) { if (zmq_msg_send(&data_msg, socket, tag) == data_size) break; if (errno == EINTR) continue; return -1; } // zmq_msg_close(&data_msg); send_bytes += data_size; } return send_bytes; }
RecvMsg 就是在綁定的端口上接受消息。
接受消息時候,會判斷是第幾個消息,而後作不一樣的處理。
int RecvMsg(Message* msg) override { msg->data.clear(); size_t recv_bytes = 0; for (int i = 0; ; ++i) { zmq_msg_t* zmsg = new zmq_msg_t; CHECK(zmq_msg_init(zmsg) == 0) << zmq_strerror(errno); while (true) { if (zmq_msg_recv(zmsg, receiver_, 0) != -1) break; if (errno == EINTR) { std::cout << "interrupted"; continue; } return -1; } char* buf = CHECK_NOTNULL((char *)zmq_msg_data(zmsg)); size_t size = zmq_msg_size(zmsg); recv_bytes += size; if (i == 0) { // identify msg->meta.sender = GetNodeID(buf, size); msg->meta.recver = my_node_.id; CHECK(zmq_msg_more(zmsg)); zmq_msg_close(zmsg); delete zmsg; } else if (i == 1) { // task UnpackMeta(buf, size, &(msg->meta)); zmq_msg_close(zmsg); bool more = zmq_msg_more(zmsg); delete zmsg; if (!more) break; } else { // zero-copy SArray<char> data; data.reset(buf, size, [zmsg, size](char* buf) { zmq_msg_close(zmsg); delete zmsg; }); msg->data.push_back(data); if (!zmq_msg_more(zmsg)) { break; } } } return recv_bytes; }
GetNodeID 函數是
/** * return the node id given the received identity * \return -1 if not find */ int GetNodeID(const char* buf, size_t size) { if (size > 2 && buf[0] == 'p' && buf[1] == 's') { int id = 0; size_t i = 2; for (; i < size; ++i) { if (buf[i] >= '0' && buf[i] <= '9') { id = id * 10 + buf[i] - '0'; } else { break; } } if (i == size) return id; } return Meta::kEmpty; }
咱們最後進行一下總結:
郵局裏有了地址簿,就須要有貨車來負責拉送物件,Van 就是整個Parameter Server的通訊模塊,其特色以下。
★★★★★★關於生活和技術的思考★★★★★★
微信公衆帳號:羅西的思考
若是您想及時獲得我的撰寫文章的消息推送,或者想看看我的推薦的技術資料,敬請關注。