本文是參數服務器的第四篇,介紹KVWorker, KVServer。html
本系列其餘文章是:node
[源碼解析] 機器學習參數服務器ps-lite 之(1) ----- PostOfficepython
[源碼解析] 機器學習參數服務器ps-lite(2) ----- 通訊模塊Vanc++
[源碼解析] 機器學習參數服務器ps-lite 之(3) ----- 代理人Customer算法
KVWorker, KVServer這兩個分別是 Server / Worker 節點的抽象,是被 Van ---> Customer ---> recv_handle_ 來做爲引擎的一部分來啓動的。安全
本文會先介紹一些基礎支撐類,而後介紹 Server / Worker的基類 SimpleApp,最後介紹 Server / Worker 的具體實現。服務器
整體流程圖提早劇透以下:微信
咱們首先須要介紹一些基礎類。網絡
Range 類做用是:根據這個Range肯定要拉取的參數在哪一個server上,以及一個server對應的key的range。app
Range 類提供以下函數:
class Range { public: Range() : Range(0, 0) {} Range(uint64_t begin, uint64_t end) : begin_(begin), end_(end) { } uint64_t begin() const { return begin_; } uint64_t end() const { return end_; } uint64_t size() const { return end_ - begin_; } private: uint64_t begin_; uint64_t end_; };
TreadsafeQueue 是一個能夠供多個線程讀取的隊列,經過鎖和條件量合做來達到線程安全,用來作消息隊列。
/** * \brief thread-safe queue allowing push and waited pop */ class ThreadsafePQueue { public: ThreadsafePQueue() { } ~ThreadsafePQueue() { } /** * \brief push an value into the end. threadsafe. * \param new_value the value */ void Push(Message new_value) { mu_.lock(); queue_.push(std::move(new_value)); mu_.unlock(); cond_.notify_all(); } /** * \brief wait until pop an element from the beginning, threadsafe * \param value the poped value */ void WaitAndPop(Message* value) { // 等待隊列不爲空,按照優先級pop message std::unique_lock<std::mutex> lk(mu_); cond_.wait(lk, [this]{return !queue_.empty();}); *value = std::move(queue_.top()); queue_.pop(); } private: class Compare { public: bool operator()(const Message &l, const Message &r) { return l.meta.priority <= r.meta.priority; } }; mutable std::mutex mu_; //數據同步互斥變量 std::priority_queue<Message, std::vector<Message>, Compare> queue_; // message優先隊列 std::condition_variable cond_; //隊列不爲空條件變量 };
SimpleApp是一個基類,把應用節點功能作了一個統一抽象。
SimpleData 定義了 Request 和 Response 的基本格式。
struct SimpleData { /** \brief the int head */ int head; /** \brief the string body */ std::string body; /** \brief sender's node id */ int sender; /** \brief the associated timestamp */ int timestamp; /** \brief sender's customer id */ int customer_id; };
SimpleApp 主要有以下成員變量:
request_handle_
, response_handle_
。在客戶端調用SimpleApp::Process時,根據message.meta中的指示變量判斷是request仍是response,調用相應handle處理;class SimpleApp { public: /** * \brief constructor * @param app_id the app id, should match with the remote node app with which this app * @param customer_id the customer_id, should be node-locally unique * is communicated */ explicit SimpleApp(int app_id, int customer_id); /** \brief deconstructor */ virtual ~SimpleApp() { delete obj_; obj_ = nullptr; } /** * \brief send a request to a remote node * * \param req_head request head * \param req_body request body * \param recv_id remote node id * * @return the timestamp of this request */ virtual inline int Request(int req_head, const std::string& req_body, int recv_id); /** * \brief wait until a request is finished * * \param timestamp */ virtual inline void Wait(int timestamp) { obj_->WaitRequest(timestamp); } /** * \brief send back a response for a request * \param recv_req the received request * \param the response body */ virtual inline void Response(const SimpleData& recv_req, const std::string& res_body = ""); /** * \brief the handle to proces a received request/respoonse * * \param recved the received request or response * \param app this pointer */ using Handle = std::function<void(const SimpleData& recved, SimpleApp* app)>; /** * \brief set the request handle * \param request_handle the request handle */ virtual inline void set_request_handle(const Handle& request_handle) { CHECK(request_handle) << "invalid request handle"; request_handle_ = request_handle; } /** * \brief set the response handle * \param response_handle the response handle */ virtual inline void set_response_handle(const Handle& response_handle) { CHECK(response_handle) << "invalid response handle"; response_handle_ = response_handle; } /** * \brief returns the customer */ virtual inline Customer* get_customer() { return obj_; } protected: /** \brief empty construct */ inline SimpleApp() : obj_(nullptr) { request_handle_ = [](const SimpleData& recved, SimpleApp* app) { app->Response(recved); }; response_handle_ = [](const SimpleData& recved, SimpleApp* app) { }; } /** \brief process a received message */ virtual inline void Process(const Message& msg); /** \brief ps internal object */ Customer* obj_; private: /** \brief request handle */ Handle request_handle_; /** \brief request handle */ Handle response_handle_; };
三個簡單功能函數以下:
Request 就是調用 Van 發送消息。
inline int SimpleApp::Request(int req_head, const std::string& req_body, int recv_id) { // setup message Message msg; msg.meta.head = req_head; if (req_body.size()) msg.meta.body = req_body; int ts = obj_->NewRequest(recv_id); msg.meta.timestamp = ts; msg.meta.request = true; msg.meta.simple_app = true; msg.meta.app_id = obj_->app_id(); msg.meta.customer_id = obj_->customer_id(); // send for (int r : Postoffice::Get()->GetNodeIDs(recv_id)) { msg.meta.recver = r; Postoffice::Get()->van()->Send(msg); } return ts; }
Response 是調用 Van 回覆消息。
inline void SimpleApp::Response(const SimpleData& req, const std::string& res_body) { // setup message Message msg; msg.meta.head = req.head; if (res_body.size()) msg.meta.body = res_body; msg.meta.timestamp = req.timestamp; msg.meta.request = false; msg.meta.simple_app = true; msg.meta.app_id = obj_->app_id(); msg.meta.customer_id = req.customer_id; msg.meta.recver = req.sender; // send Postoffice::Get()->van()->Send(msg); }
Process 函數根據message.meta中的指示變量判斷是request仍是response,調用相應handle處理。
inline void SimpleApp::Process(const Message& msg) { SimpleData recv; recv.sender = msg.meta.sender; recv.head = msg.meta.head; recv.body = msg.meta.body; recv.timestamp = msg.meta.timestamp; recv.customer_id = msg.meta.customer_id; if (msg.meta.request) { // 判斷是request仍是response,調用相應handle處理 CHECK(request_handle_); request_handle_(recv, this); } else { CHECK(response_handle_); response_handle_(recv, this); } }
KVServer 是 Server 節點的抽象,其做用是 接收信息、處理信息、返回結果三個步驟,主要功能是:
request_handle_
處理請求:
request_handle_
。request_handle_
默認爲KVServerDefaultHandle
。Response
用於返回數據;request_handle_ 是 request 處理函數,須要自定義。
/** * \brief A server node for maintaining key-value pairs */ template <typename Val> class KVServer : public SimpleApp { public: /** * \brief constructor * \param app_id the app id, should match with \ref KVWorker's id */ explicit KVServer(int app_id) : SimpleApp() { using namespace std::placeholders; obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1)); } /** \brief deconstructor */ virtual ~KVServer() { delete obj_; obj_ = nullptr; } /** * \brief the handle to process a push/pull request from a worker * \param req_meta meta-info of this request * \param req_data kv pairs of this request * \param server this pointer */ using ReqHandle = std::function<void(const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer* server)>; void set_request_handle(const ReqHandle& request_handle) { CHECK(request_handle) << "invalid request handle"; request_handle_ = request_handle; } /** * \brief response to the push/pull request * \param req the meta-info of the request * \param res the kv pairs that will send back to the worker */ void Response(const KVMeta& req, const KVPairs<Val>& res = KVPairs<Val>()); private: /** \brief internal receive handle */ void Process(const Message& msg); /** \brief request handle */ ReqHandle request_handle_; // 須要用戶本身實現 };
Response()
就是向調用的worker發送 response 信息。與SimpleApp 比較下,發現 KVServer 這裏對於 head 和 body 都有了新的處理。
須要注意的是:Response 函數應該是被用戶自定義的 request_handle_
調用,即 request_handle_
處理收到的消息,而後調用 Response 對 worker 進行回覆應答。
template <typename Val> void KVServer<Val>::Response(const KVMeta& req, const KVPairs<Val>& res) { Message msg; msg.meta.app_id = obj_->app_id(); msg.meta.customer_id = req.customer_id; msg.meta.request = false; msg.meta.push = req.push; msg.meta.pull = req.pull; msg.meta.head = req.cmd; msg.meta.timestamp = req.timestamp; msg.meta.recver = req.sender; if (res.keys.size()) { msg.AddData(res.keys); msg.AddData(res.vals); if (res.lens.size()) { msg.AddData(res.lens); } } Postoffice::Get()->van()->Send(msg); }
Process()
被註冊到Customer對象中,當Customer對象的receiving thread接受到消息時,就調用Process()
對數據進行處理。
Process()
內部的邏輯是:
template <typename Val> void KVServer<Val>::Process(const Message& msg) { if (msg.meta.simple_app) { SimpleApp::Process(msg); return; } KVMeta meta; meta.cmd = msg.meta.head; meta.push = msg.meta.push; meta.pull = msg.meta.pull; meta.sender = msg.meta.sender; meta.timestamp = msg.meta.timestamp; meta.customer_id = msg.meta.customer_id; KVPairs<Val> data; int n = msg.data.size(); if (n) { CHECK_GE(n, 2); data.keys = msg.data[0]; data.vals = msg.data[1]; if (n > 2) { CHECK_EQ(n, 3); data.lens = msg.data[2]; CHECK_EQ(data.lens.size(), data.keys.size()); } } CHECK(request_handle_); request_handle_(meta, data, this); }
KVServerDefaultHandle 是 ps-lite 提供的例子,用於演示如何維護 KV,處理消息,返回請求。
這裏維護一個哈希表 unordered_map,記錄key和value,並對push和pull請求進行響應。
使用std::unordered_map store保存server的參數,當請求爲push時,對store參數作更新,請求爲pull時對參數進行拉取;
/** * \brief an example handle adding pushed kv into store */ template <typename Val> struct KVServerDefaultHandle { void operator()( const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer<Val>* server) { size_t n = req_data.keys.size(); KVPairs<Val> res; if (!req_meta.pull) { CHECK_EQ(n, req_data.vals.size()); } else { res.keys = req_data.keys; res.vals.resize(n); } for (size_t i = 0; i < n; ++i) { Key key = req_data.keys[i]; if (req_meta.push) { store[key] += req_data.vals[i]; } if (req_meta.pull) { res.vals[i] = store[key]; } } server->Response(req_meta, res); } std::unordered_map<Key, Val> store; };
咱們接着上文繼續梳理細化流程。
worker節點 或者 server節點 在程序的最開始會執行Postoffice::start()
。
Postoffice::start()
會初始化節點信息,而且調用Van::start()
。
每一個節點都監聽了本地一個端口;該鏈接的節點在啓動時已經鏈接。
Van::start()
啓動一個本地線程專門接收socket的信息,使用Van::Receiving()
來持續監聽收到的message。
receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
Van::Receiving()
接收後消息以後,根據不一樣命令執行不一樣動做。針對數據消息,若是須要下一步處理,會調用 ProcessDataMsg:
Customer::Accept
函數。Customer::Accept() 函數將消息添加到一個隊列recv_queue_
;
Customer 對象自己也會啓動一個接受線程 recv_thread_
,使用 Customer::Receiving() :
recv_queue_
隊列取消息。tracker_[req.timestamp].second++
recv_handle_
函數對消息進行處理。對於worker來講,其註冊的recv_handle_
是KVWorker::Process()
函數。由於worker的recv thread接受到的消息主要是從server處pull下來的KV對,所以該Process()
主要是接收message中的KV對;
而對於Server來講,其註冊的recv_handle_
是KVServer::Process()
函數。
由於咱們這裏是 KVServer,並且server接受的是worker們push上來的KV對,須要對其進行處理,所以該Process()
函數中調用的用戶經過KVServer::set_request_handle()
傳入的函數對象。
在 用戶自定義的 request_handle_ 函數中,若是須要發送 response 給 worker,則調用 KVServer
目前邏輯以下圖,在 第 8 步,recv_handle_ 指向 KVServer
+--------------------------+ | Van | | | Request +-----------> Receiving | | 1 + | +---------------------------+ | | | | Postoffice | | | 2 | | | | v | GetCustomer | | | ProcessDataMsg <------------------> unordered_map customers_| | + | 3 | | | | | +---------------------------+ +--------------------------+ | | 4 | +------------------------------------+ | Customer | | | | | | v | | Accept | | + | | | | | | 5 | | v | | recv_queue_ | +------------------+ | + | |KVWorker | | | 6 | +--------> | | | | | | 8 | Process | | v | | +------------------+ | recv_thread_ +---> Receiving | | | + | | | | 7 | | | | | | +------------------+ | v | | |KVServer | | recv_handle_+---------+--------> | | | | 8 | Process | +------------------------------------+ | + | +------------------+ | | 9 v +-----------+-------+ | request_handle_ | 10 | | Response <----------------------------------------------------+ Response | | | +-------------------+
KVWorker用於向server節點push,pull key-value對,就是在算法過程當中,須要並行處理的各類參數。
KVWorker 主要變量爲:
主要函數爲:
ZPush 零拷貝push函數
ZPull 零拷貝pull函數
AddPullCB key重組函數
Process 消息處理函數
DefaultSlicer 默認的slice 處理函數
set_slicer:設置slicer_成員,該函數在調用Send函數時,將KVPairs按照每一個server的Range切片;
/** * \brief A worker node that can \ref Push (\ref Pull) key-value pairs to (from) server * nodes * * \tparam Val the type of value, which should be primitive types such as * int32_t and float */ template<typename Val> class KVWorker : public SimpleApp { public: /** avoid too many this-> */ using SimpleApp::obj_; // Customer 對象 /** * \brief callback function for \ref Push and \ref Pull * * It is called by the data receiving thread of this instance when the push or * pull is actually finished. Namely the kv pairs have already written into * servers' data structure or the kv pairs have already pulled back. */ using Callback = std::function<void()>; /** * \brief constructor * * \param app_id the app id, should match with \ref KVServer's id * \param customer_id the customer id which is unique locally */ explicit KVWorker(int app_id, int customer_id) : SimpleApp() { using namespace std::placeholders; slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3); obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1)); } /** \brief deconstructor */ virtual ~KVWorker() { delete obj_; obj_ = nullptr; } using SlicedKVs = std::vector<std::pair<bool, KVPairs<Val>>>; /** * \brief a slicer partitions a key-value list according to the key ranges * \param send the kv list for partitioning * \param ranges the key ranges, ranges[i] is the key range of server i * \param sliced the sliced lists. slices[i] should only contains keys in * ranges[i] and the according values */ using Slicer = std::function<void( const KVPairs<Val>& send, const std::vector<Range>& ranges, SlicedKVs* sliced)>; /** * \brief set a user-defined slicer */ void set_slicer(const Slicer& slicer) { CHECK(slicer); slicer_ = slicer; } private: /** * \brief add a callback for a request. threadsafe. * @param cb callback * @param timestamp the timestamp of the request */ void AddCallback(int timestamp, const Callback& cb) { if (!cb) return; std::lock_guard<std::mutex> lk(mu_); callbacks_[timestamp] = cb; } /** \brief data buffer for received kvs for each timestamp */ std::unordered_map<int, std::vector<KVPairs<Val>>> recv_kvs_; // 收到的 kv value /** \brief callbacks for each timestamp */ std::unordered_map<int, Callback> callbacks_; // 收到 request 的全部 response 以後執行的回調函數 /** \brief lock */ std::mutex mu_; /** \brief kv list slicer */ Slicer slicer_; // 默認 slice 函數變量 };
由於 Push 調用了 ZPush,因此咱們放在一塊兒介紹。
Push方法主要就是:
ZPush 方法是:
int Push(const std::vector<Key>& keys, const std::vector<Val>& vals, const std::vector<int>& lens = {}, int cmd = 0, const Callback& cb = nullptr, int priority = 0) { return ZPush( SArray<Key>(keys), SArray<Val>(vals), SArray<int>(lens), cmd, cb, priority); } int ZPush(const SArray<Key>& keys, const SArray<Val>& vals, const SArray<int>& lens = {}, int cmd = 0, const Callback& cb = nullptr, int priority = 0) { int ts = obj_->NewRequest(kServerGroup); AddCallback(ts, cb); KVPairs<Val> kvs; kvs.keys = keys; kvs.vals = vals; kvs.lens = lens; kvs.priority = priority; Send(ts, true, false, cmd, kvs); return ts; }
如何調用能夠參考其註釋:
* Sample usage: the following codes push two KV pairs `{1, (1.1, 1.2)}` and `{3, * (3.1,3.2)}` to server nodes, where the value is a length-2 float vector * \code * KVWorker<float> w; * std::vector<Key> keys = {1, 3}; * std::vector<float> vals = {1.1, 1.2, 3.1, 3.2}; * w.Push(keys, vals); * \endcode
pull方法跟push的邏輯大致相似:
int Pull(const std::vector<Key>& keys, std::vector<Val>* vals, std::vector<int>* lens = nullptr, int cmd = 0, const Callback& cb = nullptr, int priority = 0) { SArray<Key> skeys(keys); int ts = AddPullCB(skeys, vals, lens, cmd, cb); KVPairs<Val> kvs; kvs.keys = skeys; kvs.priority = priority; Send(ts, false, true, cmd, kvs); return ts; }
邏輯與 Pull 一致,只是省略了拷貝到系統這個過程。所以須要保證在ZPull完成前,調用者沒有改變key_vector;
int ZPull(const SArray<Key>& keys, SArray<Val>* vals, SArray<int>* lens = nullptr, int cmd = 0, const Callback& cb = nullptr, int priority = 0) { int ts = AddPullCB(keys, vals, lens, cmd, cb); KVPairs<Val> kvs; kvs.keys = keys; kvs.priority = priority; Send(ts, false, true, cmd, kvs); return ts; }
Push()
和Pull()
最後都會調用Send()
函數,Send()
對KVPairs進行切分,由於每一個Server只保留一部分參數,所以切分後的SlicedKVpairs就會被髮送給不一樣的Server。
若是是 skipped,則會直接調用 callback。
不然遍歷發送。
template <typename Val> void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) { // slice the message SlicedKVs sliced; slicer_(kvs, Postoffice::Get()->GetServerKeyRanges(), &sliced); // need to add response first, since it will not always trigger the callback int skipped = 0; for (size_t i = 0; i < sliced.size(); ++i) { if (!sliced[i].first) ++skipped; } obj_->AddResponse(timestamp, skipped); if ((size_t)skipped == sliced.size()) { RunCallback(timestamp); } for (size_t i = 0; i < sliced.size(); ++i) { const auto& s = sliced[i]; if (!s.first) continue; Message msg; msg.meta.app_id = obj_->app_id(); msg.meta.customer_id = obj_->customer_id(); msg.meta.request = true; msg.meta.push = push; msg.meta.pull = pull; msg.meta.head = cmd; msg.meta.timestamp = timestamp; msg.meta.recver = Postoffice::Get()->ServerRankToID(i); msg.meta.priority = kvs.priority; const auto& kvs = s.second; if (kvs.keys.size()) { msg.AddData(kvs.keys); msg.AddData(kvs.vals); if (kvs.lens.size()) { msg.AddData(kvs.lens); } } Postoffice::Get()->van()->Send(msg); } }
切分函數能夠由用戶自行重寫,默認爲DefaultSlicer
,每一個SlicedKVPairs被包裝成Message對象,而後用van::send()
發送。
根據std::vector& ranges
分片範圍信息,將要發送的數據進行分片。目前默認的使用 Postoffice::GetServerKeyRanges
來劃分分片範圍。
template <typename Val> void KVWorker<Val>::DefaultSlicer( const KVPairs<Val>& send, const std::vector<Range>& ranges, typename KVWorker<Val>::SlicedKVs* sliced) { sliced->resize(ranges.size()); // find the positions in msg.key size_t n = ranges.size(); std::vector<size_t> pos(n+1); const Key* begin = send.keys.begin(); const Key* end = send.keys.end(); for (size_t i = 0; i < n; ++i) { if (i == 0) { pos[0] = std::lower_bound(begin, end, ranges[0].begin()) - begin; begin += pos[0]; } else { CHECK_EQ(ranges[i-1].end(), ranges[i].begin()); } size_t len = std::lower_bound(begin, end, ranges[i].end()) - begin; begin += len; pos[i+1] = pos[i] + len; // don't send it to servers for empty kv sliced->at(i).first = (len != 0); } CHECK_EQ(pos[n], send.keys.size()); if (send.keys.empty()) return; // the length of value size_t k = 0, val_begin = 0, val_end = 0; if (send.lens.empty()) { k = send.vals.size() / send.keys.size(); CHECK_EQ(k * send.keys.size(), send.vals.size()); } else { CHECK_EQ(send.keys.size(), send.lens.size()); } // slice for (size_t i = 0; i < n; ++i) { if (pos[i+1] == pos[i]) { sliced->at(i).first = false; continue; } sliced->at(i).first = true; auto& kv = sliced->at(i).second; kv.keys = send.keys.segment(pos[i], pos[i+1]); if (send.lens.size()) { kv.lens = send.lens.segment(pos[i], pos[i+1]); for (int l : kv.lens) val_end += l; kv.vals = send.vals.segment(val_begin, val_end); val_begin = val_end; } else { kv.vals = send.vals.segment(pos[i]*k, pos[i+1]*k); } } }
就是把 push,pull 聚合在一塊兒。
int PushPull(const std::vector<Key>& keys, const std::vector<Val>& vals, std::vector<Val>* outs, std::vector<int>* lens = nullptr, int cmd = 0, const Callback& cb = nullptr, int priority = 0) { CHECK_NOTNULL(outs); if (outs->empty()) outs->resize(vals.size()); else CHECK_EQ(vals.size(), outs->size()); SArray<Key> skeys(keys); SArray<Val> svals(vals); auto souts = new SArray<Val>(outs->data(), outs->size()); SArray<int>* slens = lens ? new SArray<int>(lens->data(), lens->size()) : nullptr; int ts = ZPushPull(skeys, svals, souts, slens, cmd, [this, cb, souts, slens]() { delete souts; delete slens; if (cb) cb(); }, priority); return ts; } int ZPushPull(const SArray<Key>& keys, const SArray<Val>& vals, SArray<Val>* outs, SArray<int>* lens = nullptr, int cmd = 0, const Callback& cb = nullptr, int priority = 0) { int ts = AddPullCB(keys, outs, lens, cmd, cb); KVPairs<Val> kvs; kvs.keys = keys; kvs.vals = vals; kvs.priority = priority; if (lens) kvs.lens = *lens; Send(ts, true, true, cmd, kvs); re
前面提到了一些回調函數的設置,下面咱們看看如何使用。
咱們能夠看到,針對每一個時間戳,設置了一個回調函數,進而構成了一個回調函數列表。
每次發送請求以後,都會往這個列表中註冊回調函數。
using Callback = std::function<void()>; /** \brief callbacks for each timestamp */ std::unordered_map<int, Callback> callbacks_; // 回調函數列表 void AddCallback(int timestamp, const Callback& cb) { if (!cb) return; std::lock_guard<std::mutex> lk(mu_); callbacks_[timestamp] = cb; // 添加回調函數 }
這是 pull 以後,獲得應答的回調函數,用於拷貝返回的數據。
可是,若是是多個 Server 都應該有返回,應該如何處理?不管是 push 仍是 pull,只有在收到了全部的Response以後,纔會將從各個server上拉取的value填入本地的vals
裏。
template <typename Val> template <typename C, typename D> int KVWorker<Val>::AddPullCB( const SArray<Key>& keys, C* vals, D* lens, int cmd, const Callback& cb) { int ts = obj_->NewRequest(kServerGroup); AddCallback(ts, [this, ts, keys, vals, lens, cb]() mutable { mu_.lock(); auto& kvs = recv_kvs_[ts]; mu_.unlock(); // do check size_t total_key = 0, total_val = 0; for (const auto& s : kvs) { // 進行有效性驗證 Range range = FindRange(keys, s.keys.front(), s.keys.back()+1); CHECK_EQ(range.size(), s.keys.size()) << "unmatched keys size from one server"; if (lens) CHECK_EQ(s.lens.size(), s.keys.size()); total_key += s.keys.size(); total_val += s.vals.size(); } CHECK_EQ(total_key, keys.size()) << "lost some servers?"; // fill vals and lens std::sort(kvs.begin(), kvs.end(), []( const KVPairs<Val>& a, const KVPairs<Val>& b) { return a.keys.front() < b.keys.front(); }); CHECK_NOTNULL(vals); if (vals->empty()) { vals->resize(total_val); } else { CHECK_EQ(vals->size(), total_val); } Val* p_vals = vals->data(); int *p_lens = nullptr; if (lens) { if (lens->empty()) { lens->resize(keys.size()); } else { CHECK_EQ(lens->size(), keys.size()); } p_lens = lens->data(); } for (const auto& s : kvs) { // 拷貝返回的數據 memcpy(p_vals, s.vals.data(), s.vals.size() * sizeof(Val)); p_vals += s.vals.size(); if (p_lens) { memcpy(p_lens, s.lens.data(), s.lens.size() * sizeof(int)); p_lens += s.lens.size(); } } mu_.lock(); recv_kvs_.erase(ts); mu_.unlock(); if (cb) cb(); }); return ts; }
就是依據時間戳找到回調函數,運行,而後刪除。
什麼時候調用,就是在 Process 之中會調用,咱們立刻介紹。
template <typename Val> void KVWorker<Val>::RunCallback(int timestamp) { mu_.lock(); auto it = callbacks_.find(timestamp); if (it != callbacks_.end()) { mu_.unlock(); CHECK(it->second); it->second(); mu_.lock(); callbacks_.erase(it); } mu_.unlock(); }
若是是 Pull 的 response, 在每次收到的Response返回的values,會先保存recv_kvs_
裏,recv_kvs_[ts].push_back(kvs);
不管是 push 仍是 pull,只有在收到了全部的Response以後,纔會將從各個server上拉取的value填入本地的vals
裏。
template <typename Val> void KVWorker<Val>::Process(const Message& msg) { if (msg.meta.simple_app) { SimpleApp::Process(msg); return; } // store the data for pulling int ts = msg.meta.timestamp; if (msg.meta.pull) { CHECK_GE(msg.data.size(), (size_t)2); KVPairs<Val> kvs; kvs.keys = msg.data[0]; kvs.vals = msg.data[1]; if (msg.data.size() > (size_t)2) { kvs.lens = msg.data[2]; } mu_.lock(); recv_kvs_[ts].push_back(kvs); mu_.unlock(); } // finished, run callbacks,只有在收到了全部的Response以後 if (obj_->NumResponse(ts) == Postoffice::Get()->num_servers() - 1) { RunCallback(ts); // 在這裏調用了 RunCallback。 } }
最後咱們用一個消息傳遞流程作一下總結,看看各個部分在其中如何使用。整體流程圖以下:
Postoffice::start()
會初始化節點信息,而且調用Van::start()
。Van::Receiving()
接收後消息以後,根據不一樣命令執行不一樣動做。針對數據消息,若是須要下一步處理,會調用 ProcessDataMsg。recv_queue_
。recv_thread_
,使用 Customer::Receiving() :
recv_queue_
隊列取消息。tracker_[req.timestamp].second++
recv_handle_
函數對消息進行處理。Van::Receiving()
調用註冊的用戶自定義的recv_handle_
函數對消息進行處理。recv_handle_
是KVServer::Process()
函數。Van::Receiving()
接收後消息以後,根據不一樣命令執行不一樣動做。針對數據消息,若是須要下一步處理,會調用 ProcessDataMsg。recv_queue_
。recv_thread_
處理。recv_thread_
,使用 Customer::Receiving() 。recv_handle_
是KVWorker::Process()
函數。KVWorker::Process()
函數處理響應消息Response。+---------------------+ +------------------------+ Worker + Server +--------------------------+ | KVWorker | 1 | Van | 3 | | Van | | Send +--------+---------------> send +-----------------+-----> Request +-----------> Receiving | | | | | | | + | | | | | Receiving <---------+ | 4 | | | +---------------------------+ | | | | + | | | | | | | Postoffice | | Process | | | | 16 | | | | | 5 | | | | ^ | | | v | | 15 | | v | GetCustomer | | | | | | | ProcessDataMsg | | | | ProcessDataMsg <------------------> unordered_map customers_| | | | | | + | | | | + | 6 | | | | | | | | | | | | | | +---------------------------+ +---------------------+ | +------------------------+ | | +--------------------------+ | | | | | | | |2 | 17 | | | 7 | | | | | | | +---------------------------------------+ | | +------------------------------------+ | | Customer | | | | | | Customer | | | | | v | | | | | | | | v | | | | v | | | NewRequest Accept | | | | Accept | | | + | | | | + | | | | 18 | | | | | | | | | | | | | | 8 | | | v | | | | v | | | revc_queue_ | | | | recv_queue_ | | | + | | | | + | 22 | | | 19 | | | | | 9 | | | | | | | | | | | | 20 v | | | | 10 v | | | recv_thread_ +-------> Receving | | | | recv_thread_ +---> Receiving | | | | | | | | + | | | | 21 | | | | | 11 | | | | | | | | | | +------------------+ | | v | | | | v | |KVServer | +---------------------------+ recv_handle | | | | recv_handle_+------------------> | | | | | | | | 12 | Process | +---------------------------------------+ | | +------------------------------------+ | + | | | +------------------+ | | | | | | 13 | | v | | +-----------+-------+ | | | request_handle_ | | | 14 | | +<-----------+ Response <----------------------------------------------------+ Response | | | | | +-------------------+ +
手機以下:
★★★★★★關於生活和技術的思考★★★★★★
微信公衆帳號:羅西的思考
若是您想及時獲得我的撰寫文章的消息推送,或者想看看我的推薦的技術資料,敬請關注。