[源碼解析]機器學習參數服務器ps-lite(4) ----- 應用節點實現

[源碼解析]機器學習參數服務器ps-lite(4) ----- 應用節點實現

0x00 摘要

本文是參數服務器的第四篇,介紹KVWorker, KVServer。html

本系列其餘文章是:node

[源碼解析] 機器學習參數服務器ps-lite 之(1) ----- PostOfficepython

[源碼解析] 機器學習參數服務器ps-lite(2) ----- 通訊模塊Vanc++

[源碼解析] 機器學習參數服務器ps-lite 之(3) ----- 代理人Customer算法

KVWorker, KVServer這兩個分別是 Server / Worker 節點的抽象,是被 Van ---> Customer ---> recv_handle_ 來做爲引擎的一部分來啓動的。安全

本文會先介紹一些基礎支撐類,而後介紹 Server / Worker的基類 SimpleApp,最後介紹 Server / Worker 的具體實現。服務器

整體流程圖提早劇透以下:微信

0x01 基礎類

咱們首先須要介紹一些基礎類。網絡

1.1 Range

Range 類做用是:根據這個Range肯定要拉取的參數在哪一個server上,以及一個server對應的key的range。app

Range 類提供以下函數:

  • begin()和end()兩個uint64的位置;
  • size() 得到 本 range 的大小,即 end_ - begin_;
class Range {
 public:
  Range() : Range(0, 0) {}
  Range(uint64_t begin, uint64_t end) : begin_(begin), end_(end) { }

  uint64_t begin() const { return begin_; }
  uint64_t end() const { return end_; }
  uint64_t size() const { return end_ - begin_; }
 private:
  uint64_t begin_;
  uint64_t end_;
};

1.2 TreadsafeQueue

TreadsafeQueue 是一個能夠供多個線程讀取的隊列,經過鎖和條件量合做來達到線程安全,用來作消息隊列。

/**
 * \brief thread-safe queue allowing push and waited pop
 */
class ThreadsafePQueue {
 public:
  ThreadsafePQueue() { }
  ~ThreadsafePQueue() { }

  /**
   * \brief push an value into the end. threadsafe.
   * \param new_value the value
   */
  void Push(Message new_value) {
    mu_.lock();
    queue_.push(std::move(new_value));
    mu_.unlock();
    cond_.notify_all();
  }

  /**
   * \brief wait until pop an element from the beginning, threadsafe
   * \param value the poped value
   */
  void WaitAndPop(Message* value) { // 等待隊列不爲空,按照優先級pop message
    std::unique_lock<std::mutex> lk(mu_);
    cond_.wait(lk, [this]{return !queue_.empty();});
    *value = std::move(queue_.top());
    queue_.pop();
  }

 private:
  class Compare {
   public:
    bool operator()(const Message &l, const Message &r) {
      return l.meta.priority <= r.meta.priority;
    }
  };
  mutable std::mutex mu_; //數據同步互斥變量
  std::priority_queue<Message, std::vector<Message>, Compare> queue_; // message優先隊列
  std::condition_variable cond_; //隊列不爲空條件變量
};

0x02 SimpleApp

2.1 概述

SimpleApp是一個基類,把應用節點功能作了一個統一抽象。

  • 提供了基本發送功能和簡單消息處理函數(Request, Wait, Response)。
  • 消息類型爲:int型的head和string型的body。
  • 它有2個派生類。KVServer和KVWorker。

2.2 定義

2.2.1 支撐類

SimpleData 定義了 Request 和 Response 的基本格式。

struct SimpleData {
  /** \brief the int head */
  int head;
  /** \brief the string body */
  std::string body;
  /** \brief sender's node id */
  int sender;
  /** \brief the associated timestamp */
  int timestamp;
  /** \brief sender's customer id */
  int customer_id;
};

2.2.2 成員變量

SimpleApp 主要有以下成員變量:

  • Customer* obj_ :本 App 的 Customer,控制請求鏈接;
  • Handle request_handle_ :request 處理函數;
  • Handle response_handle_ :response 處理函數;
  • set_request_handle,set_response_handle:設置成員request_handle_, response_handle_。在客戶端調用SimpleApp::Process時,根據message.meta中的指示變量判斷是request仍是response,調用相應handle處理;
class SimpleApp {
 public:
  /**
   * \brief constructor
   * @param app_id the app id, should match with the remote node app with which this app
   * @param customer_id the customer_id, should be node-locally unique
   * is communicated
   */
  explicit SimpleApp(int app_id, int customer_id);

  /** \brief deconstructor */
  virtual ~SimpleApp() { delete obj_; obj_ = nullptr; }

  /**
   * \brief send a request to a remote node
   *
   * \param req_head request head
   * \param req_body request body
   * \param recv_id remote node id
   *
   * @return the timestamp of this request
   */
  virtual inline int Request(int req_head, const std::string& req_body, int recv_id);

  /**
   * \brief wait until a request is finished
   *
   * \param timestamp
   */
  virtual inline void Wait(int timestamp) { obj_->WaitRequest(timestamp); }


  /**
   * \brief send back a response for a request
   * \param recv_req the received request
   * \param the response body
   */
  virtual inline void Response(const SimpleData& recv_req, const std::string& res_body = "");

  /**
   * \brief the handle to proces a received request/respoonse
   *
   * \param recved the received request or response
   * \param app this pointer
   */
  using Handle = std::function<void(const SimpleData& recved, SimpleApp* app)>;

  /**
   * \brief set the request handle
   * \param request_handle the request handle
   */
  virtual inline void set_request_handle(const Handle& request_handle) {
    CHECK(request_handle) << "invalid request handle";
    request_handle_ = request_handle;
  }

  /**
   * \brief set the response handle
   * \param response_handle the response handle
   */
  virtual inline void set_response_handle(const Handle& response_handle) {
    CHECK(response_handle) << "invalid response handle";
    response_handle_ = response_handle;
  }

  /**
   * \brief returns the customer
   */
  virtual inline Customer* get_customer() { return obj_; }

 protected:
  /** \brief empty construct */
  inline SimpleApp() : obj_(nullptr) {
    request_handle_ = [](const SimpleData& recved, SimpleApp* app) {
      app->Response(recved);
    };
    response_handle_ = [](const SimpleData& recved, SimpleApp* app) { };
  }

  /** \brief process a received message */
  virtual inline void Process(const Message& msg);

  /** \brief ps internal object */
  Customer* obj_;

 private:
  /** \brief request handle */
  Handle request_handle_;
  /** \brief request handle */
  Handle response_handle_;
};

2.3 功能函數

三個簡單功能函數以下:

Request 就是調用 Van 發送消息。

inline int SimpleApp::Request(int req_head, const std::string& req_body, int recv_id) {
  // setup message
  Message msg;
  msg.meta.head = req_head;
  if (req_body.size()) msg.meta.body = req_body;
  int ts = obj_->NewRequest(recv_id);
  msg.meta.timestamp = ts;
  msg.meta.request = true;
  msg.meta.simple_app = true;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = obj_->customer_id();

  // send
  for (int r : Postoffice::Get()->GetNodeIDs(recv_id)) {
    msg.meta.recver = r;
    Postoffice::Get()->van()->Send(msg);
  }
  return ts;
}

Response 是調用 Van 回覆消息。

inline void SimpleApp::Response(const SimpleData& req, const std::string& res_body) {
  // setup message
  Message msg;
  msg.meta.head = req.head;
  if (res_body.size()) msg.meta.body = res_body;
  msg.meta.timestamp = req.timestamp;
  msg.meta.request = false;
  msg.meta.simple_app = true;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = req.customer_id;
  msg.meta.recver = req.sender;

  // send
  Postoffice::Get()->van()->Send(msg);
}

Process 函數根據message.meta中的指示變量判斷是request仍是response,調用相應handle處理。

inline void SimpleApp::Process(const Message& msg) {
  SimpleData recv;
  recv.sender    = msg.meta.sender;
  recv.head      = msg.meta.head;
  recv.body      = msg.meta.body;
  recv.timestamp = msg.meta.timestamp;
  recv.customer_id = msg.meta.customer_id;
  if (msg.meta.request) { // 判斷是request仍是response,調用相應handle處理
    CHECK(request_handle_);
    request_handle_(recv, this);
  } else {
    CHECK(response_handle_);
    response_handle_(recv, this);
  }
}

0x03 KVServer

KVServer 是 Server 節點的抽象,其做用是 接收信息處理信息返回結果三個步驟,主要功能是:

  • 維護 key-value pairs 數據;
  • 處理 & 應答 客戶端的 push & pull 請求;
    • 函數request_handle_ 處理請求:
      • 在調用KVServer::Process時 會調用到 request_handle_
      • request_handle_默認爲KVServerDefaultHandle
    • 函數Response用於返回數據;

3.1 定義

request_handle_ 是 request 處理函數,須要自定義。

  • 在該回調函數中使用者則須要實現各類優化器的的模型權重梯度更新算法和模型權重返回操做
  • 可直接參考ps-lite已實現的默認版本KVServerDefaultHandle。
/**
 * \brief A server node for maintaining key-value pairs
 */
template <typename Val>
class KVServer : public SimpleApp {
 public:
  /**
   * \brief constructor
   * \param app_id the app id, should match with \ref KVWorker's id
   */
  explicit KVServer(int app_id) : SimpleApp() {
    using namespace std::placeholders;
    obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1));
  }

  /** \brief deconstructor */
  virtual ~KVServer() { delete obj_; obj_ = nullptr; }

  /**
   * \brief the handle to process a push/pull request from a worker
   * \param req_meta meta-info of this request
   * \param req_data kv pairs of this request
   * \param server this pointer
   */
  using ReqHandle = std::function<void(const KVMeta& req_meta,
                                       const KVPairs<Val>& req_data,
                                       KVServer* server)>;
  void set_request_handle(const ReqHandle& request_handle) {
    CHECK(request_handle) << "invalid request handle";
    request_handle_ = request_handle;
  }

  /**
   * \brief response to the push/pull request
   * \param req the meta-info of the request
   * \param res the kv pairs that will send back to the worker
   */
  void Response(const KVMeta& req, const KVPairs<Val>& res = KVPairs<Val>());

 private:
  /** \brief internal receive handle */
  void Process(const Message& msg);
  /** \brief request handle */
  ReqHandle request_handle_; // 須要用戶本身實現
};

3.2 功能函數

3.2.1 Response

Response()就是向調用的worker發送 response 信息。與SimpleApp 比較下,發現 KVServer 這裏對於 head 和 body 都有了新的處理。

須要注意的是:Response 函數應該是被用戶自定義的 request_handle_ 調用,即 request_handle_ 處理收到的消息,而後調用 Response 對 worker 進行回覆應答

template <typename Val>
void KVServer<Val>::Response(const KVMeta& req, const KVPairs<Val>& res) {
  Message msg;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = req.customer_id;
  msg.meta.request     = false;
  msg.meta.push        = req.push;
  msg.meta.pull        = req.pull;
  msg.meta.head        = req.cmd;
  msg.meta.timestamp   = req.timestamp;
  msg.meta.recver      = req.sender;
  if (res.keys.size()) {
    msg.AddData(res.keys);
    msg.AddData(res.vals);
    if (res.lens.size()) {
      msg.AddData(res.lens);
    }
  }
  Postoffice::Get()->van()->Send(msg);
}

3.2.2 Process

Process()被註冊到Customer對象中,當Customer對象的receiving thread接受到消息時,就調用Process()對數據進行處理。

Process()內部的邏輯是:

  • 提取消息的元信息,構建一個 KVMeta。
  • 能夠看到,在 Process 中沒有對 KV 數據的維護。
  • Process 調用 用戶自行實現的一個request_handle_ (std::function函數對象)對數據進行處理。
  • 在回調函數 request_handle_ 中使用者則須要實現各類優化器的的模型權重梯度更新算法和模型權重返回操做
template <typename Val>
void KVServer<Val>::Process(const Message& msg) {
  if (msg.meta.simple_app) {
    SimpleApp::Process(msg); return;
  }
  KVMeta meta;
  meta.cmd       = msg.meta.head;
  meta.push      = msg.meta.push;
  meta.pull      = msg.meta.pull;
  meta.sender    = msg.meta.sender;
  meta.timestamp = msg.meta.timestamp;
  meta.customer_id = msg.meta.customer_id;
  KVPairs<Val> data;
  int n = msg.data.size();
  if (n) {
    CHECK_GE(n, 2);
    data.keys = msg.data[0];
    data.vals = msg.data[1];
    if (n > 2) {
      CHECK_EQ(n, 3);
      data.lens = msg.data[2];
      CHECK_EQ(data.lens.size(), data.keys.size());
    }
  }
  CHECK(request_handle_);
  request_handle_(meta, data, this);
}

3.2.3 例子函數

KVServerDefaultHandle 是 ps-lite 提供的例子,用於演示如何維護 KV,處理消息,返回請求。

這裏維護一個哈希表 unordered_map,記錄key和value,並對push和pull請求進行響應。

使用std::unordered_map store保存server的參數,當請求爲push時,對store參數作更新,請求爲pull時對參數進行拉取;

/**
 * \brief an example handle adding pushed kv into store
 */
template <typename Val>
struct KVServerDefaultHandle {
  void operator()(
      const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer<Val>* server) {
    size_t n = req_data.keys.size();
    KVPairs<Val> res;
    if (!req_meta.pull) {
      CHECK_EQ(n, req_data.vals.size());
    } else {
      res.keys = req_data.keys; res.vals.resize(n);
    }
    for (size_t i = 0; i < n; ++i) {
      Key key = req_data.keys[i];
      if (req_meta.push) {
        store[key] += req_data.vals[i];
      }
      if (req_meta.pull) {
        res.vals[i] = store[key];
      }
    }
    server->Response(req_meta, res);
  }
  std::unordered_map<Key, Val> store;
};

3.2.4 流程

咱們接着上文繼續梳理細化流程。

  • worker節點 或者 server節點 在程序的最開始會執行Postoffice::start()

  • Postoffice::start()會初始化節點信息,而且調用Van::start()

  • 每一個節點都監聽了本地一個端口;該鏈接的節點在啓動時已經鏈接。

  • Van::start() 啓動一個本地線程專門接收socket的信息,使用Van::Receiving()來持續監聽收到的message。

    • receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
  • Van::Receiving()接收後消息以後,根據不一樣命令執行不一樣動做。針對數據消息,若是須要下一步處理,會調用 ProcessDataMsg:

    • 依據消息中的app id找到 Customer(每一個app 任務會綁定一個custom類),即會根據customer id的不一樣將message發給不一樣的customer的recv thread。
    • 將消息傳遞給Customer::Accept函數。
  • Customer::Accept() 函數將消息添加到一個隊列recv_queue_

  • Customer 對象自己也會啓動一個接受線程 recv_thread_,使用 Customer::Receiving() :

    • 不斷的從recv_queue_隊列取消息。
    • 若是 (!recv.meta.request) ,就說明是 response,則tracker_[req.timestamp].second++
    • 調用註冊的用戶自定義的recv_handle_函數對消息進行處理。
  • 對於worker來講,其註冊的recv_handle_KVWorker::Process()函數。由於worker的recv thread接受到的消息主要是從server處pull下來的KV對,所以該Process()主要是接收message中的KV對;

  • 而對於Server來講,其註冊的recv_handle_KVServer::Process()函數。

  • 由於咱們這裏是 KVServer,並且server接受的是worker們push上來的KV對,須要對其進行處理,所以該Process()函數中調用的用戶經過KVServer::set_request_handle()傳入的函數對象。

  • 在 用戶自定義的 request_handle_ 函數中,若是須要發送 response 給 worker,則調用 KVServer ::Response。

目前邏輯以下圖,在 第 8 步,recv_handle_ 指向 KVServer ::Process 或者 KVWorker ::Process(本節是server,因此對應的是KVServer ::Process)。在第10步,返回 response 給 worker。

+--------------------------+
            | Van                      |
            |                          |
Request +----------->  Receiving       |
            |  1           +           |             +---------------------------+
            |              |           |             | Postoffice                |
            |              | 2         |             |                           |
            |              v           | GetCustomer |                           |
            |        ProcessDataMsg <------------------> unordered_map customers_|
            |              +           |      3      |                           |
            |              |           |             +---------------------------+
            +--------------------------+
                           |
                           | 4
                           |
 +------------------------------------+
 | Customer                |          |
 |                         |          |
 |                         v          |
 |                      Accept        |
 |                         +          |
 |                         |          |
 |                         | 5        |
 |                         v          |
 |                    recv_queue_     |                +------------------+
 |                         +          |                |KVWorker          |
 |                         | 6        |     +--------> |                  |
 |                         |          |     |    8     |         Process  |
 |                         v          |     |          +------------------+
 |  recv_thread_ +---> Receiving      |     |
 |                         +          |     |
 |                         | 7        |     |
 |                         |          |     |          +------------------+
 |                         v          |     |          |KVServer          |
 |                    recv_handle_+---------+--------> |                  |
 |                                    |          8     |         Process  |
 +------------------------------------+                |           +      |
                                                       +------------------+
                                                                   |
                                                                   |  9
                                                                   v
                                                       +-----------+-------+
                                                       | request_handle_   |
                                    10                 |                   |
Response <----------------------------------------------------+ Response   |
                                                       |                   |
                                                       +-------------------+

0x04 KVWorker

4.1 概述

KVWorker用於向server節點push,pull key-value對,就是在算法過程當中,須要並行處理的各類參數。

  • Worker中的push和pull操做都是異步返回一個ID,而後使用ID進行wait阻塞等待,即同步操做。
  • 或者異步調用時傳入一個Callback進行後續操做。

4.2 定義

KVWorker 主要變量爲:

  • std::unordered_map<int, std::vector<KVPairs >> recv_kvs :收到的pull 結果: kv value ;
  • std::unordered_map<int, Callback> callbacks :收到 request 的全部 response 以後執行的回調函數;
  • Slicer slicer_ :默認 slice 函數變量,該函數在調用Send函數時,將KVPairs按照每一個server的Range切片;

主要函數爲:

  • ZPush 零拷貝push函數

  • ZPull 零拷貝pull函數

  • AddPullCB key重組函數

  • Process 消息處理函數

  • DefaultSlicer 默認的slice 處理函數

  • set_slicer:設置slicer_成員,該函數在調用Send函數時,將KVPairs按照每一個server的Range切片;

/**
 * \brief A worker node that can \ref Push (\ref Pull) key-value pairs to (from) server
 * nodes
 *
 * \tparam Val the type of value, which should be primitive types such as
 * int32_t and float
 */
template<typename Val>
class KVWorker : public SimpleApp {
 public:
  /** avoid too many this-> */
  using SimpleApp::obj_; // Customer 對象
  /**
   * \brief callback function for \ref Push and \ref Pull
   *
   * It is called by the data receiving thread of this instance when the push or
   * pull is actually finished. Namely the kv pairs have already written into
   * servers' data structure or the kv pairs have already pulled back.
   */
  using Callback = std::function<void()>;

  /**
   * \brief constructor
   *
   * \param app_id the app id, should match with \ref KVServer's id
   * \param customer_id the customer id which is unique locally
   */
  explicit KVWorker(int app_id, int customer_id) : SimpleApp() {
    using namespace std::placeholders;
    slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3);
    obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1));
  }

  /** \brief deconstructor */
  virtual ~KVWorker() { delete obj_; obj_ = nullptr; }

  using SlicedKVs = std::vector<std::pair<bool, KVPairs<Val>>>;
  /**
   * \brief a slicer partitions a key-value list according to the key ranges
   * \param send the kv list for partitioning
   * \param ranges the key ranges, ranges[i] is the key range of server i
   * \param sliced the sliced lists. slices[i] should only contains keys in
   * ranges[i] and the according values
   */
  using Slicer = std::function<void(
      const KVPairs<Val>& send, const std::vector<Range>& ranges,
      SlicedKVs* sliced)>;

  /**
   * \brief set a user-defined slicer
   */
  void set_slicer(const Slicer& slicer) {
    CHECK(slicer); slicer_ = slicer;
  }

 private:
  /**
   * \brief add a callback for a request. threadsafe.
   * @param cb callback
   * @param timestamp the timestamp of the request
   */
  void AddCallback(int timestamp, const Callback& cb) {
    if (!cb) return;
    std::lock_guard<std::mutex> lk(mu_);
    callbacks_[timestamp] = cb;
  }

  /** \brief data buffer for received kvs for each timestamp */
  std::unordered_map<int, std::vector<KVPairs<Val>>> recv_kvs_; // 收到的 kv value 
  /** \brief callbacks for each timestamp */
  std::unordered_map<int, Callback> callbacks_; // 收到 request 的全部 response 以後執行的回調函數
  /** \brief lock */
  std::mutex mu_;
  /** \brief kv list slicer */
  Slicer slicer_; // 默認 slice 函數變量
};

4.3 功能函數

4.3.1 Push & ZPush

由於 Push 調用了 ZPush,因此咱們放在一塊兒介紹。

Push方法主要就是:

  • 把數據(KV列表)發送到對應的服務器節點;
  • KV列表是依據每一個服務器維護的 Key range 來進行分區發送;
  • Push 是異步直接返回,若是想知道返回結果如何,則能夠:
    • 使用 Wait 來等待,即利用tracker_來記錄發送的請求量和對應的響應請求量,當發送量等於接收量的時候,表示每一個請求都成功發送了,以此來達到同步的目的;
    • 使用 callback,這樣當結束時候就能夠回調到。

ZPush 方法是:

  • 使用obj_(Customer類型)的 NewRequest 方法來記錄記錄發送的請求量和對應的響應請求量,而且返回一個時間戳;
  • 設置好對應 timestamp 的 callback;
  • 使用傳入的參數構造KVPair對象,調用Send送出該對象;
int Push(const std::vector<Key>& keys,
           const std::vector<Val>& vals,
           const std::vector<int>& lens = {},
           int cmd = 0,
           const Callback& cb = nullptr,
           int priority = 0) {
    return ZPush(
        SArray<Key>(keys), SArray<Val>(vals), SArray<int>(lens), cmd, cb,
        priority);
  }
  
  int ZPush(const SArray<Key>& keys,
            const SArray<Val>& vals,
            const SArray<int>& lens = {},
            int cmd = 0,
            const Callback& cb = nullptr,
            int priority = 0) {
    int ts = obj_->NewRequest(kServerGroup);
    AddCallback(ts, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.vals = vals;
    kvs.lens = lens;
    kvs.priority = priority;
    Send(ts, true, false, cmd, kvs);
    return ts;
  }

如何調用能夠參考其註釋:

* Sample usage: the following codes push two KV pairs `{1, (1.1, 1.2)}` and `{3,
   * (3.1,3.2)}` to server nodes, where the value is a length-2 float vector
   * \code
   *   KVWorker<float> w;
   *   std::vector<Key> keys = {1, 3};
   *   std::vector<float> vals = {1.1, 1.2, 3.1, 3.2};
   *   w.Push(keys, vals);
   * \endcode

4.3.2 Pull

pull方法跟push的邏輯大致相似:

  • 綁定一個回調函數,用於拷貝數據,而且獲得一個時間戳。
  • 根據key_vector從Server上拉取val_vector,
  • 最終返回timestamp,
  • 該函數不阻塞,可用worker.Wait(timestamp)等待;
int Pull(const std::vector<Key>& keys,
           std::vector<Val>* vals,
           std::vector<int>* lens = nullptr,
           int cmd = 0,
           const Callback& cb = nullptr,
           int priority = 0) {
    SArray<Key> skeys(keys);
    int ts = AddPullCB(skeys, vals, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = skeys;
    kvs.priority = priority;
    Send(ts, false, true, cmd, kvs);
    return ts;
  }

4.3.3 ZPull

邏輯與 Pull 一致,只是省略了拷貝到系統這個過程。所以須要保證在ZPull完成前,調用者沒有改變key_vector;

int ZPull(const SArray<Key>& keys,
            SArray<Val>* vals,
            SArray<int>* lens = nullptr,
            int cmd = 0,
            const Callback& cb = nullptr,
            int priority = 0) {
    int ts = AddPullCB(keys, vals, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.priority = priority;
    Send(ts, false, true, cmd, kvs);
    return ts;
  }

4.3.4 Send

Push()Pull()最後都會調用Send()函數,Send()對KVPairs進行切分,由於每一個Server只保留一部分參數,所以切分後的SlicedKVpairs就會被髮送給不一樣的Server。

若是是 skipped,則會直接調用 callback。

不然遍歷發送。

template <typename Val>
void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) {
  // slice the message
  SlicedKVs sliced;
  slicer_(kvs, Postoffice::Get()->GetServerKeyRanges(), &sliced);

  // need to add response first, since it will not always trigger the callback
  int skipped = 0;
  for (size_t i = 0; i < sliced.size(); ++i) {
    if (!sliced[i].first) ++skipped;
  }
  obj_->AddResponse(timestamp, skipped);
  if ((size_t)skipped == sliced.size()) {
    RunCallback(timestamp);
  }

  for (size_t i = 0; i < sliced.size(); ++i) {
    const auto& s = sliced[i];
    if (!s.first) continue;
    Message msg;
    msg.meta.app_id = obj_->app_id();
    msg.meta.customer_id = obj_->customer_id();
    msg.meta.request     = true;
    msg.meta.push        = push;
    msg.meta.pull        = pull;
    msg.meta.head        = cmd;
    msg.meta.timestamp   = timestamp;
    msg.meta.recver      = Postoffice::Get()->ServerRankToID(i);
    msg.meta.priority    = kvs.priority;
    const auto& kvs = s.second;
    if (kvs.keys.size()) {
      msg.AddData(kvs.keys);
      msg.AddData(kvs.vals);
      if (kvs.lens.size()) {
        msg.AddData(kvs.lens);
      }
    }
    Postoffice::Get()->van()->Send(msg);
  }
}

4.3.5 DefaultSlicer

切分函數能夠由用戶自行重寫,默認爲DefaultSlicer,每一個SlicedKVPairs被包裝成Message對象,而後用van::send()發送。

根據std::vector& ranges分片範圍信息,將要發送的數據進行分片。目前默認的使用 Postoffice::GetServerKeyRanges來劃分分片範圍。

template <typename Val>
void KVWorker<Val>::DefaultSlicer(
    const KVPairs<Val>& send, const std::vector<Range>& ranges,
    typename KVWorker<Val>::SlicedKVs* sliced) {
  sliced->resize(ranges.size());

  // find the positions in msg.key
  size_t n = ranges.size();
  std::vector<size_t> pos(n+1);
  const Key* begin = send.keys.begin();
  const Key* end = send.keys.end();
  for (size_t i = 0; i < n; ++i) {
    if (i == 0) {
      pos[0] = std::lower_bound(begin, end, ranges[0].begin()) - begin;
      begin += pos[0];
    } else {
      CHECK_EQ(ranges[i-1].end(), ranges[i].begin());
    }
    size_t len = std::lower_bound(begin, end, ranges[i].end()) - begin;
    begin += len;
    pos[i+1] = pos[i] + len;

    // don't send it to servers for empty kv
    sliced->at(i).first = (len != 0);
  }
  CHECK_EQ(pos[n], send.keys.size());
  if (send.keys.empty()) return;

  // the length of value
  size_t k = 0, val_begin = 0, val_end = 0;
  if (send.lens.empty()) {
    k = send.vals.size() / send.keys.size();
    CHECK_EQ(k * send.keys.size(), send.vals.size());
  } else {
    CHECK_EQ(send.keys.size(), send.lens.size());
  }

  // slice
  for (size_t i = 0; i < n; ++i) {
    if (pos[i+1] == pos[i]) {
      sliced->at(i).first = false;
      continue;
    }
    sliced->at(i).first = true;
    auto& kv = sliced->at(i).second;
    kv.keys = send.keys.segment(pos[i], pos[i+1]);
    if (send.lens.size()) {
      kv.lens = send.lens.segment(pos[i], pos[i+1]);
      for (int l : kv.lens) val_end += l;
      kv.vals = send.vals.segment(val_begin, val_end);
      val_begin = val_end;
    } else {
      kv.vals = send.vals.segment(pos[i]*k, pos[i+1]*k);
    }
  }
}

4.3.6 PushPull & ZPushPull

就是把 push,pull 聚合在一塊兒。

int PushPull(const std::vector<Key>& keys,
               const std::vector<Val>& vals,
               std::vector<Val>* outs,
               std::vector<int>* lens = nullptr,
               int cmd = 0,
               const Callback& cb = nullptr,
               int priority = 0) {
    CHECK_NOTNULL(outs);
    if (outs->empty())
      outs->resize(vals.size());
    else
      CHECK_EQ(vals.size(), outs->size());

    SArray<Key> skeys(keys);
    SArray<Val> svals(vals);
    auto souts = new SArray<Val>(outs->data(), outs->size());
    SArray<int>* slens = lens ?
        new SArray<int>(lens->data(), lens->size()) : nullptr;
    int ts = ZPushPull(skeys, svals, souts, slens, cmd,
        [this, cb, souts, slens]() {
          delete souts;
          delete slens;
          if (cb) cb();
        }, priority);
    return ts;
  }

  int ZPushPull(const SArray<Key>& keys,
                const SArray<Val>& vals,
                SArray<Val>* outs,
                SArray<int>* lens = nullptr,
                int cmd = 0,
                const Callback& cb = nullptr,
                int priority = 0) {
    int ts = AddPullCB(keys, outs, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.vals = vals;
    kvs.priority = priority;
    if (lens)
      kvs.lens = *lens;
    Send(ts, true, true, cmd, kvs);
    re

4.3.7 Callback 相關

前面提到了一些回調函數的設置,下面咱們看看如何使用。

4.3.7.1 設置

咱們能夠看到,針對每一個時間戳,設置了一個回調函數,進而構成了一個回調函數列表。

每次發送請求以後,都會往這個列表中註冊回調函數。

using Callback = std::function<void()>;
  
  /** \brief callbacks for each timestamp */
  std::unordered_map<int, Callback> callbacks_;  // 回調函數列表
  
  void AddCallback(int timestamp, const Callback& cb) {
    if (!cb) return;
    std::lock_guard<std::mutex> lk(mu_);
    callbacks_[timestamp] = cb; // 添加回調函數
  }
4.3.7.2 AddPullCB

這是 pull 以後,獲得應答的回調函數,用於拷貝返回的數據。

可是,若是是多個 Server 都應該有返回,應該如何處理?不管是 push 仍是 pull,只有在收到了全部的Response以後,纔會將從各個server上拉取的value填入本地的vals裏。

template <typename Val>
template <typename C, typename D>
int KVWorker<Val>::AddPullCB(
    const SArray<Key>& keys, C* vals, D* lens, int cmd,
    const Callback& cb) {
  int ts = obj_->NewRequest(kServerGroup);
  AddCallback(ts, [this, ts, keys, vals, lens, cb]() mutable {
      mu_.lock();
      auto& kvs = recv_kvs_[ts];
      mu_.unlock();

      // do check
      size_t total_key = 0, total_val = 0;
      for (const auto& s : kvs) { // 進行有效性驗證
        Range range = FindRange(keys, s.keys.front(), s.keys.back()+1);
        CHECK_EQ(range.size(), s.keys.size())
            << "unmatched keys size from one server";
        if (lens) CHECK_EQ(s.lens.size(), s.keys.size());
        total_key += s.keys.size();
        total_val += s.vals.size();
      }
      CHECK_EQ(total_key, keys.size()) << "lost some servers?";

      // fill vals and lens
      std::sort(kvs.begin(), kvs.end(), [](
          const KVPairs<Val>& a, const KVPairs<Val>& b) {
                  return a.keys.front() < b.keys.front();
        });
      CHECK_NOTNULL(vals);
      if (vals->empty()) {
        vals->resize(total_val);
      } else {
        CHECK_EQ(vals->size(), total_val);
      }
      Val* p_vals = vals->data();
      int *p_lens = nullptr;
      if (lens) {
        if (lens->empty()) {
          lens->resize(keys.size());
        } else {
          CHECK_EQ(lens->size(), keys.size());
        }
        p_lens = lens->data();
      }
      for (const auto& s : kvs) { // 拷貝返回的數據
        memcpy(p_vals, s.vals.data(), s.vals.size() * sizeof(Val));
        p_vals += s.vals.size();
        if (p_lens) {
          memcpy(p_lens, s.lens.data(), s.lens.size() * sizeof(int));
          p_lens += s.lens.size();
        }
      }

      mu_.lock();
      recv_kvs_.erase(ts);
      mu_.unlock();
      if (cb) cb();
    });

  return ts;
}
4.3.7.3 運行

就是依據時間戳找到回調函數,運行,而後刪除。

什麼時候調用,就是在 Process 之中會調用,咱們立刻介紹。

template <typename Val>
void KVWorker<Val>::RunCallback(int timestamp) {
  mu_.lock();
  auto it = callbacks_.find(timestamp);
  if (it != callbacks_.end()) {
    mu_.unlock();

    CHECK(it->second);
    it->second();

    mu_.lock();
    callbacks_.erase(it);
  }
  mu_.unlock();
}

4.3.8 Process

若是是 Pull 的 response, 在每次收到的Response返回的values,會先保存recv_kvs_裏,recv_kvs_[ts].push_back(kvs);

不管是 push 仍是 pull,只有在收到了全部的Response以後,纔會將從各個server上拉取的value填入本地的vals裏。

template <typename Val>
void KVWorker<Val>::Process(const Message& msg) {
  if (msg.meta.simple_app) {
    SimpleApp::Process(msg); return;
  }
  // store the data for pulling
  int ts = msg.meta.timestamp;
  if (msg.meta.pull) {
    CHECK_GE(msg.data.size(), (size_t)2);
    KVPairs<Val> kvs;
    kvs.keys = msg.data[0];
    kvs.vals = msg.data[1];
    if (msg.data.size() > (size_t)2) {
      kvs.lens = msg.data[2];
    }
    mu_.lock();
    recv_kvs_[ts].push_back(kvs);
    mu_.unlock();
  }

  // finished, run callbacks,只有在收到了全部的Response以後
  if (obj_->NumResponse(ts) == Postoffice::Get()->num_servers() - 1)  {
    RunCallback(ts); // 在這裏調用了 RunCallback。
  }
}

0x05 總結

最後咱們用一個消息傳遞流程作一下總結,看看各個部分在其中如何使用。整體流程圖以下:

  1. worker節點 要發送消息,因此調用了 Send 方法。
  2. Send 方法會調用到了 Customer的 NewRequest,來創建一個新請求。
  3. Postoffice::start()會初始化節點信息,而且調用Van::start()
  4. Send方法會調用 Van 的 send 方法來進行網絡交互。
  5. 通過網絡傳遞以後,流程來到了 Server 處,對於 Server 來講,這是一個 Request,調用到了 Van 的 Receiving。Van::Receiving()接收後消息以後,根據不一樣命令執行不一樣動做。針對數據消息,若是須要下一步處理,會調用 ProcessDataMsg。
  6. 繼續調用到 Van 的 ProcessDataMsg,而後調用 GetCustomer。
  7. GetCustomer 會調用到Postoffice,對於 customers_ 進行相應處理。
  8. Customer 會使用 Accept 來處理消息。
  9. Customer::Accept() 函數將消息添加到一個隊列recv_queue_
  10. Customer 對象自己也會啓動一個接受線程 recv_thread_,使用 Customer::Receiving() :
    1. 不斷的從recv_queue_隊列取消息。
    2. 若是 (!recv.meta.request) ,就說明是 response,則tracker_[req.timestamp].second++
    3. 調用註冊的用戶自定義的recv_handle_函數對消息進行處理。
  11. Van::Receiving() 調用註冊的用戶自定義的recv_handle_函數對消息進行處理。
  12. 對於Server來講,其註冊的recv_handle_KVServer::Process()函數。
  13. Process 函數調用 request_handle_ 繼續處理,生成 Response,返回給 Worker。
  14. Response 通過網絡傳遞給 Worker。
  15. 運行回到了 Worker,來到了 Worker 的 Van。對於 worker 來講,這是一個 Request,調用到了 Van 的 Receiving。(如下操做序列和 Server 相似
  16. Van::Receiving()接收後消息以後,根據不一樣命令執行不一樣動做。針對數據消息,若是須要下一步處理,會調用 ProcessDataMsg。
  17. Customer 會使用 Accept 來處理消息。
  18. Customer::Accept() 函數將消息添加到一個隊列recv_queue_
  19. 這裏有個解耦合,由一個新線程 recv_thread_處理。
  20. Customer 對象自己已經啓動一個新線程 recv_thread_,使用 Customer::Receiving() 。
  21. 對於Worker來講,其註冊的recv_handle_KVWorker::Process()函數。
  22. 調用到KVWorker::Process()函數處理響應消息Response。
+---------------------+       +------------------------+   Worker   +  Server            +--------------------------+
| KVWorker            |  1    |  Van                   |      3     |                    | Van                      |
|          Send  +--------+--------------->  send +-----------------+----->  Request +----------->  Receiving       |
|                     |   |   |                        |                                 |              +           |
|                     |   |   |       Receiving   <---------+       |           4        |              |           |             +---------------------------+
|                     |   |   |           +            |    |       |                    |              |           |             | Postoffice                |
|    Process          |   |   |           | 16         |    |       |                    |              | 5         |             |                           |
|      ^              |   |   |           v            |    | 15    |                    |              v           | GetCustomer |                           |
|      |              |   |   |     ProcessDataMsg     |    |       |                    |        ProcessDataMsg <------------------> unordered_map customers_|
|      |              |   |   |           +            |    |       |                    |              +           |      6      |                           |
|      |              |   |   |           |            |    |       |                    |              |           |             +---------------------------+
+---------------------+   |   +------------------------+    |       |                    +--------------------------+
       |                  |               |                 |       |                                   |
       |                  |2              |  17             |       |                                   | 7
       |                  |               |                 |       |                                   |
       |     +---------------------------------------+      |       |         +------------------------------------+
       |     | Customer   |               |          |      |       |         | Customer                |          |
       |     |            |               v          |      |       |         |                         |          |
       |     |            v                          |      |       |         |                         v          |
       |     |        NewRequest        Accept       |      |       |         |                      Accept        |
       |     |                            +          |      |       |         |                         +          |
       |     |                            |  18      |      |       |         |                         |          |
       |     |                            |          |      |       |         |                         | 8        |
       |     |                            v          |      |       |         |                         v          |
       |     |                      revc_queue_      |      |       |         |                    recv_queue_     |
       |     |                            +          |      |       |         |                         +          |
    22 |     |                            |  19      |      |       |         |                         | 9        |
       |     |                            |          |      |       |         |                         |          |
       |     |                  20        v          |      |       |         |                10       v          |
       |     | recv_thread_ +-------> Receving       |      |       |         |  recv_thread_ +---> Receiving      |
       |     |                            |          |      |       |         |                         +          |
       |     |                            |  21      |      |       |         |                         | 11       |
       |     |                            |          |      |       |         |                         |          |                +------------------+
       |     |                            v          |      |       |         |                         v          |                |KVServer          |
       +---------------------------+ recv_handle     |      |       |         |                    recv_handle_+------------------> |                  |
             |                                       |      |       |         |                                    |          12    |         Process  |
             +---------------------------------------+      |       |         +------------------------------------+                |           +      |
                                                            |       |                                                               +------------------+
                                                            |       |                                                                           |
                                                            |       |                                                                           |  13
                                                            |       |                                                                           v
                                                            |       |                                                               +-----------+-------+
                                                            |       |                                                               | request_handle_   |
                                                            |       |                                            14                 |                   |
                                                            +<-----------+   Response <----------------------------------------------------+ Response   |
                                                                    |                                                               |                   |
                                                                    |                                                               +-------------------+
                                                                    +

手機以下:

0xEE 我的信息

★★★★★★關於生活和技術的思考★★★★★★

微信公衆帳號:羅西的思考

若是您想及時獲得我的撰寫文章的消息推送,或者想看看我的推薦的技術資料,敬請關注。

在這裏插入圖片描述

0xFF 參考

史上最全面的ps-lite理解

從零實現機器學習參數服務器框架(二)

相關文章
相關標籤/搜索