最近用Egg做爲底層框架開發項目,好奇其多進程模型的管理實現,因而學習瞭解了一些東西,順便記錄下來。文章若有錯誤, 請輕噴html
伴隨科技的發展, 如今的服務器基本上都是多核cpu
的了。然而,Node是一個單進程單線程
語言(對於開發者來講是單線程,實際上不是)。咱們都知道,cpu的調度單位是線程
,而基於Node的特性,那麼咱們每次只能利用一個cpu。這樣不只僅利用率極低,並且容錯更是不能接受(出錯時會崩潰整個程序)。因此,Node有了cluster來協助咱們充分利用服務器的資源。 前端
cluster工做原理
關於cluster的工做原理推薦你們看這篇文章,這裏簡單總結一下:node
hack
掉,而是統一由master的內部TCP監聽
,因此不會出現多個子進程監聽同一端口而報錯的現象。請求統一通過master的內部TCP
,TCP的請求處理邏輯中,會挑選一個worker進程向其發送一個newconn內部消息,隨消息發送客戶端句柄
。(這裏的挑選有兩種方式,第一種是除Windows外全部平臺的默認方法循環法,即由主進程負責監聽端口,接收新鏈接後再將鏈接循環分發給工做進程。在分發中使用了一些內置技巧防止工做進程任務過載。第二種是主進程建立監聽socket後發送給感興趣的工做進程,由工做進程負責直接接收鏈接。)建立客戶端實例(net.socket)執行具體的業務邏輯
,而後返回。如圖:
圖引用出處git
先看一下Egg官方文檔的進程模型github
+--------+ +-------+ | Master |<-------->| Agent | +--------+ +-------+ ^ ^ ^ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
類型 | 進程數量 | 做用 | 穩定性 | 是否運行業務代碼 |
---|---|---|---|---|
Master | 1 | 進程管理,進程間消息轉發 | 很是高 | 否 |
Agent | 1 | 後臺運行工做(長鏈接客戶端) | 高 | 少許 |
Worker | 通常爲cpu核數 | 執行業務代碼 | 通常 | 是 |
大體上就是利用Master
做爲主線程,啓動Agent
做爲祕書進程協助Worker
處理一些公共事務(日誌之類),啓動Worker
進程執行真正的業務代碼。npm
首先從Master
入手,這裏暫時認爲Master是最頂級的進程(事實上還有一個parent
進程,待會再說)。後端
/** * start egg app * @method Egg#startCluster * @param {Object} options {@link Master} * @param {Function} callback start success callback */ exports.startCluster = function(options, callback) { new Master(options).ready(callback); };
先從Master的構造函數
看起api
constructor(options) { super(); // 初始化參數 this.options = parseOptions(options); // worker進程的管理類 詳情見 Manager及Messenger篇 this.workerManager = new Manager(); // messenger類, 詳情見 Manager及Messenger篇 this.messenger = new Messenger(this); // 設置一個ready事件 詳情見get-ready npm包 ready.mixin(this); // 是否爲生產環境 this.isProduction = isProduction(); this.agentWorkerIndex = 0; // 是否關閉 this.closed = false; ... 接下來看的是ready的回調函數及註冊的各種事件: this.ready(() => { // 將開始狀態設置爲true this.isStarted = true; const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : ''; this.logger.info('[master] %s started on %s (%sms)%s', frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg); // 發送egg-ready至各個進程並觸發相關事件 const action = 'egg-ready'; this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } }); this.messenger.send({ action, to: 'app', data: this.options }); this.messenger.send({ action, to: 'agent', data: this.options }); // start check agent and worker status this.workerManager.startCheck(); }); // 註冊各種事件 this.on('agent-exit', this.onAgentExit.bind(this)); this.on('agent-start', this.onAgentStart.bind(this)); ... // 檢查端口並 Fork一個Agent detectPort((err, port) => { ... this.forkAgentWorker(); } }); }
綜上, 能夠看到Master的構造函數主要是初始化和註冊各種相應的事件
, 最後運行的是forkAgentWorker
函數, 該函數的關鍵代碼能夠看到:服務器
const agentWorkerFile = path.join(__dirname, 'agent_worker.js'); // 經過child_process執行一個Agent const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
繼續到agent_worker.js
上面看,agent_worker
實例化一個agent
對象,agent_worker.js
有一句關鍵代碼:app
agent.ready(() => { agent.removeListener('error', startErrorHandler); // 清除錯誤監聽的事件 process.send({ action: 'agent-start', to: 'master' }); // 向master發送一個agent-start的動做 });
能夠看到, agent_worker.js
中的代碼向master
發出了一個信息, 動做爲agent-start
, 再回到Master
中, 能夠看到其註冊了兩個事件, 分別爲once的forkAppWorkers和 on的onAgentStart
this.on('agent-start', this.onAgentStart.bind(this)); this.once('agent-start', this.forkAppWorkers.bind(this));
先看onAgentStart
函數, 這個函數相對簡單, 就是一些信息的傳遞:
onAgentStart() { this.agentWorker.status = 'started'; // Send egg-ready when agent is started after launched if (this.isAllAppWorkerStarted) { this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options }); } this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] }); // should send current worker pids when agent restart if (this.isStarted) { this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() }); } this.messenger.send({ action: 'agent-start', to: 'app' }); this.logger.info('[master] agent_worker#%s:%s started (%sms)', this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime); }
而後會執行forkAppWorkers
函數,該函數主要是藉助cfork包fork
對應的工做進程, 並註冊一系列相關的監聽事件,
... cfork({ exec: this.getAppWorkerFile(), args, silent: false, count: this.options.workers, // don't refork in local env refork: this.isProduction, }); ... // 觸發app-start事件 cluster.on('listening', (worker, address) => { this.messenger.send({ action: 'app-start', data: { workerPid: worker.process.pid, address }, to: 'master', from: 'app', }); });
能夠看到forkAppWorkers
函數在監聽Listening
事件時,會觸發master
上的app-start
事件。
this.on('app-start', this.onAppStart.bind(this)); ... // master ready回調觸發 if (this.options.sticky) { this.startMasterSocketServer(err => { if (err) return this.ready(err); this.ready(true); }); } else { this.ready(true); } // ready回調 發送egg-ready狀態到各個進程 const action = 'egg-ready'; this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } }); this.messenger.send({ action, to: 'app', data: this.options }); this.messenger.send({ action, to: 'agent', data: this.options }); // start check agent and worker status if (this.isProduction) { this.workerManager.startCheck(); }
總結下:
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|
根據官方文檔,進程守護主要是依賴於graceful和egg-cluster這兩個庫。
未捕獲異常
+---------+ +---------+ | Worker | | Master | +---------+ +----+----+ | uncaughtException | +------------+ | | | | +---------+ | <----------+ | | Worker | | | +----+----+ | disconnect | fork a new worker | +-------------------------> + ---------------------> | | wait... | | | exit | | +-------------------------> | | | | | die | | | | | |
由執行的app文件可知, app
其實是繼承於Application類, 該類下面調用了graceful()
。
onServer(server) { ...... graceful({ server: [ server ], error: (err, throwErrorCount) => { ...... }, }); ...... }
繼續看graceful
, 能夠看到它捕獲了process.on('uncaughtException')
事件, 並在回調函數裏面關閉TCP
鏈接, 關閉自己進程, 斷開與master
的IPC
通道。
process.on('uncaughtException', function (err) { ...... // 對http鏈接設置 Connection: close響應頭 servers.forEach(function (server) { if (server instanceof http.Server) { server.on('request', function (req, res) { // Let http server set `Connection: close` header, and close the current request socket. req.shouldKeepAlive = false; res.shouldKeepAlive = false; if (!res._header) { res.setHeader('Connection', 'close'); } }); } }); // 設置一個定時函數關閉子進程, 並退出自己進程 // make sure we close down within `killTimeout` seconds var killtimer = setTimeout(function () { console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid); if (process.env.NODE_ENV !== 'test') { // kill children by SIGKILL before exit killChildren(function() { // 退出自己進程 process.exit(1); }); } }, killTimeout); // But don't keep the process open just for that! // If there is no more io waitting, just let process exit normally. if (typeof killtimer.unref === 'function') { // only worked on node 0.10+ killtimer.unref(); } var worker = options.worker || cluster.worker; // cluster mode if (worker) { try { // 關閉TCP鏈接 for (var i = 0; i < servers.length; i++) { var server = servers[i]; server.close(); } } catch (er1) { ...... } try { // 關閉ICP通道 worker.disconnect(); } catch (er2) { ...... } } });
ok, 關閉了IPC
通道後, 咱們繼續看cfork
文件, 即上面提到的fork worker
的包, 裏面監聽了子進程的disconnect
事件, 他會根據條件判斷是否從新fork
一個新的子進程
cluster.on('disconnect', function (worker) { ...... // 存起該pid disconnects[worker.process.pid] = utility.logDate(); if (allow()) { // fork一個新的子進程 newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; } else { ...... } });
通常來講, 這個時候會繼續等待一會而後就執行了上面說到的定時函數了, 即退出進程
。
OOM、系統異常
關於這種系統異常
, 有時候在子進程中是不能捕獲到
的, 咱們只能在master中進行處理, 也就是cfork
包。
cluster.on('exit', function (worker, code, signal) { // 是程序異常的話, 會經過上面提到的uncatughException從新fork一個子進程, 因此這裏就不須要了 var isExpected = !!disconnects[worker.process.pid]; if (isExpected) { delete disconnects[worker.process.pid]; // worker disconnect first, exit expected return; } // 是master殺死的子進程, 無需fork if (worker.disableRefork) { // worker is killed by master return; } if (allow()) { newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; } else { ...... } cluster.emit('unexpectedExit', worker, code, signal); });
上面一直提到各類進程間通訊,細心的你可能已經發現 cluster 的 IPC 通道只存在於 Master 和 Worker/Agent 之間,Worker 與 Agent 進程互相間是沒有的。那麼 Worker 之間想通信該怎麼辦呢?是的,經過 Master 來轉發。
廣播消息: agent => all workers +--------+ +-------+ | Master |<---------| Agent | +--------+ +-------+ / | \ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+ 指定接收方: one worker => another worker +--------+ +-------+ | Master |----------| Agent | +--------+ +-------+ ^ | send to / | worker 2 / | / | / v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+
在master
中, 能夠看到當agent和app被fork時
, 會監聽他們的信息, 同時將信息轉化成一個對象:
agentWorker.on('message', msg => { if (typeof msg === 'string') msg = { action: msg, data: msg }; msg.from = 'agent'; this.messenger.send(msg); }); worker.on('message', msg => { if (typeof msg === 'string') msg = { action: msg, data: msg }; msg.from = 'app'; this.messenger.send(msg); });
能夠看到最後調用的是messenger.send
, 而messengeer.send就是根據from和to來決定將信息發送到哪裏
send(data) { if (!data.from) { data.from = 'master'; } ...... // app -> master // agent -> master if (data.to === 'master') { debug('%s -> master, data: %j', data.from, data); // app/agent to master this.sendToMaster(data); return; } // master -> parent // app -> parent // agent -> parent if (data.to === 'parent') { debug('%s -> parent, data: %j', data.from, data); this.sendToParent(data); return; } // parent -> master -> app // agent -> master -> app if (data.to === 'app') { debug('%s -> %s, data: %j', data.from, data.to, data); this.sendToAppWorker(data); return; } // parent -> master -> agent // app -> master -> agent,可能不指定 to if (data.to === 'agent') { debug('%s -> %s, data: %j', data.from, data.to, data); this.sendToAgentWorker(data); return; } }
master
則是直接根據action
信息emit
對應的註冊事件
sendToMaster(data) { this.master.emit(data.action, data.data); }
而agent和worker則是經過一個sendmessage
包, 實際上就是調用下面相似的方法
// 將信息傳給子進程 agent.send(data) worker.send(data)
最後, 在agent和app都繼承的基礎類EggApplication
上, 調用了Messenger
類, 該類內部的構造函數以下:
constructor() { super(); ...... this._onMessage = this._onMessage.bind(this); process.on('message', this._onMessage); } _onMessage(message) { if (message && is.string(message.action)) { // 和master同樣根據action信息emit對應的註冊事件 this.emit(message.action, message.data); } }
總結一下:
思路就是利用事件機制和IPC通道來達到各個進程之間的通訊。
學習過程當中有遇到一個timeout.unref()的函數, 關於該函數推薦你們參考這個問題的6樓回答
從前端思惟轉到後端思惟其實仍是很吃力的,加上Egg的進程管理實現確實很是厲害, 因此花了不少時間在各類api和思路思考上。