cluster
是一個nodejs
內置的模塊,用於nodejs
多核處理。cluster
模塊,能夠幫助咱們簡化多進程並行化程序的開發難度,輕鬆構建一個用於負載均衡的集羣。html
環境:node
MacOS 10.14
Node v8.11.3
npm 6.4.0npm
master
是總控節點,worker
是運行節點。而後根據CPU
的數量,啓動worker
。
咱們能夠先查看一下本身電腦CPU的核數和線程:bash
sysctl machdep.cpu
複製代碼
machdep.cpu.core_count: 6
machdep.cpu.thread_count: 12
複製代碼
6核12線程併發
新建app.js
app
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
// 是不是主節點
if (cluster.isMaster) {
console.log("master start...");
// Fork workers.
for (var i = 0; i < numCPUs; i++) {
// 監聽建立worker進程事件
cluster.fork();
}
// 監聽worker
cluster.on('listening', function (worker, address) {
// address對象包含鏈接屬性信息
console.log('listening: worker ' + worker.process.pid + ', Address: ' + address.address + ":" + address.port + "," + address.addressType);
// 3秒後殺掉全部worker
setTimeout(() => {
worker.kill()
}, 3000)
});
// 監聽worker退出事件,code進程非正常退出的錯誤code,signal致使進程被殺死的信號名稱
cluster.on('exit', function (worker, code, signal) {
// console.log('worker ' + worker.process.pid + ' died');
console.log('工做進程 %d 關閉 (%s)(%s). 重啓中...', worker.process.pid, signal || code);
// 退出以後重啓
// cluster.fork();
});
} else {
console.log('createServer...')
http.createServer(function (req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(0);
}
複製代碼
咱們設定啓動以後三秒再殺掉全部進程
啓動負載均衡
➜ node app.js
master start...
createServer...
createServer...
createServer...
listening: worker 29374, Address: null:61186,4
createServer...
listening: worker 29375, Address: null:61186,4
listening: worker 29376, Address: null:61186,4
createServer...
listening: worker 29377, Address: null:61186,4
createServer...
createServer...
listening: worker 29378, Address: null:61186,4
createServer...
listening: worker 29379, Address: null:61186,4
listening: worker 29380, Address: null:61186,4
createServer...
createServer...
listening: worker 29381, Address: null:61186,4
createServer...
listening: worker 29382, Address: null:61186,4
listening: worker 29383, Address: null:61186,4
createServer...
listening: worker 29384, Address: null:61186,4
listening: worker 29385, Address: null:61186,4
工做進程 29374 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29375 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29376 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29377 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29378 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29379 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29380 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29381 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29382 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29383 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29384 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29385 關閉 (SIGTERM)(%s). 重啓中...
複製代碼
電腦有12個線程,因此在總控結點master
啓動後,生成了12個運行節點worker
殺掉進程後,能夠經過exit
事件監聽到,還能夠經過fork()
重啓curl
每一個worker
進程經過使用child_process.fork()
函數,基於IPC
(Inter-Process Communication
,進程間通訊),實現與master
進程間通訊。ide
當worker
使用server.listen(...)
函數時 ,將參數序列傳遞給master
進程。若是master
進程已經匹配workers
,會將傳遞句柄給工人。若是master
沒有匹配好worker
,那麼會建立一個worker
,再傳遞並句柄傳遞給worker
。函數
由於worker
都是獨立運行的,根據程序的須要,它們能夠被獨立刪除或者重啓,worker
並不相互影響。只要還有worker
存活,則master
將繼續接收鏈接。Node
不會自動維護workers
的數目。咱們能夠創建本身的鏈接池。
cluster
對象cluster
的各類屬性和函數
cluster.setttings
:配置集羣參數對象cluster.isMaster
:判斷是否是master
節點cluster.isWorker
:判斷是否是worker
節點Event: 'fork'
: 監聽建立worker
進程事件Event: 'online'
: 監聽worker
建立成功事件Event: 'listening'
: 監聽worker
向master
狀態事件Event: 'disconnect'
: 監聽worker
斷線事件Event: 'exit'
: 監聽worker
退出事件Event: 'setup'
: 監聽setupMaster
事件cluster.setupMaster([settings])
: 設置集羣參數cluster.fork([env])
: 建立worker
進程cluster.disconnect([callback])
: 關閉worket
進程cluster.worker
: 得到當前的worker
對象cluster.workers
: 得到集羣中全部存活的worker
對象worker
對象worker
的各類屬性和函數:能夠經過cluster.workers
, cluster.worker
得到。
worker.id
: 進程ID
號worker.process
: ChildProcess
對象worker.suicide
: 在disconnect()
後,判斷worker
是否自殺worker.send(message, [sendHandle])
: master
給worker
發送消息。注:worker
給發master
發送消息要用process.send(message)
worker.kill([signal='SIGTERM'])
: 殺死指定的worker
,別名destory()
worker.disconnect()
: 斷開worker
鏈接,讓worker
自殺Event: 'message'
: 監聽master
和worker
的message
事件Event: 'online'
: 監聽指定的worker
建立成功事件Event: 'listening'
: 監聽master
向worker
狀態事件Event: 'disconnect'
: 監聽worker
斷線事件Event: 'exit'
: 監聽worker
退出事件新建cluster.js
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log('[master] ' + "start master...");
// 線程太多,起3個
for (var i = 0; i < numCPUs / 4; i++) {
var wk = cluster.fork();
wk.send('[master] ' + 'hi worker' + wk.id);
}
// 監聽worker生成
cluster.on('fork', function (worker) {
console.log('[master] fork: worker' + worker.id);
});
// 當衍生一個新的工做進程後,工做進程應當響應一個上線消息。 當主進程收到上線消息後將會觸發此事件。 'fork' 事件和 'online' 事件的區別在於,當主進程衍生工做進程時觸發 'fork',當工做進程運行時觸發 'online'。
cluster.on('online', function (worker) {
console.log('[master] online: worker' + worker.id);
});
// 當一個工做進程調用 listen() 後,工做進程上的 server 會觸發 'listening' 事件,同時主進程上的 cluster 也會觸發 'listening' 事件。
cluster.on('listening', function (worker, address) {
console.log('[master] listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
});
// 在工做進程的 IPC 管道被斷開後觸發。 可能致使事件觸發的緣由包括:工做進程優雅地退出、被殺死、或手動斷開鏈接(如調用 worker.disconnect())。
cluster.on('disconnect', function (worker) {
console.log('[master] disconnect: worker' + worker.id);
});
// 當任何一個工做進程關閉的時候,cluster 模塊都將會觸發 'exit' 事件。
cluster.on('exit', function (worker, code, signal) {
console.log('[master] exit worker' + worker.id + ' died');
});
function eachWorker(callback) {
for (var id in cluster.workers) {
callback(cluster.workers[id]);
}
}
// 3秒後向全部worker推送信息
setTimeout(function () {
eachWorker(function (worker) {
worker.send('[master] ' + 'send message to worker' + worker.id);
});
}, 3000);
Object.keys(cluster.workers).forEach(function (id) {
// 當 cluster 主進程接收任意工做進程發送的消息時觸發。
cluster.workers[id].on('message', function (msg) {
console.log('[master] ' + 'message ' + msg);
});
});
} else if (cluster.isWorker) {
console.log('[worker] ' + "start worker ..." + cluster.worker.id);
// 相似於 cluster.on('message') 事件,但特定於此工做進程。
process.on('message', function (msg) {
// 接收的是master發出來的初始化worker信息
console.log('[worker] ' + msg);
// 而後告訴master接收到了,觸發了cluster.workers[id].on('message',fn)
process.send('[worker] worker' + cluster.worker.id + ' received!');
});
http.createServer(function (req, res) {
res.writeHead(200, { "content-type": "text/html" });
res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
}).listen(3000);
console.log(`工做進程 ${process.pid} 已啓動`);
}
複製代碼
➜ node cluster.js
[master] start master...
[master] fork: worker1
[master] fork: worker2
[master] fork: worker3
[master] online: worker1
[master] online: worker2
[master] online: worker3
[worker] start worker ...1
[worker] [master] hi worker1
[master] message [worker] worker1 received!
[worker] start worker ...2
[master] listening: worker1,pid:30288, Address:null:3000
[worker] start worker ...3
[worker] [master] hi worker2
[master] message [worker] worker2 received!
[master] listening: worker2,pid:30289, Address:null:3000
[worker] [master] hi worker3
[master] message [worker] worker3 received!
[master] listening: worker3,pid:30290, Address:null:3000
[worker] [master] send message to worker1
[worker] [master] send message to worker2
[worker] [master] send message to worker3
[master] message [worker] worker1 received!
[master] message [worker] worker2 received!
[master] message [worker] worker3 received!
複製代碼
咱們先模擬一下訪問,看下是否會自動分配
load-balance.js
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log('[master] ' + "start master...");
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('listening', function (worker, address) {
console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
});
} else if (cluster.isWorker) {
console.log('[worker] ' + "start worker ..." + cluster.worker.id);
http.createServer(function (req, res) {
console.log('worker' + cluster.worker.id);
res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
}).listen(3000);
}
複製代碼
node load-balance.js
[master] start master...
[worker] start worker ...1
[worker] start worker ...3
[worker] start worker ...2
[worker] start worker ...4
[worker] start worker ...6
[worker] start worker ...7
[master] listening: worker3,pid:96592, Address:null:3000
[master] listening: worker2,pid:96591, Address:null:3000
[master] listening: worker1,pid:96590, Address:null:3000
[master] listening: worker4,pid:96593, Address:null:3000
[master] listening: worker6,pid:96595, Address:null:3000
[worker] start worker ...5
[master] listening: worker7,pid:96596, Address:null:3000
[worker] start worker ...8
[worker] start worker ...9
[worker] start worker ...10
[master] listening: worker5,pid:96594, Address:null:3000
[master] listening: worker8,pid:96597, Address:null:3000
[master] listening: worker9,pid:96598, Address:null:3000
[worker] start worker ...11
[worker] start worker ...12
[master] listening: worker10,pid:96599, Address:null:3000
[master] listening: worker11,pid:96600, Address:null:3000
[master] listening: worker12,pid:96601, Address:null:3000
複製代碼
咱們使用curl
訪問一下
➜ cluster curl http://172.16.78.185:3000/
worker1,PID:96590%
➜ cluster curl http://172.16.78.185:3000/
worker3,PID:96592%
➜ cluster curl http://172.16.78.185:3000/
worker2,PID:96591%
➜ cluster curl http://172.16.78.185:3000/
worker4,PID:96593%
➜ cluster curl http://172.16.78.185:3000/
worker6,PID:96595%
複製代碼
隨機分配是沒問題的,可是這樣的請求量沒法看出是否均衡分配,咱們要模擬下併發請求
➜ node load-balance.js > server.log
複製代碼
而後用siege
模擬壓測,併發量每秒50
siege -c 50 http://localhost:3000
複製代碼
HTTP/1.1 200 0.00 secs: 16 bytes ==> GET /
^C
Lifting the server siege...
Transactions: 16276 hits
Availability: 100.00 %
Elapsed time: 31.65 secs
Data transferred: 0.25 MB
Response time: 0.03 secs
Transaction rate: 514.25 trans/sec
Throughput: 0.01 MB/sec
Concurrency: 14.72
Successful transactions: 16276
Failed transactions: 0
Longest transaction: 0.18
Shortest transaction: 0.00
複製代碼
耗時31.65
秒
發送16276
次請求
每秒處理514.25
個請求
咱們能夠查看下server.log
文件
須要下載一下R語言包 教程
~ R
> df<-read.table(file="server.log",skip=9,header=FALSE)
> summary(df)
V1
worker9 :1361
worker2 :1359
worker5 :1358
worker1 :1357
worker12:1356
worker11:1354
(Other) :8122
複製代碼
咱們能夠看到,請求被分配到worker數據量至關。 因此,cluster的負載均衡的策略,應該是隨機分配的。
學習連接 粉絲日誌