目錄html
示例代碼託管在:http://www.github.com/dashnowords/blogs前端
博客園地址:《大史住在大前端》原創博文目錄git
華爲雲社區地址:【你要的前端打怪升級指南】github
閱讀本章須要先閱讀本系列前兩章內容預熱一下。windows
前兩篇博文中已經分別介紹了使用cluster
模塊創建集羣時主進程執行cluster.fork( )
方法時的執行邏輯,以及net
模塊在不一樣場景下創建通信的基本原理。本篇繼續分析cluster
模塊,從第一個子進程開始創建服務器講起,cluster
基本用法示例代碼再來一遍:數組
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { console.log(`主進程 ${process.pid} 正在運行`); // 衍生工做進程。 for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`工做進程 ${worker.process.pid} 已退出`); }); } else { // 工做進程能夠共享任何 TCP 鏈接。 // 在本例子中,共享的是 HTTP 服務器。 http.createServer((req, res) => { res.writeHead(200); res.end('你好世界\n'); }).listen(8000); console.log(`工做進程 ${process.pid} 已啓動`); }
代碼是足夠精簡的,實現過程也確實是很龐大的工程。每個子進程中執行的邏輯都是http.createServer().listen()
,咱們來看看它是如何一步一步運做而最終創建通信機制的,你會發現它和上一節中的簡易模型很是類似。緩存
在http
模塊的源碼中很容易找到http.createServer( )
方法的邏輯就是透傳參數生成了一個net.Server
實例,這個實例在上一節中就已經介紹過,實際上就只是生成了一個server
的實例,因此這裏跳轉到net.Server.prototype.listen()
(net.js
文件1306-1404行),基本邏輯以下:服務器
Server.prototype.listen = function(...args){ const normalized = normalizeArgs(args); var options = normalized[0]; /*..獲取監聽參數中的句柄對象..*/ options = options._handle || options.handle || options; //若是options上有句柄,句柄是一個TCP實例 if(options instanceof TCP){ //...... listenInCluster(......); } //若是配置參數中有fd(file descriptor) if(typeof options.fd === 'number' && options.fd >=0){ //...... listenInCluster(......); } //若是參數中有port端口號 if(typeof options.port === 'number' || typeof options.port === 'string'){ //..... listenInCluster(......); } //若是參數中有port端口號 或 字符型的pipe名稱 if(typeof options.port === 'number' || typeof options.port === 'string'){ //..... listenInCluster(......); } }
這裏不難看出它的邏輯就和net
模塊官方文檔中描述的server.listen( )
的幾種場景對應,能夠監聽帶有非空handle
屬性的句柄對象,數字型端口號,字符串型命名管道地址,或者直接傳入配置參數合集options
,而後分別根據幾種不一樣的狀況來調用listenInCluster
方法(集羣功能的邏輯主線是數字型port,假設傳入了12315
)。併發
listenInCluster
方法定義以下:socket
大體能夠看出,若是是主進程,就直接調用server._listen2()
方法而後return
了,不然(也就是在工做進程中的邏輯,敲黑板!!!這裏是重點了),構造一個serverQuery
的參數集,能夠看到裏面記錄了以各類不一樣姿式調用這個方法時傳入的參數,因此有的參數爲null
也很正常,而後調用了cluster._getServer( )
方法,這就是工做進程在引用cluster
模塊時引入的child.js
中定義並掛載在cluster
上的方法,最後一個參數listenOnMasterHandle
是一個回調函數,也是一個錯誤前置風格的函數,能夠看到,它接收了一個句柄對象,並把這個句柄對象掛載在了子進程這個server
實例的_handle
屬性上,接着也調用了server._listen2( )
方法,能夠看到兩種狀況下調用這個方法時傳入的參數是同樣的。接着來到server._listen2( )
方法,它綁定了setupListenHandle
方法(別抓狂,這是net
模塊中相關邏輯的最後一步了),簡化代碼以下:
function setupListenHandle(......){ if (this._handle) { //工做進程在執行上一步邏輯時,在cluster._getServer()回調函數中把一個handle傳遞給了server._handle debug('setupListenHandle: have a handle already'); } else { //主進程會執行的邏輯 debug('setupListenHandle: create a handle'); //...... rval = createServerHandle(address, port, addressType, fd, flags); //...... this._handle = rval; } //...... this._handle.onconnection = onconnection; this._handle[owner_symbol] = this; //.... }
工做進程經過cluster._getServer( )
方法拿到了一個handle
,因此不會再生成,而主進程server.listen(port)
執行時會走到else分支,而後生成一個新的綁定了端口號的特殊的socket句柄而後掛載到主進程server._handle
上,這裏對句柄的connection
事件回調邏輯進行了修改,相關代碼以下:
這裏須要注意的是,server._handle
的connection
事件和server
的connection
事件是兩碼事,server._handle
指向的是一個綁定了端口的特殊的socket
句柄,當客戶端connect一個server
時實際上底層是客戶端socket
與服務端這個socket
的對接,因此須要在server._handle
這個的connection
回調函數中,將客戶端的socket
句柄clientHandle
從新包裝,而後再經過觸發server
的connection
事件將其轉發給server
實例。因此在使用server
實例時能夠直接添加connectionListener
:
let server = net.createServer(socket=>{ /*這個回調函數就是server的connection事件回調 * 這裏接收到的socket就是server._handle的connection收到的客戶端句柄clientHandle封裝成的socket實例 */ })
不管是主進程仍是子進程都會觸發這個邏輯,只須要當作是一種功能性質的封裝便可,並不影響業務邏輯。
下面回到cluster
模塊繼續,_getServer( )
方法只存在於子進程代碼中,源碼位於lib/internal/cluster/child.js
,方法定義在54-106行,基本邏輯以下:
cluster._getServer = function(obj, options, cb){ /* 這裏的obj就是子進程中運行上面listenInCluster方法中傳入的server, * options就是serverQuery, * cb就是最後要把主進程handle傳回去的回調函數listenOnMasterHandler */ //先構建index而後進行了一通記錄,就是根據監聽的參數來構建一個識別這個server的索引 //而後構建消息 const message = { act: 'queryServer', index, data: null, ...options }; //...... /* 發送act:queryServer消息,並傳一個回調函數, * 從形參命名就能夠看出,這個回調函數被調用時會被傳入一個句柄, * 最後根據不一樣的調度策略來執行不一樣的函數,這裏主要看Round-robin */ send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); //...... }
rr
方法將響應reply
和前一個調用者傳入的回調函數cb
進行了透傳,rr
的函數體就是實現listen
方法偷樑換柱的地方了:
// Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; function listen(backlog) { return 0; } function close() { if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } function getsockname(out) { if (key) Object.assign(out, message.sockname); return 0; } const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles.has(key) === false); handles.set(key, handle); cb(0, handle); //這裏的cb其實就是listenInCluster方法中定義的那個listenOnMasterHandler回調 }
能夠看到rr
方法中構建了一個假的handle
句柄,並調用cb將它傳了回去,而後執行邏輯回回到net
模塊,前文已經提這個handle在回調函數中被掛載在了server._handle
上,因而setupListenHandle( )
的邏輯中也不會從新構建句柄。
從新梳理一下這部分的邏輯,就是子進程中調用listen
方法時,會經過cluster._getServer( )
拿到一個假句柄,而後執行一個空的listen
方法,這樣就避免了端口的重複監聽。因此咱們能夠推測,cluster._getServer( )
必然會觸發主進程啓動一個監聽端口的服務器,並創建對子進程的調度,進程之間的IPC通信能夠直接經過process
對象來完成,不須要再從新構建跨進程通信管道。
繼續進行後續內容前,先來看一個獨立的跨進程通信工具,源碼放在lib/internal/cluster/utils.js
。
它是cluster
模塊發送跨進程消息的內部代理,這個模塊對外暴露了消息發送方法sendHelper
和內部消息監聽器的預處理方法internal
,源碼很短就不貼了。當子進程調用sendHelper
發送消息時,utils
內部會把這條消息處理完成後須要執行的回調函數先緩存起來,而後給消息添加一些包裝標記,而後再發出去;internal
會對傳入的內部消息監聽器進行代理,過濾掉非NODE_CLUSTER
類別的消息,若是消息攜帶的message
對象沒有ack
屬性則最終會執行綁定監聽時傳入的回調函數,不然會從緩存中找出以前暫存的回調函數來執行。
發個消息爲何要搞這麼複雜呢?這個ack
屬性又是哪來的呢?其實這個utils
模塊主要是在跨進程的雙向消息通信時實現了方法複用,同一個message
從工做進程發往主進程時和主進程發回給工做進程時是由同一個事件名internalMessage
攜帶的,那如何來區分消息發送的方向呢,就是ack
屬性,若是消息帶有ack
屬性,就表示它是由主進程發給子進程的,那麼就要調用子進程中的後續處理方法,這個方法其實就是子進程發送消息給主進程以前暫存在utils
內部callbacks
裏的方法,也就是child.js
中cluster._getServer()
中調用send
方法時傳入的回調方法,也就是net
模塊中listenInCluster( )
方法中的listenOnMasterHandle
方法,這個方法漂洋過海透傳了N個函數,的確不容易看懂,「回調地獄」也的確不是鬧着玩的。再看看沒有ack
屬性的狀況,沒有這個屬性時消息是從子進程發給主進程的,天然要調用主進程的方法,從邏輯裏不難看出,這種狀況下方法引用的就是internal
方法執行時傳入的第二個參數(master.js
源碼213行執行的internal(worker, onmessage)
的onmessage
這個函數),源碼中就是利用高階函數這種分步執行的特色實現了引用。
故事再回到第三節工做進程中發出act:'queryServer
的消息後,來看主進程master.js
中的代碼,主進程中在調用cluster.fork( )
時就綁定了對worker線程internalMessage
的監聽,對於act:queryServer
類型的集羣消息,主進程已經定義了queryServer
這個方法來處理。這段源代碼的主要邏輯以下:
1.根據重要參數組拼接出一個惟一的key 2.1.根據key查詢是否有已經存在的調度句柄round-robin-handle,若是有則直接進行後續邏輯 2.2.若是沒有已經存在的調度句柄,則選擇調度策略,實例化一個調度句柄,並把它添加進記錄裏 3.把消息數據message.data掛載在調度句柄的handle.data字段上 4.執行調度句柄的add方法,把子進程和一個回調方法傳進實例,回調方法被執行時會從調度句柄中取得數據,並組裝返回消息(帶有ack屬性和其餘數據的消息)發給子進程,子進程收到這個消息後執行的方法,就是前文分析過的返回假句柄給net模塊中的`listenInCluster()`邏輯。
從開篇的多進程代碼能夠看到,每一個子進程中執行的listen
方法監聽的端口號都是同樣的,因此每一個子進程發送queryServer
消息給主進程並執行這段邏輯時,其實對應的key
都是同樣的,因此調度對象RoundRobinHandle
只會實例化一次,在以後的過程當中,每個子進程會根據key
獲取到同一個調度實例,並調用add
方法將worker
對象和一個回調函數添加進調度實例,能夠看到回調函數執行時,就會將原message
中的seq
屬性的值添加給ack
屬性再掛載上處理後的數據併發送給子進程。那麼剩下的事情,就剩下調度對象RoundRobinHandle
的源碼了。
咱們不妨來推測一下,它的主要邏輯就是在主進程中創建真正監聽目標端口的服務器,並添加當客戶端請求到達時對於工做進程的調度代碼,下一節咱們就一塊兒來驗證一下。
調度方法的源碼是internal/cluster/round_robin_handle.js
,另外一種shared_handle.js
是windows下使用的調度策略,先不作分析(主要是沒研究過,不敢瞎說)。先從構造函數開始:
16行,bingo,終於看到主進程啓動服務器了。接着就是根據參數而分流的監聽方法,集羣代碼中對應的是20行的帶有有效port
參數的狀況,因此服務器就在主進程啓動了,最後來看看server
開始觸發listening
事件時執行的邏輯(此處調用的是once
方法,因此只會執行一次):
1.將主進程server的內部_handle句柄,掛載給round-robin-handle實例 2.當這個句柄被鏈接時(也就是客戶端socket執行connect方法鏈接後),會觸發它的`connection`事件,回調函數會調用`distribute`方法來分發這個客戶端socket句柄,注意32行後面半句的箭頭函數方法,這裏的handle就是指客戶端`socket`實例。 3.將server._handle指向null 4.將server屬性指向null
若是你還記得net
模塊中listen
方法的邏輯的話可能會有印象,_handle
的connection
事件回調其實本來已經被複寫過一次了,也就是說單進程運行的程序在創建服務器時,server._handle
的connection
事件會觸發server
實例的connection
事件,而在集羣模式下,主進程中調度實例中服務器句柄server._handle
的connection
再次被複寫,將邏輯改變爲分發socket
,而子進程中的server._handle
仍是保持原來的邏輯。
最後一步指向null
的邏輯還涉及到add
方法,繼續看主進程中調用的add
方法:
這個send
形參實際上就是主進程中傳入的最終向子進程發送返回消息的那個回調函數,它被封裝進了done
函數,這裏須要着重看一下55行的邏輯,this.server === null
這個條件實際上對應的就是構造函數中服務器開始監聽的事件,因此55-59行的代碼以及構造函數中添加的listening
事件的回調函數須要聯合在一塊兒來理解,也就是每一個子進程的send
方法都被包裹在一個獨立的done
函數中,這個函數會在主進程的server
處於listening
狀態後觸發執行,而且只觸發一次。當它觸發時,會從實例的handle
屬性(也就是server
的_handle
句柄)上取得socket
名稱而後調用send
方法,這個特殊socket
的名稱在回調函數中對應reply
形參,最終掛載在message
中發回了子進程。
至此其實主進程和子進程創建服務器的消息已經完成了閉環。最後再看一下RoundRobinHandle
中最後兩個方法:
當客戶端socket
執行connect
方法鏈接到主進程server
的句柄後,主進程會調用round-robin-handle
實例的distribute
方法,這個方法的邏輯比較簡單,把這個客戶端句柄加入到待處理隊列,而後從空閒進程隊列頭部取出一個worker
進程,把它做爲參數傳給handoff
方法。
handoff
方法中,從客戶端請求句柄隊列的頭部取出下一個待處理的socket
,若是已經沒有要處理的請求,就把傳進來的worker
放回空閒子進程隊列free
中。在add
方法內部封裝的done
方法中也執行了這個handoff
方法,如今再回過頭來看這個add
方法的做用,就是當主進程處於監聽狀態後,將每個子進程對象worker
依次添加到空閒進程隊列free
中。最後夠早了一個新的act:newconn
消息,並經過調度選出的worker.process
對象實現跨進程通信來將待處理句柄和【新鏈接】消息發送給子進程。
集羣創建過程的邏輯大體的跳轉路徑以下,細節部分直接參考前文的講解便可。