最近作了點nodejs項目,對nodejs的cluster怎麼利用多進程處理請求產生了疑問,因而着手進行了研究,以後發現這其中竟大有文章!一切仍是先從遙遠的TCP提及吧。。。javascript
說到TCP,相信不少人都至關了解了,大學已經教過,可是又相信有不少人也不是很瞭解,要不是當時沒聽,要不也多是自身的編程能力不足以去實踐相關內容,寫到這我還特地去翻了一下大學的計算機網絡教材,內容是很豐富的,但教人實踐的內容仍是太少了,裏面的內容都把學生當成了有至關的Linux編程能力的人了,因此結果就是大部分只上了一年編程課剛學會幾個Hello world程序的大二學生,聽了這門課後一臉懵逼,即便記住了也由於沒什麼實踐很快忘了,當年我就是這麼懵逼過來的。
因此,扯了這些,結果是什麼呢,結果就是咱們要多動手!而要動手創建一條TCP鏈接能夠用socket來實現,不過這裏不是要說socket用法,只是來簡單聊一聊他們之間的一點小聯繫,以便於理解後面的內容。java
應用層經過傳輸層進行TCP通訊時,有時TCP須要爲多個應用程序進程提供併發服務。多個TCP鏈接或多個應用程序進程可能須要經過同一個TCP協議端口傳輸數據。爲了區別不一樣的應用程序進程和鏈接,許多計算機操做系統爲應用程序與TCP協議交互提供了稱爲套接字 (Socket)的接口,區分不一樣應用程序進程間的網絡通訊和鏈接。node
咱們能夠用一個四元組來肯定一條TCP鏈接(源ip,源端口,目標ip,目標端口),而鏈接是經過socket來創建的(服務端進行bind和listen->客戶端發起connect->服務端accept),計算機系統就是經過socket來區分不一樣的TCP鏈接的。因此咱們能夠看出來,只要目標ip/端口不一樣,服務端能夠用同一個端口生成多個socket,創建多條鏈接。
可是,一個進程只能監聽一個端口,一個端口怎麼生成多個socket呢?其實服務器端程序通常會把socket和服務器某個端口(ip+端口)bind起來, 這樣構成了一個特殊的socket, 這個socket沒有目標ip和端口。socket進行listen以後當有新的鏈接進來時, 系統將請求存進隊列(此時TCP握三次手完成), 後續能夠再調用accept拿到隊列的請求,返回一個新的socket, 這個socket是由四元組創建的, 也就對應了一個惟一的鏈接。程序員
說完這些,能夠來聊一聊nodejs是怎樣創建一個TCP服務的了。編程
通常咱們用nodejs啓動一個TCP服務多是這樣的:服務器
require('net').createServer(function(sock) { sock.on('data', function(data) { sock.write('Hello world'); }); }).listen(8080, '127.0.0.1');
進到createServer
一看(代碼都在net模塊中),裏面return了一個Server
對象,Server
繼承EventEmitter
,將createServer
的參數作爲connection
事件的回調函數,這塊比較簡單就不貼代碼了。咱們須要關注的是Server
的listen
方法,其不一樣的參數最終都會調用到listenInCluster
方法。cluster!是的這和cluster有關,但先無論它,咱們先管在主進程中它的執行:網絡
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { // ... if (cluster.isMaster || exclusive) { // ... server._listen2(address, port, addressType, backlog, fd); return; } // ... }
從代碼咱們能夠看到listenInCluster
最終是調用了_listen2
方法,它就是服務啓動的關鍵,其定義以下:併發
function setupListenHandle(address, port, addressType, backlog, fd) { // ... var rval = null; // ... if (rval === null) rval = createServerHandle(address, port, addressType, fd); // ... this._handle = rval; // ... this._handle.onconnection = onconnection; this._handle.owner = this; var err = this._handle.listen(backlog || 511); // ... }
其中createServerHandle
方法就不展開了,它就如以前所說的:把socket和服務器某個端口(ip+端口)bind起來, 這樣構成了一個特殊的socket, 這個socket沒有目標ip和端口。它綁定了address+port
並返回了一個特殊socket(句柄)rval
,能夠看到最後它調用了listen對端口進行監聽,而且指定了一個回調函數onconnection
,函數會在C++層當accept請求時觸發,其回調參數之一就是前面提到的accept後與客戶端鏈接的新socket句柄。到這裏再看一下onconnection
的代碼:負載均衡
function onconnection(err, clientHandle) { // ... var self = handle.owner; var socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen, pauseOnCreate: self.pauseOnConnect }); socket.readable = socket.writable = true; // ... self.emit('connection', socket); }
能夠看到nodejs在對socket句柄進一步封裝後(封裝成nodejs的Socket對象),再觸發server(由createServer建立)的connection
事件。這時咱們再回到前面createServer
的介紹,其監聽了connection
事件,因此最終流程走下來createServer
的的方法參數將被觸發,而且能夠拿到一個nodejs的Socket對象進行write與read操做,與客戶端進行通訊。socket
至此咱們已經對nodejs啓動一個TCP服務的流程有了瞭解,接下來就到主題cluster了。
開始說代碼以前,先來聊一聊喂鴿子吧。假設你坐在布拉格廣場前靜靜地坐着,而後往前面撒了一把狗糧,喔不對是鴿糧,而後周圍的一羣鴿子都震驚了並往你這邊飛搶東西吃。這個現象能夠用一個詞來形容就是「驚羣「。然而這只是個人瞎掰,咱們程序員理解的驚羣應該是:多個進程/線程同時阻塞等待某個事件,當事件發生時喚醒了全部等待的進程/線程,但最終只有一個能對事件進行處理。很明顯這對cpu形成了浪費,而cluster的多進程模型對此作了處理:只用一個master進程等待請求,而後有請求到來時使用round-robin輪詢分配請求給各個子進程進行處理,這塊後面提到的源碼會涉及到,這裏就不深刻了。除了round-robin,還有其餘的一些cluster爲咱們作的,就用代碼來talk吧:
const cluster = require('cluster'); const http = require('http'); if (cluster.isMaster) { const numCPUs = require('os').cpus().length; for (let i = 0; i < numCPUs; i++) { cluster.fork(); } } else { // Worker processes have a http server. http.Server((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); }
以上代碼就是cluster的典型用法,在nodejs啓動文件判斷當前進程,若是當前進程是master進程,那麼就根據cpu的核數fork出相同數量的進程,不然(worker進程)就啓動一個http服務,因此通常這樣會給一個核心分配一個worker進程來啓動一個服務,搭起一個小服務集羣。可是問題來了,爲何這裏能夠有多個進程同時監聽一個端口呢,是由於listen作的一些文章,下面再一步步深刻解析。因爲http.Server實際上是繼承了net.Server,因此跟前面建立TCP服務同樣,listen
最終也是調用到listenInCluster
,咱們從這裏從新開始。
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { // ... const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // ... }
listenInCluster
在worker進程中調用cluster._getServer
,而且傳入了一個函數listenOnMasterHandle
。這裏還不知道它作了什麼,因此再進入cluster._getServer
看看(因爲當前是在worker進程,cluster模塊文件是lib/internal/cluster/child.js
):
cluster._getServer = function(obj, options, cb) { // ... const message = util._extend({ act: 'queryServer', index: indexes[indexesKey], data: null }, options); send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); // ... };
關注send
方法,它調用了sendHelper
方法,該方法是在internal/cluster/utils
定義的,至關一個消息轉發器處理進程間通訊,它發送一個「進程內部消息「(internalMessage
),而worker進程在master進程被fork出來的時候監聽了internalMessage
:
// lib/internal/cluster/master.js worker.process.on('internalMessage', internal(worker, onmessage));
因此最終在worker進程發送的消息,觸發了master進程執行了onmessage
方法,onmessage
判斷message.act === 'queryServer'
執行queryServer
,而就是在這個方法中,新建了一個RoundRobinHandle
調度器,就是這個東西分配請求作了負載均衡。這裏用地址和端口號做爲key將調度器存儲起來,調度器不會被worker建立兩次,最後將worker進程add到隊列。相關代碼以下:
// lib/internal/cluster/master.js function queryServer(worker, message) { // ... var handle = handles[key]; if (handle === undefined) { var constructor = RoundRobinHandle; // ... handles[key] = handle = new constructor(key, message.address, message.port, message.addressType, message.fd, message.flags); } // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { // ... }); }
而後咱們再來看看RoundRobinHandle
,它裏面調用net.createServer
方法新建了一個server,而且開始監聽,這塊能夠看前面內容。不過與前面不一樣的是,server在listening
事件完成時拿到監聽端口的那個特殊socket句柄,重置了onconnection
方法,當新的鏈接創建時方法被調用,將accept鏈接的socket句柄分發到隊列裏的worker進行處理(distribute)。對於listening
事件,它在Server.listen
執行後就會觸發,代碼就在setupListenHandle
方法裏面。RoundRobinHandle
代碼以下:
// lib/internal/cluster/round_robin_handle.js function RoundRobinHandle(key, address, port, addressType, fd) { // ... this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address); // UNIX socket path. this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); // ... }); } RoundRobinHandle.prototype.distribute = function(err, handle) { this.handles.push(handle); const worker = this.free.shift(); if (worker) this.handoff(worker); }; RoundRobinHandle.prototype.handoff = function(worker) { // ... const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { // ... }); };
從代碼上看到最終調度器調用handoff
方法,經過sendHelper
向worker進程發送一個新鏈接到達的消息newconn
,執行worker進程的server的onconnection
方法,worker進程相關代碼以下:
// lib/internal/cluster/child.js cluster._setupWorker = function() { // ... process.on('internalMessage', internal(worker, onmessage)); send({ act: 'online' }); function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } }; // Round-robin connection. function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); }
走到這裏worker進程的server就拿到了鏈接的socket句柄能夠進行處理,可是好像有點問題,worker進程的server好像還沒起起來啊,前面講的只是在master進程的調度器啓動了一個server,worker進程並無server。咱們又得翻回前面的內容看一看了,看看以前提到的workder進程的cluster._getServer
,裏面send
方法發送了一個函數,函數裏面的rr(reply, indexesKey, cb);
就是建立了workder進程server的代碼。
先來看看cluster._getServer
中發送的函數怎麼被調用的。這裏須要來了解一下以前出現了幾回的sendHelper
,它是cluster模塊用來作進程間通訊的,另外還有一個internal
方法用來處理通訊的回調。cluster._getServer
的send
會調用sendHelper
,它會用message.seq
當key把send的函數存儲起來。而後在internal
方法處理通訊的回調時判斷message是否有這個key,是否能找到這個函數,能夠的話就執行。而在master進程執行queryServer
把worker進程添加到調度器中時添加了一個回調函數,回調函數send了一個帶seq的消息,而且handle爲null,就是這個消息觸發了cluster._getServer
發送的函數的執行。相關代碼以下:
// `internal/cluster/utils.js` const callbacks = {}; var seq = 0; function sendHelper(proc, message, handle, cb) { // ... if (typeof cb === 'function') callbacks[seq] = cb; message.seq = seq; // ... } function internal(worker, cb) { return function onInternalMessage(message, handle) { // ... var fn = cb; if (message.ack !== undefined && callbacks[message.ack] !== undefined) { fn = callbacks[message.ack]; delete callbacks[message.ack]; } // ... }; } // lib/internal/cluster/master.js function queryServer(worker, message) { // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { reply = util._extend({ // ... ack: message.seq, // ... }, reply); // ... send(worker, reply, handle); });
最終,rr(reply, indexesKey, cb);
執行,它構造了一個假的socket句柄,句柄設置了一個不作操做的listen方法。而後執行cb,這個cb也就是前面提到過的listenOnMasterHandle
,它會把假socket句柄賦值給worker進程的server._handle
,隨後因爲server._handle
的存在,server._listen2(address, port, addressType, backlog, fd);
也不會作任何操做,也就是說worker進程建立的server是不會對端口進行監聽的。相關代碼以下:
// lib/internal/cluster/child.js function rr(message, indexesKey, cb) { function listen(backlog) { // ... return 0; } // ... cb(0, handle); } // lib/net.js function listenOnMasterHandle(err, handle) { // ... server._handle = handle; server._listen2(address, port, addressType, backlog, fd); } // setupListenHandle就是_listen2 function setupListenHandle(address, port, addressType, backlog, fd) { // ... if (this._handle) { debug('setupListenHandle: have a handle already'); } // ...
至此,cluster模塊如何創建多進程服務的就算講完了。畫個草圖總結下吧: