咱們知道,js是單線程的,意味着一個nodejs進程只能運行在單個cpu上面。nodejs在io處理方面是很是優秀的,可是在密集運算型應用中仍然有不足之處,而解決的辦法,就是利用多核cpu的併發優點,將nodejs進程運行在各個cpu上面。而egg爲咱們提供了egg-cluster模塊,用於多進程管理以及進程間通訊。html
借用官網的文檔,cluster是什麼呢? 簡單的說:node
其中:git
ps: 由於官網講得很詳細,因此這一部分都是借鑑官網的。github
下面,咱們經過文檔中的圖,來看看多進程模型windows
與咱們使用pm2開啓進程守護相似。master進程充當了進程管理的工做,不會運行任何業務代碼,它負責agent,worker進程的start,reload工做以及進程間消息轉發等工做。有些工做其實不須要每一個 Worker 都去作,若是都作,一來是浪費資源,更重要的是可能會致使多進程間資源訪問衝突api
Agent
在大部分狀況下,咱們在寫業務代碼的時候徹底不用考慮 Agent 進程的存在,可是當咱們遇到一些場景,只想讓代碼運行在一個進程上的時候,Agent 進程就到了發揮做用的時候了。 因爲 Agent 只有一個,並且會負責許多維持鏈接的髒活累活,所以它不能輕易掛掉和重啓,因此 Agent 進程在監聽到未捕獲異常時不會退出,可是會打印出錯誤日誌,咱們須要對日誌中的未捕獲異常提升警戒。bash
Q: 當fork進程時,明明代碼中已經監聽了一個端口,爲何fork時沒有報端口占用?
A: cluster的工做原理推薦這一篇文章《經過源碼解析 Node.js 中 cluster 模塊的主要功能實現》, 結合樸靈老師的《深刻淺出nodejs》中的多進程架構,這裏作一下總結:服務器
// parent.js
var n = require('child_process').fork(__dirname + 'child.js')
n.on('message', function(m){
console.log('parent get msg:'+ m)
})
n.send({hello: 'world'})
// child.js
process.on('message', function(m){
console.log('child get msg' + m)
})
process.send({hello: 'world'})
複製代碼
child.send(params, [sendHandle])
複製代碼
詳細用法:架構
// parent.js
var n = require('child_process').fork(__dirname + 'child.js')
var server = require('net').createServer();
server.listen(8080, function(){
n.send('server', server)
})
// child.js
process.on('message', function(m, server){
server.on('connection', function(socket){
console.log('child get msg' + m)
socket.end('handle by child process')
})
})
複製代碼
經過傳遞 TCP server,咱們能夠發現,沒有異常了,多個子進程能夠監聽同個端口 。 在node句柄發送的過程當中,多個進程能夠監聽到相同的端口,而不引發EADDRINUSE異常,這是由於,咱們獨立啓動的進程中,tcp服務端套接字socket的文件描述符並不相同,致使監聽相同的端口時會拋出異常,因爲獨立啓動的進程互相之間並不知道文件描述符,因此監聽相同端口就會失敗,但對於send()發送的句柄還原出來的服務而言,他們的文件描述符是相同的,因此監聽相同端口不會引發異常。
併發
總結下來,一句話:經過進程間ipc通訊傳遞句柄從而共享文件描述符
egg-cluster/master.js中承擔了初始化,啓動agent和app進程,檢測狀態等工做,只詳細講解一下master.js中作了什麼?咱們看一下構造函數中的代碼,整個流程在constructor中已經很好的提現出來了。
constructor(options) {
super();
this.options = parseOptions(options);
// new 一個 Messenger 實例
this.messenger = new Messenger(this);
// 借用 ready 模塊的方法
ready.mixin(this);
this.isProduction = isProduction();
this.isDebug = isDebug();
...
...
// 根據不一樣運行環境(local、test、prod)設置日誌輸出級別
this.logger = new ConsoleLogger({ level: process.env.EGG_MASTER_LOGGER_LEVEL || 'INFO' });
...
}
// master 啓動成功後通知 parent、app worker、agent
this.ready(() => {
this.isStarted = true;
const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
this.logger.info('[master] %s started on %s://127.0.0.1:%s (%sms)%s',
frameworkPkg.name, this.options.https ? 'https' : 'http',
this.options.port, Date.now() - startTime, stickyMsg);
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent' });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
});
// 監聽 agent 退出
this.on('agent-exit', this.onAgentExit.bind(this));
// 監聽 agent 啓動
this.on('agent-start', this.onAgentStart.bind(this));
// 監聽 app worker 退出
this.on('app-exit', this.onAppExit.bind(this));
// 監聽 app worker 啓動
this.on('app-start', this.onAppStart.bind(this));
// 開發環境下監聽 app worker 重啓
this.on('reload-worker', this.onReload.bind(this));
// 監聽 agent 啓動,注意這裏只執行一次
this.once('agent-start', this.forkAppWorkers.bind(this));
// master監聽自身的退出及退出後的處理
// kill(2) Ctrl-C 監聽 SIGINT 信號
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\ 監聽 SIGQUIT 信號
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default 監聽 SIGTERM 信號
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
// 監聽 exit 事件
process.once('exit', this.onExit.bind(this));
// 監聽端口衝突
detectPort((err, port) => {
/* istanbul ignore if */
if (err) {
err.name = 'ClusterPortConflictError';
err.message = '[master] try get free port error, ' + err.message;
this.logger.error(err);
process.exit(1);
return;
}
this.options.clusterPort = port;
this.forkAgentWorker(); // 若是端口沒有衝突則執行該方法
});
複製代碼
master繼承eventEmitter,使用發佈訂閱模式監聽消息。 構造函數中的流程以下:
detect-port
來獲取空閒的端口forkAgentWorker
使用child_process.fork()來啓動agent進程,啓動後經過 process.send
通知master agent已經啓動agent.ready(() => {
agent.removeListener('error', startErrorHandler);
process.send({ action: 'agent-start', to: 'master' });
});
複製代碼
forkAppWorkers
: agent啓動後,經過 cluster.fork()
啓動app_worker進程。// fork app workers after agent started
this.once('agent-start', this.forkAppWorkers.bind(this));
複製代碼
這裏是使用了cfork
模塊,其本質也是cluster.fork()
,默認啓動進程數爲os.cpu.length
,也能夠經過啓動參數來指定worker進程的數量。
cfork({
exec: this.getAppWorkerFile(),
args,
silent: false,
count: this.options.workers,
// don't refork in local env refork: this.isProduction, windowsHide: process.platform === 'win32', }); 複製代碼
啓動成功後,經過messenger告知master,worker進程已經ready
this.messenger.send({
action: 'app-start',
data: {
workerPid: worker.process.pid,
address,
},
to: 'master',
from: 'app',
});
複製代碼
onAppStart
: app worker 啓動成功後通知 agent。並告知parent,egg-ready了,並帶上port,address,protocol
等參數this.ready(() => {
this.isStarted = true;
const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
this.logger.info('[master] %s started on %s (%sms)%s',
frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);
const action = 'egg-ready';
this.messenger.send({
action,
to: 'parent',
data: {
port: this[REAL_PORT],
address: this[APP_ADDRESS],
protocol: this[PROTOCOL],
},
});
this.messenger.send({
action,
to: 'app',
data: this.options,
});
this.messenger.send({
action,
to: 'agent',
data: this.options,
});
// start check agent and worker status
if (this.isProduction) {
this.workerManager.startCheck();
}
});
複製代碼
startCheck
: 若是在生產環境,,每隔10s檢測agent和worker,若有異常則上報。// check agent and worker must both alive
// if exception appear 3 times, emit an exception event
startCheck() {
this.exception = 0;
this.timer = setInterval(() => {
const count = this.count();
if (count.agent && count.worker) {
this.exception = 0;
return;
}
this.exception++;
if (this.exception >= 3) {
this.emit('exception', count);
clearInterval(this.timer);
}
}, 10000);
}
複製代碼
egg文檔上的流程圖很好的總結了以上過程:
IPC的全稱是Inter-Process Communication,即進程間通訊。進程間通訊的目的是爲了讓不一樣的進程可以互相訪問資源,並進程協調工做。實現進程間通訊的技術有不少,如命名管道、匿名管道、socket、信號量、共享內存、消息隊列、Domain Socket等,node中實現IPC通道的是管道技術(pipe)。在node中管道是個抽象層面的稱呼,具體細節實現由libuv提供,在win下是命名管道(named pipe)實現,在*nix下,採用unix Domain Socket來實現。
Q:那麼,進程間是如何經過ipc通道去連接的呢?
父進程在實際建立子進程前,會建立IPC通道並監聽它,而後才真正建立出子進程,並經過環境變量(NODE_CHANNEL_FD)告訴子進程這個IPC通訊的文件描述符。子進程在啓動的過程當中,根據文件描述符去鏈接這個已存在的IPC通道,從而完成父子進程之間的鏈接。
cluster 的 IPC 通道只存在於 Master 和 Worker/Agent 之間,Worker 與 Agent 進程互相間是沒有的。那麼 Worker 之間想通信該怎麼辦呢?是的,經過 Master 來轉發
在egg-cluster的源碼中,封裝了一個messageer類來處理進程間通訊, 代碼傳送門show the code
/**
* master messenger,provide communication between parent, master, agent and app.
* ┌────────┐
* │ parent │
* /└────────┘\
* / | \
* / ┌────────┐ \
* / │ master │ \
* / └────────┘ \
* / / \ \
* ┌───────┐ ┌───────┐
* │ agent │ ------- │ app │
* └───────┘ └───────┘
*/
send(data) {
if (!data.from) {
data.from = 'master';
}
// recognise receiverPid is to who
if (data.receiverPid) {
if (data.receiverPid === String(process.pid)) {
data.to = 'master';
} else if (data.receiverPid === String(this.master.agentWorker.pid)) {
data.to = 'agent';
} else {
data.to = 'app';
}
}
// default from -> to rules
if (!data.to) {
if (data.from === 'agent') data.to = 'app';
if (data.from === 'app') data.to = 'agent';
if (data.from === 'parent') data.to = 'master';
}
// app -> master
// agent -> master
if (data.to === 'master') {
debug('%s -> master, data: %j', data.from, data);
// app/agent to master
this.sendToMaster(data);
return;
}
// master -> parent
// app -> parent
// agent -> parent
if (data.to === 'parent') {
debug('%s -> parent, data: %j', data.from, data);
this.sendToParent(data);
return;
}
// parent -> master -> app
// agent -> master -> app
if (data.to === 'app') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAppWorker(data);
return;
}
// parent -> master -> agent
// app -> master -> agent,可能不指定 to
if (data.to === 'agent') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAgentWorker(data);
return;
}
}
複製代碼
master.emit(data.action, data.data)
(master 繼承自 EventEmitter)process.send(data)
sendmessage(worker, data)
sendmessage(this.master.agentWorker, data)
注: [sendmessage]是一個蘇千大大寫的一個用於處理進程間通訊的module(Send a cross process message if message channel is connected.),感興趣的同窗能夠看看源碼https://github.com/node-modules/sendmessage/blob/master/index.js
可能有同窗會想,爲何多了一個parent?
原來,parent就是master進程的parent進程,通常是CLI,好比egg-script start和egg-bin,egg-script中經過('child_process').spawn
建立的master進程。child_process.spawn()
方法使用給定的 command 衍生一個新進程,並帶上 args 中的命令行參數。同時,經過傳遞detached
參數,可使得在父進程退出後子進程繼續執行。
spawn文檔傳送門
(感謝@天豬 的解答,源碼連接)