在好久以前的單機時代,一臺電腦中跑着多個進程,進程之間沒有交流各幹各的,就這樣過了不少年。忽然有一天有了新需求,A進程須要實現一個畫圖的功能,剛好鄰居B進程已經有了這個功能,偷懶的程序員C想出了一個辦法:A進程調B進程的畫圖功能。因而出現了IPC
(Inter-process communication,進程間通訊)。就這樣程序員C愉快的去吃早餐去了!javascript
又過了幾年,到了互聯網時代,每一個電腦都實現了互聯互通。這時候僱主又有了新需求,當時還沒掛的A進程須要實現使用tensorflow
識別出笑臉 >_< 。說巧不巧,遠在幾千裏的一臺快速運行的電腦上已經實現了這個功能,睡眼惺忪的程序媛D接手了這個A進程後借鑑以前IPC
的實現,把IPC
擴展到了互聯網上,這就是RPC
(Remote Procedure Call,遠程過程調用)。RPC
其實就是一臺電腦上的進程調用另一臺電腦上的進程的工具。成熟的RPC
方案大多數會具有服務註冊、服務發現、熔斷降級和限流等機制。目前市面上的RPC已經有不少成熟的了,好比Facebook
家的Thrift
、Google
家的gRPC
、阿里家的Dubbo
和螞蟻家的SOFA
。java
接口定義語言,簡稱IDL,
是實現端對端之間可靠通信的一套編碼方案。這裏有涉及到傳輸數據的序列化和反序列化,咱們經常使用的http的請求通常用json當作序列化工具,定製rpc
協議的時候由於要求響應迅速等特色,因此大多數會定義一套序列化協議。好比:node
Protobuf
:git
// protobuf 版本
syntax = "proto3";
package testPackage;
service testService {
// 定義一個ping方法,請求參數集合pingRequest, 響應參數集合pingReply
rpc ping (pingRequest) returns (pingReply) {}
}
message pingRequest {
// string 是類型,param是參數名,1是指參數在方法的第1個位置
string param = 1;
}
message pingReply {
string message = 1;
string content = 2;
}
複製代碼
講到Protobuf
就得講到該庫做者的另外一個做品Cap'n proto
了,號稱性能是直接秒殺Google Protobuf
,直接上官方對比:程序員
雖然知道不少比Protobuf
更快的編碼方案,可是快到這種地步也是厲害了,爲啥這麼快,Cap'n Proto的文檔裏面就馬上說明了,由於Cap'n Proto
沒有任何序列號和反序列化步驟,Cap'n Proto
編碼的數據格式跟在內存裏面的佈局是一致的,因此能夠直接將編碼好的structure直接字節存放到硬盤上面。貼個栗子:github
@0xdbb9ad1f14bf0b36; # unique file ID, generated by `capnp id`
struct Person {
name @0 :Text;
birthdate @3 :Date;
email @1 :Text;
phones @2 :List(PhoneNumber);
struct PhoneNumber {
number @0 :Text;
type @1 :Type;
enum Type {
mobile @0;
home @1;
work @2;
}
}
}
struct Date {
year @0 :Int16;
month @1 :UInt8;
day @2 :UInt8;
}
複製代碼
咱們這裏要定製的編碼方案就是基於protobuf
和Cap'n Proto
結合的相似的語法。由於本人比較喜歡刀劍神域裏的男主角,因此就給這個庫起了個名字 —— Kiritobuf
。shell
首先咱們定義kirito
的語法:json
# test
service testService {
method ping (reqMsg, resMsg)
}
struct reqMsg {
@0 age = Int16;
@1 name = Text;
}
struct resMsg {
@0 age = Int16;
@1 name = Text;
}
複製代碼
#
開頭的是註釋service
、method
、struct
,{}
裏是一個塊結構()
裏有兩個參數,第一個是請求的參數結構,第二個是返回值的結構@
是定義參數位置的描述符,0
表示在首位=
號左邊是參數名,右邊是參數類型參數類型:數組
Bool
Int8
, Int16
, Int32
, Int64
UInt8
, UInt16
, UInt32
, UInt64
Float32
, Float64
Text
, Data
List(T)
定義好了語法和參數類型,咱們先過一下生成有抽象關係代碼的流程:緩存
取到.kirito
後綴的文件,讀取所有字符,經過詞法分析器生成token
,獲得的token
傳入語法分析器生成AST (抽象語法樹)
。
首先咱們新建一個kirito.js
文件:
'use strict';
const fs = require('fs');
const tokenizer = Symbol.for('kirito#tokenizer');
const parser = Symbol.for('kirito#parser');
const transformer = Symbol.for('kirito#transformer');
// 定義詞法分析Token類型
const TYPE = {
// 保留字,service、struct、method...
KEYWORD: 'keyword',
// 變量
VARIABLE: 'variable',
// 符號,{ } ( ) ; # @ ,
SYMBOL: 'symbol',
// 參數位置,數值表示0、一、二、3...
INDEX: 'index'
};
// 定義語法分析字段類型
const EXP = {
// 變量
VARIABLE: 'Identifier',
// 結構申明,service、struct、method
STRUCT_DECLARATIONL: 'StructDeclaration',
// 變量申明,@
VAR_DECLARATION: 'VariableDeclaration',
// 數據類型, Int1六、UInt1六、Bool、Text...
TYPE: 'DataType',
};
複製代碼
定義好了一些必要的字面量,接下來首先是詞法分析階段。
咱們設計詞法分析獲得的Token
是這樣子的:
[ { type: 'keyword', value: 'service' },
{ type: 'variable', value: 'testService' },
{ type: 'symbol', value: '{' },
{ type: 'keyword', value: 'method' },
{ type: 'variable', value: 'ping' },
{ type: 'symbol', value: '(' },
{ type: 'variable', value: 'reqMsg' },
{ type: 'variable', value: 'resMsg' },
{ type: 'symbol', value: ')' },
{ type: 'symbol', value: '}' },
{ type: 'keyword', value: 'struct' },
{ type: 'variable', value: 'reqMsg' },
{ type: 'symbol', value: '{' },
{ type: 'symbol', value: '@' },
{ type: 'index', value: '1' },
{ type: 'variable', value: 'age' },
{ type: 'symbol', value: '=' },
{ type: 'variable', value: 'Int16' },
{ type: 'symbol', value: ';' },
{ type: 'symbol', value: '@' },
{ type: 'index', value: '2' },
{ type: 'variable', value: 'name' },
{ type: 'symbol', value: '=' },
{ type: 'variable', value: 'Text' },
{ type: 'symbol', value: ';' },
{ type: 'symbol', value: '}' },
{ type: 'keyword', value: 'struct' },
{ type: 'variable', value: 'resMsg' },
{ type: 'symbol', value: '{' },
{ type: 'symbol', value: '@' },
{ type: 'index', value: '1' },
{ type: 'variable', value: 'age' },
{ type: 'symbol', value: '=' },
{ type: 'variable', value: 'Int16' },
{ type: 'symbol', value: ';' },
{ type: 'symbol', value: '@' },
{ type: 'index', value: '2' },
{ type: 'variable', value: 'name' },
{ type: 'symbol', value: '=' },
{ type: 'variable', value: 'Text' },
{ type: 'symbol', value: ';' },
{ type: 'symbol', value: '}' } ]
複製代碼
詞法分析步驟:
kirito
代碼串按照\n
分割組合成數組A,數組的每一個元素就是一行代碼代碼以下:
[tokenizer] (input) {
// 保留關鍵字
const KEYWORD = ['service', 'struct', 'method'];
// 符號
const SYMBOL = ['{', '}', '(', ')', '=', '@', ';'];
// 匹配全部空字符
const WHITESPACE = /\s/;
// 匹配全部a-z的字符、不限大小寫
const LETTERS = /^[a-z]$/i;
// 匹配數值
const NUMBER = /\d/;
// 以換行符分割成數組
const source = input.split('\n');
// 最終生成的token數組
const tokens = [];
source.some(line => {
// 聲明一個 `current` 變量做爲指針
let current = 0;
// 是否繼續當前循環、移動到下一行,用於忽略註釋
let isContinue = false;
while (current < line.length) {
let char = line[current];
// 匹配任何空字符
if (WHITESPACE.test(char)) {
current++;
continue;
}
// 忽略註釋
if (char === '#') {
isContinue = true;
break;
}
// 匹配a-z|A-Z的字符
if (LETTERS.test(char)) {
// 定義一個字符串變量,用來存儲連續匹配成功的字符
let value = '';
// 匹配字符(變量/保留字)、字符加數字(參數類型)
while (LETTERS.test(char) || NUMBER.test(char)) {
// 追加字符
value += char;
// 移動指針
char = line[++current];
}
if (KEYWORD.indexOf(value) !== -1) {
// 匹配保留關鍵字
tokens.push({
type: TYPE.KEYWORD,
value: value
});
} else {
// 匹配變量名、類型
tokens.push({
type: TYPE.VARIABLE,
value: value
});
}
continue;
}
// 匹配符號 { } ( ) = @
if (SYMBOL.indexOf(char) !== -1) {
tokens.push({
type: TYPE.SYMBOL,
value: char
});
// 匹配@ 參數位置符號
if (char === '@') {
char = line[++current];
// 匹配參數位置0-9
if (NUMBER.test(char)) {
// 定義參數位置字符串,用來存儲連續匹配成功的參數位置
let index = '';
// 匹配參數位置0-9
while (NUMBER.test(char)) {
// 追加參數位置 `1`+`2`=`12`
index += char;
char = line[++current];
}
tokens.push({
type: TYPE.INDEX,
value: index
});
}
continue;
}
current++;
continue;
}
current++;
}
// 跳過註釋
if (isContinue) return false;
});
return tokens;
}
複製代碼
獲得上面的詞法分析的token後,咱們就能夠對該token作語法分析,咱們須要最終生成的AST的格式以下:
{
"type": "Program",
"body": [
{
"type": "StructDeclaration",
"name": "service",
"value": "testService",
"params": [
{
"type": "StructDeclaration",
"name": "method",
"value": "ping",
"params": [
{
"type": "Identifier",
"value": "reqMsg"
},
{
"type": "Identifier",
"value": "resMsg"
}
]
}
]
},
{
"type": "StructDeclaration",
"name": "struct",
"value": "reqMsg",
"params": [
{
"type": "VariableDeclaration",
"name": "@",
"value": "1",
"params": [
{
"type": "Identifier",
"value": "age"
},
{
"type": "DataType",
"value": "Int16"
}
]
},
{
"type": "VariableDeclaration",
"name": "@",
"value": "2",
"params": [
{
"type": "Identifier",
"value": "name"
},
{
"type": "DataType",
"value": "Text"
}
]
}
]
},
{
"type": "StructDeclaration",
"name": "struct",
"value": "resMsg",
"params": [
{
"type": "VariableDeclaration",
"name": "@",
"value": "1",
"params": [
{
"type": "Identifier",
"value": "age"
},
{
"type": "DataType",
"value": "Int16"
}
]
},
{
"type": "VariableDeclaration",
"name": "@",
"value": "2",
"params": [
{
"type": "Identifier",
"value": "name"
},
{
"type": "DataType",
"value": "Text"
}
]
}
]
}
]
}
複製代碼
看上圖咱們能友好的獲得結構、參數、數據類型、函數之間的依賴和關係,步驟:
代碼以下:
[parser] (tokens) {
// 聲明ast對象,做爲分析過程當中的節點存儲器
const ast = {
type: 'Program',
body: []
};
// 定義token數組指針變量
let current = 0;
// 定義函數、用例遞歸分析節點之間的依賴和存儲
function walk() {
// 當前指針位置的token節點
let token = tokens[current];
// 檢查變量、數據類型
if (token.type === TYPE.VARIABLE) {
current++;
return {
type: EXP.VARIABLE,
struct: tokens[current].value === '=' ? false : true,
value: token.value
};
}
// 檢查符號
if (token.type === TYPE.SYMBOL) {
// 檢查@,添加參數位置綁定
if (token.value === '@') {
// 移動到下一個token, 一般是個數值,也就是參數位置
token = tokens[++current];
// 定義參數節點,用來存儲位置、變量名、數據類型
let node = {
type: EXP.VAR_DECLARATION,
name: '@',
value: token.value,
params: []
};
// 移動到下一個token, 準備開始讀取參數變量名和數據類型
token = tokens[++current];
// 每一個參數節點以;符號結束
// 這個循環中會匹配參數變量名和參數數據類型並把他們添加到當前的參數節點上
while (token.value !== ';') {
// 遞歸匹配參數變量名、數據類型
node.params.push(walk());
// 指定當前指針的token
token = tokens[current];
}
// 移動token數組指針
current++;
// 返回參數節點
return node;
}
// 檢查=,匹配該符號右邊的參數數據類型
if (token.value === '=') {
// 移動到下一個token
token = tokens[++current];
current++;
return {
type: EXP.TYPE,
value: token.value
};
}
current++;
}
// 檢查保留字
if (token.type === TYPE.KEYWORD) {
// 檢查service、struct
if (['struct', 'service'].indexOf(token.value) !== -1) {
// 緩存保留字
let keywordName = token.value;
// 移動到下一個token,一般是結構名
token = tokens[++current];
// 定義結構節點,用來儲存結構保留字、結構名、結構參數數組
let node = {
type: EXP.STRUCT_DECLARATIONL,
// 保留字
name: keywordName,
// 結構名
value: token.value,
// 參數數組
params: []
};
// 移動到下一個token
token = tokens[++current];
// 匹配符號且是{,準備解析{裏的參數
if (token.type === TYPE.SYMBOL && token.value === '{') {
// 移動到下一個token
token = tokens[++current];
// 等於}是退出參數匹配,完成參數儲存
while (token.value !== '}') {
// 遞歸調用分析函數,獲取參數數組
node.params.push(walk());
// 移動token到當前指針
token = tokens[current];
}
current++;
}
// 返回結構節點
return node;
}
if (token.value === 'method') {
// 檢查method,匹配請求函數名
token = tokens[++current];
// 定義請求函數節點,用來儲存函數入參和返回參數
let node = {
type: EXP.STRUCT_DECLARATIONL,
name: 'method',
value: token.value,
params: []
};
// 移動到下一個token
token = tokens[++current];
// 匹配(符號,準備儲存入參和返回參數
if (token.type === TYPE.SYMBOL && token.value === '(') {
// 移動到入參token
token = tokens[++current];
// 等於)時退出匹配,完成函數匹配
while (token.value !== ')') {
// 遞歸調用分析函數
node.params.push(walk());
token = tokens[current];
}
current++;
}
// 返回函數節點
return node;
}
}
// 拋出未匹配到的錯誤
throw new TypeError(token.type);
}
// 遍歷token數組
while (current < tokens.length) {
ast.body.push(walk());
}
// 返回ast
return ast;
}
複製代碼
獲得了語法分析的AST
後咱們須要進一步對AST
轉換爲更易操做的js對象
。格式以下:
{
testService: {
ping: {
[Function]
param: {
reqMsg: {
age: 'Int16',
name: 'Text'
},
resMsg: {
age: 'Int16',
name: 'Text'
}
}
}
}
}
複製代碼
經過上面這個格式,咱們能夠更容易的知道有幾個service
、service
裏有多少個函數以及函數的參數。
代碼以下:
// 轉換器
[transformer] (ast) {
// 定義彙總的service
const services = {};
// 定義彙總的struct,用來儲存參數結構,以便最後和service合併
const structs = {};
// 轉換數組
function traverseArray(array, parent) {
// 遍歷數組
array.some((child) => {
// 分治轉換單個節點
traverseNode(child, parent);
});
}
function traverseNode (node, parent) {
switch (node.type) {
case 'Program':
// 根節點
traverseArray(node.body, parent);
break;
case 'StructDeclaration':
// 匹配service、struct、method類型節點
if (node.name === 'service') {
// 定義service的父節點爲對象,爲了更好的添加屬性
parent[node.value] = {};
// 調用數組轉換函數解析,並把父節點傳入以便添加子節點
traverseArray(node.params, parent[node.value]);
} else if (node.name === 'method') {
// 定義一個空函數給method節點
parent[node.value] = function () {};
// 在該函數下掛載一個param屬性做爲函數的參數列表
parent[node.value].param = {};
traverseArray(node.params, parent[node.value].param);
} else if (node.name === 'struct') {
// 定義struct的父節點爲一個對象
structs[node.value] = {};
// 解析struct
traverseArray(node.params, structs[node.value]);
}
break;
case 'Identifier':
// 定義參數變量
parent[node.value] = {};
break;
case 'VariableDeclaration':
// 解析參數數組
traverseArray(node.params, parent);
break;
case 'DataType':
// 參數數據類型
parent[Object.keys(parent).pop()] = node.value;
break;
default:
// 拋出未匹配到的錯誤
throw new TypeError(node.type);
}
}
traverseNode(ast, services);
// 合併service和struct
const serviceKeys = Object.getOwnPropertyNames(services);
serviceKeys.some(service => {
const methodKeys = Object.getOwnPropertyNames(services[service]);
methodKeys.some(method => {
Object.keys(services[service][method].param).some(p => {
if (structs[p] !== null) {
services[service][method].param[p] = structs[p];
delete structs[p];
}
});
});
});
return services;
}
複製代碼
RPC
協議有多種,能夠是json、xml、http2
,相對於http1.x這種文本協議,http2.0這種二進制協議更適合做爲RPC
的應用層通訊協議。不少成熟的RPC
框架通常都會定製本身的協議已知足各類變化莫測的需求。
好比Thrift
的TBinaryProtocol
、TCompactProtocol
等,用戶能夠自主選擇適合本身的傳輸協議。
大多數計算機都是以字節編址的(除了按字節編址還有按字編址和按位編址),咱們這裏只討論字節編址。每一個機器由於不一樣的系統或者不一樣的CPU對內存地址的編碼有不同的規則,通常分爲兩種字節序:大端序和小端序。
大端序: 數據的高字節保存在低地址
小端序: 數據的低字節保存在高地址
舉個栗子:
好比一個整數:258
,用16進製表示爲0x0102
,咱們把它分爲兩個字節0x01
和ox02
,對應的二進制爲0000 0001
和0000 0010
。在大端序的電腦上存放形式以下:
小端序則相反。爲了保證在不一樣機器之間傳輸的數據是同樣的,開發一個通信協議時會首先約定好使用一種做爲通信方案。java虛擬機
採用的是大端序。在機器上咱們稱爲主機字節序
,網絡傳輸時咱們稱爲網絡字節序
。網絡字節序是TCP/IP
中規定好的一種數據表示格式,它與具體的CPU
類型、操做系統等無關,從而能夠保證數據在不一樣主機之間傳輸時可以被正確解釋。網絡字節序採用大端排序方式。
咱們這裏就不造新應用層協議的輪子了,咱們直接使用MQTT
協議做爲咱們的默認應用層協議。MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議)
,是一種基於發佈/訂閱
(publish/subscribe
)模式的「輕量級」通信協議,採用大端序的網絡字節序傳輸,該協議構建於TCP/IP
協議上。
先貼下實現完的代碼調用流程,首先是server端:
'use strict';
const pRPC = require('..');
const path = require('path');
const kiritoProto = './protocol/test.kirito';
const server = new pRPC.Server();
// 解析kirito文件生成js對象
const proto = pRPC.load(path.join(__dirname, kiritoProto));
// 定義client端能夠調用的函數
function test(call, cb) {
cb(null, {age: call.age, name: call.name});
}
// 加載kirito解析出來的對象和函數綁定,這裏聲明瞭ping的執行函數test
server.addKiritoService(proto.testService, {ping: test});
server.listen(10003);
複製代碼
client端:
'use strict';
const pRPC = require('..');
const path = require('path');
const kiritoProto = './protocol/test.kirito';
// 解析kirito文件生成js對象
const proto = pRPC.load(path.join(__dirname, kiritoProto));
// 分配一個client實例綁定kirito解析的對象並鏈接server
const client = new pRPC.Client({host: 'localhost', port: 10003}, proto.testService);
// 調用server端的函數
client.ping({age: 23, name: 'ricky 澤陽'}, function (err, result) {
if (err) {
throw new Error(err.message);
}
console.log(result);
});
複製代碼
不管是server端定義函數或者client端調用函數都是比較簡潔的步驟。接下來咱們慢慢剖析具體的邏輯實現。
貼下具體的調用流程架構圖:
調用流程總結:
說完了調用流程,如今開始講解具體的實現。
server:
// protocol/mqtt.js
'use strict';
const net = require('net');
const debug = require('debug')('polix-rpc:mqtt');
const EventEmitter = require('events').EventEmitter;
const mqttCon = require('mqtt-connection');
// 定義server類,繼承EventEmitter是爲了更好的將模塊解耦
class MQTT extends EventEmitter {
constructor () {
super();
// 是否已經開啓服務
this.inited = false;
// 函數集合
this.events = {};
}
// 監聽端口並開啓服務
listen (port, cb) {
// 已經初始化了就不用再次init
if (this.inited) {
cb && cb(new Error('already inited.', null));
return;
}
// 賦值當前做用域上下文的指針給self對象,用來在非當前做用的函數執行當前做用域的代碼
const self = this;
// 設置初始化
this.inited = true;
// 實例化一個net服務
this.server = new net.Server();
this.port = port || 10003;
// 監聽端口
this.server.listen(this.port);
debug('MQTT Server is started for port: %d', this.port);
// 監聽error事件
this.server.on('error', (err) => {
debug('rpc server is error: %j', err.stack);
self.emit('error', err);
});
// 監聽鏈接事件
this.server.on('connection', (stream) => {
// 實例化mqtt對象
const socket = mqttCon(stream);
debug('=========== new connection ===========');
// 監聽mqtt服務connect事件
socket.on('connect', () => {
debug('connected');
socket.connack({ returnCode: 0 });
});
socket.on('error', (err) => {
debug('error : %j', err);
socket.destroy();
});
socket.on('close', () => {
debug('=========== close ============');
socket.destroy();
});
socket.on('disconnect', () => {
debug('=========== disconnect ============');
socket.destroy();
});
// 監聽mqtt服務publish事件,接收client端請求
socket.on('publish', (pkg) => {
// 消費client端的請求
self.consumers(pkg, socket);
});
});
}
// 消費client端的請求
consumers (pkg, socket) {
// 賦值當前做用的指針給self對象
const self = this;
// 將client的數據包轉成json字符,字節序不一樣的處理已經在mqtt的底層轉換好了
let content = pkg.payload.toString();
debug(content);
content = JSON.parse(content);
// 定義響應數據包
const respMsg = {
msgId: content.msgId
};
// 若是請求調用的函數不存在則加上錯誤消息響應回去client端
if (this.events[content.method] === null) {
// 定義調用錯誤消息
respMsg.error = {
message: `not found ${content.method} method`
};
// 推送到client端
self.response(socket, {messageId: pkg.messageId, body: respMsg});
} else {
// 若是存在有效的函數則準備調用
const fn = this.events[content.method].method;
// 設置調用函數的回調事件,用來處理調用函數完成後的參數返回
const callback = function (err, result) {
// 獲取調用完後的參數結果
respMsg.body = result;
// 推送到client端
self.response(socket, {messageId: pkg.messageId, body: respMsg});
};
// 執行調用參數
fn.call(fn, content.body, callback);
}
}
// 推送調用結果數據包給client端
response (socket, result) {
socket.publish({
topic: 'rpc',
qos: 1,
messageId: result.messageId,
payload: JSON.stringify(result.body)
});
}
// 綁定kirito定義的函數集合
addEvent (events) {
const eventKeys = Object.getOwnPropertyNames(events);
eventKeys.some(event => {
this.events[event] = {
method: events[event].method,
param: events[event].param
};
});
}
}
module.exports.create = function () {
return new MQTT();
};
複製代碼
定義protocol接口,加上這一層是爲了之後的多協議,mqtt只是默認使用的協議:
// protocol.js
'use strict';
const mqtt = require('./protocol/mqtt');
module.exports.create = function (opts = {}) {
return mqtt.create(opts);
};
複製代碼
接下來是server端的暴露出去的接口:
// index.js
'use strict';
const protocol = require('./protocol.js');
class Server {
constructor () {
// 實例化協議對象
this.server = protocol.create();
}
// 將kirito定義的接口和函數集合綁定
addKiritoService (service, methods) {
const serviceKeys = Object.getOwnPropertyNames(service);
const methodKeys = Object.getOwnPropertyNames(methods);
const events = {};
serviceKeys.some(method => {
let idx = -1;
if ((idx = methodKeys.indexOf(method)) !== -1) {
events[method] = {
method: methods[method],
param: service[method].param
};
methodKeys.splice(idx, 1);
}
});
if (Object.keys(events).length > 0) {
this.server.addEvent(events);
}
}
listen (port) {
this.server.listen(port);
}
}
module.exports = Server;
複製代碼
client:
// protocol/mqtt.js
'use strict';
const net = require('net');
const debug = require('debug')('polix-rpc:mqtt');
const EventEmitter = require('events').EventEmitter;
const mqttCon = require('mqtt-connection');
class MQTT extends EventEmitter {
constructor (server) {
super();
// 獲取server端鏈接信息
this.host = server.host || 'localhost';
this.port = server.port || 10003;
// 是否服務已鏈接
this.connected = false;
// 是否服務已關閉
this.closed = false;
}
// 鏈接server服務
connect (cb) {
// 鏈接了就不用再次執行鏈接
if (this.connected) {
cb && cb (new Error('mqtt rpc has already connected'), null);
return;
}
// 複製當前做用域上下文的指針給self變量
const self = this;
// 獲取net服務鏈接流
const stream = net.createConnection(this.port, this.host);
// 初始化mqtt服務
this.socket = mqttCon(stream);
// 監聽conack事件
this.socket.on('connack', (pkg) => {
debug('conack: %j', pkg);
});
// 監聽error事件
this.socket.on('error', function (err) {
debug('error: %j', err);
});
// 監聽publish事件,接收server端調用函數結果的返回數據
this.socket.on('publish', (pkg) => {
// 將數據包轉成json字符
const content = pkg.payload.toString();
debug(content);
// 將數據轉發到MQTT的對象事件上
this.emit('data', JSON.parse(content));
});
// 監聽puback事件
this.socket.on('puback', (pkg) => {
debug('puback: %j', pkg);
});
// 發起鏈接
this.socket.connect({
clientId: 'MQTT_RPC_' + Math.round(new Date().getTime() / 1000)
}, () => {
if (self.connected) {
return;
}
// 設置已鏈接
self.connected = true;
cb && cb(null, {connected: self.connected});
});
}
// 發起調用函數請求
send (param) {
this.socket.publish({
topic: 'rpc',
qos: 1,
messageId: 1,
payload: JSON.stringify(param || {})
});
}
// 關閉鏈接
close () {
if (this.closed) {
return;
}
this.closed = true;
this.connected = false;
this.socket.destroy();
}
}
module.exports.create = function (server) {
return new MQTT(server || {});
};
複製代碼
定義protocol接口:
// protocol.js
'use strict';
const mqtt = require('./protocol/mqtt');
module.exports.create = function (opts = {}) {
return mqtt.create(opts);
};
複製代碼
最後是client端暴露的接口:
'use strict';
const protocol = require('./protocol.js');
const connect = Symbol.for('connect');
const uuid = require('uuid/v1');
class Client {
constructor(opts, service) {
// 聲明client實例
this.client = void(0);
// 調用協議鏈接接口
this[connect](opts, service);
// 定義回調參數集合
this.callQueues = {};
}
// 鏈接server
[connect] (opts, service) {
// 初始化協議服務
this.client = protocol.create(opts);
// 發起鏈接
this.client.connect((err) => {
if (err) {
throw new Error(err);
}
});
// 複製當前做用域的上下文指針給self對象
const self = this;
// 監聽協議data時間,接收協議轉發server端響應的數據
this.client.on('data', function (result) {
// 聽過msgId取出回調函數
const fn = self.callQueues[result.msgId];
// 若是有調用錯誤信息,則直接回調錯誤
if (result.error) {
return fn.call(fn, result.error, null);
}
// 執行回調
fn.call(fn, null, result.body);
});
// 綁定kirito定義的接口參數到協議對象中
const serviceKeys = Object.getOwnPropertyNames(service);
serviceKeys.some(method => {
// 增長client端的函數,對應server端的調用函數
self[method] = function () {
// 取出發送的數據
const reqMsg = arguments[0];
// 取出回調函數
const fn = arguments[1];
const paramKey = Object.getOwnPropertyNames(service[method].param);
paramKey.some((param) => {
if (reqMsg[param] === null) {
throw new Error(`Parameters '${param}' are missing`);
}
// todo 類型判斷及轉換
});
// 爲每一個請求標記
const msgId = uuid();
// 註冊該請求的回調函數到回調隊列中
self.callQueues[msgId] = fn;
// 發起調用函數請求
self.client.send({method, msgId, body: reqMsg});
};
});
}
}
module.exports = Client;
複製代碼
就這樣,一個簡單的IDL+RPC框架就這樣搭建完成了。這裏只是描述RPC的原理和經常使用的調用方式,要想用在企業級的開發上,還得加上服務發現、註冊,服務熔斷,服務降級等,讀者若是有興趣能夠在Github上fork下來或者提PR來改進這個框架,有什麼問題也能夠提Issue, 固然PR是最好的 : ) 。
倉庫地址:
有什麼問題能夠在CNode上問:cnodejs.org/topic/5b63b…