關於IPC-Message通訊

阿里eggjs中有個核心就是egg-cluster來作基本的啓動流程,裏面的通訊仍是比較有意思的。仔細研究了下nodejs官方的cluster再加上eggjs的Agent理念,若是要保持通訊,仍是踩了很多的坑。這個坑其實來自cluster自身。若是是剛研究cluster,那麼不少人都是被其迷惑,究竟是如何監聽,如何發送呢?javascript

概念

先理解一些概念:html

  • master 主進程 cluster.isMaster 肯定的進程
  • worker 子進程 cluster.isWorker 肯定的進程
  • agent child_process 建立的進程

想要在這些進程上互相通訊,咱們須要理清楚發送的方式。java

  • worker與agent通訊,須要master做爲橋樑中轉
  • worker與master通訊
  • master與worker通訊
  • master與agent通訊
  • agent與master通訊
  • agent與worker通訊,須要master做爲橋樑中轉

如何讓其完美通訊,我這裏推薦一個庫 github.com/cevio/ipc-m…。它能讓你在無感知的狀況下幫你綁定完畢全部的事件,同時打通消息通道。node

建立

它是一個類,須要被繼承後使用。git

const IPCMessage = require('ipc-message');
module.exports = class NodeBase extends IPCMessage {
  constructor() {
    // If it is a `agent` type process, you need to set the parameter to `true`. 
    // super(true);
    super();

    // receive message from other processes.
    this.on('message', msg => {
      console.log(`[${this.type}] Receive Message:`, msg);
    });

    if (this.type === 'master') {
      // do master ...
    } else {
      // do worker
    }
  }
}
複製代碼

咱們經過綁定message事件來得到消息,經過registAgent來註冊agent,經過cluster.fork()來建立子進程,固然這個建立是被自動監聽的,你無需關心。github

消息

很簡單,他們在任意的代碼中經過send方法來發送消息。好比如上的例子(假設已經設置來一個名字爲staticAgent的agent和創建了4個worker,如今是在worker運行的代碼上):json

const base = new NodeBase();
base.send('staticAgent', 'worker-ready', {
    a: 1,
    b: 2
});
複製代碼

agent經過master的轉發就收到了該信息api

[staticAgent] agent receive message:app

{
    to: [19678],
    from: 19679,
    transfer: true,
    action: 'worker-ready',
    body: {
        a: 1,
        b: 2
    }
}
複製代碼

你能夠經過這個數據來解析,具體如何解決全靠我的想法了。koa

使用

咱們來看2段實際代碼

test/index.js

const IPCMessage = require('ipc-message');
const ChildProcess = require('child_process');
const path = require('path');
const cluster = require('cluster');
const Koa = require('koa');
const os = require('os');

class Nodebase extends IPCMessage {
  constructor() {
    super();
    if (this.type === 'master') {
      const agentWorkerRuntimeFile = path.resolve(__dirname, 'agent.js');
      // 建立一個agent
      const agent = ChildProcess.fork(agentWorkerRuntimeFile, null, {
        cwd: process.cwd(),
        stdout: process.stdout,
        stderr: process.stderr,
        stdin: process.stdin,
        stdio: process.stdio
      });
      // 註冊該agent
      this.registAgent('agent', agent);

      let cpus = os.cpus().length;
      while (cpus--) {
        // 根據內核數來建立子進程
        cluster.fork();
      }
    } else {
      // 如下爲實際開發的代碼
      const app = new Koa();
      app.use(async (ctx, next) => {
        if (ctx.req.url === '/favicon.ico') return;
        this.send('agent', '/test/agent', { a:1, c:3 });
        ctx.body = 'hello world';
      });
      app.listen(3000, () => {
        console.log('server start at 3000');
      });
    }

    this.on('message', msg => {
      console.log(`[${this.type}] onMessageReceive:`, msg);
    });
  }
}

const nodebase = new Nodebase();

if (nodebase.type === 'master') {
  setTimeout(() => {
    // 發送消息給agent
    nodebase.send('agent', '/a/b', {
      a:1
    });
  }, 5000)
}
複製代碼

test/agent.js

const IPCMessage = require('ipc-message');

class Agent extends IPCMessage {
  constructor() {
    // 若是是個agent啓動的文件,這裏必須爲true
    super(true);
    this.timer = setInterval(() => {
      console.log('agent alive');
    }, 1000);

    process.on('SIGINT', () => {
      clearInterval(this.timer);
      process.exit(0);
    });

    this.on('message', msg => {
      console.log('[Agent] onMessageReceive:', msg);
      this.send([msg.from, 'master'], '/reply/agent', 'done');
    })
  }
}

const agent = new Agent();
// 發送消息
agent.send('master', '/agent/ready', { a: 1, b: 2 });
複製代碼

最後

有了通訊處理的簡化模塊,你也能夠嘗試使用其來建立相似egg的啓動流程,egg的核心啓動也就再也不神祕了。

相關文章
相關標籤/搜索