衆所周知Node基於V8,而在V8中JavaScript是單線程運行的,這裏的單線程不是指Node啓動的時候就只有一個線程,而是說運行JavaScript代碼是在單線程上,Node還有其餘線程,好比進行異步IO操做的IO線程。這種單線程模型帶來的好處就是系統調度過程當中不會頻繁進行上下文切換,提高了單核CPU的利用率。javascript
可是這種作法有個缺陷,就是咱們沒法利用服務器CPU多核的性能,一個Node進程只能利用一個CPU。並且單線程模式下一旦代碼崩潰就是整個程序崩潰。一般解決方案就是使用Node的cluster模塊,經過master-worker
模式啓用多個進程實例。下面咱們詳細講述下,Node如何使用多進程模型利用多核CPU,以及自帶的cluster模塊具體的工做原理。html
node提供了child_process
模塊用來進行子進程的建立,該模塊一共有四個方法用來建立子進程。java
const { spawn, exec, execFile, fork } = require('child_process') spawn(command[, args][, options]) exec(command[, options][, callback]) execFile(file[, args][, options][, callback]) fork(modulePath[, args][, options])
首先認識一下spawn方法,下面是Node文檔的官方實例。node
const { spawn } = require('child_process'); const child = spawn('ls', ['-lh', '/home']); child.on('close', (code) => { console.log(`子進程退出碼:${code}`); }); const { stdin, stdout, stderr } = child stdout.on('data', (data) => { console.log(`stdout: ${data}`); }); stderr.on('data', (data) => { console.log(`stderr: ${data}`); });
經過spawn建立的子進程,繼承自EventEmitter,因此能夠在上面進行事件(discount
,error
,close
,message
)的監聽。同時子進程具備三個輸入輸出流:stdin、stdout、stderr,經過這三個流,能夠實時獲取子進程的輸入輸出和錯誤信息。linux
這個方法的最終實現基於libuv,這裏再也不展開討論,感興趣能夠查看源碼。nginx
// 調用libuv的api,初始化一個進程 int err = uv_spawn(env->event_loop(), &wrap->process_, &options);
之因此把這兩個放到一塊兒,是由於exec最後調用的就是execFile方法,源碼在這裏)。惟一的區別是,exec中調用的normalizeExecArgs
方法會將opts的shell屬性默認設置爲true。git
exports.exec = function exec(/* command , options, callback */) { const opts = normalizeExecArgs.apply(null, arguments); return exports.execFile(opts.file, opts.options, opts.callback); }; function normalizeExecArgs(command, options, callback) { options = { ...options }; options.shell = typeof options.shell === 'string' ? options.shell : true; return { options }; }
在execFile中,最終調用的是spawn
方法。github
exports.execFile = function execFile(file /* , args, options, callback */) { let args = []; let callback; let options; var child = spawn(file, args, { // ... some options }); return child; }
exec會將spawn的輸入輸出流轉換成String,默認使用UTF-8的編碼,而後傳遞給回調函數,使用回調方式在node中較爲熟悉,比流更容易操做,因此咱們能使用exec方法執行一些shell
命令,而後在回調中獲取返回值。有點須要注意,這裏的buffer是有最大緩存區的,若是超出會直接被kill掉,可用經過maxBuffer屬性進行配置(默認: 200*1024)。shell
const { exec } = require('child_process'); exec('ls -lh /home', (error, stdout, stderr) => { console.log(`stdout: ${stdout}`); console.log(`stderr: ${stderr}`); });
fork最後也是調用spawn來建立子進程,可是fork是spawn的一種特殊狀況,用於衍生新的 Node.js 進程,會產生一個新的V8實例,因此執行fork方法時須要指定一個js文件。數據庫
exports.fork = function fork(modulePath /* , args, options */) { // ... options.shell = false; return spawn(options.execPath, args, options); };
經過fork建立子進程以後,父子進程直接會建立一個IPC(進程間通訊)通道,方便父子進程直接通訊,在js層使用 process.send(message)
和 process.on('message', msg => {})
進行通訊。而在底層,實現進程間通訊的方式有不少,Node的進程間通訊基於libuv實現,不一樣操做系統實現方式不一致。在*unix系統中採用Unix Domain Socket方式實現,Windows中使用命名管道的方式實現。
常見進程間通訊方式:消息隊列、共享內存、pipe、信號量、套接字
下面是一個父子進程通訊的實例。
parent.js
const path = require('path') const { fork } = require('child_process') const child = fork(path.join(__dirname, 'child.js')) child.on('message', msg => { console.log('message from child', msg) }); child.send('hello child, I\'m master')
child.js
process.on('message', msg => { console.log('message from master:', msg) }); let counter = 0 setInterval(() => { process.send({ child: true, counter: counter++ }) }, 1000);
其實能夠看到,這些方法都是對spawn方法的複用,而後spawn方法底層調用了libuv進行進程的管理,具體能夠看下圖。
master-worker
模型首先來看看,若是咱們在child.js
中啓動一個http服務會發生什麼狀況。
// master.js const { fork } = require('child_process') for (let i = 0; i < 2; i++) { const child = fork('./child.js') } // child.js const http = require('http') http.createServer((req, res) => { res.end('Hello World\n'); }).listen(8000)
+--------------+ | | | master | | | +--------+--------------+- -- -- - | | | Error: listen EADDRINUSE | | | +----v----+ +-----v---+ | | | | | worker1 | | worker2 | | | | | +---------+ +---------+ :8000 :8000
咱們fork了兩個子進程,由於兩個子進程同時對一個端口進行監聽,Node會直接拋出一個異常(Error: listen EADDRINUSE
),如上圖所示。那麼咱們能不能使用代理模式,同時監聽多個端口,讓master進程監聽80端口收到請求時,再將請求分發給不一樣服務,並且master進程還能作適當的負載均衡。
+--------------+ | | | master | | :80 | +--------+--------------+---------+ | | | | | | | | +----v----+ +-----v---+ | | | | | worker1 | | worker2 | | | | | +---------+ +---------+ :8000 :8001
可是這麼作又會帶來另外一個問題,代理模式中十分消耗文件描述符(linux系統默認的最大文件描述符限制是1024),文件描述符在windows系統中稱爲句柄(handle),習慣性的咱們也能夠稱linux中的文件描述符爲句柄。當用戶進行訪問,首先鏈接到master進程,會消耗一個句柄,而後master進程再代理到worker進程又會消耗掉一個句柄,因此這種作法十分浪費系統資源。爲了解決這個問題,Node的進程間通訊能夠發送句柄,節省系統資源。
句柄是一種特殊的智能指針 。當一個應用程序要引用其餘系統(如數據庫、操做系統)所管理的內存塊或對象時,就要使用句柄。
咱們能夠在master進程啓動一個tcp服務,而後經過IPC將服務的句柄發送給子進程,子進程再對服務的鏈接事件進行監聽,具體代碼以下:
// master.js var { fork } = require('child_process') var server = require('net').createServer() server.on('connection', function(socket) { socket.end('handled by master') // 響應來自master }) server.listen(3000, function() { console.log('master listening on: ', 3000) }) for (var i = 0; i < 2; i++) { var child = fork('./child.js') child.send('server', server) // 發送句柄給worker console.log('worker create, pid is ', child.pid) } // child.js process.on('message', function (msg, handler) { if (msg !== 'server') { return } // 獲取到句柄後,進行請求的監聽 handler.on('connection', function(socket) { socket.end('handled by worker, pid is ' + process.pid) }) })
下面咱們經過curl
連續請求 5 次服務。
for varible1 in {1..5} do curl "localhost:3000" done
能夠看到,響應請求的能夠是父進程,也能夠是不一樣子進程,多個進程對同一個服務響應的鏈接事件監聽,誰先搶佔,就由誰進行響應。這裏就會出現一個Linux網絡編程中很常見的事件,當多個進程同時監聽網絡的鏈接事件,當這個有新的鏈接到達時,這些進程被同時喚醒,這被稱爲「驚羣」。這樣致使的狀況就是,一旦事件到達,每一個進程同時去響應這一個事件,而最終只有一個進程能處理事件成功,其餘的進程在處理該事件失敗後從新休眠,形成了系統資源的浪費。
ps:在windows系統上,永遠都是最後定義的子進程搶佔到句柄,這可能和libuv的實現機制有關,具體緣由往有大佬可以指點。
出現這樣的問題確定是你們都不肯意的嘛,這個時候咱們就想起了nginx
的好了,這裏有篇文章講解了nginx是如何解決「驚羣」的,利用nginx的反向代理能夠有效地解決這個問題,畢竟nginx原本就很擅長這種問題。
http { upstream node { server 127.0.0.1:8000; server 127.0.0.1:8001; server 127.0.0.1:8002; server 127.0.0.1:8003; keepalive 64; } server { listen 80; server_name shenfq.com; location / { proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $http_host; proxy_set_header X-Nginx-Proxy true; proxy_set_header Connection ""; proxy_pass http://node; # 這裏要和最上面upstream後的應用名一致,能夠自定義 } } }
若是咱們本身用Node原生來實現一個多進程模型,存在這樣或者那樣的問題,雖然最終咱們藉助了nginx達到了這個目的,可是使用nginx的話,咱們須要另外維護一套nginx的配置,並且若是有一個Node服務掛了,nginx並不知道,仍是會將請求轉發到那個端口。
除了用nginx作反向代理,node自己也提供了一個cluster
模塊,用於多核CPU環境下多進程的負載均衡。cluster模塊建立子進程本質上是經過child_procee.fork,利用該模塊能夠很容易的建立共享同一端口的子進程服務器。
有了這個模塊,你會感受實現Node的單機集羣是多麼容易的一件事情。下面看看官方實例,短短的十幾行代碼就實現了一個多進程的Node服務,且自帶負載均衡。
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // 判斷是否爲主進程 console.log(`主進程 ${process.pid} 正在運行`); // 衍生工做進程。 for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`工做進程 ${worker.process.pid} 已退出`); }); } else { // 子進程進行服務器建立 // 工做進程能夠共享任何 TCP 鏈接。 // 在本例子中,共享的是一個 HTTP 服務器。 http.createServer((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); console.log(`工做進程 ${process.pid} 已啓動`); }
首先看代碼,經過isMaster
來判斷是否爲主進程,若是是主進程進行fork操做,子進程建立服務器。這裏cluster進行fork操做時,執行的是當前文件。cluster.fork
最終調用的child_process.fork
,且第一個參數爲process.argv.slice(2)
,在fork子進程以後,會對其internalMessage事件進行監聽,這個後面會提到,具體代碼以下:
const { fork } = require('child_process'); cluster.fork = function(env) { cluster.setupMaster(); const id = ++ids; const workerProcess = createWorkerProcess(id, env); const worker = new Worker({ id: id, process: workerProcess }); // 監聽子進程的消息 worker.process.on('internalMessage', internal(worker, onmessage)); // ... }; // 配置master進程 cluster.setupMaster = function(options) { cluster.settings = { args: process.argv.slice(2), exec: process.argv[1], execArgv: process.execArgv, silent: false, ...cluster.settings, ...options }; }; // 建立子進程 function createWorkerProcess(id, env) { return fork(cluster.settings.exec, cluster.settings.args, { // some options }); }
這裏會有一個問題,子進程所有都在監聽同一個端口,咱們以前已經試驗過,服務監聽同一個端口會出現端口占用的問題,那麼cluster模塊如何保證端口不衝突的呢? 查閱源碼發現,http模塊的createServer繼承自net模塊。
util.inherits(Server, net.Server);
而在net模塊中,listen方法會調用listenInCluster方法,listenInCluster判斷當前是否爲master進程。
Server.prototype.listen = function(...args) { // ... if (typeof options.port === 'number' || typeof options.port === 'string') { // 若是listen方法只傳入了端口號,最後會走到這裏 listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); return this; } // ... }; function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { if (cluster === undefined) cluster = require('cluster'); if (cluster.isMaster) { // 若是是主進程則啓動一個服務 // 可是主進程沒有調用過listen方法,因此沒有走這裏一步 server._listen2(address, port, addressType, backlog, fd, flags); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // 子進程獲取主進程服務的句柄 cluster._getServer(server, serverQuery, listenOnMasterHandle); function listenOnMasterHandle(err, handle) { server._handle = handle; // 重寫handle,對listen方法進行了hack server._listen2(address, port, addressType, backlog, fd, flags); } }
看上面代碼能夠知道,真正啓動服務的方法爲server._listen2
。在_listen2
方法中,最終調用的是_handle
下的listen方法。
function setupListenHandle(address, port, addressType, backlog, fd, flags) { // ... this._handle.onconnection = onconnection; var err = this._handle.listen(backlog || 511); // ... } Server.prototype._listen2 = setupListenHandle; // legacy alias
那麼cluster._getServer
方法到底作了什麼呢?
搜尋它的源碼,首先向master進程發送了一個消息,消息類型爲queryServer
。
// child.js cluster._getServer = function(obj, options, cb) { // ... const message = { act: 'queryServer', index, data: null, ...options }; // 發送消息到master進程,消息類型爲 queryServer send(message, (reply, handle) => { rr(reply, indexesKey, cb); // Round-robin. }); // ... };
這裏的rr方法,對前面提到的_handle.listen
進行了hack,全部子進程的listen實際上是不起做用的。
function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; function listen(backlog) { // listen方法直接返回0,再也不進行端口監聽 return 0; } function close() { send({ act: 'close', key }); } function getsockname(out) { return 0; } const handle = { close, listen, ref: noop, unref: noop }; handles.set(key, handle); // 根據key將工做進程的 handle 進行緩存 cb(0, handle); } // 這裏的cb回調就是前面_getServer方法傳入的。 參考以前net模塊的listen方法 function listenOnMasterHandle(err, handle) { server._handle = handle; // 重寫handle,對listen方法進行了hack // 該方法調用後,會對handle綁定一個 onconnection 方法,最後會進行調用 server._listen2(address, port, addressType, backlog, fd, flags); }
那麼到底在哪裏對端口進行了監聽呢?
前面提到過,fork子進程的時候,對子進程進行了internalMessage事件的監聽。
worker.process.on('internalMessage', internal(worker, onmessage));
子進程向master進程發送消息,通常使用process.send
方法,會被監聽的message
事件所接收。這裏是由於發送的message指定了cmd: 'NODE_CLUSTER'
,只要cmd字段以NODE_
開頭,這樣消息就會認爲是內部通訊,被internalMessage事件所接收。
// child.js function send(message, cb) { return sendHelper(process, message, null, cb); } // utils.js function sendHelper(proc, message, handle, cb) { if (!proc.connected) return false; // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js message = { cmd: 'NODE_CLUSTER', ...message, seq }; if (typeof cb === 'function') callbacks.set(seq, cb); seq += 1; return proc.send(message, handle); }
master進程接收到消息後,根據act的類型開始執行不一樣的方法,這裏act爲queryServer
。queryServer方法會構造一個key,若是這個key(規則主要爲地址+端口+文件描述符)以前不存在,則對RoundRobinHandle
構造函數進行了實例化,RoundRobinHandle構造函數中啓動了一個TCP服務,並對以前指定的端口進行了監聽。
// master.js const handles = new Map(); function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); else if (message.act === 'queryServer') queryServer(worker, message); // other act logic } function queryServer(worker, message) { // ... const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; var handle = handles.get(key); // 若是以前沒有對該key進行實例化,則進行實例化 if (handle === undefined) { let address = message.address; // const RoundRobinHandle = require('internal/cluster/round_robin_handle'); var constructor = RoundRobinHandle; handle = new constructor(key, address, message.port, message.addressType, message.fd, message.flags); handles.set(key, handle); } // ... } // internal/cluster/round_robin_handle function RoundRobinHandle(key, address, port, addressType, fd, flags) { this.server = net.createServer(assert.fail); // 這裏啓動一個TCP服務器 this.server.listen({ port, host }); // TCP服務器啓動時的事件 this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); }); // ... }
能夠看到TCP服務啓動後,立馬對connection
事件進行了監聽,會調用RoundRobinHandle的distribute方法。
// RoundRobinHandle this.handle.onconnection = (err, handle) => this.distribute(err, handle); // distribute 對工做進程進行分發 RoundRobinHandle.prototype.distribute = function(err, handle) { this.handles.push(handle); // 存入TCP服務的句柄 const worker = this.free.shift(); // 取出第一個工做進程 if (worker) this.handoff(worker); // 切換到工做進程 }; RoundRobinHandle.prototype.handoff = function(worker) { const handle = this.handles.shift(); // 獲取TCP服務句柄 if (handle === undefined) { this.free.push(worker); // 將該工做進程從新放入隊列中 return; } const message = { act: 'newconn', key: this.key }; // 向工做進程發送一個類型爲 newconn 的消息以及TCP服務的句柄 sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // 工做進程不能正常運行,啓動下一個 this.handoff(worker); }); };
在子進程中也有對內部消息進行監聽,在cluster/child.js
中,有個cluster._setupWorker
方法,該方法會對內部消息監聽,該方法的在lib/internal/bootstrap/node.js
中調用,這個文件是每次啓動node命令後,由C++模塊調用的。
function startup() { // ... startExecution(); } function startExecution() { // ... prepareUserCodeExecution(); } function prepareUserCodeExecution() { if (process.argv[1] && process.env.NODE_UNIQUE_ID) { const cluster = NativeModule.require('cluster'); cluster._setupWorker(); delete process.env.NODE_UNIQUE_ID; } } startup()
下面看看_setupWorker方法作了什麼。
cluster._setupWorker = function() { // ... process.on('internalMessage', internal(worker, onmessage)); function onmessage(message, handle) { // 若是act爲 newconn 調用onconnection方法 if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } }; function onconnection(message, handle) { const key = message.key; const server = handles.get(key); const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); // 調用net中的onconnection方法 }
最後子進程獲取到客戶端句柄後,調用net模塊的onconnection,對Socket進行實例化,後面就與其餘http請求的邏輯一致了,再也不細講。
至此,cluster模塊的邏輯就走通了。