Nodejs多核處理模塊cluster

cluster是一個nodejs內置的模塊,用於nodejs多核處理。cluster模塊,能夠幫助咱們簡化多進程並行化程序的開發難度,輕鬆構建一個用於負載均衡的集羣。html

環境:node

MacOS 10.14
Node v8.11.3
npm 6.4.0npm

實踐

master是總控節點,worker是運行節點。而後根據CPU的數量,啓動worker
咱們能夠先查看一下本身電腦CPU的核數和線程:bash

sysctl machdep.cpu
複製代碼
machdep.cpu.core_count: 6
machdep.cpu.thread_count: 12
複製代碼

6核12線程併發

新建app.jsapp

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

// 是不是主節點
if (cluster.isMaster) {
  console.log("master start...");

  // Fork workers.
  for (var i = 0; i < numCPUs; i++) {
    // 監聽建立worker進程事件
    cluster.fork();
  }

  // 監聽worker
  cluster.on('listening', function (worker, address) {
    // address對象包含鏈接屬性信息
    console.log('listening: worker ' + worker.process.pid + ', Address: ' + address.address + ":" + address.port + "," + address.addressType);
    // 3秒後殺掉全部worker
    setTimeout(() => {
      worker.kill()
    }, 3000)
  });

  // 監聽worker退出事件,code進程非正常退出的錯誤code,signal致使進程被殺死的信號名稱
  cluster.on('exit', function (worker, code, signal) {
    // console.log('worker ' + worker.process.pid + ' died');
    console.log('工做進程 %d 關閉 (%s)(%s). 重啓中...', worker.process.pid, signal || code);
    // 退出以後重啓
    // cluster.fork();
  });

} else {
  console.log('createServer...')
  http.createServer(function (req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(0);
}


複製代碼

咱們設定啓動以後三秒再殺掉全部進程
啓動負載均衡

➜  node app.js
master start...
createServer...
createServer...
createServer...
listening: worker 29374, Address: null:61186,4
createServer...
listening: worker 29375, Address: null:61186,4
listening: worker 29376, Address: null:61186,4
createServer...
listening: worker 29377, Address: null:61186,4
createServer...
createServer...
listening: worker 29378, Address: null:61186,4
createServer...
listening: worker 29379, Address: null:61186,4
listening: worker 29380, Address: null:61186,4
createServer...
createServer...
listening: worker 29381, Address: null:61186,4
createServer...
listening: worker 29382, Address: null:61186,4
listening: worker 29383, Address: null:61186,4
createServer...
listening: worker 29384, Address: null:61186,4
listening: worker 29385, Address: null:61186,4
工做進程 29374 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29375 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29376 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29377 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29378 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29379 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29380 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29381 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29382 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29383 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29384 關閉 (SIGTERM)(%s). 重啓中...
工做進程 29385 關閉 (SIGTERM)(%s). 重啓中...
複製代碼

電腦有12個線程,因此在總控結點master啓動後,生成了12個運行節點worker
殺掉進程後,能夠經過exit事件監聽到,還能夠經過fork()重啓curl

每一個worker進程經過使用child_process.fork()函數,基於IPCInter-Process Communication,進程間通訊),實現與master進程間通訊。ide

worker使用server.listen(...)函數時 ,將參數序列傳遞給master進程。若是master進程已經匹配workers,會將傳遞句柄給工人。若是master沒有匹配好worker,那麼會建立一個worker,再傳遞並句柄傳遞給worker函數

由於worker都是獨立運行的,根據程序的須要,它們能夠被獨立刪除或者重啓,worker並不相互影響。只要還有worker存活,則master將繼續接收鏈接。Node不會自動維護workers的數目。咱們能夠創建本身的鏈接池。

cluster對象

cluster的各類屬性和函數

  • cluster.setttings:配置集羣參數對象
  • cluster.isMaster:判斷是否是master節點
  • cluster.isWorker:判斷是否是worker節點
  • Event: 'fork': 監聽建立worker進程事件
  • Event: 'online': 監聽worker建立成功事件
  • Event: 'listening': 監聽workermaster狀態事件
  • Event: 'disconnect': 監聽worker斷線事件
  • Event: 'exit': 監聽worker退出事件
  • Event: 'setup': 監聽setupMaster事件
  • cluster.setupMaster([settings]): 設置集羣參數
  • cluster.fork([env]): 建立worker進程
  • cluster.disconnect([callback]): 關閉worket進程
  • cluster.worker: 得到當前的worker對象
  • cluster.workers: 得到集羣中全部存活的worker對象

worker對象

worker的各類屬性和函數:能夠經過cluster.workers, cluster.worker得到。

  • worker.id: 進程ID
  • worker.process: ChildProcess對象
  • worker.suicide: 在disconnect()後,判斷worker是否自殺
  • worker.send(message, [sendHandle]): masterworker發送消息。注:worker給發master發送消息要用process.send(message)
  • worker.kill([signal='SIGTERM']): 殺死指定的worker,別名destory()
  • worker.disconnect(): 斷開worker鏈接,讓worker自殺
  • Event: 'message': 監聽masterworkermessage事件
  • Event: 'online': 監聽指定的worker建立成功事件
  • Event: 'listening': 監聽masterworker狀態事件
  • Event: 'disconnect': 監聽worker斷線事件
  • Event: 'exit': 監聽worker退出事件

master和worker之間的通訊

新建cluster.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log('[master] ' + "start master...");
  
  // 線程太多,起3個
  for (var i = 0; i < numCPUs / 4; i++) {
    var wk = cluster.fork();
    wk.send('[master] ' + 'hi worker' + wk.id);
  }

  // 監聽worker生成
  cluster.on('fork', function (worker) {
    console.log('[master] fork: worker' + worker.id);
  });

  // 當衍生一個新的工做進程後,工做進程應當響應一個上線消息。 當主進程收到上線消息後將會觸發此事件。 'fork' 事件和 'online' 事件的區別在於,當主進程衍生工做進程時觸發 'fork',當工做進程運行時觸發 'online'。
  cluster.on('online', function (worker) {
    console.log('[master] online: worker' + worker.id);
  });

  // 當一個工做進程調用 listen() 後,工做進程上的 server 會觸發 'listening' 事件,同時主進程上的 cluster 也會觸發 'listening' 事件。
  cluster.on('listening', function (worker, address) {
    console.log('[master] listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
  });

  // 在工做進程的 IPC 管道被斷開後觸發。 可能致使事件觸發的緣由包括:工做進程優雅地退出、被殺死、或手動斷開鏈接(如調用 worker.disconnect())。
  cluster.on('disconnect', function (worker) {
    console.log('[master] disconnect: worker' + worker.id);
  });

  // 當任何一個工做進程關閉的時候,cluster 模塊都將會觸發 'exit' 事件。
  cluster.on('exit', function (worker, code, signal) {
    console.log('[master] exit worker' + worker.id + ' died');
  });


  function eachWorker(callback) {
    for (var id in cluster.workers) {
      callback(cluster.workers[id]);
    }
  }
  // 3秒後向全部worker推送信息
  setTimeout(function () {
    eachWorker(function (worker) {
      worker.send('[master] ' + 'send message to worker' + worker.id);
    });
  }, 3000);

  Object.keys(cluster.workers).forEach(function (id) {
    // 當 cluster 主進程接收任意工做進程發送的消息時觸發。
    cluster.workers[id].on('message', function (msg) {
      console.log('[master] ' + 'message ' + msg);
    });
  });

} else if (cluster.isWorker) {
  console.log('[worker] ' + "start worker ..." + cluster.worker.id);

  // 相似於 cluster.on('message') 事件,但特定於此工做進程。
  process.on('message', function (msg) {
    // 接收的是master發出來的初始化worker信息
    console.log('[worker] ' + msg);
    // 而後告訴master接收到了,觸發了cluster.workers[id].on('message',fn)
    process.send('[worker] worker' + cluster.worker.id + ' received!');
  });

  http.createServer(function (req, res) {
    res.writeHead(200, { "content-type": "text/html" });
    res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
  }).listen(3000);
  console.log(`工做進程 ${process.pid} 已啓動`);

}
複製代碼
➜ node cluster.js
[master] start master...
[master] fork: worker1
[master] fork: worker2
[master] fork: worker3
[master] online: worker1
[master] online: worker2
[master] online: worker3
[worker] start worker ...1
[worker] [master] hi worker1
[master] message [worker] worker1 received!
[worker] start worker ...2
[master] listening: worker1,pid:30288, Address:null:3000
[worker] start worker ...3
[worker] [master] hi worker2
[master] message [worker] worker2 received!
[master] listening: worker2,pid:30289, Address:null:3000
[worker] [master] hi worker3
[master] message [worker] worker3 received!
[master] listening: worker3,pid:30290, Address:null:3000
[worker] [master] send message to worker1
[worker] [master] send message to worker2
[worker] [master] send message to worker3
[master] message [worker] worker1 received!
[master] message [worker] worker2 received!
[master] message [worker] worker3 received!

複製代碼

負載均衡

咱們先模擬一下訪問,看下是否會自動分配

load-balance.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log('[master] ' + "start master...");

  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('listening', function (worker, address) {
    console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
  });

} else if (cluster.isWorker) {
  console.log('[worker] ' + "start worker ..." + cluster.worker.id);
  http.createServer(function (req, res) {
    console.log('worker' + cluster.worker.id);
    res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
  }).listen(3000);
}
複製代碼
node load-balance.js
[master] start master...
[worker] start worker ...1
[worker] start worker ...3
[worker] start worker ...2
[worker] start worker ...4
[worker] start worker ...6
[worker] start worker ...7
[master] listening: worker3,pid:96592, Address:null:3000
[master] listening: worker2,pid:96591, Address:null:3000
[master] listening: worker1,pid:96590, Address:null:3000
[master] listening: worker4,pid:96593, Address:null:3000
[master] listening: worker6,pid:96595, Address:null:3000
[worker] start worker ...5
[master] listening: worker7,pid:96596, Address:null:3000
[worker] start worker ...8
[worker] start worker ...9
[worker] start worker ...10
[master] listening: worker5,pid:96594, Address:null:3000
[master] listening: worker8,pid:96597, Address:null:3000
[master] listening: worker9,pid:96598, Address:null:3000
[worker] start worker ...11
[worker] start worker ...12
[master] listening: worker10,pid:96599, Address:null:3000
[master] listening: worker11,pid:96600, Address:null:3000
[master] listening: worker12,pid:96601, Address:null:3000
複製代碼

咱們使用curl訪問一下

➜  cluster curl http://172.16.78.185:3000/
worker1,PID:96590%                                                                                     
➜  cluster curl http://172.16.78.185:3000/
worker3,PID:96592%                                                                                     
➜  cluster curl http://172.16.78.185:3000/
worker2,PID:96591%                                                                                     
➜  cluster curl http://172.16.78.185:3000/
worker4,PID:96593%                                                                                     
➜  cluster curl http://172.16.78.185:3000/
worker6,PID:96595% 
複製代碼

隨機分配是沒問題的,可是這樣的請求量沒法看出是否均衡分配,咱們要模擬下併發請求

➜  node load-balance.js > server.log
複製代碼

而後用siege模擬壓測,併發量每秒50

siege -c 50 http://localhost:3000
複製代碼
HTTP/1.1 200     0.00 secs:      16 bytes ==> GET  /
^C
Lifting the server siege...
Transactions:		       16276 hits
Availability:		      100.00 %
Elapsed time:		       31.65 secs
Data transferred:	        0.25 MB
Response time:		        0.03 secs
Transaction rate:	      514.25 trans/sec
Throughput:		        0.01 MB/sec
Concurrency:		       14.72
Successful transactions:       16276
Failed transactions:	           0
Longest transaction:	        0.18
Shortest transaction:	        0.00
複製代碼

耗時31.65
發送16276次請求
每秒處理514.25個請求

咱們能夠查看下server.log文件

須要下載一下R語言包 教程

~ R 

> df<-read.table(file="server.log",skip=9,header=FALSE)
> summary(df)
       V1
 worker9 :1361  
 worker2 :1359  
 worker5 :1358  
 worker1 :1357  
 worker12:1356  
 worker11:1354  
 (Other) :8122 
複製代碼

咱們能夠看到,請求被分配到worker數據量至關。 因此,cluster的負載均衡的策略,應該是隨機分配的。


學習連接 粉絲日誌

相關文章
相關標籤/搜索