參數服務器是機器學習訓練一種範式,是爲了解決分佈式機器學習問題的一個編程框架,其主要包括服務器端,客戶端和調度器,與其餘範式相比,參數服務器把模型參數存儲和更新提高爲主要組件,而且使用多種方法提升了處理能力。node
本文是參數服務器系列第一篇,介紹ps-lite的整體設計和基礎模塊 Postoffice。
lpython
若是作一個類比,參數服務器是機器學習領域的分佈式內存數據庫,其做用是存儲模型和更新模型。c++
咱們來看看機器學習的幾個步驟,這些步驟不斷循環往復。git
若是使用參數服務器訓練,咱們能夠把如上步驟對應以下:github
具體以下圖:算法
FP/BP +--------+ Gather/Sum FP/BP +-------+ Gather/Sum +----------> | grad 1 +------+ +----------------------> |grad 2 +-----------+ | +--------+ | | +-------+ | +-----+----+ v +--------------+-------------------+ v | | +---+----------+ Update | | +------+-----+ Update +------------------+ | weight 1 | | total grad 1 +--------->+weight 2 = weight 1 - total grad 1| |total grad 2+--------> |weight 2 = ...... | | | +---+----------+ | | +------+-----+ +------------------+ +-----+----+ ^ +--------------+-------------------+ ^ | FP/BP +--------+ | | FP/BP +-------+ | +----------> | grad 2 +------+ +----------------------> |grad 2 +-----------+ +--------+ Gather/Sum +-------+ Gather/Sum
手機以下:shell
所以咱們能夠推導出參數服務器之中各個模塊的做用:數據庫
參數服務器屬於機器學習訓練的一個範式,具體能夠分爲三代(目前各大公司應該有本身內部最新實現,能夠算爲第四代)。編程
在參數服務器以前,大部分分佈式機器學習算法是經過按期同步來實現的,好比集合通訊的all-reduce,或者 map-reduce類系統的reduce步驟。可是按期同步有兩個問題:bash
所以,當async sgd出現以後,就有人提出了參數服務器。
參數服務器的概念最先來自於Alex Smola於2010年提出的並行LDA的框架。它經過採用一個分佈式的Memcached做爲存放共享參數的存儲,這樣就提供了有效的機制用於分佈式系統中不一樣的Worker之間同步模型參數,而每一個Worker只須要保存他計算時因此來的一小部分參數便可,也避免了全部進程在一個時間點上都停下來同步。可是獨立的kv對帶來了很大的通訊開銷,並且服務端端難以編程。
第二代由Google的Jeff Dean進一步提出了第一代Google大腦的解決方案:DistBelief。DistBelief將巨大的深度學習模型分佈存儲在全局的參數服務器中,計算節點經過參數服務器進行信息傳遞,很好地解決了SGD和L-BFGS算法的分佈式訓練問題。
再後來就是李沐所在的DMLC組所設計的參數服務器。根據論文中所寫,該parameter server屬於第三代參數服務器,就是提供了更加通用的設計。架構上包括一個Server Group和若干個Worker Group。
咱們首先用沐神論文中的圖來看看系統架構。
解釋一下圖中總體架構中每一個模塊:
在分佈式計算梯度時,系統的數據流以下:
圖中每一個步驟的做用爲:
上面兩個圖的依據是其原始代碼。ps-lite 是後來的精簡版代碼,因此有些功能在 ps-lite 之中沒有提供。
從網上找到了一些 ps-lite發展歷程,能夠看到其演進的思路。
第一代是parameter,針對特定算法(如邏輯迴歸和LDA)進行了設計和優化,以知足規模龐大的工業機器學習任務(數百億個示例和10-100TB數據大小的功能)。
後來嘗試爲機器學習算法構建一個開源通用框架。 該項目位於dmlc / parameter_server。
鑑於其餘項目的需求不斷增加,建立了ps-lite,它提供了一個乾淨的數據通訊API和一個輕量級的實現。 該實現基於dmlc / parameter_server,但爲不一樣的項目重構了做業啓動器,文件IO和機器學習算法代碼,如dmlc-core和wormhole
根據在開發dmlc / mxnet期間學到的經驗,從v1進一步重構了API和實現。 主要變化包括:
ps-lite 實際上是Paramter Server的實現的一個框架,其中參數處理具體相關策略需用戶本身實現。
Parameter Server包含三種角色:Worker,Server,Scheduler。具體關係以下圖:
具體角色功能爲:
其中引入scheduler的好處以下:
熟悉分佈式系統的同窗可能會擔憂 scheduler 模塊的單點問題,這個經過 raft、zab 等 paxos 協議能夠獲得比較好的解決。
ps-lite系統中的一些基礎模塊以下:
Environment:一個單例模式的環境變量類,它經過一個 std::unordered_map<std::string, std::string> kvs
維護了一組 kvs 藉以保存全部環境變量名以及值;
PostOffice:一個單例模式的全局管理類,一個 node 在生命期內具備一個PostOffice,依賴它的類成員對Node進行管理;
Van:通訊模塊,負責與其餘節點的網絡通訊和Message的實際收發工做。PostOffice持有一個Van成員;
SimpleApp:KVServer和KVWorker的父類,它提供了簡單的Request, Wait, Response,Process功能;KVServer和KVWorker分別根據本身的使命重寫了這些功能;
Customer:每一個SimpleApp對象持有一個Customer類的成員,且Customer須要在PostOffice進行註冊,該類主要負責:
Node :信息類,存儲了本節點的對應信息,每一個 Node 可使用 hostname + port 來惟一標識。
從源碼中的例子能夠看出,使用ps-lite 提供的腳本 local.sh 能夠啓動整個系統,這裏 test_connection 爲編譯好的可執行程序。
./local.sh 2 3 ./test_connection
具體 local.sh 代碼以下。注意,在shell腳本中,有三個shift,這就讓腳本中始終使用$1。
針對咱們的例子,腳本參數對應了就是
能夠從腳本中看到,本腳本作了兩件事:
具體以下:
#!/bin/bash # set -x if [ $# -lt 3 ]; then echo "usage: $0 num_servers num_workers bin [args..]" exit -1; fi # 對環境變量進行各類配置,此後不一樣節點都會從這些環境變量中獲取信息 export DMLC_NUM_SERVER=$1 shift export DMLC_NUM_WORKER=$1 shift bin=$1 shift arg="$@" # start the scheduler export DMLC_PS_ROOT_URI='127.0.0.1' export DMLC_PS_ROOT_PORT=8000 export DMLC_ROLE='scheduler' ${bin} ${arg} & # start servers export DMLC_ROLE='server' for ((i=0; i<${DMLC_NUM_SERVER}; ++i)); do export HEAPPROFILE=./S${i} ${bin} ${arg} & done # start workers export DMLC_ROLE='worker' for ((i=0; i<${DMLC_NUM_WORKER}; ++i)); do export HEAPPROFILE=./W${i} ${bin} ${arg} & done wait
咱們依然使用官方例子看看。
ps-lite 使用的是 C++語言,其中 worker, server, scheduler 都使用同一套代碼。這會讓習慣於Java,python的同窗很是不適應,你們須要適應一個階段。
針對這個示例程序,起初會讓人疑惑,爲何每次程序運行,代碼中都會啓動 scheduler,worker,server?其實,從下面註釋就能看出來,具體執行是依據環境變量來決定。若是環境變量設置了本次角色是 server,則不會啓動 scheduler 和 worker。
#include <cmath> #include "ps/ps.h" using namespace ps; void StartServer() { if (!IsServer()) { return; } auto server = new KVServer<float>(0); server->set_request_handle(KVServerDefaultHandle<float>()); //註冊functor RegisterExitCallback([server](){ delete server; }); } void RunWorker() { if (!IsWorker()) return; KVWorker<float> kv(0, 0); // init int num = 10000; std::vector<Key> keys(num); std::vector<float> vals(num); int rank = MyRank(); srand(rank + 7); for (int i = 0; i < num; ++i) { keys[i] = kMaxKey / num * i + rank; vals[i] = (rand() % 1000); } // push int repeat = 50; std::vector<int> ts; for (int i = 0; i < repeat; ++i) { ts.push_back(kv.Push(keys, vals)); //kv.Push()返回的是該請求的timestamp // to avoid too frequency push, which leads huge memory usage if (i > 10) kv.Wait(ts[ts.size()-10]); } for (int t : ts) kv.Wait(t); // pull std::vector<float> rets; kv.Wait(kv.Pull(keys, &rets)); // pushpull std::vector<float> outs; for (int i = 0; i < repeat; ++i) { // PushPull on the same keys should be called serially kv.Wait(kv.PushPull(keys, vals, &outs)); } float res = 0; float res2 = 0; for (int i = 0; i < num; ++i) { res += std::fabs(rets[i] - vals[i] * repeat); res2 += std::fabs(outs[i] - vals[i] * 2 * repeat); } CHECK_LT(res / repeat, 1e-5); CHECK_LT(res2 / (2 * repeat), 1e-5); LL << "error: " << res / repeat << ", " << res2 / (2 * repeat); } int main(int argc, char *argv[]) { // start system Start(0); // Postoffice::start(),每一個node都會調用到這裏,可是在 Start 函數之中,會依據本次設定的角色來不一樣處理,只有角色爲 scheduler 纔會啓動 Scheduler。 // setup server nodes StartServer(); // Server會在其中作有效執行,其餘節點不會有效執行。 // run worker nodes RunWorker(); // Worker 會在其中作有效執行,其餘節點不會有效執行。 // stop system Finalize(0, true); //結束。每一個節點都須要執行這個函數。 return 0; }
其中KVServerDefaultHandle是functor,用與處理server收到的來自worker的請求,具體以下:
/** * \brief an example handle adding pushed kv into store */ template <typename Val> struct KVServerDefaultHandle { //functor,用與處理server收到的來自worker的請求 // req_meta 是存儲該請求的一些元信息,好比請求來自於哪一個節點,發送給哪一個節點等等 // req_data 是發送過來的數據 // server 是指向當前server對象的指針 void operator()( const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer<Val>* server) { size_t n = req_data.keys.size(); KVPairs<Val> res; if (!req_meta.pull) { //收到的是pull請求 CHECK_EQ(n, req_data.vals.size()); } else { //收到的是push請求 res.keys = req_data.keys; res.vals.resize(n); } for (size_t i = 0; i < n; ++i) { Key key = req_data.keys[i]; if (req_meta.push) { //push請求 store[key] += req_data.vals[i]; //此處的操做是將相同key的value相加 } if (req_meta.pull) { //pull請求 res.vals[i] = store[key]; } } server->Response(req_meta, res); } std::unordered_map<Key, Val> store; };
Postoffice 是一個單例模式的全局管理類,其維護了系統的一個全局信息,具備以下特色:
請注意:這些代碼都是在 Postoffice 類內,沒有按照角色分開成多個模塊。
類 UML 圖以下:
下面咱們只給出關鍵變量和成員函數說明,由於每一個節點都包含一個 PostOffice,因此 PostOffice 的數據結構中包括了各類節點所須要的變量,會顯得比較繁雜。
主要變量做用以下:
主要函數做用以下:
具體以下:
class Postoffice { /** * \brief start the system * * This function will block until every nodes are started. * \param argv0 the program name, used for logging. * \param do_barrier whether to block until every nodes are started. */ void Start(int customer_id, const char* argv0, const bool do_barrier); /** * \brief terminate the system * * All nodes should call this function before existing. * \param do_barrier whether to do block until every node is finalized, default true. */ void Finalize(const int customer_id, const bool do_barrier = true); /** * \brief barrier * \param node_id the barrier group id */ void Barrier(int customer_id, int node_group); /** * \brief process a control message, called by van * \param the received message */ void Manage(const Message& recv); /** * \brief update the heartbeat record map * \param node_id the \ref Node id * \param t the last received heartbeat time */ void UpdateHeartbeat(int node_id, time_t t) { std::lock_guard<std::mutex> lk(heartbeat_mu_); heartbeats_[node_id] = t; } /** * \brief get node ids that haven't reported heartbeats for over t seconds * \param t timeout in sec */ std::vector<int> GetDeadNodes(int t = 60); private: void InitEnvironment(); Van* van_; mutable std::mutex mu_; // app_id -> (customer_id -> customer pointer) std::unordered_map<int, std::unordered_map<int, Customer*>> customers_; std::unordered_map<int, std::vector<int>> node_ids_; std::mutex server_key_ranges_mu_; std::vector<Range> server_key_ranges_; bool is_worker_, is_server_, is_scheduler_; int num_servers_, num_workers_; std::unordered_map<int, std::unordered_map<int, bool> > barrier_done_; int verbose_; std::mutex barrier_mu_; std::condition_variable barrier_cond_; std::mutex heartbeat_mu_; std::mutex start_mu_; int init_stage_ = 0; std::unordered_map<int, time_t> heartbeats_; Callback exit_callback_; /** \brief Holding a shared_ptr to prevent it from being destructed too early */ std::shared_ptr<Environment> env_ref_; time_t start_time_; DISALLOW_COPY_AND_ASSIGN(Postoffice); };
首先咱們介紹下 node id 映射功能,就是如何在邏輯節點和物理節點之間作映射,如何把物理節點劃分紅各個邏輯組,如何用簡便的方法作到給組內物理節點統一發消息。
三個邏輯組的定義以下:
/** \brief node ID for the scheduler */ static const int kScheduler = 1; /** * \brief the server node group ID * * group id can be combined: * - kServerGroup + kScheduler means all server nodes and the scheuduler * - kServerGroup + kWorkerGroup means all server and worker nodes */ static const int kServerGroup = 2; /** \brief the worker node group ID */ static const int kWorkerGroup = 4;
node id 是物理節點的惟一標示,rank 是每個邏輯概念(scheduler,work,server)內部的惟一標示。這兩個標示由一個算法來肯定。
以下面代碼所示,若是配置了 3 個worker,則 worker 的 rank 從 0 ~ 2,那麼這幾個 worker 實際對應的 物理 node ID 就會使用 WorkerRankToID 來計算出來。
for (int i = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); } }
具體計算規則以下:
/** * \brief convert from a worker rank into a node id * \param rank the worker rank */ static inline int WorkerRankToID(int rank) { return rank * 2 + 9; } /** * \brief convert from a server rank into a node id * \param rank the server rank */ static inline int ServerRankToID(int rank) { return rank * 2 + 8; } /** * \brief convert from a node id into a server or worker rank * \param id the node id */ static inline int IDtoRank(int id) { #ifdef _MSC_VER #undef max #endif return std::max((id - 8) / 2, 0); }
這樣咱們能夠知道,1-7 的id表示的是node group,單個節點的id 就從 8 開始。
並且這個算法保證server id爲偶數,node id爲奇數。
由於有時請求要發送給多個節點,因此ps-lite用了一個 map 來存儲每一個 node group / single node 對應的實際的node節點集合,即 肯定每一個id值對應的節點id集。
std::unordered_map<int, std::vector<int>> node_ids_
如何使用這個node_ids_?咱們仍是須要看以前的代碼:
for (int i = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); } }
咱們回憶一下以前的節點信息:
因此,爲了實現 「設置 1-7 內任意一個數字 能夠發送給其對應的 全部node」 這個功能,對於每個新節點,須要將其對應多個id(node,node group)上,這些id組就是本節點能夠與之通信的節點。例如對於 worker 2 來講,其 node id 是 2 * 2 + 8 = 12,因此須要將它與
這 5 個id 相對應,即須要在 node_ids_ 這個映射表中對應的 4, 4 + 1, 4 + 2, 4 +1 + 2, 12 這五個 item 之中添加。就是上面代碼中的內部 for 循環條件。即,node_ids_ [4], node_ids_ [5],node_ids_ [6],node_ids_ [7] ,node_ids_ [12] 之中,都須要把 12 添加到 vector 最後。
workers 跟 servers 之間經過 push 跟 pull 來通訊。worker 經過 push 將計算好的梯度發送到server,而後經過 pull 從server更新參數。
parameter server 中,參數都是能夠被表示成(key, value)的集合,好比一個最小化損失函數的問題,key就是feature ID,而value就是它的權值。對於稀疏參數來講,value不存在的key,就能夠認爲value是0。
把參數表示成 k-v, 形式更天然,易於理解和編程實現。
分佈式算法有兩個額外成本:數據通訊成本,負載均衡不理想和機器性能差別致使的同步成本。
對於高維機器學習訓練來講,由於高頻特徵更新極爲頻繁,所會致使網絡壓力極大。若是每個參數都設一個key而且按key更新,那麼會使得通訊變得更加頻繁低效,爲了抹平這個問題,就須要有折衷和平衡,即,
利用機器學習算法的特性,給每一個key對應的value賦予一個向量或者矩陣,這樣就能夠一次性傳遞多個參數,權衡了融合與同步的成本。
作這樣的操做的前提是假設參數是有順序的。缺點是在對於稀疏模型來講,總會在向量或者矩陣裏會有參數爲0,這在單個參數狀態下是不用存的,因此,形成了數據的冗餘。
但這樣作有兩點好處:
爲了提升計算性能和帶寬效率,參數服務器也會採用批次更新的辦法,來減輕高頻 key 的壓力。好比把minibatch之中高頻key合併成一個minibatch進行更新。
ps-lite 容許用戶使用 Range Push 跟 Range Pull 操做。
路由功能指的就是:Worker 在作 Push/Pull 時候,如何知道把消息發送給哪些 Servers。
咱們知道,ps-lite 是多 Server 架構,一個很重要的問題是如何分佈多個參數。好比給定一個參數的鍵,如何肯定其存儲在哪一臺 Server 上。因此必然有一個路由邏輯用來確立 key與server的對應關係。
PS Lite 將路由邏輯放置在 Worker 端,採用範圍劃分的策略,即每個 Server 有本身固定負責的鍵的範圍。這個範圍是在 Worker 啓動的時候肯定的。細節以下:
[MAX/N*i, MAX/N*(i+1))
。須要注意的是,在不能恰好整除的狀況下,鍵域上界的一小段被丟棄了。
具體實現以下:
首先,ps-lite的key只支持int類型。
#if USE_KEY32 /*! \brief Use unsigned 32-bit int as the key type */ using Key = uint32_t; #else /*! \brief Use unsigned 64-bit int as the key type */ using Key = uint64_t; #endif /*! \brief The maximal allowed key value */ static const Key kMaxKey = std::numeric_limits<Key>::max();
其次,將int範圍均分便可
const std::vector<Range>& Postoffice::GetServerKeyRanges() { if (server_key_ranges_.empty()) { for (int i = 0; i < num_servers_; ++i) { server_key_ranges_.push_back(Range( kMaxKey / num_servers_ * i, kMaxKey / num_servers_ * (i+1))); } } return server_key_ranges_; }
從以前分析中咱們能夠知道,ps-lite 是經過環境變量來控制具體節點。
具體某個節點屬於哪種取決於啓動節點以前設置了哪些環境變量以及其數值。
環境變量包括:節點角色,worker&server個數、ip、port等。
InitEnvironment 函數就是建立了 Van,獲得了 worker 和 server 的數量,獲得了本節點的類型。
void Postoffice::InitEnvironment() { const char* val = NULL; std::string van_type = GetEnv("DMLC_PS_VAN_TYPE", "zmq"); van_ = Van::Create(van_type); val = CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_WORKER")); num_workers_ = atoi(val); val = CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_SERVER")); num_servers_ = atoi(val); val = CHECK_NOTNULL(Environment::Get()->find("DMLC_ROLE")); std::string role(val); is_worker_ = role == "worker"; is_server_ = role == "server"; is_scheduler_ = role == "scheduler"; verbose_ = GetEnv("PS_VERBOSE", 0); }
主要就是:
具體代碼以下:
void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) { start_mu_.lock(); if (init_stage_ == 0) { InitEnvironment(); // init node info. // 對於全部的worker,進行node設置 for (int i = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); } } // 對於全部的server,進行node設置 for (int i = 0; i < num_servers_; ++i) { int id = ServerRankToID(i); for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup, kServerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); } } // 設置scheduler的node for (int g : {kScheduler, kScheduler + kServerGroup + kWorkerGroup, kScheduler + kWorkerGroup, kScheduler + kServerGroup}) { node_ids_[g].push_back(kScheduler); } init_stage_++; } start_mu_.unlock(); // start van van_->Start(customer_id); start_mu_.lock(); if (init_stage_ == 1) { // record start time start_time_ = time(NULL); init_stage_++; } start_mu_.unlock(); // do a barrier here if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler); }
總的來說,schedular節點經過計數的方式實現各個節點的同步。具體來講就是:
ps-lite 使用 Barrier 來控制系統的初始化,就是你們都準備好了再一塊兒前進。這是一個可選項。具體以下:
Node會調用 Barrier 函數 告知Scheduler,隨即本身進入等待狀態。
注意,調用時候是
if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler);
這就是說,等待全部的 group,即 scheduler 節點也要給本身發送消息。
void Postoffice::Barrier(int customer_id, int node_group) { if (GetNodeIDs(node_group).size() <= 1) return; auto role = van_->my_node().role; if (role == Node::SCHEDULER) { CHECK(node_group & kScheduler); } else if (role == Node::WORKER) { CHECK(node_group & kWorkerGroup); } else if (role == Node::SERVER) { CHECK(node_group & kServerGroup); } std::unique_lock<std::mutex> ulk(barrier_mu_); barrier_done_[0][customer_id] = false; Message req; req.meta.recver = kScheduler; req.meta.request = true; req.meta.control.cmd = Control::BARRIER; req.meta.app_id = 0; req.meta.customer_id = customer_id; req.meta.control.barrier_group = node_group; // 記錄了等待哪些 req.meta.timestamp = van_->GetTimestamp(); van_->Send(req); // 給 scheduler 發給 BARRIER barrier_cond_.wait(ulk, [this, customer_id] { // 而後等待 return barrier_done_[0][customer_id]; }); }
處理等待的動做在 Van 類之中,咱們提早放出來。
具體ProcessBarrierCommand邏輯以下:
request==false
的BARRIER
消息。barrier_done_
置爲true,而後通知全部等待條件變量barrier_cond_.notify_all()
。void Van::ProcessBarrierCommand(Message* msg) { auto& ctrl = msg->meta.control; if (msg->meta.request) { // scheduler收到了消息,由於 Postoffice::Barrier函數 會在發送時候作設置爲true。 if (barrier_count_.empty()) { barrier_count_.resize(8, 0); } int group = ctrl.barrier_group; ++barrier_count_[group]; // Scheduler會對Barrier請求進行計數 if (barrier_count_[group] == static_cast<int>(Postoffice::Get()->GetNodeIDs(group).size())) { // 若是相等,說明已經收到了最後一個請求,因此發送解除 barrier 消息。 barrier_count_[group] = 0; Message res; res.meta.request = false; // 回覆時候,這裏就是false res.meta.app_id = msg->meta.app_id; res.meta.customer_id = msg->meta.customer_id; res.meta.control.cmd = Control::BARRIER; for (int r : Postoffice::Get()->GetNodeIDs(group)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { res.meta.recver = recver_id; res.meta.timestamp = timestamp_++; Send(res); } } } } else { // 說明這裏收到了 barrier respones,能夠解除 barrier了。具體見上面的設置爲false處。 Postoffice::Get()->Manage(*msg); } }
Manage 函數就是解除了 barrier。
void Postoffice::Manage(const Message& recv) { CHECK(!recv.meta.control.empty()); const auto& ctrl = recv.meta.control; if (ctrl.cmd == Control::BARRIER && !recv.meta.request) { barrier_mu_.lock(); auto size = barrier_done_[recv.meta.app_id].size(); for (size_t customer_id = 0; customer_id < size; customer_id++) { barrier_done_[recv.meta.app_id][customer_id] = true; } barrier_mu_.unlock(); barrier_cond_.notify_all(); // 這裏解除了barrier } }
具體示意以下:
+ Scheduler | Worker + | + | | | | | | +--------------------------------+ | +-----------------+ | | | | | | | | | | | | | | | | v | | v | receiver_thread_ | | receiver_thread_ | + | | | | | | | | v BARRIER | | BARRIER v | Postoffice::Barrier +-----------------> | <---------------------+ Postoffice::Barrier | + | | + | | | | | | | | | | | | | | | | | v | | | v | v | barrier_cond_.wait ProcessBarrierCommand | barrier_cond_.wait | | + | | | | | | | | | All Nodes OK | | | | | | | | | | +--------------+ | BARRIER | | | | +----------------------------------------------> | | | BARRIER | | | | | +------------> | | | | | | | | | | | | | | +<-------------------------------< | | <---------------+ | barrier_cond_.notify_all | | barrier_cond_.notify_all v | v +
手機以下:
至此,Postoffice的分析咱們初步完成,其他功能咱們將會結合 Van 和 Customer 在後續文章中分析。
★★★★★★關於生活和技術的思考★★★★★★
微信公衆帳號:羅西的思考
若是您想及時獲得我的撰寫文章的消息推送,或者想看看我的推薦的技術資料,敬請關注。
基於Parameter Server的可擴展分佈式機器學習架構
Talk - Scaling Distributed Machine Learning with System and Algorithm Co-design 筆記