起源,從官方實例中看多進程共用端口node
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`worker ${worker.process.pid} died`); }); } else { http.createServer((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); console.log(`Worker ${process.pid} started`); }
執行結果:服務器
$ node server.js Master 3596 is running Worker 4324 started Worker 4520 started Worker 6056 started Worker 5644 started
瞭解http.js模塊:網絡
// lib/net.js 'use strict'; ... Server.prototype.listen = function(...args) { ... if (options instanceof TCP) { this._handle = options; this[async_id_symbol] = this._handle.getAsyncId(); listenInCluster(this, null, -1, -1, backlogFromArgs); // 注意這個方法調用了cluster模式下的處理辦法 return this; } ... }; function listenInCluster(server, address, port, addressType,backlog, fd, exclusive) { // 若是是master 進程或者沒有開啓cluster模式直接啓動listen if (cluster.isMaster || exclusive) { //_listen2,細心的人必定會發現爲何是listen2而不直接使用listen // _listen2 包裹了listen方法,若是是Worker進程,會調用被hack後的listen方法,從而避免出錯端口被佔用的錯誤 server._listen2(address, port, addressType, backlog, fd); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 }; // 是fork 出來的進程,獲取master上的handel,而且監聽, // 如今是否是很好奇_getServer方法作了什麼 cluster._getServer(server, serverQuery, listenOnMasterHandle); } ...
答案很快就能夠經過cluster._getServer 這個函數找到負載均衡
// lib/internal/cluster/child.js cluster._getServer = function(obj, options, cb) { // ... const message = util._extend({ act: 'queryServer', // 關鍵點:構建一個queryServer的消息 index: indexes[indexesKey], data: null }, options); message.address = address; // 發送queryServer消息給master進程,master 在收到這個消息後,會建立一個開始一個server,而且listen send(message, (reply, handle) => { rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = address && address.port || options.port; send(message); }); }; //... // Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; // 這裏hack 了listen方法 // 子進程調用的listen方法,就是這個,直接返回0,因此不會報端口被佔用的錯誤 function listen(backlog) { return 0; } // ... const handle = { close, listen, ref: noop, unref: noop }; handles[key] = handle; // 這個cb 函數是net.js 中的listenOnMasterHandle 方法 cb(0, handle); } // lib/net.js /* function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); server._handle = handle; // _listen2 函數中,調用的handle.listen方法,也就是上面被hack的listen server._listen2(address, port, addressType, backlog, fd); } */
master進程收到queryServer消息後進行啓動服務socket
// lib/internal/cluster/master.js function queryServer(worker, message) { const args = [ message.address, message.port, message.addressType, message.fd, message.index ]; const key = args.join(':'); var handle = handles[key]; // 若是地址沒被監聽過,經過RoundRobinHandle監聽開啓服務 if (handle === undefined) { var constructor = RoundRobinHandle; if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } handles[key] = handle = new constructor(key, address, message.port, message.addressType, message.fd, message.flags); } // 若是地址已經被監聽,直接綁定handel到已經監聽到服務上,去消費請求 // Set custom server data handle.add(worker, (errno, reply, handle) => { reply = util._extend({ errno: errno, key: key, ack: message.seq, data: handles[key].data }, reply); if (errno) delete handles[key]; // Gives other workers a chance to retry. send(worker, reply, handle); }); }
看到這一步,已經很明顯,咱們知道了多進行端口共享的實現原理async
那如今問題來了,既然Worker進程是如何獲取到master進程監聽服務接收到的connect呢?函數
// lib/internal/cluster/round_robin_handle.js function RoundRobinHandle(key, address, port, addressType, fd) { this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address); // UNIX socket path. this.server.once('listening', () => { this.handle = this.server._handle; // 監聽onconnection方法 this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } RoundRobinHandle.prototype.add = function (worker, send) { // ... }; RoundRobinHandle.prototype.remove = function (worker) { // ... }; RoundRobinHandle.prototype.distribute = function (err, handle) { // 負載均衡地挑選出一個worker this.handles.push(handle); const worker = this.free.shift(); if (worker) this.handoff(worker); }; RoundRobinHandle.prototype.handoff = function (worker) { const handle = this.handles.shift(); const message = { act: 'newconn', key: this.key }; // 向work進程其發送newconn內部消息和客戶端的句柄handle sendHelper(worker.process, message, handle, (reply) => { // ... this.handoff(worker); }); };
下面讓咱們看看Worker進程接收到newconn消息後進行了哪些操做oop
// lib/child.js function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } // Round-robin connection. // 接收鏈接,而且處理 function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); }
總結學習
分享出於共享學習的目的,若有錯誤,歡迎你們留言指導,不喜勿噴。ui