一個瀏覽器和NodeJS通用的RPC框架

歡迎關注個人知乎專欄: https://zhuanlan.zhihu.com/starkwangnode


starkwang/Maus: A Simple JSON-RPC Framework running in NodeJS or Browser, based on websocket.webpack

這幾天寫了個小型的RPC框架,最初只是想用 TCP-JSON 寫個純 NodeJS 平臺的東西,後來無心中開了個腦洞,若是基於 Websocket 把瀏覽器當作 RPC Server ,那豈不是隻要是能運行瀏覽器(或者nodejs)的設備,均可以做爲分佈式計算中的一個 Worker 了嗎?git

打開一張網頁,就能成爲分佈式計算的一個節點,看起來仍是挺酷炫的。github

1、什麼是RPC

能夠參考:誰能用通俗的語言解釋一下什麼是RPC框架? - 知乎web

簡單地說就是你能夠這樣註冊一個任意數量的worker(姑且叫這個名字好了),它裏面聲明瞭具體的方法實現:算法

var rpcWorker = require('maus').worker;
rpcWorker.create({
    add: (x, y) => x + y
}, 'http://192.168.1.100:8124');

而後你能夠在另外一個node進程裏這樣調用:編程

var rpcManager = require('maus').manager;
rpcManager.create(workers => {
    workers.add(1, 2, result => console.log(result));
}, 8124)

這裏咱們封裝了底層的通訊細節(能夠是tcp、http、websocket等等)和任務分配,只須要用異步的方式去調用worker提供的方法便可,經過這個咱們能夠垂手可得地作到分佈式計算的mapreduce數組

rpcManager.create(workers => {
    //首先定義一個promise化的add
    var add = function(x, y){
        return new Promise((resolve, reject)=>{
            workers.add(x, y, result => resolve(result));
        })
    }
    //map&reduce
    Promise.all([add(1,2), add(3,4), add(4,5)])
        .then(result => result.reduce((x, y) => x + y))
        .then(sum => console.log(sum)) //19
}, 8124)

若是咱們有三個已經註冊的Worker(多是本地的另外一個nodejs進程、某個設備上的瀏覽器、另外一個機器上的nodejs),那麼咱們這裏會分別在這三個機器上分別計算三個add,而且將三個結果在本地相加,獲得最後的值,這就是分佈式計算的基礎。promise

2、Manager的實現

0、通訊標準

要實現雙向的通訊,咱們首先要定義這樣一個「遠程調用」的通訊標準,在個人實現中比較簡單:瀏覽器

{
    [id]: uuid          //在某些通訊中須要惟一標識碼
    message: '......'   //消息類別
    body: ......        //攜帶的數據
}

一、初始化

首先咱們要解決的問題是,如何讓Manager知道Worker提供了哪些方法可供調用?

這個問題其實很簡單,只要在 websocket 創建的時刻發送一個init消息就能夠了,init消息大概長這樣:

{
    message: 'init',
    body: ['add', 'multiply'] //body是方法名組成的數組
}

同時,咱們要將Manager傳入的回調函數,記錄到Manager.__workersStaticCallback中,以便延遲調用:

manager.create(callback, port) //記錄下這個callback

//一段時間後。。。。。。

manager.start() //任務開始

二、生成workers實例

如今咱們的Manager收到了一個遠程可調用的方法名組成的數組,咱們接下來須要在Manager中生成一個workers實例,它應該包含全部這些方法名,但底層依然是調用一個webpack通訊。這裏咱們能夠用相似元編程的奇技淫巧,下面的是部分代碼:

//收到worker發來的init消息以後
var workers = {
    __send: this.__send.bind(this), //這個this指向Manager,而不是本身
    __functionCall: this.__functionCall.bind(this) //同上
};
var funcNames = data.body; //好比['add', 'multiply']
funcNames.forEach(funcName => {
    //使用new Function的奇技淫巧
    rpc[funcName] = new Function(`
        //截取參數
        var params = Array.prototype.slice.call(arguments,0,arguments.length-1);
        var callback = arguments[arguments.length-1];
        
        //這個__functionCall調用了Manager底層的通訊,具體在後面解釋
        this.__functionCall('${funcName}',params,callback);
    `)
})
//將workers註冊到Manager內部
this.__workers = workers;
//若是此時Manager已經在等待開始了,那麼開始任務
if (this.__waitingForInit) {
    this.start();
}

還記得上面咱們有個start方法麼?它是這樣寫的:

start: function() {
    if (this.__workers != undefined) {
        //若是初始化完畢,workers實例存在
        this.__workersStaticCallback(this.__workers);
        this.__waitingForInit = false;
    } else {
        //不然將等待初始化完畢
        this.__waitingForInit = true;
    }
},

三、序列化

若是隻是單個Worker和單個Manager,而且遠程方法都是同步而非異步的,那麼咱們顯然不須要考慮返回值順序的問題:

好比咱們的Manager調用了下面一堆方法:

workers.add(1, 1, callback);
workers.add(2, 2, callback);
workers.add(3, 3, callback);

因爲Workeradd的是同步的方法,那麼顯然咱們收到返回值的順序是:

2
4
6

但若是Worker中存在一個異步調用,那麼這個順序就會被打亂:

workers.readFile('xxx', callback);
workers.add(1, 1, callback);
workers.add(2, 2, callback);

顯然咱們收到的返回值順序是:

2
4
content of xxx

因此這裏就須要對發出的函數調用作一個序列化,具體的方法就是對於每個調用都給一個uuid(惟一標識碼)。

好比咱們調用了:

workers.add(1, 1, stupid_callback);

那麼首先Manager會對這個調用生成一個 uuid :

9557881b-25d7-4c94-84c8-2463c53b67f4

而後在__callbackStore中將這個 uuid 和stupid_callback 綁定,而後向選中的某個Worker發送函數調用信息(具體怎麼選Worker咱們後面再說):

{
    id: '9557881b-25d7-4c94-84c8-2463c53b67f4',
    message: 'function call',
    body: { 
        funcName: 'add', 
        params: [1, 1] 
    }
}

Worker執行這個函數以後,發送回來一個函數返回值的信息體,大概是這樣:

{
    id: '9557881b-25d7-4c94-84c8-2463c53b67f4',
    message: 'function call',
    body: { 
        result: 2 
    }
}

而後咱們就能夠在__callbackStore中找到這個 uuid 對應的 callback ,而且執行它:

this.__callbackStore[id](result);

這就是workers.add(1, 1, stupid_callback)這行代碼背後的原理。

四、任務分配

若是存在多個Worker,顯然咱們不能把全部的調用都傻傻地發送到第一個Worker身上,因此這裏就須要有一個任務分配機制,個人機制比較簡單,大概說就是在一張表裏對每一個Worker記錄下它是否繁忙的狀態,每次當有調用需求的時候,先遍歷這張表,

  1. 若是找到有空閒的Worker,那麼就將對它發送調用;

  2. 若是全部Worker都繁忙,那麼先把這個調用暫存在一個隊列之中;

  3. 當收到某個Worker的返回值後,會檢查隊列中是否有任務,有的話,那麼就對這個Worker發送最前的函數調用,若沒有,就把這個Worker設爲空閒狀態。

具體任務分配的代碼比較冗餘,分散在各個方法內,因此只介紹方法,就不貼上來了/w\

所有的Manager代碼在這裏(抱歉還沒時間補註釋):

Maus/manager.js at master · starkwang/Maus

3、Worker的實現

這裏要再說一遍,咱們的RPC框架是基於websocket的,因此Worker能夠是一個PC瀏覽器!!!能夠是一個手機瀏覽器!!!能夠是一個平板瀏覽器!!!

Worker的實現遠比Manager簡單,由於它只須要對惟一一個Manager通訊,它的邏輯只有:

  1. 接收Manager發來的數據;

  2. 根據數據作出相應的反應(函數調用、初始化等等);

  3. 發送返回值

因此咱們也不放代碼了,有興趣的能夠看這裏:

Maus/worker.js at master · starkwang/Maus

4、寫一個分佈式算法

假設咱們的加法是經過這個框架異步調用的,那麼咱們該怎麼寫算法呢?

在單機狀況下,寫個斐波拉契數列簡直跟喝水同樣簡單(事實上這種暴力遞歸的寫法很是很是傻逼且性能低下,只是做爲範例演示用):

var fib = x => x>1 ? fib(x-1)+fib(x-2) : x

可是在分佈式環境下,咱們要將workers.add方法封裝成一個Promise化的add

//這裏的x, y多是數字,也多是個Promise,因此要先調用Promise.all
var add = function(x, y){
    return Promise.all([x, y])
        .then(arr => new Promise((resolve, reject) => {
            workers.add(arr[0], arr[1], result => resolve(result));
        }))
}

而後咱們就能夠用相似同步的遞歸方法這樣寫一個分佈式的fib算法:

var fib = x => x>1 ? add(fib(x-1), fib(x-2)) : x;

而後你能夠嘗試用你的電腦裏、樹莓派裏、服務器裏的nodejs、手機平板上的瀏覽器做爲一個Worker,總之集合全部的計算能力,一塊兒來計算這個傻傻的算法(事實上相比於單機算法會慢不少不少,由於通訊上的延遲遠大於單機的加法計算,但只是爲了演示啦):

//分佈式計算fib(40)
fib(40).then(result => console.log(result));
相關文章
相關標籤/搜索