Paracel是豆瓣開發的一個分佈式計算框架,它基於參數服務器範式來解決機器學習的問題:邏輯迴歸、SVD、矩陣分解(BFGS,sgd,als,cg),LDA,Lasso...。html
Paracel支持數據和模型的並行,爲用戶提供簡單易用的通訊接口,比mapreduce式的系統要更加靈活。Paracel同時支持異步的訓練模式,使迭代問題收斂地更快。此外,Paracel程序的結構與串行程序十分類似,用戶能夠更加專一於算法自己,不需將精力過多放在分佈式邏輯上。python
由於 ps-lite 沒有對 SSP 進行深刻,而 Paracel 對 SSP的實現比較深刻,因此咱們本文就看看SSP如何實現。c++
解析時候會刪除部分非主體代碼。算法
[源碼解析] 機器學習參數服務器ps-lite 之(1) ----- PostOffice緩存
[源碼解析] 機器學習參數服務器ps-lite(2) ----- 通訊模塊Van服務器
[源碼解析] 機器學習參數服務器ps-lite 之(3) ----- 代理人Customer網絡
[源碼解析]機器學習參數服務器ps-lite(4) ----- 應用節點實現架構
[源碼解析] 機器學習參數服務器 Paracel (1)-----整體架構框架
不一樣的worker同時並行運算的時候,可能由於網絡、機器配置等外界緣由,致使不一樣的worker的進度是不同的,如何控制worker的同步機制是一個比較重要的課題。less
許多機器學習問題能夠轉化爲迭代任務。對於迭代控制,通常來講,有三個級別的異步控制協議:BSP(Bulk Synchronous Parallel),SSP(Stalness Synchronous Parallel)和ASP(Asynchronous Parallel),它們的同步限制依次放寬。爲了追求更快的計算速度,算法能夠選擇更寬鬆的同步協議。
爲了更好的說明以及行文完整,咱們把ps-lite之中介紹過的段落再次拿出來。
這三個協議具體以下:
ASP:task之間徹底不用相互等待,徹底不顧worker之間的順序,每一個worker按照本身的節奏走,跑完一個迭代就update,先完成的task,繼續下一輪的訓練。
優勢:消除了等待慢task的時間,減小了GPU的空閒時間,所以與BSP相比提升了硬件效率。計算速度快,最大限度利用了集羣的計算能力,全部的worker所在的機器都不用等待
缺點:
BSP:是通常分佈式計算採用的同步協議,每一輪迭代中都須要等待全部的task計算完成。每一個worker都必須在同一個迭代運行,只有一個迭代任務全部的worker都完成了,纔會進行一次worker和server之間的同步和分片更新。
BSP的模式和單機串行由於僅僅是batch size的區別,因此在模型收斂性上是徹底同樣的。同時,由於每一個worker在一個週期內是能夠並行計算的,因此有了必定的並行能力。spark用的就是這種方式。
優勢:適用範圍廣;每一輪迭代收斂質量高
缺點:每一輪迭代中,,BSP要求每一個worker等待或暫停來自其餘worker的梯度,這樣就須要等待最慢的task,從而顯著下降了硬件效率,致使總體任務計算時間長。整個worker group的性能由其中最慢的worker決定;這個worker通常稱爲straggler。
SSP:容許必定程度的task進度不一致,但這個不一致有一個上限,稱爲staleness值,即最快的task最多領先最慢的task staleness輪迭代。
就是把將ASP和BSP作一下折中。既然ASP是容許不一樣worker之間的迭代次數間隔任意大,而BSP則只容許爲0,那我就取一個常數s。有了SSP,BSP就能夠經過指定s=0而獲得。而ASP一樣能夠經過制定s=∞來達到。
優勢:必定程度減小了task之間的等待時間,計算速度較快。
缺點:每一輪迭代的收斂質量不如BSP,達到一樣的收斂效果可能須要更多輪的迭代,適用性也不如BSP,部分算法不適用。
傳統的方法是使用BSP來完成迭代,這意味着咱們必須在每一個迭代器的末尾進行同步。這致使了straggler問題:因爲一些軟硬件的緣由,節點的計算能力每每不盡相同。對於迭代問題來講,每一輪結束時算得快的節點都需等待算得慢的節點算完,再進行下一輪迭代。這種等待在節點數增多時將變得尤其明顯,從而拖慢總體的性能。
有兩種方法能夠解決這個問題:
Paracel使用第二種方法,放寬了同步條件,即放寬了「每一個迭代步都等待」這個約束:
假設最快的worker與最慢的worker之間的同步不超過一個有界參數,這是每次迭代的收斂性和總收斂時間之間的折衷。當在一輪迭代結束時,算得快的節點能夠繼續下一輪迭代,但不能比最慢的節點領先參數s個迭代步。當領先超過s個迭代步,Paracel纔會強制進行等待。
這樣異步的控制方式既從總體上省去了等待時間,也能間接地幫助慢的節點遇上。從優化問題的角度來看,雖然單迭代步收斂得慢了,然而每一個迭代步的時間開銷變少了,整體上收斂也就變快了。
這種作法就是Staleness Synchronous Parallel (SSP),基本思想是容許各機器以不一樣步調對模型進行更新,可是加一個限制,使得最快的機器的進度和最慢機器的進度之差不要太大。這樣作的好處是:既減輕慢的機器拖整個系統的後腿,又能保證模型的最終收斂。
咱們首先回憶一下前文總結的架構。
ssp_switch 用來控制是否使用 ssp。
咱們以 include/ps.hpp 的 paracel_read 爲例。
若是啓用了 ssp,則:
(stale_cache + limit_s < clock)
,則 while 循環等待。
pull_int(paracel::str_type("server_clock")
來增長 server的時鐘。回憶一下前面講的 SSP 核心思想(容許必定程度的task進度不一致,但這個不一致有一個上限,稱爲staleness值,即最快的task最多領先最慢的task staleness輪迭代)。其中緩存定義:
paracel::dict_type<paracel::str_type, boost::any> cached_para;
具體代碼以下:
template <class V> bool paracel_read(const paracel::str_type & key, V & val, int replica_id = -1) { if(ssp_switch) { if(clock == 0 || clock == total_iters) { // check total_iters for last // 說明是ssp啓動或者時間間隔(迭代次數)到了,這時候須要從新獲取對應數值,更新cache。 cached_para[key] = boost::any_cast<V>(ps_obj-> kvm[ps_obj->p_ring->get_server(key)]. pull<V>(key)); val = boost::any_cast<V>(cached_para[key]); } else if(stale_cache + limit_s > clock) { // cache hit 若是命中緩存,則直接返回 val = boost::any_cast<V>(cached_para[key]); } else { // cache miss // 若是Miss,若是當前時鐘已經大於某個數值 ,則 while 循環等待 // pull from server until leading slowest less than s clocks while(stale_cache + limit_s < clock) { // 時間同步 stale_cache = ps_obj-> kvm[clock_server].pull_int(paracel::str_type("server_clock")); } // 獲取key對應權重的最新數值 cached_para[key] = boost::any_cast<V>(ps_obj-> kvm[ps_obj->p_ring->get_server(key)]. pull<V>(key)); val = boost::any_cast<V>(cached_para[key]); } return true; } return ps_obj->kvm[ps_obj->p_ring->get_server(key)].pull(key, val); }
kvclt 之中有pull_int方法,就是與Clock server交互,進行時間同步:
int pull_int(const paracel::str_type & key) { if(p_ssp_sock == nullptr) { p_ssp_sock.reset(create_req_sock(ports_lst[4])); } auto scrip = paste(paracel::str_type("pull_int"), key); int val = -1; bool r = req_send_recv(*p_ssp_sock, scrip, val); if(!r) ERROR_ABORT("key: pull_int does not exist"); return val; }
在 include/server.hpp之中,thrd_exec_ssp 是專門處理ssp的線程。
其用到的ssp_tbl 在 include/kv_def.hpp 之中。
namespace paracel { paracel::kvs<paracel::str_type, int> ssp_tbl; // 這裏是ssp專用KV存儲 paracel::kvs<paracel::str_type, paracel::str_type> tbl_store; }
以 pull_int 這個命令爲例,就是從服務器拉取 「ssp專用KV存儲」 對應的數據。
thrd_exec_ssp 具體代碼以下:
// thread entry for ssp void thrd_exec_ssp(zmq::socket_t & sock) { paracel::packer<> pk; paracel::ssp_tbl.set("server_clock", 0); while(1) { zmq::message_t s; sock.recv(&s); auto scrip = paracel::str_type(static_cast<const char *>(s.data()), s.size()); auto msg = paracel::str_split_by_word(scrip, paracel::seperator); auto indicator = pk.unpack(msg[0]); //std::cout << indicator << std::endl; if(indicator == "push_int") { // 推送數據 auto key = pk.unpack(msg[1]); paracel::packer<int> pk_i; auto val = pk_i.unpack(msg[2]); paracel::ssp_tbl.set(key, val); bool result = true; rep_pack_send(sock, result); } if(indicator == "incr_int") { // 更改數據 auto key = pk.unpack(msg[1]); if(paracel::startswith(key, "client_clock_")) { if(paracel::ssp_tbl.get(key)) { paracel::ssp_tbl.incr(key, 1); } else { paracel::ssp_tbl.set(key, 1); } if(paracel::ssp_tbl.get(key) >= paracel::ssp_tbl.get("worker_sz")) { paracel::ssp_tbl.incr("server_clock", 1); paracel::ssp_tbl.set(key, 0); } } paracel::packer<int> pk_i; int delta = pk_i.unpack(msg[2]); paracel::ssp_tbl.incr(key, delta); bool result = true; rep_pack_send(sock, result); } if(indicator == "pull_int") { // 拉取數據 auto key = pk.unpack(msg[1]); int result = 0; auto exist = paracel::ssp_tbl.get(key, result); // 獲取對應的key if(!exist) { paracel::str_type tmp = "nokey"; rep_send(sock, tmp); } rep_pack_send(sock, result); } } // while }
邏輯以下(注意,由於篇幅所限,這裏省略了上圖部分變量,加入了新的變量與邏輯):
+------------------+ worker + server | paralg | | | | | | | | | parasrv *ps_obj | | | + | | +------------------+ | | | | | start_server | +------------------+ | | | | | | | | | | | v | | | +------------+-----+ +------------------+ +---------+ | | | | parasrv | |kvclt | | kvclt | | | | | | | | | | | | thrd_exec | | | | host | | | | | | | servers | | | | | | | ssp_tbl | | | | ports_lst | | | | | | | kvm +-----------> | |.....| | | | tbl_store | | | | context | | | | | | | p_ring | | | | | | | thrd_exec_ssp | | + | | conn_prefix | | | | | | | | | | | | | | | ^ | +------------------+ | p_ssp_sock | | | | | | | | | + | | | | | | | | | | | | | | | | | | | | | | | | | | | v | | | | | | | | | +------------+------+ +------------------+ +---------+ | | | | | ring | | | +------------------+ | | | | | | | | | | | srv_hashring | | | | | | | | | | srv_hashring_dct | +------------------------------------+ | | | +-------------------+ +
手機以下:
用戶只需添加幾行代碼便可將BSP進程轉換爲異步進程。好比一個很是簡單的示例。
主要就是使用iter_commit() 在每次迭代結束以後,把本地更新結果提交到參數服務器。
class logistic_regression: public paracel::paralg { public: logistic_regression(paracel::Comm comm, std::string hosts_dct_str, std::string _output, int _rounds, int _limit_s, bool _ssp_switch) : paracel::paralg(hosts_dct_str, comm, _output, _rounds, _limit_s, _ssp_switch) {} void training() { theta = paracel::random_double_list(data_dim); paracel_write("theta", theta); // init push for(int iter = 0; iter < rounds; ++iter) { for(int i = 0; i < data_dim; ++i) { delta[i] = 0.; } random_shuffle(idx.begin(), idx.end()); // pull theta theta = paracel_read<vector<double> >("theta"); for(auto sample_id : idx) { for(int i = 0; i < data_dim; ++i) { delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i]; } } // traverse // update theta with delta paracel_bupdate("theta", delta, "update.so", "lg_theta_update"); // commit to server at the end of each iteration iter_commit(); // 這裏是添加的,在每次迭代結束以後,把本地更新結果提交到參數服務器 } // last pull theta = paracel_read<vector<double> >("theta"); } void solve() { // init training data auto parser = [](const std::vector<std::string>) { /* ... */ }; auto lines = paracel_load(input); parser(lines); paracel_sync(); // set total iterations of your training process set_total_iters(rounds); // training training(); } }; // class logistic regression
前面每一個部分咱們其實都講解得不透徹,須要在此串聯起來。
咱們假設有5個worker,limit_s 是 3,即最快的節點不能比最慢的節點領先參數 3 個迭代步。當領先超過 3 個迭代步,Paracel會強制進行等待。
在 paralg 構建函數中,會對各類數據進行初始化,這裏重要的是服務器端 key "worker_sz" 對應的數值被設置爲 worker_comm.get_size() ,就是worker 數值 5。
"worker_sz" 的意義是:目前應該有多少個worker一塊兒訓練。
paralg(paracel::str_type hosts_dct_str, paracel::Comm comm, paracel::str_type _output = "", int _rounds = 1, int _limit_s = 0, bool _ssp_switch = false) : worker_comm(comm), output(_output), nworker(comm.get_size()), rounds(_rounds), limit_s(_limit_s), ssp_switch(_ssp_switch) { ps_obj = new parasrv(hosts_dct_str); init_output(_output); clock = 0; stale_cache = 0; clock_server = 0; total_iters = rounds; if(worker_comm.get_rank() == 0) { paracel::str_type key = "worker_sz"; (ps_obj->kvm[clock_server]). push_int(key, worker_comm.get_size()); // 設置爲 5 } paracel_sync(); }
在 iter_commit 之中,邏輯以下。
// put where you want to control iter with ssp void iter_commit() { paracel::str_type clock_key; if(limit_s == 0) { clock_key = "client_clock_0"; } else { clock_key = "client_clock_" + std::to_string(clock % limit_s); } ps_obj->kvm[clock_server].incr_int(paracel::str_type(clock_key), 1); // value 1 is not important clock += 1; if(clock == total_iters) { // 若是已經達到了整體迭代數值,就減小服務器 "worker_sz" 數值 ps_obj->kvm[clock_server].incr_int(paracel::str_type("worker_sz"), -1); } }
kvclt 之中有以下代碼,其實就是給服務器轉發請求,因此咱們能夠略過:
bool incr_int(const paracel::str_type & key, int delta) { if(p_ssp_sock == nullptr) { p_ssp_sock.reset(create_req_sock(ports_lst[4])); } auto scrip = paste(paracel::str_type("incr_int"), key, delta); bool stat; auto r = req_send_recv(*p_ssp_sock, scrip, stat); return r && stat; } int pull_int(const paracel::str_type & key) { if(p_ssp_sock == nullptr) { p_ssp_sock.reset(create_req_sock(ports_lst[4])); } auto scrip = paste(paracel::str_type("pull_int"), key); int val = -1; bool r = req_send_recv(*p_ssp_sock, scrip, val); assert(val != -1); assert(r); if(!r) ERROR_ABORT("key: pull_int does not exist"); return val; }
服務器收到了kvclt 轉發的請求,處理舉例以下:
在 thread_exec_ssp 中,incr_int 部分代碼以下:
if(indicator == "incr_int") { auto key = pk.unpack(msg[1]); if(paracel::startswith(key, "client_clock_")) { if(paracel::ssp_tbl.get(key)) { paracel::ssp_tbl.incr(key, 1); // 把對應的key增長對應的數值 } else { paracel::ssp_tbl.set(key, 1 // 添加這個數值 } if(paracel::ssp_tbl.get(key) >= paracel::ssp_tbl.get("worker_sz")) //全部worker 都完成了一輪迭代 paracel::ssp_tbl.incr("server_clock", 1); //服務器迭代增長1 paracel::ssp_tbl.set(key, 0); //重置爲 0,說明須要考慮下次迭代了,由於本次迭代中,全部client都完成了,下次迭代又要從新計算 } } paracel::packer<int> pk_i; int delta = pk_i.unpack(msg[2]); paracel::ssp_tbl.incr(key, delta); bool result = true; rep_pack_send(sock, result); }
把全部邏輯串聯起來,名詞解釋以下:
具體以下:
limit_s 是 3,即最快的節點不能比最慢的節點領先參數 3 個迭代步。當領先超過 3 個迭代步,Paracel會強制進行等待。所以,有兩種迭代:
在 worker 的 paralg 構建函數中,會對各類數據進行初始化,這裏重要的是服務器端 key "worker_sz" 對應的數值被設置爲 worker_comm.get_size() ,就是worker 數值 5。
"worker_sz" 的意義是:目前應該有多少個worker一塊兒訓練。
在 worker 的 paracel_read 之中,一直用本地的 clock 與遠端 "server_clock
" 作比較,若是小於 limit_s 則強制本worker等待;
在worker 的 iter_commit 之中:
遞交 iter_commit 以後,在 server 之中:
咱們能夠看看邏輯圖:
worker 1 + Server 1 | 快 | +-----------------------------------------+ | +------------------------------------------+ | paracel_read() { | | | | | | | |auto key = pk.unpack(msg[1]); | | while(stale_cache + limit_s < clock) { | | |if(startswith(key, "client_clock_")){ | | stale_cache = get("server_clock") | | | if(ssp_tbl.get(key)) { | | } | | | incr(key, 1); | | } | | | } else { | +-----------------------------------------+ | | set(key, 1); | | | } | +---------------------------------------------+ | if(get(key) >= get("worker_sz")) { | worker 2 | | incr("server_clock", 1); | 慢 | | set(key, 0); | +-----------------------------------------+ | | } | | iter_commit() { | | |} | | | | |ssp_tbl.incr(key, delta); | | if(limit_s == 0) { | | | | | clock_key = "client_clock_0" | | +------------------------------------------+ | } else { | | | clock_key = "client_clock_" + | | | (clock % limit_s) | | | } | | | | | | incr_int(clock_key, 1); | | | | | | clock += 1; | | | | | | if(clock == total_iters) { | | | incr_int("worker_sz"), +1); | | | } | | | } | | | } | | +-----------------------------------------+ +
手機以下:
咱們也能夠用圖表展現下邏輯過程,其中:
首先開始啓動訓練,表格中從上到下順序執行。
第一個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 說明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker2 | ||||||
worker3 | ||||||
worker4 | ||||||
worker5 |
第二個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 說明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker2 | 2 | 2 | 5 | 第二個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker3 | ||||||
worker4 | ||||||
worker5 |
第三個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 說明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker2 | 2 | 2 | 5 | 第二個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker3 | 3 | 3 | 5 | 第三個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker4 | ||||||
worker5 |
第四個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 說明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker2 | 2 | 2 | 5 | 第二個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker3 | 3 | 3 | 5 | 第三個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker4 | 4 | 4 | 5 | 第四個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker5 |
第五個worker開始訓練,實際訓練一步,增長c_c_0,由於已經完成了一輪實際迭代,因此server_clock增長 1。
此時,worker 5 落後了一個迭代(server_clock = 1)。
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 說明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 5 | 第一個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker2 | 2 | 2 | 5 | 第二個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker3 | 3 | 3 | 5 | 第三個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker4 | 4 | 4 | 5 | 第四個worker開始訓練,實際訓練兩步,增長c_c_0,c_c_1 | ||
worker5 | 5 --> 0 | 5 | 1 | 第五個worker開始訓練,實際訓練一步,增長c_c_0,由於全部5個worker都已經完成了一輪實際迭代,因此server_clock增長 1,而後對應的 "client_clock_0" 重置爲 0,則說明須要考慮下次迭代了。 |
下面看看特殊狀況。
首先,4個worker都運行完3步,可是worker 5沒有運行,情況以下:
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 說明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 1 | 5 | 本輪第一個worker實際訓練三步,增長c_c_0,c_c_1,c_c_2 | |
worker2 | 2 | 2 | 2 | 5 | 本輪第二個worker實際訓練三步,增長c_c_0,c_c_1,c_c_2 | |
worker3 | 3 | 3 | 3 | 5 | 本輪第三個worker實際訓練三步,增長c_c_0,c_c_1,c_c_2 | |
worker4 | 4 | 4 | 4 | 5 | 本輪第四個worker實際訓練三步,增長c_c_0,c_c_1,c_c_2 | |
worker5 |
假設worker 5 的 iter_commit 之中,若是worker 5 發現本身 (clock == total_iters),說明本 worker 5 已經達到了整體迭代數值,就減小服務器 "worker_sz" 數值。即:本worker已經跑完了訓練,因此下面一塊兒訓練的worker數目須要減小 1;
由於 worker 5 一會兒完成 3步訓練,因此 s_c 變成 3,即整體迭代次數爲 3。
由於 本次虛擬迭代中,5 個worker都完成了訓練,因此 c_c_1 ~ c_c_2 都先變成 5, 而後重置爲 0。
c_c_0 | c_c_1 | c_c_2 | w_sz | s_c | 說明 | |
---|---|---|---|---|---|---|
worker1 | 1 | 1 | 1 | 5 | 本輪第一個worker實際訓練三步,增長c_c_0,c_c_1,c_c_2 | |
worker2 | 2 | 2 | 2 | 5 | 本輪第二個worker實際訓練三步,增長c_c_0,c_c_1,c_c_2 | |
worker3 | 3 | 3 | 3 | 5 | 本輪第三個worker實際訓練三步,增長c_c_0,c_c_1,c_c_2 | |
worker4 | 4 | 4 | 4 | 5 | 本輪第四個worker實際訓練三步,增長c_c_0,c_c_1,c_c_2 | |
worker5 | 5 --> 0 | 5 --> 0 | 5 --> 0 | 4 | 3 | 本輪第五個worker訓練完成,worker 5 又發現本身 (clock == total_iters),則"worker_sz" 數值減小1,之後只要看 4 個worker便可。 |
至此,SSP相關咱們分析完畢,下文解析數據/模型加載。