原文連接html
分析的是從 node 源碼的角度進行的,因此須要先配置源碼的調試環境。node
須要準備的內容爲:git
node 源碼的獲取,經過如下命令行:github
git clone https://github.com/nodejs/node.git
# 本文針對的版本
git checkout tags/v11.6.0
複製代碼
獲取了 node 源碼以後,須要在 CLion 中導入項目,詳細能夠參考此文 使用 cLion 調試 node.js 源碼。windows
上文中提到的源碼編譯指令爲:api
make -C out BUILDTYPE=Debug -j 4
複製代碼
-j 4
的意義是同時執行的任務數,通常設定爲 CPU 核數,能夠經過下面指令得到 CPU 核數:bash
[ $(uname) = 'Darwin' ] && sysctl -n hw.logicalcpu_max || lscpu -p | egrep -v '^#' | wc -l
複製代碼
若是但願加快 node 源碼的編譯速度的話,能夠先嚐試獲取核數,而後調整 -j
的值。app
node 中的 js 實現都在 lib 目錄下,須要注意的是,當 node 編譯完成以後,這些 js 文件是都會被打包到編譯結果中的。當 node 在運行中要引入 lib 下的 js 文件時,並不會從咱們的源碼目錄中讀取了,而是採用的編譯時打包進去的 js 內容。因此在修改了 lib 目錄下的 js 文件後,須要從新對 node 進行編譯。less
爲了測試 Cluster 的運行,須要準備一小段測試代碼,保存到 ./test-cluster.js
:socket
若是沒有特別說明,接下來文件路徑中的
.
都表示的是 node 源碼目錄
const cluster = require('cluster');
const http = require('http');
// 下面的代碼會依據 numCPUs 的值來建立對應數量的子進程,
// 因爲目前硬件核數都會比較多,爲了使調試時的輸出內容儘量的清晰,因此設定爲 2
// const numCPUs = require('os').cpus().length;
const numCPUs = 2;
if (cluster.isMaster) {
// 若是是 master 進程:
// 1. 打印進程號
// 2. 根據 numCPUs 的值來建立對應數量的子進程
console.log("master pid: " + process.pid);
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
// 若是是子進程:
// 1. 打印進程號
// 2. 建立 http server 實例
// 3. 並調用上一步建立的實例的 listen 方法
console.log("isMaster: " + cluster.isMaster + " pid: " + process.pid);
const srv = http.createServer(function (req, res) {
res.writeHead(200);
res.end("hello world\n" + process.pid);
});
srv.listen(8000);
}
複製代碼
接下來須要使用在 預備工做 中編譯好的 node 來運行上述代碼:
./out/Debug/node test-cluster.js
複製代碼
會獲得以下的輸出:
master pid: 30094
isMaster: false pid: 30095
isMaster: false pid: 30096
複製代碼
經過對比輸出內容,代碼執行的過程相似:
cluster.isMaster
分支被執行,建立了 2 個子進程else
分支被執行這裏有幾個值得思考的問題:
listen
方法,爲何沒有報錯 ERR_SERVER_ALREADY_LISTEN
對於問題3,準備下面兩個文件
./test-fork-exit.js
:
const numCPUs = 2;
const {fork} = require('child_process');
console.log("master pid: " + process.pid);
for (let i = 0; i < numCPUs; i++) {
fork("./test-fork-sub.js")
}
複製代碼
./test-fork-sub.js
:
console.log("sub: " + process.pid);
複製代碼
運行 ./out/Debug/node test-fork-exit.js
後,會發現 master 進程在建立了兩個進程後退出了。
接下來將對上述幾個問題進行分析。
在運行了 ./out/Debug/node test-cluster.js
命令以後,master 進程即被啓動,在該進程中,運行 test-cluster.js
中的代碼。
首先執行的就是 const cluster = require('cluster');
。打開 cluster 模塊的源碼 ./lib/child_process.js
:
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);
複製代碼
能夠看到,代碼會根據 childOrMaster
的值來決定引入的是 internal/cluster/child
仍是 internal/cluster/master
模塊。而 childOrMaster
的值取決於環境變量中是否設定了 NODE_UNIQUE_ID
,若是設定了,那麼加載對應的 child
模塊,不然加載對應的 master
模塊。
顯然,默認環境變量中是沒有對 NODE_UNIQUE_ID
標識進行設定的,因而引入的就是 ./internal/cluster/master.js
注意下面的代碼片斷:
cluster.isMaster = true;
複製代碼
因而,在接下來的條件判斷 cluster.isMaster
爲 true
,進而會執行子進程的建立,也就是調用 master 模塊中的 fork
方法。
注意 fork 方法中的片斷:
cluster.setupMaster();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
複製代碼
只須要注意自增的 id,接下來看下 createWorkerProcess
的代碼片斷:
workerEnv.NODE_UNIQUE_ID = '' + id;
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
複製代碼
因而發現實際上是調用的 child_process
模塊中的 fork
方法,並設置了環境變量 NODE_UNIQUE_ID
,上文提到 cluster
模塊被引入的時候,會根據環境變量是否存在 NODE_UNIQUE_ID
標識而決定引入 child
仍是 master
。
另外,child_process.fork 方法第一個參數爲 modulePath
,也就是須要在子進程中執行的 js 文件路徑,對應上述代碼中 cluster.settings.exec
的值,對該變量的設定代碼在 setupMaster
方法中:
var settings = {
args: process.argv.slice(2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false
};
複製代碼
process.argv[1]
爲當前進程的入口文件,對於這個例子中的主進程而言,即爲: ./test-cluster.js
(實際值爲對應的絕對路徑)
因而 cluster 模塊做用下的 master 進程中的 fork 方法執行的內容能夠簡單部分概括爲:
NODE_UNIQUE_ID
child_process.fork
,參數 modulePath
爲主進程入口文件接下來就是子進程中執行的過程。
子進程進來執行的仍是與主進程相同的文件,之因此執行了 cluster.isMaster
爲 false
的分支,是由於 ./internal/cluster/child.js
的代碼片斷:
cluster.isMaster = false;
複製代碼
子進程中都執行了 listen
方法,可是卻沒有報錯,因而嘗試分析 listen
的執行細節。
http
模塊中的 Server
是繼承於 net.Server
,見 ./lib/_http_server.js
中:
function Server(options, requestListener) {
// ...
net.Server.call(this, { allowHalfOpen: true });
// ...
}
複製代碼
而 listen 方法存在於 net.Server
上。查看 net.Server.listen
中主要的動做都是對參數的 normalization,而後調用 net.Server::listenInCluster
方法:
if (cluster.isMaster || exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
cluster._getServer(server, serverQuery, listenOnMasterHandle);
複製代碼
這裏須要注意的是,listen 方法都是在子進程中執行的,因此 cluster.isMaster
爲 false
,而 exclusive
是未設定的,故也爲 false
。因而,子進程中的 listen 實際執行的是 cluster._getServer
方法,而且這裏的 cluster
模塊實際是引入的 ./lib/internal/cluster/child.js
,因而查看該文件中的 _getServer
方法片斷:
const message = util._extend({
act: 'queryServer',
index,
data: null
}, options);
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
複製代碼
send
方法最終會調用 ./lib/internal/cluster/utils.js
中的 sendHelper
方法,而該方法會向父進程發送 { cmd: 'NODE_CLUSTER' }
消息,根據文檔的 描述,NODE_
起頭的 cmd
爲 內部消息(internalMessage
),須要經過 .on('internalMessage', lister)
來監聽它。
因爲這個消息是從子進程發往父進程的、即主進程的,因而在 ./lib/internal/cluster/master.js
中找到了相關的監聽代碼片斷:
worker.process.on('internalMessage', internal(worker, onmessage));
複製代碼
接着經過 onmessage
方法定位到 queryServer
方法中的代碼片斷:
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
var handle = handles.get(key);
if (handle === undefined) {
let address = message.address;
// Find shortest path for unix sockets because of the ~100 byte limit
if (message.port < 0 && typeof address === 'string' &&
process.platform !== 'win32') {
address = path.relative(process.cwd(), address);
if (message.address.length < address.length)
address = message.address;
}
var constructor = RoundRobinHandle;
// UDP is exempt from round-robin connection balancing for what should
// be obvious reasons: it's connectionless. There is nothing to send to
// the workers except raw datagrams and that's pointless.
if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
constructor = SharedHandle;
}
handle = new constructor(key,
address,
message.port,
message.addressType,
message.fd,
message.flags);
handles.set(key, handle);
}
複製代碼
當這段代碼首次被運行時,會建立一個 handle,並將其和 key 關聯起來。對於 TCP 鏈路,在沒有特別指定 schedulingPolicy
的狀況下,handle 均爲 RoundRobinHandle
的實例。而查看 ./lib/internal/cluster/round_robin_handle.js
文件中的 RoundRobinHandle
構造函數細節,則發現具體的 listen 綁定動做:
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address); // UNIX socket path.
複製代碼
因爲兩個子進程都在前後順序不肯定的狀況下向 master 發送 queryServer 內部消息,因此上面的代碼會被執行兩次。若是兩次的 key 不同,就會致使 handle === undefined
的條件判斷爲 true
,進而 listen 兩次,最終發生 ERR_SERVER_ALREADY_LISTEN
錯誤。可是在上面的運行過程當中,並無報錯,說明了兩次的 key 是相同的。
來看下 key 的內容:
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
複製代碼
很顯然,對於分別來自兩個子進程的消息而言,除了 message.index
以外,其他項的內容都是相同的,那麼 message.index
的生成過程在 ./lib/internal/cluster/child.js
中的代碼片斷:
const indexesKey = [address,
options.port,
options.addressType,
options.fd ].join(':');
let index = indexes.get(indexesKey);
if (index === undefined)
index = 0;
else
index++;
複製代碼
可見,兩個子進程在分別執行這段代碼的時候,index
首次都將爲 0
,從而印證了上面的 key 是相同的假設。
之因此子進程中都會執行 listen 方法,而不報錯的緣由小結以下:
答案現階段只能先從文檔中尋找答案,詳細見 options.stdio,如下爲節選:
It is worth noting that when an IPC channel is established between the parent and child processes, and the child is a Node.js process, the child is launched with the IPC channel unreferenced (using unref()) until the child registers an event handler for the 'disconnect' event or the 'message' event. This allows the child to exit normally without the process being held open by the open IPC channel.
簡單的說,若是子進程中沒有對事件 disconnect
和 message
進行監聽的話,那麼主進程在等待子進程執行完畢以後,會正常的退出。後半句的「主進程會等待...」見 options.detached,如下爲節選:
By default, the parent will wait for the detached child to exit.
爲了印證,能夠先將 ./test-fork-sub.js
代碼改成:
const {execSync} = require('child_process');
execSync("sleep 3");
複製代碼
執行 ./out/Debug/node test-fork-exit.js
會發現,在大約等待了幾秒以後,也就是子進程執行完畢以後,主進程進行了退出。
再次將 ./test-fork-sub.js
代碼改成:
const {execSync} = require('child_process');
execSync("sleep 3");
console.log(`child: ${process.pid} resumed`);
process.on("message", () => {});
複製代碼
能夠發現,因爲子進程監聽了 message
事件,使得主進程和子進程之間的 IPC channel 阻止了主進程的退出。
在 ./lib/internal/cluster/child.js
中的 _setupWorker
方法中的片斷:
process.once('disconnect', () => {
// ...
});
複製代碼
也印證了這一點。
回到 ./lib/internal/cluster/round_robin_handle.js
文件,注意構造函數 RoundRobinHandle
中的代碼片斷,注意該代碼是在主進程中調用的:
// ...
this.server = net.createServer(assert.fail);
// ...
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
複製代碼
net.createServer
的參數若是是函數的話,那麼該函數的做用實際是用來處理 connection
的回調函數。之因此會達到回調的效果,是由於在 ./lib/net.js
中的 Server
構造函數的片斷:
if (typeof options === 'function') {
connectionListener = options;
options = {};
this.on('connection', connectionListener);
} else if (options == null || typeof options === 'object') {
// ...
} else {
// ...
}
複製代碼
那麼上面傳遞的是 assert.fail
,也就是說,若是該方法成功地被回調了的話,那麼進程應該報錯。 既然沒有報錯,那麼說明在新的 connection
進來以後,沒有觸發 connection
事件。要搞清楚這點,就要看看 net.Server
上的 connection
事件是如何被觸發的。
net.Server
上的 connection
事件是在該文件內的 onconnection
方法中被觸發的:
self.emit('connection', socket);
複製代碼
而 onconnection
顧名思義也是一個 event listener,它是在相同文件內的 setupListenHandle
中被引用的:
this._handle.onconnection = onconnection;
複製代碼
而 setupListenHandle
函數是在調用 net.Server
上的 listen 方法被逐步調用到的:
最終在 setupListenHandle
的代碼片斷中發現:
this._handle.onconnection = onconnection;
複製代碼
回到上面列出的 RoundRobinHandle
中的代碼:
this.server.once('listening', () => {
// 這裏的 this.server._handle 是對 listen fd 的 wrapper
this.handle = this.server._handle;
// 回調中的 handle 是對 connection fd 的 wrapper
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
複製代碼
能夠看出,在真正的 listen 動做執行成功以後,listening
事件被觸發,進入到上面的代碼中,而後上面的代碼複寫了 handle 對象上的 onconnection
屬性的值,在此以前,該屬性的值即爲 assert.fail
。handle 對象此時仍是一個 TCP_Wrapper 對象 (對 CPP 層面對象的包裹的一個對象)。
因爲 master 環境下 RoundRobinHandle
構造 net.Server
對象的目的僅僅是但願得到其內部的 listen fd handle 對象,由於 master 只須要將接下來的 connection fd handle 派發給 works 便可,因此上面的回調中在得到了該對象以後,取消了對 net.Server
對象的引用。
跟進上述代碼中的 distribute
方法,發現其會調用 handoff
方法,向 work 發送 connection fd handle。也就是說,當 master 進程接收到新鏈接以後,會將其派發給 work。接下來須要在 ./lib/internal/cluster/child.js
文件中找到事件監聽函數:
function onmessage(message, handle) {
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
_disconnect.call(worker, true);
}
複製代碼
進一步發現 onconnection
方法中:
const server = handles.get(key);
// ...
server.onconnection(0, handle);
複製代碼
接來下先搞清楚 server
是何時被添加進 handles
中的,而後再看看 server.onconnection
作了什麼。
回顧下子進程中所作的事情,能夠結合上面的章節。另外,爲了方便結合代碼進行理解,故同時給出具體的代碼位置:
net.Server
codenet.Server
上的 listen 方法 codenet.Server::listenInCluster
code_getServer
,並指望被回調 codequeryServer
消息,並指望被回調 codeRoundRobinHandle
實例,並將發來 queryServer
消息的 work 註冊到其中,並指望被回調。在回調中,會向 work 發送消息,觸發第 5 步中 work 指望的回調 coderr
方法,該方法會僞造一個 handle 對象,加入到 handles 中,並以該對象回調第 4 步 codelistenOnMasterHandle
,該函數中設定了 server._handle = handle
,注意這裏的 handle 即爲上一步產生的 handle;並調用了 listen2
codelisten2
即爲 setupListenHandle
,而 setupListenHandle
內部設定了 handle 對象的 onconnection
屬性 code接下來 work 進程中處理請求的邏輯就都與不使用 clsuter 模塊時的請求處理邏輯一致了,由於是使用的一樣的處理函數,只不過是在 work 進程中執行。
請求的處理邏輯能夠小結爲:
最後看下 RoundRobinHandle 中的派發機制:
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.all = new Map();
this.free = [];
this.handles = [];
// ...
this.server.once('listening', () => {
// ...
// 1. 當接收到新的客戶端鏈接後,調用 this.distribute 方法
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
// ...
});
}
// master 進程接收到 work 進程的 `queryServer` 消息後,會調用該方法。
// 1. 先將 work 記錄到 this.all 這個 map 中
// 2. 調用 this.handoff 方法,該方法致使兩個結果:
// 2.1 若是此時有 pending handle 的話,那麼即刻使用 work 處理
// 2.2 不然,則將 work 加入到 this.free 這個 map 中
RoundRobinHandle.prototype.add = function(worker, send) {
//...
this.all.set(worker.id, worker);
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
// ...
};
// 該方法的做用就是講 connection handle 進行派發
// 1. 先將 handle 加入到 pending 隊列中
// 2. 嘗試使用 this.free 的第一個 work 處理 pending 隊列。若是存在 free work 的話,
// 該 work 還將會被移出 this.free
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const worker = this.free.shift();
if (worker)
this.handoff(worker);
};
// 該方法包含了具體的派發動做
// 1. 從 pending handle 隊列取出第一個項目,若是隊列爲空,則將 work 加入到
// this.free map 中,不然進行派發動做
// 2. 派發是經過將 handle 經由消息發送給 work 進程的,即 sendHelper 部分
RoundRobinHandle.prototype.handoff = function(worker) {
if (this.all.has(worker.id) === false) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
if (handle === undefined) {
this.free.push(worker); // Add to ready queue again.
return;
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
// 該回調由 ./lib/internal/cluster/child.js#L180 觸發
if (reply.accepted)
// work 進程表示它能夠處理該 handle,handle 發送到 work 進程中時應該是
// 使用的副本的形式。因此主進程則能夠關閉屬於其上下文的 handle。handle 內部的
// fd 被加入到 work 進程的 event loop 中
handle.close();
else
// 如註釋所描述的,從新調用一次 this.distribute,嘗試其餘的 work
this.distribute(0, handle); // Worker is shutting down. Send to another.
// 再次調用 this.handoff
// 若是還有 pending handle,則處理之,不然將 work 從新加入到 this.free 中
this.handoff(worker);
});
};
複製代碼
調度的機制能夠簡單理解爲:
另外須要注意的是,cluster 模塊中對 works 進程沒有重啓的機制,work 進程若是遇到沒有主動處理的異常就會退出,master 進程不會自動的補齊 works 數量,當全部的 works 都退出後,即不存在任一個 IPC channel 了,master 進程也將退出。
掘金自動抓取的內容不自動更新,因此以後的文章都會手動拷貝到專欄中。