在講 rpc 以前,先講講通訊協議的設計。什麼是通信協議,簡單地說,是指通訊雙方對數據傳送控制的一種約定。通訊不只僅是把消息傳過去,還要保證消息的準確和可靠。node
能夠先看看 amp 協議的設計, 對於首個字節,咱們把版本存到它的低四位,多少條數據存到它的高四位,恰好消耗完一個字節。對於接下來的每一個數據,都是以 <length>
/ <data>
這種形式存在,每一個數據的長度以4個字節表示,而數據的長度是不固定的。git
All multi-byte integers are big endian. The
version
andargc
integers are stored in the first byte, followed by a sequence of zero or more<length>
/<data>
pairs, wherelength
is a 32-bit unsigned integer.github
0 1 2 3 4 <length> ...
+------------+----------+------------+
| <ver/argc> | <length> | <data> | additional arguments
+------------+----------+------------+
複製代碼
須要注意的一點是,字節的高地位和高低地址是兩回事,這樣就設計到採用大端序仍是小端序存儲數據的問題。固然,對於網絡協議的設計,咱們須要採用大端序。api
它們的區別在於:Int8 只須要一個字節就能夠表示,而 Short,Int32,Double 這些類型一個字節放不下,咱們就要用多個字節表示,這就要引入「字節序」的概念,也就是字節存儲的順序。對於某一個要表示的值,是把它的低位存到低地址,仍是把它的高位存到低地址,前者叫小端字節序(Little Endian),後者叫大端字節序(Big Endian)。大端和小端各有優缺點,不一樣的CPU廠商並無達成一致,可是當網絡通信的時候你們必須統一標準,否則沒法通信了。爲了統一網絡傳輸時候的字節的順序,TCP/IP 協議 RFC1700 裏規定使用「大端」字節序做爲網絡字節序,因此,咱們在開發網絡通信協議的時候操做 Buffer 都應該用大端序的 API,也就是 BE 結尾的。參考來自:聊聊 Node.js RPC(一)— 協議數組
amp 的使用方法也很簡單,encode 對數據進行編碼,decode 對數據進行解碼。bash
var bin = amp.encode([Buffer.from('hello'), Buffer.from('world')]);
var msg = amp.decode(bin);
console.log(msg);
複製代碼
接下來咱們就來看看 encode 是怎麼實現的~ 首先咱們須要確認,咱們一共須要多少字節,上面咱們已經講到了協議的組成,因此咱們須要一個字節存儲協議的版本號和數據條數的,加上每條數據須要4個字節存儲數據長度以及數據自己長度須要的字節數。而對於首個字節,咱們經過 version << 4 | argc
, 來實現版本在低四位,數據條數在高四位。網絡
對於每條數據的存儲,咱們前面說過,首先把數據的長度32位無符號大端序寫入,接着再把數據寫入。循環結束之時,就是咱們的封印,不,咱們的數據寫完之時~函數
/** * Protocol version. */
var version = 1;
/** * Encode `msg` and `args`. * * @param {Array} args * @return {Buffer} * @api public */
module.exports = function(args){
var argc = args.length;
var len = 1;
var off = 0;
// data length
for (var i = 0; i < argc; i++) {
len += 4 + args[i].length;
}
// buffer
var buf = Buffer.allocUnsafe(len);
// pack meta
buf[off++] = version << 4 | argc;
// pack args
for (var i = 0; i < argc; i++) {
var arg = args[i];
buf.writeUInt32BE(arg.length, off);
off += 4;
arg.copy(buf, off);
off += arg.length;
}
return buf;
};
複製代碼
咱們再來看看怎麼把數據解碼 decode ,不論是編碼仍是解碼,須要牢記於心的是,咱們前面通信協議的結構是怎樣的~ui
首先咱們從 buf 從拿出首個字節,meta >> 4
拿出低四位的版本號,meta & 0xf
經過讓高四位相與,拿到數據條數 argv。this
接着開始循環,首先取出 readUInt32BE
數據的長度,接着經過 buf.slice(off, off += len)
拿到數據,最後把數據數組返回去,打完收工 ~
/** * Decode the given `buf`. * * @param {Buffer} buf * @return {Object} * @api public */
module.exports = function(buf){
var off = 0;
// unpack meta
var meta = buf[off++];
var version = meta >> 4;
var argv = meta & 0xf;
var args = new Array(argv);
// unpack args
for (var i = 0; i < argv; i++) {
var len = buf.readUInt32BE(off);
off += 4;
var arg = buf.slice(off, off += len);
args[i] = arg;
}
return args;
};
複製代碼
可是在實際編碼過程當中,數據的傳輸都是經過流的信息傳輸,因此咱們須要從流的數據中拿到咱們編碼後的數據~例如像下面這樣,咱們把客戶端傳過來的數據 pipe 過去給 parser 處理,拿出完整編碼數據。
var server = net.createServer(function(sock){
var parser = new amp.Stream;
parser.on('data', function(chunk){
var args = chunk.map(function(c){
return c.toString();
});
var meth = args.shift();
console.log('.%s(%s)', meth, args.join(', '));
});
sock.pipe(parser);
});
複製代碼
So, 咱們看看 amp 中的 Stream 怎麼實現的。 Stream 繼承可寫流,同時自身維護 state 表示對 chunk 怎麼處理,_lenbuf 表示協議中當前數據的長度~
state 一共有三種狀態,message 表示開始解析協議,咱們從中拿出協議版本和多少條數據,arglen 表示拿出當前數據的長度,arg 表示解析當前數據。
咱們看看 _write 函數,一開始 state 的狀態是 mesaage,取出協議版本號和數據條數,_nargs 表示取出多少條數據了,_leni 是對當前數據長度的計數,由於數據長度使用四個字節存儲的,_bufs 用來暫時amp 數據,同時把 state 狀態設置爲 arglen,由於 amp 協議 <ver/argc>
後面跟着是數據的長度。
進入 arglen,同時取出數據長度低位,由於是數據的長度是用四個字節大端序存儲的,因此有可能數據長度的其餘字節可能在下一個chunk裏面,因此不是一會兒取出四個字節。當 _leni 爲4時,說明拿到所有的四個字節了,從臨時數據長度 _lenbuf 大端序取出數據的長度,賦值給 _arglen,用 _argcur 來計數是否讀取完單條數據了,最後把 state 狀態設置 爲 arg,表示下個階段要去拿數據了。
若是 chunk 還有數據,這時進去 arg 的分支,首先拿出當前數據剩餘的字節數 this._arglen - this._argcur
, 再一次解釋,緣由跟 arglen 同樣,數據可能殘餘在下一個 chunk 裏面,而後企圖從 chunk 裏面拿出最多的數據 Math.min(rem + i, chunk.length)
, 講切割下來的數據塞入 _bufs 數組,若是當前數據取完,this._argcur == this._arglen
則成立,同時對 amp 中已讀取的數據 _nargs++, 若是 _nargs 已經等於 amp 頭字節說提示的全部數據的條數,這是把狀態置爲 state,讀取下一個 amp 同時觸發 data 監聽,把當前 amp 的數據傳遞出去。而若是 done 成立,則說明當前數據讀取完畢,重置數據長度計數器 _leni 和 state 重置爲 arglen 去讀取下一個數據的長度。而若是done條件不知足,則說明當前的數據在下一個 chunk 裏面,但願下一個 chunk 的時候繼續讀取數當前數據剩餘的字節。
最後 chunk 讀取完畢,觸發 fn 回調~
/** * Module dependencies. */
var Stream = require('stream').Writable;
var encode = require('./encode');
/** * Expose parser. */
module.exports = Parser;
/** * Initialize parser. * * @param {Options} [opts] * @api public */
function Parser(opts) {
Stream.call(this, opts);
this.state = 'message';
this._lenbuf = Buffer.allocUnsafe(4);
}
/** * Inherit from `Stream.prototype`. */
Parser.prototype.__proto__ = Stream.prototype;
/** * Write implementation. */
Parser.prototype._write = function(chunk, encoding, fn){
for (var i = 0; i < chunk.length; i++) {
switch (this.state) {
case 'message':
var meta = chunk[i];
this.version = meta >> 4;
this.argv = meta & 0xf;
this.state = 'arglen';
this._bufs = [Buffer.from([meta])];
this._nargs = 0;
this._leni = 0;
break;
case 'arglen':
this._lenbuf[this._leni++] = chunk[i];
// done
if (4 == this._leni) {
this._arglen = this._lenbuf.readUInt32BE(0);
var buf = Buffer.allocUnsafe(4);
buf[0] = this._lenbuf[0];
buf[1] = this._lenbuf[1];
buf[2] = this._lenbuf[2];
buf[3] = this._lenbuf[3];
this._bufs.push(buf);
this._argcur = 0;
this.state = 'arg';
}
break;
case 'arg':
// bytes remaining in the argument
var rem = this._arglen - this._argcur;
// consume the chunk we need to complete
// the argument, or the remainder of the
// chunk if it's not mixed-boundary
var pos = Math.min(rem + i, chunk.length);
// slice arg chunk
var part = chunk.slice(i, pos);
this._bufs.push(part);
// check if we have the complete arg
this._argcur += pos - i;
var done = this._argcur == this._arglen;
i = pos - 1;
if (done) this._nargs++;
// no more args
if (this._nargs == this.argv) {
this.state = 'message';
this.emit('data', Buffer.concat(this._bufs));
break;
}
if (done) {
this.state = 'arglen';
this._leni = 0;
}
break;
}
}
fn();
};
複製代碼
上面就是 amp 協議設計的所有細節,但願經過這對協議設計的細節有進一步的認識~ 寫