對於有繼承關係的進程,nodejs自己爲咱們提供了進程間通訊的方式,可是對於沒有繼承關係的進程,好比兄弟進程,想要通訊最簡單的方式就是經過主進程中轉,相似前端框架中子組件經過更新父組件的數據,而後父通知其餘子組件。由於nodejs內置的進程間通訊須要通過序列化和反序列化,因此這種方式可能會帶來必定的性能損耗,並且在實現上也比較麻煩。今天介紹的是實現兄弟進程通訊的另一種方式,在windows上使用命名管道,在非windows上使用unix域,另外本文還會介紹基於tcp的遠程進程通訊的實現。下面具體介紹一下設計和實現。前端
1 IPC的實現
ipc的實現比較簡單,主要是對nodejs提供的功能進行封裝。首先咱們須要處理一下path,由於在命名管道和unix域中他的格式是不同的。node
const os = require('os');
module.exports = { path: os.platform() === 'win32' ? '\\\\?\\pipe\\ipc' : '/tmp/unix.sock',};
接着咱們看看客戶端和服務器的實現。git
1.1 IPCClient的實現
const net = require('net');const { EventEmitter } = require('events');const { path } = require('../config');
class Client extends EventEmitter { constructor(options) { super(); this.options = { path, ...options }; const socket = net.connect(this.options); socket.on('error', (error) => { console.error(error); }); return socket; }}module.exports = { Client,};
1.2 IPCServer的實現
const fs = require('fs');const net = require('net');const { EventEmitter } = require('events');const { path } = require('../config');
class Server extends EventEmitter { constructor(options, connectionListener) { super(); if (typeof options === 'function') { options = { connectionListener: options, }; } else { options = { ...options, connectionListener }; } try { fs.existsSync(options.path) && fs.unlinkSync(options.path); } catch(e) {
} this.options = { path, ...options }; return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => { client.on('error', (error) => { console.error(error); }); typeof this.options.connectionListener === 'function' && this.options.connectionListener(client); }).listen(this.options); }}
module.exports = { Server,};
2 RPC的實現
咱們知道tcp是面向流的服務,他自己只負責傳輸數據,不負責數據的解析和解釋。經過tcp傳輸數據時,須要本身解析數據,咱們須要從一串字節流中解析出一個個數據包。這就涉及到協議的設計。因此首先咱們要定義一個應用層協議。github
2.1 應用層協議的設計和實現
應用層協議的設計很是簡單 npm
1 總長度是除了開頭標記以外的其餘數據長度。由於數據部分是變長的,因此咱們須要一個總長度來判斷後續的數據長度是多少。編程
2 序列號是用於關聯請求和響應,由於咱們在一個鏈接上可能會串行發送多個數據包,當咱們收到一個回包的時候,咱們不知道是來自哪一個請求的響應,經過響應體中的seq,咱們就知道是來自哪一個請求的響應。設計了通訊協議後,咱們就須要對協議進行封包解包。首先咱們看一下封包邏輯。windows
function seq() { return ~~(Math.random() * Math.pow(2, 31))}
function packet(data, sequnce) { // 轉成buffer const bufferData = Buffer.from(data, 'utf-8'); // 開始標記長度 const startFlagLength = Buffer.from([PACKET_START]).byteLength; // 序列號 const _seq = sequnce || seq(); // 分配一個buffer存儲數據 let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN); // 設計開始標記 buffer[0] = 0x3; // 寫入總長度字段的值 buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH); // 寫入序列號的值 buffer.writeUIntBE(_seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN); // 把協議元數據和數據組裝到一塊兒 buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength); return buffer;}
接着咱們看一下解包的邏輯,由於數據的傳輸是字節流,因此有可能多個數據包的數據會粘在一塊兒,因此咱們第一步首先要根據協議解析出一個個數據包,而後再解析每個數據包。咱們經過有限狀態機實現數據的解析。下面是狀態機的狀態集。前端框架
const PARSE_STATE = { PARSE_INIT: 0, PARSE_HEADER: 1, PARSE_DATA: 2, PARSE_END: 3,};
接着咱們定義狀態集的轉換規則。服務器
class StateSwitcher { constructor(options) { this.options = options; }
[PARSE_STATE.PARSE_INIT](data) { // 數據不符合預期 if (data[0] !== PACKET_START) { // 跳過部分數據,找到開始標記 const position = data.indexOf(PACKET_START); // 沒有開始標記,說明這部分數據無效,丟棄 if (position === -1) { return [NEED_MORE_DATA, null]; } // 不然返回有效數據部分,繼續解析 return [PARSE_STATE.PACKET_START, data.slice(position)]; } // 保存當前正在解析的數據包 this.packet = new Packet(); // 跳過開始標記的字節數,進入解析協議頭階段 return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer.from([PACKET_START]).byteLength)]; }
[PARSE_STATE.PARSE_HEADER](data) { // 數據不夠頭部的大小則等待數據到來 if (data.length < TOTAL_LENGTH + SEQ_LEN) { return [NEED_MORE_DATA, data]; } // 有效數據包的長度 = 整個數據包長度 - 頭部長度 this.packet.set('length', data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN)); // 序列號 this.packet.set('seq', data.readUInt32BE(TOTAL_LENGTH)); // 解析完頭部了,跳過去 data = data.slice(TOTAL_LENGTH + SEQ_LEN); // 進入解析數據階段 return [PARSE_STATE.PARSE_DATA, data]; }
[PARSE_STATE.PARSE_DATA](data) { const len = this.packet.get('length'); // 數據部分的長度小於協議頭中定義的長度,則繼續等待 if (data.length < len) { return [NEED_MORE_DATA, data]; } // 截取數據部分 this.packet.set('data', data.slice(0, len)); // 解析完數據了,完成一個包的解析,跳過數據部分 data = data.slice(len); typeof this.options.cb === 'function' && this.options.cb(this.packet); this.packet = null; // 解析完一個數據包,進入結束標記階段 return [PARSE_STATE.PARSE_INIT, data]; }}
咱們再看一下狀態機的實現微信
class FSM { constructor(options) { this.options = options; // 狀態處理機,定義了狀態轉移集合 this.stateSwitcher = new StateSwitcher({cb: options.cb}); // 當前狀態 this.state = PARSE_STATE.PARSE_INIT; // 結束狀態 this.endState = PARSE_STATE.PARSE_END; // 當前待解析的數據 this.buffer = null; }
run(data) { // 沒有數據或者解析結束了直接返回 if (this.state === this.endState || !data || !data.length) { return; } // 保存待解析的數據 this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data; // 還沒結束,而且還有數據能夠處理則繼續執行 while(this.state !== this.endState && this.buffer && this.buffer.length) { // 執行狀態處理函數,返回[下一個狀態, 剩下的數據] const result = this.stateSwitcher[this.state](this.buffer); // 若是下一個狀態是NEED_MORE_DATA則說明須要更多的數據才能繼續解析,並保持當前狀態 if (result[0] === NEED_MORE_DATA) { return; } // 記錄下一個狀態和數據 [this.state, this.buffer] = result; }
}}
狀態機就是對開始狀態、結束狀態、狀態轉換集的封裝。實現了協議的封包和解析後咱們看一下如何使用。
2.2 RPC客戶端實現
const net = require('net');const { EventEmitter } = require('events');const { FSM } = require('tiny-application-layer-protocol');class Client extends EventEmitter { constructor(options) { super(); this.options = { ...options }; const socket = net.connect(this.options); socket.on('error', (error) => { console.error(error); }); const fsm = new FSM({ cb: (packet) => { socket.emit('message', packet); } }); socket.on('data', fsm.run.bind(fsm)); return socket; }}module.exports = { Client,};
咱們作的事情主要是負責數據的解析。
2.3 RPC服務器實現
const fs = require('fs');const net = require('net');const { EventEmitter } = require('events')const { FSM } = require('tiny-application-layer-protocol');
class Server extends EventEmitter { constructor(options, connectionListener) { super(); if (typeof options === 'function') { options = { connectionListener: options, }; } else { options = { ...options, connectionListener }; } this.options = { ...options }; return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => { const fsm = new FSM({ cb: function(packet) { client.emit('message', packet); } }) client.on('data', fsm.run.bind(fsm)); client.on('error', (error) => { console.error(error); }); typeof this.options.connectionListener === 'function' && this.options.connectionListener(client); }).listen(this.options); }}
module.exports = { Server,};
一樣,服務器也是負責數據的解析
3 使用
接下來咱們看一下如何使用。
3.1 ipc的使用
server.js
const { IPCServer } = require('../../src');const { packet } = require('tiny-application-layer-protocol');new IPCServer(function(client) { console.log(1) client.on('data', (data) => { console.log('receive', data); client.write(packet('world', data.seq)); });});
client.js
const { IPCClient } = require('../../src');const { packet, seq } = require('tiny-application-layer-protocol');const client = new IPCClient();client.write(packet('hello', seq()));client.on('data', function(res) { console.log('receive', res);})
服務器輸出
客戶端輸出
3.2 RPC的使用
server.js
const { RPCServer } = require('../../src');const { packet } = require('tiny-application-layer-protocol');new RPCServer({host: '127.0.0.1', port: 80}, function(client) { client.on('message', (data) => { console.log('receive', data); client.write(packet('world', data.seq)); });});
client.js
const { RPCClient } = require('../../src');const { packet, seq } = require('tiny-application-layer-protocol');const client = new RPCClient({host: '127.0.0.1', port: 80});client.write(packet('hello', seq()));client.on('message', function(res) { console.log('receive', res);})
服務器輸出客戶端輸出
4 RPC拓展
咱們實現了數據的傳輸和解析,可是如何咱們但願數據的請求和響應是一一對應的怎麼辦呢?好比像http在tcp上能夠併發發起多個請求同樣,響應是否能夠亂序返回,咱們又如何知道某個響應對應的是哪一個請求?接下來介紹如何解決這個問題。首先咱們實現一個請求管理的類。
class RequestManager { constructor(options) { this.options = { timeout: 10000, ...options }; this.map = {}; this.timerId = null; this.startPollTimeout(); } set(key, context) { if (typeof context.cb !== 'function') { throw new Error('cb is required'); } this.map[key] = { startTime: Date.now(), ...context, }; } get(key) { return this.map[key]; } del(key) { return delete this.map[key]; } // 執行上下文 exec(key, data) { const context = this.get(key); if (context) { this.del(key); context.cb(data); } } execAll(data) { for (const [key] of Object.entries(this.map)) { this.exec(key, data); } } // 定時輪詢是否超時 startPollTimeout() { this.timerId = setTimeout(() => { if (!this.timerId) { return; } const nextMap = {}; for (const [key, context] of Object.entries(this.map)) { if (Date.now() - context.startTime < (context.timeout || this.options.timeout)) { nextMap[key] = context; } else { context.cb(new Error('timeout')); } } this.map = nextMap; this.startPollTimeout(); }, 1000); }}
該類的邏輯主要是請求的seq保存對應的上下文,而後收到響應的時候,咱們根據響應的seq拿到對應的上下文,從而執行對應的回調。咱們看看如何使用該類。server.js
const { RPCServer } = require('../../src');const { packet } = require('tiny-application-layer-protocol');new RPCServer({host: '127.0.0.1', port: 80}, function(client) { client.on('message', (data) => { console.log('receive', data); client.end(packet('world', data.seq)); }); client.on('end', (data) => { client.end(); });});
client.js
const { RPCClient, RequestManager } = require('../../src');const { packet, seq } = require('tiny-application-layer-protocol');const requestManager = new RequestManager({timeout: 3000});const client = new RPCClient({host: '127.0.0.1', port: 80});const _seq = seq(); requestManager.set(_seq, { cb: function() { console.log(...arguments); }})client.write(packet('hello', _seq));client.on('message', function(packet) { requestManager.exec(packet.seq, packet);})
輸出 服務器輸出客戶端輸出
github倉庫:https://github.com/theanarkh/nodejs-ipc
github倉庫:https://github.com/theanarkh/tiny-application-layer-protocol
npm install nodejs-i-p-c(ipc和rpc庫,依賴tiny-application-layer-protocol)
npm install tiny-application-layer-protocol(基於tcp的小型應用層協議,包含協議的定義、封包、解包功能)
本文分享自微信公衆號 - 編程雜技(theanarkh)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。