理解Egg.js中的多進程模型(egg-cluster)

背景

咱們知道,js是單線程的,意味着一個nodejs進程只能運行在單個cpu上面。nodejs在io處理方面是很是優秀的,可是在密集運算型應用中仍然有不足之處,而解決的辦法,就是利用多核cpu的併發優點,將nodejs進程運行在各個cpu上面。而egg爲咱們提供了egg-cluster模塊,用於多進程管理以及進程間通訊。html

介紹(egg的多進程模型)

egg-cluster介紹

借用官網的文檔,cluster是什麼呢? 簡單的說:node

  • 在服務器上同時啓動多個進程。
  • 每一個進程裏都跑的是同一份源代碼(比如把之前一個進程的工做分給多個進程去作)。
  • 更神奇的是,這些進程能夠同時監聽一個端口。

其中:git

  • 負責啓動其餘進程的叫作 Master 進程,他比如是個『包工頭』,不作具體的工做,只負責啓動其餘進程。
  • 其餘被啓動的叫 Worker 進程,顧名思義就是幹活的『工人』。它們接收請求,對外提供服務。
  • Worker 進程的數量通常根據服務器的 CPU 核數來定,這樣就能夠完美利用多核資源。

ps: 由於官網講得很詳細,因此這一部分都是借鑑官網的。github

多進程模型

下面,咱們經過文檔中的圖,來看看多進程模型windows

與咱們使用pm2開啓進程守護相似。master進程充當了進程管理的工做,不會運行任何業務代碼,它負責agent,worker進程的start,reload工做以及進程間消息轉發等工做。
而看到這裏,相信有部分讀者會有疑問,爲何須要agent進程呢?
那麼文檔上其實也作出了詳細的說明。答案就是

有些工做其實不須要每一個 Worker 都去作,若是都作,一來是浪費資源,更重要的是可能會致使多進程間資源訪問衝突api

Agent
在大部分狀況下,咱們在寫業務代碼的時候徹底不用考慮 Agent 進程的存在,可是當咱們遇到一些場景,只想讓代碼運行在一個進程上的時候,Agent 進程就到了發揮做用的時候了。 因爲 Agent 只有一個,並且會負責許多維持鏈接的髒活累活,所以它不能輕易掛掉和重啓,因此 Agent 進程在監聽到未捕獲異常時不會退出,可是會打印出錯誤日誌,咱們須要對日誌中的未捕獲異常提升警戒。bash

爲何不會端口衝突?

Q: 當fork進程時,明明代碼中已經監聽了一個端口,爲何fork時沒有報端口占用?
A: cluster的工做原理推薦這一篇文章《經過源碼解析 Node.js 中 cluster 模塊的主要功能實現》, 結合樸靈老師的《深刻淺出nodejs》中的多進程架構,這裏作一下總結:服務器

  1. 在master-worker模式中,建立子進程後,父子進程將會建立ipc通道,進程間經過ipc通道,使用message和send進行消息傳遞。用法以下:
// parent.js
var n = require('child_process').fork(__dirname + 'child.js')
n.on('message', function(m){
    console.log('parent get msg:'+  m)  
})
n.send({hello: 'world'})
// child.js
process.on('message', function(m){
    console.log('child get msg' + m)
})
process.send({hello: 'world'})
複製代碼
  1. 爲了解決端口不能重複監聽的問題,在nodev0.5.9中引入了進程間發送句柄的功能(句柄是一種能夠用來標識資源的引用,它的內部 包含了指向對象的文件描述符,好比句柄能夠用來標識一個socket對象 ,一個UDP套接字,一個管道等)。send方法出了能夠發送數據,還能夠發送句柄。
child.send(params, [sendHandle])
複製代碼

詳細用法:架構

// parent.js
var n = require('child_process').fork(__dirname + 'child.js')
var server = require('net').createServer();
server.listen(8080, function(){
    n.send('server', server)
})
// child.js
process.on('message', function(m, server){
    server.on('connection', function(socket){
        console.log('child get msg' + m)
        socket.end('handle by child process')
    })
})
複製代碼

經過傳遞 TCP server,咱們能夠發現,沒有異常了,多個子進程能夠監聽同個端口 。 在node句柄發送的過程當中,多個進程能夠監聽到相同的端口,而不引發EADDRINUSE異常,這是由於,咱們獨立啓動的進程中,tcp服務端套接字socket的文件描述符並不相同,致使監聽相同的端口時會拋出異常,因爲獨立啓動的進程互相之間並不知道文件描述符,因此監聽相同端口就會失敗,但對於send()發送的句柄還原出來的服務而言,他們的文件描述符是相同的,因此監聽相同端口不會引發異常。併發

總結下來,一句話:經過進程間ipc通訊傳遞句柄從而共享文件描述符

進程的啓動順序

egg-cluster/master.js中承擔了初始化,啓動agent和app進程,檢測狀態等工做,只詳細講解一下master.js中作了什麼?咱們看一下構造函數中的代碼,整個流程在constructor中已經很好的提現出來了。

constructor(options) {
  super();
  this.options = parseOptions(options);
  // new 一個 Messenger 實例
  this.messenger = new Messenger(this);
  // 借用 ready 模塊的方法
  ready.mixin(this);
  this.isProduction = isProduction();
  this.isDebug = isDebug();
  ...
  ...
  // 根據不一樣運行環境(localtest、prod)設置日誌輸出級別
  this.logger = new ConsoleLogger({ level: process.env.EGG_MASTER_LOGGER_LEVEL || 'INFO' });
  ...
}
// master 啓動成功後通知 parent、app worker、agent
this.ready(() => {
  this.isStarted = true;
  const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
  this.logger.info('[master] %s started on %s://127.0.0.1:%s (%sms)%s',
  frameworkPkg.name, this.options.https ? 'https' : 'http',
  this.options.port, Date.now() - startTime, stickyMsg);

  const action = 'egg-ready';
  this.messenger.send({ action, to: 'parent' });
  this.messenger.send({ action, to: 'app', data: this.options });
  this.messenger.send({ action, to: 'agent', data: this.options });
});
// 監聽 agent 退出
this.on('agent-exit', this.onAgentExit.bind(this));
// 監聽 agent 啓動
this.on('agent-start', this.onAgentStart.bind(this));
// 監聽 app worker 退出
this.on('app-exit', this.onAppExit.bind(this));
// 監聽 app worker 啓動
this.on('app-start', this.onAppStart.bind(this));
// 開發環境下監聽 app worker 重啓
this.on('reload-worker', this.onReload.bind(this));

// 監聽 agent 啓動,注意這裏只執行一次
this.once('agent-start', this.forkAppWorkers.bind(this));
// master監聽自身的退出及退出後的處理

// kill(2) Ctrl-C     監聽 SIGINT 信號
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\     監聽 SIGQUIT 信號
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default   監聽 SIGTERM 信號
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));

// 監聽 exit 事件
process.once('exit', this.onExit.bind(this));
// 監聽端口衝突
detectPort((err, port) => {
  /* istanbul ignore if */
  if (err) {
    err.name = 'ClusterPortConflictError';
    err.message = '[master] try get free port error, ' + err.message;
    this.logger.error(err);
    process.exit(1);
    return;
  }
  this.options.clusterPort = port;
  this.forkAgentWorker(); // 若是端口沒有衝突則執行該方法
});
複製代碼

master繼承eventEmitter,使用發佈訂閱模式監聽消息。 構造函數中的流程以下:

  1. 使用detect-port來獲取空閒的端口
  2. forkAgentWorker使用child_process.fork()來啓動agent進程,啓動後經過 process.send通知master agent已經啓動
agent.ready(() => {
 agent.removeListener('error', startErrorHandler);
 process.send({ action: 'agent-start', to: 'master' });
});
複製代碼
  1. forkAppWorkers: agent啓動後,經過 cluster.fork()啓動app_worker進程。
// fork app workers after agent started
this.once('agent-start', this.forkAppWorkers.bind(this));
複製代碼

這裏是使用了cfork模塊,其本質也是cluster.fork(),默認啓動進程數爲os.cpu.length,也能夠經過啓動參數來指定worker進程的數量。

cfork({
  exec: this.getAppWorkerFile(),
  args,
  silent: false,
  count: this.options.workers,
  // don't refork in local env refork: this.isProduction, windowsHide: process.platform === 'win32', }); 複製代碼

啓動成功後,經過messenger告知master,worker進程已經ready

this.messenger.send({
    action: 'app-start',
    data: {
      workerPid: worker.process.pid,
      address,
    },
    to: 'master',
    from: 'app',
  });
複製代碼
  1. onAppStart: app worker 啓動成功後通知 agent。並告知parent,egg-ready了,並帶上port,address,protocol等參數
this.ready(() => {
      this.isStarted = true;
      const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
      this.logger.info('[master] %s started on %s (%sms)%s',
        frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);

      const action = 'egg-ready';
      this.messenger.send({
        action,
        to: 'parent',
        data: {
          port: this[REAL_PORT],
          address: this[APP_ADDRESS],
          protocol: this[PROTOCOL],
        },
      });
      this.messenger.send({
        action,
        to: 'app',
        data: this.options,
      });
      this.messenger.send({
        action,
        to: 'agent',
        data: this.options,
      });

      // start check agent and worker status
      if (this.isProduction) {
        this.workerManager.startCheck();
      }
    });
複製代碼
  1. startCheck: 若是在生產環境,,每隔10s檢測agent和worker,若有異常則上報。
// check agent and worker must both alive
  // if exception appear 3 times, emit an exception event
  startCheck() {
    this.exception = 0;
    this.timer = setInterval(() => {
      const count = this.count();
      if (count.agent && count.worker) {
        this.exception = 0;
        return;
      }
      this.exception++;
      if (this.exception >= 3) {
        this.emit('exception', count);
        clearInterval(this.timer);
      }
    }, 10000);
  }
複製代碼

egg文檔上的流程圖很好的總結了以上過程:

進程間消息通信

進程間通訊原理(ipc)

IPC的全稱是Inter-Process Communication,即進程間通訊。進程間通訊的目的是爲了讓不一樣的進程可以互相訪問資源,並進程協調工做。實現進程間通訊的技術有不少,如命名管道、匿名管道、socket、信號量、共享內存、消息隊列、Domain Socket等,node中實現IPC通道的是管道技術(pipe)。在node中管道是個抽象層面的稱呼,具體細節實現由libuv提供,在win下是命名管道(named pipe)實現,在*nix下,採用unix Domain Socket來實現。

Q:那麼,進程間是如何經過ipc通道去連接的呢?

父進程在實際建立子進程前,會建立IPC通道並監聽它,而後才真正建立出子進程,並經過環境變量(NODE_CHANNEL_FD)告訴子進程這個IPC通訊的文件描述符。子進程在啓動的過程當中,根據文件描述符去鏈接這個已存在的IPC通道,從而完成父子進程之間的鏈接。

egg-cluster中的進程間通訊

cluster 的 IPC 通道只存在於 Master 和 Worker/Agent 之間,Worker 與 Agent 進程互相間是沒有的。那麼 Worker 之間想通信該怎麼辦呢?是的,經過 Master 來轉發

在egg-cluster的源碼中,封裝了一個messageer類來處理進程間通訊, 代碼傳送門

show the code

/**
 * master messenger,provide communication between parent, master, agent and app.
 *             ┌────────┐
 *             │ parent │
 *            /└────────┘\
 *           /     |      \
 *          /  ┌────────┐  \
 *         /   │ master │   \
 *        /    └────────┘    \
 *       /     /         \    \
 *     ┌───────┐         ┌───────┐
 *     │ agent │ ------- │  app  │
 *     └───────┘         └───────┘
 */
  send(data) {
    if (!data.from) {
      data.from = 'master';
    }

    // recognise receiverPid is to who
    if (data.receiverPid) {
      if (data.receiverPid === String(process.pid)) {
        data.to = 'master';
      } else if (data.receiverPid === String(this.master.agentWorker.pid)) {
        data.to = 'agent';
      } else {
        data.to = 'app';
      }
    }

    // default from -> to rules
    if (!data.to) {
      if (data.from === 'agent') data.to = 'app';
      if (data.from === 'app') data.to = 'agent';
      if (data.from === 'parent') data.to = 'master';
    }

    // app -> master
    // agent -> master
    if (data.to === 'master') {
      debug('%s -> master, data: %j', data.from, data);
      // app/agent to master
      this.sendToMaster(data);
      return;
    }

    // master -> parent
    // app -> parent
    // agent -> parent
    if (data.to === 'parent') {
      debug('%s -> parent, data: %j', data.from, data);
      this.sendToParent(data);
      return;
    }

    // parent -> master -> app
    // agent -> master -> app
    if (data.to === 'app') {
      debug('%s -> %s, data: %j', data.from, data.to, data);
      this.sendToAppWorker(data);
      return;
    }

    // parent -> master -> agent
    // app -> master -> agent,可能不指定 to
    if (data.to === 'agent') {
      debug('%s -> %s, data: %j', data.from, data.to, data);
      this.sendToAgentWorker(data);
      return;
    }
  }
複製代碼
  1. app/agent -> master: 經過master.emit(data.action, data.data)(master 繼承自 EventEmitter)
  2. app/master/agent -> parent: process.send(data)
  3. parent/agent -> master -> app: sendmessage(worker, data)
  4. parent/agent -> master -> agent: sendmessage(this.master.agentWorker, data)

注: [sendmessage]是一個蘇千大大寫的一個用於處理進程間通訊的module(Send a cross process message if message channel is connected.),感興趣的同窗能夠看看源碼https://github.com/node-modules/sendmessage/blob/master/index.js

可能有同窗會想,爲何多了一個parent?

原來,parent就是master進程的parent進程,通常是CLI,好比egg-script start和egg-bin,egg-script中經過('child_process').spawn 建立的master進程。child_process.spawn() 方法使用給定的 command 衍生一個新進程,並帶上 args 中的命令行參數。同時,經過傳遞detached參數,可使得在父進程退出後子進程繼續執行。
spawn文檔傳送門

(感謝@天豬 的解答,源碼連接

Ref

相關文章
相關標籤/搜索