Node.js默認單進程運行,對於32位系統最高可使用512MB內存,對於64位最高可使用1GB內存。對於多核CPU的計算機來講,這樣作效率很低,由於只有一個核在運行,其餘核都在閒置。cluster模塊就是爲了解決這個問題而提出的。node
cluster模塊容許設立一個主進程和若干個worker進程,由主進程監控和協調worker進程的運行。worker之間採用進程間通訊交換消息,cluster模塊內置一個負載均衡器,採用Round-robin算法協調各個worker進程之間的負載。運行時,全部新創建的連接都由主進程完成,而後主進程再把TCP鏈接分配給指定的worker進程。nginx
var cluster = require('cluster');
var os = require('os');
if (cluster.isMaster){
for (var i = 0, n = os.cpus().length; i < n; i += 1){
cluster.fork();
}
} else {
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}
複製代碼
上面代碼先判斷當前進程是否爲主進程(cluster.isMaster),若是是的,就按照CPU的核數,新建若干個worker進程;若是不是,說明當前進程是worker進程,則在該進程啓動一個服務器程序。算法
上面這段代碼有一個缺點,就是一旦work進程掛了,主進程沒法知道。爲了解決這個問題,能夠在主進程部署online事件和exit事件的監聽函數。express
var cluster = require('cluster');
if(cluster.isMaster) {
var numWorkers = require('os').cpus().length;
console.log('Master cluster setting up ' + numWorkers + ' workers...');
for(var i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('online', function(worker) {
console.log('Worker ' + worker.process.pid + ' is online');
});
cluster.on('exit', function(worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
cluster.fork();
});
}
複製代碼
上面代碼中,主進程一旦監聽到worker進程的exit事件,就會重啓一個worker進程。worker進程一旦啓動成功,能夠正常運行了,就會發出online事件。bash
worker對象是cluster.fork()
的返回值,表明一個worker進程。服務器
它的屬性和方法以下。app
(1)worker.id負載均衡
worker.id返回當前worker的獨一無二的進程編號。這個編號也是cluster.workers中指向當前進程的索引值。socket
(2)worker.process函數
全部的worker進程都是用child_process.fork()生成的。child_process.fork()返回的對象,就被保存在worker.process之中。經過這個屬性,能夠獲取worker所在的進程對象。
(3)worker.send()
該方法用於在主進程中,向子進程發送信息。
if (cluster.isMaster) {
var worker = cluster.fork();
worker.send('hi there');
} else if (cluster.isWorker) {
process.on('message', function(msg) {
process.send(msg);
});
}
複製代碼
上面代碼的做用是,worker進程對主進程發出的每一個消息,都作回聲。
在worker進程中,要向主進程發送消息,使用process.send(message)
;要監聽主進程發出的消息,使用下面的代碼。
process.on('message', function(message) {
console.log(message);
});
複製代碼
發出的消息能夠字符串,也能夠是JSON對象。下面是一個發送JSON對象的例子。
worker.send({
type: 'task 1',
from: 'master',
data: {
// the data that you want to transfer
}
});
複製代碼
該對象只有主進程纔有,包含了全部worker進程。每一個成員的鍵值就是一個worker進程對象,鍵名就是該worker進程的worker.id屬性。
function eachWorker(callback) {
for (var id in cluster.workers) {
callback(cluster.workers[id]);
}
}
eachWorker(function(worker) {
worker.send('big announcement to all workers');
});
複製代碼
上面代碼用來遍歷全部worker進程。
當前socket的data事件,也能夠用id屬性識別worker進程。
socket.on('data', function(id) {
var worker = cluster.workers[id];
});
複製代碼
isMaster屬性返回一個布爾值,表示當前進程是否爲主進程。這個屬性由process.env.NODE_UNIQUE_ID決定,若是process.env.NODE_UNIQUE_ID爲未定義,就表示該進程是主進程。
isWorker屬性返回一個布爾值,表示當前進程是否爲work進程。它與isMaster屬性的值正好相反。
fork方法用於新建一個worker進程,上下文都複製主進程。只有主進程才能調用這個方法。
該方法返回一個worker對象。
kill方法用於終止worker進程。它能夠接受一個參數,表示系統信號。
若是當前是主進程,就會終止與worker.process的聯絡,而後將系統信號法發向worker進程。若是當前是worker進程,就會終止與主進程的通訊,而後退出,返回0。
在之前的版本中,該方法也叫作 worker.destroy() 。
worker進程調用listening方法之後,「listening」事件就傳向該進程的服務器,而後傳向主進程。
該事件的回調函數接受兩個參數,一個是當前worker對象,另外一個是地址對象,包含網址、端口、地址類型(IPv四、IPv六、Unix socket、UDP)等信息。這對於那些服務多個網址的Node應用程序很是有用。
cluster.on('listening', function (worker, address) {
console.log("A worker is now connected to " + address.address + ":" + address.port);
});
複製代碼
重啓服務須要關閉後再啓動,利用cluster模塊,能夠作到先啓動一個worker進程,再把原有的全部work進程關閉。這樣就能實現不中斷地重啓Node服務。
首先,主進程向worker進程發出重啓信號。
workers[wid].send({type: 'shutdown', from: 'master'});
複製代碼
worker進程監聽message事件,一旦發現內容是shutdown,就退出。
process.on('message', function(message) {
if(message.type === 'shutdown') {
process.exit(0);
}
});
複製代碼
下面是一個關閉全部worker進程的函數。
function restartWorkers() {
var wid, workerIds = [];
for(wid in cluster.workers) {
workerIds.push(wid);
}
workerIds.forEach(function(wid) {
cluster.workers[wid].send({
text: 'shutdown',
from: 'master'
});
setTimeout(function() {
if(cluster.workers[wid]) {
cluster.workers[wid].kill('SIGKILL');
}
}, 5000);
});
};
複製代碼
下面是一個完整的實例,先是主進程的代碼master.js。
var cluster = require('cluster');
console.log('started master with ' + process.pid);
// 新建一個worker進程
cluster.fork();
process.on('SIGHUP', function () {
console.log('Reloading...');
var new_worker = cluster.fork();
new_worker.once('listening', function () {
// 關閉全部其餘worker進程
for(var id in cluster.workers) {
if (id === new_worker.id.toString()) continue;
cluster.workers[id].kill('SIGTERM');
}
});
});
複製代碼
上面代碼中,主進程監聽SIGHUP事件,若是發生該事件就關閉其餘全部worker進程。之因此是SIGHUP事件,是由於nginx服務器監聽到這個信號,會創造一個新的worker進程,從新加載配置文件。另外,關閉worker進程時,主進程發送SIGTERM信號,這是由於Node容許多個worker進程監聽同一個端口。
下面是worker進程的代碼server.js。
var cluster = require('cluster');
if (cluster.isMaster) {
require('./master');
return;
}
var express = require('express');
var http = require('http');
var app = express();
app.get('/', function (req, res) {
res.send('ha fsdgfds gfds gfd!');
});
http.createServer(app).listen(8080, function () {
console.log('http://localhost:8080');
});
複製代碼
使用時代碼以下。
$ node server.js
started master with 10538
http://localhost:8080
複製代碼
而後,向主進程連續發出兩次SIGHUP信號。
$ kill -SIGHUP 10538
$ kill -SIGHUP 10538
複製代碼
主進程會連續兩次新建一個worker進程,而後關閉全部其餘worker進程,顯示以下。
Reloading...
http://localhost:8080
Reloading...
http://localhost:8080
複製代碼
最後,向主進程發出SIGTERM信號,關閉主進程。
$ kill 10538
複製代碼
PM2模塊是cluster模塊的一個包裝層。它的做用是儘可能將cluster模塊抽象掉,讓用戶像使用單進程同樣,部署多進程Node應用。
// app.js
var http = require('http');
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world");
}).listen(8080);
複製代碼
上面代碼是標準的Node架設Web服務器的方式,而後用PM2從命令行啓動這段代碼。
$ pm2 start app.js -i 4
複製代碼
上面代碼的i參數告訴PM2,這段代碼應該在cluster_mode啓動,且新建worker進程的數量是4個。若是i參數的值是0,那麼當前機器有幾個CPU內核,PM2就會啓動幾個worker進程。
若是一個worker進程因爲某種緣由掛掉了,會馬上重啓該worker進程。
# 重啓全部worker進程
$ pm2 reload all
複製代碼
每一個worker進程都有一個id,能夠用下面的命令查看單個worker進程的詳情。
$ pm2 show <worker id>
複製代碼
正確狀況下,PM2採用fork模式新建worker進程,即主進程fork自身,產生一個worker進程。pm2 reload
命令則會用spawn方式啓動,即一個接一個啓動worker進程,一個新的worker啓動成功,再殺死一箇舊的worker進程。採用這種方式,從新部署新版本時,服務器就不會中斷服務。
$ pm2 reload <腳本文件名>
複製代碼
關閉worker進程的時候,能夠部署下面的代碼,讓worker進程監聽shutdown消息。一旦收到這個消息,進行完畢收尾清理工做再關閉。
process.on('message', function(msg) {
if (msg === 'shutdown') {
close_all_connections();
delete_logs();
server.close();
process.exit(0);
}
});
複製代碼