淺談 rpc 通訊協議設計

在講 rpc 以前,先講講通訊協議的設計。什麼是通信協議,簡單地說,是指通訊雙方對數據傳送控制的一種約定。通訊不只僅是把消息傳過去,還要保證消息的準確和可靠。node

能夠先看看 amp 協議的設計, 對於首個字節,咱們把版本存到它的低四位,多少條數據存到它的高四位,恰好消耗完一個字節。對於接下來的每一個數據,都是以 <length> / <data> 這種形式存在,每一個數據的長度以4個字節表示,而數據的長度是不固定的。git

All multi-byte integers are big endian. The version and argc integers are stored in the first byte, followed by a sequence of zero or more <length> / <data> pairs, where length is a 32-bit unsigned integer.github

0        1 2 3 4     <length>    ...
+------------+----------+------------+
| <ver/argc> | <length> | <data>     | additional arguments
+------------+----------+------------+
複製代碼

須要注意的一點是,字節的高地位和高低地址是兩回事,這樣就設計到採用大端序仍是小端序存儲數據的問題。固然,對於網絡協議的設計,咱們須要採用大端序。api

大端序和小端序

它們的區別在於:Int8 只須要一個字節就能夠表示,而 Short,Int32,Double 這些類型一個字節放不下,咱們就要用多個字節表示,這就要引入「字節序」的概念,也就是字節存儲的順序。對於某一個要表示的值,是把它的低位存到低地址,仍是把它的高位存到低地址,前者叫小端字節序(Little Endian),後者叫大端字節序(Big Endian)。大端和小端各有優缺點,不一樣的CPU廠商並無達成一致,可是當網絡通信的時候你們必須統一標準,否則沒法通信了。爲了統一網絡傳輸時候的字節的順序,TCP/IP 協議 RFC1700 裏規定使用「大端」字節序做爲網絡字節序,因此,咱們在開發網絡通信協議的時候操做 Buffer 都應該用大端序的 API,也就是 BE 結尾的。參考來自:聊聊 Node.js RPC(一)— 協議數組

amp 的使用方法也很簡單,encode 對數據進行編碼,decode 對數據進行解碼。bash

var bin = amp.encode([Buffer.from('hello'), Buffer.from('world')]);
var msg = amp.decode(bin);
console.log(msg);
複製代碼

接下來咱們就來看看 encode 是怎麼實現的~ 首先咱們須要確認,咱們一共須要多少字節,上面咱們已經講到了協議的組成,因此咱們須要一個字節存儲協議的版本號和數據條數的,加上每條數據須要4個字節存儲數據長度以及數據自己長度須要的字節數。而對於首個字節,咱們經過 version << 4 | argc, 來實現版本在低四位,數據條數在高四位。網絡

對於每條數據的存儲,咱們前面說過,首先把數據的長度32位無符號大端序寫入,接着再把數據寫入。循環結束之時,就是咱們的封印,不,咱們的數據寫完之時~函數

/** * Protocol version. */

var version = 1;

/** * Encode `msg` and `args`. * * @param {Array} args * @return {Buffer} * @api public */

module.exports = function(args){
  var argc = args.length;
  var len = 1;
  var off = 0;

  // data length
  for (var i = 0; i < argc; i++) {
    len += 4 + args[i].length;
  }

  // buffer
  var buf = Buffer.allocUnsafe(len);

  // pack meta
  buf[off++] = version << 4 | argc;

  // pack args
  for (var i = 0; i < argc; i++) {
    var arg = args[i];

    buf.writeUInt32BE(arg.length, off);
    off += 4;

    arg.copy(buf, off);
    off += arg.length;
  }

  return buf;
};
複製代碼

咱們再來看看怎麼把數據解碼 decode ,不論是編碼仍是解碼,須要牢記於心的是,咱們前面通信協議的結構是怎樣的~ui

首先咱們從 buf 從拿出首個字節,meta >> 4 拿出低四位的版本號,meta & 0xf 經過讓高四位相與,拿到數據條數 argv。this

接着開始循環,首先取出 readUInt32BE 數據的長度,接着經過 buf.slice(off, off += len) 拿到數據,最後把數據數組返回去,打完收工 ~

/** * Decode the given `buf`. * * @param {Buffer} buf * @return {Object} * @api public */

module.exports = function(buf){
  var off = 0;

  // unpack meta
  var meta = buf[off++];
  var version = meta >> 4;
  var argv = meta & 0xf;
  var args = new Array(argv);

  // unpack args
  for (var i = 0; i < argv; i++) {
    var len = buf.readUInt32BE(off);
    off += 4;

    var arg = buf.slice(off, off += len);
    args[i] = arg;
  }

  return args;
};
複製代碼

可是在實際編碼過程當中,數據的傳輸都是經過流的信息傳輸,因此咱們須要從流的數據中拿到咱們編碼後的數據~例如像下面這樣,咱們把客戶端傳過來的數據 pipe 過去給 parser 處理,拿出完整編碼數據。

var server = net.createServer(function(sock){
  var parser = new amp.Stream;
  
  parser.on('data', function(chunk){
    var args = chunk.map(function(c){
      return c.toString();
    });

    var meth = args.shift();
    console.log('.%s(%s)', meth, args.join(', '));
  });

  sock.pipe(parser);
});
複製代碼

So, 咱們看看 amp 中的 Stream 怎麼實現的。 Stream 繼承可寫流,同時自身維護 state 表示對 chunk 怎麼處理,_lenbuf 表示協議中當前數據的長度~

state 一共有三種狀態,message 表示開始解析協議,咱們從中拿出協議版本和多少條數據,arglen 表示拿出當前數據的長度,arg 表示解析當前數據。

咱們看看 _write 函數,一開始 state 的狀態是 mesaage,取出協議版本號和數據條數,_nargs 表示取出多少條數據了,_leni 是對當前數據長度的計數,由於數據長度使用四個字節存儲的,_bufs 用來暫時amp 數據,同時把 state 狀態設置爲 arglen,由於 amp 協議 <ver/argc> 後面跟着是數據的長度。

進入 arglen,同時取出數據長度低位,由於是數據的長度是用四個字節大端序存儲的,因此有可能數據長度的其餘字節可能在下一個chunk裏面,因此不是一會兒取出四個字節。當 _leni 爲4時,說明拿到所有的四個字節了,從臨時數據長度 _lenbuf 大端序取出數據的長度,賦值給 _arglen,用 _argcur 來計數是否讀取完單條數據了,最後把 state 狀態設置 爲 arg,表示下個階段要去拿數據了。

若是 chunk 還有數據,這時進去 arg 的分支,首先拿出當前數據剩餘的字節數 this._arglen - this._argcur, 再一次解釋,緣由跟 arglen 同樣,數據可能殘餘在下一個 chunk 裏面,而後企圖從 chunk 裏面拿出最多的數據 Math.min(rem + i, chunk.length), 講切割下來的數據塞入 _bufs 數組,若是當前數據取完,this._argcur == this._arglen 則成立,同時對 amp 中已讀取的數據 _nargs++, 若是 _nargs 已經等於 amp 頭字節說提示的全部數據的條數,這是把狀態置爲 state,讀取下一個 amp 同時觸發 data 監聽,把當前 amp 的數據傳遞出去。而若是 done 成立,則說明當前數據讀取完畢,重置數據長度計數器 _leni 和 state 重置爲 arglen 去讀取下一個數據的長度。而若是done條件不知足,則說明當前的數據在下一個 chunk 裏面,但願下一個 chunk 的時候繼續讀取數當前數據剩餘的字節。

最後 chunk 讀取完畢,觸發 fn 回調~

/** * Module dependencies. */

var Stream = require('stream').Writable;
var encode = require('./encode');

/** * Expose parser. */

module.exports = Parser;

/** * Initialize parser. * * @param {Options} [opts] * @api public */

function Parser(opts) {
  Stream.call(this, opts);
  this.state = 'message';
  this._lenbuf = Buffer.allocUnsafe(4);
}

/** * Inherit from `Stream.prototype`. */

Parser.prototype.__proto__ = Stream.prototype;

/** * Write implementation. */

Parser.prototype._write = function(chunk, encoding, fn){
  for (var i = 0; i < chunk.length; i++) {
    switch (this.state) {
      case 'message':
        var meta = chunk[i];
        this.version = meta >> 4;
        this.argv = meta & 0xf;
        this.state = 'arglen';
        this._bufs = [Buffer.from([meta])];
        this._nargs = 0;
        this._leni = 0;
        break;

      case 'arglen':
        this._lenbuf[this._leni++] = chunk[i];

        // done
        if (4 == this._leni) {
          this._arglen = this._lenbuf.readUInt32BE(0);
          var buf = Buffer.allocUnsafe(4);
          buf[0] = this._lenbuf[0];
          buf[1] = this._lenbuf[1];
          buf[2] = this._lenbuf[2];
          buf[3] = this._lenbuf[3];
          this._bufs.push(buf);
          this._argcur = 0;
          this.state = 'arg';
        }
        break;

      case 'arg':
        // bytes remaining in the argument
        var rem = this._arglen - this._argcur;

        // consume the chunk we need to complete
        // the argument, or the remainder of the
        // chunk if it's not mixed-boundary
        var pos = Math.min(rem + i, chunk.length);

        // slice arg chunk
        var part = chunk.slice(i, pos);
        this._bufs.push(part);

        // check if we have the complete arg
        this._argcur += pos - i;
        var done = this._argcur == this._arglen;
        i = pos - 1;

        if (done) this._nargs++;

        // no more args
        if (this._nargs == this.argv) {
          this.state = 'message';
          this.emit('data', Buffer.concat(this._bufs));
          break;
        }

        if (done) {
          this.state = 'arglen';
          this._leni = 0;
        }
        break;
    }
  }


  fn();
};
複製代碼

上面就是 amp 協議設計的所有細節,但願經過這對協議設計的細節有進一步的認識~ 寫

相關文章
相關標籤/搜索