node Cluster 模塊分析

原文連接html

預備工做

分析的是從 node 源碼的角度進行的,因此須要先配置源碼的調試環境。node

須要準備的內容爲:git

  1. node 源碼
  2. CLion

node 源碼的獲取,經過如下命令行:github

git clone https://github.com/nodejs/node.git
# 本文針對的版本
git checkout tags/v11.6.0
複製代碼

獲取了 node 源碼以後,須要在 CLion 中導入項目,詳細能夠參考此文 使用 cLion 調試 node.js 源碼windows

上文中提到的源碼編譯指令爲:api

make -C out BUILDTYPE=Debug -j 4
複製代碼

-j 4 的意義是同時執行的任務數,通常設定爲 CPU 核數,能夠經過下面指令得到 CPU 核數:bash

[ $(uname) = 'Darwin' ] && sysctl -n hw.logicalcpu_max || lscpu -p | egrep -v '^#' | wc -l
複製代碼

若是但願加快 node 源碼的編譯速度的話,能夠先嚐試獲取核數,而後調整 -j 的值。app

node 中的 js 實現都在 lib 目錄下,須要注意的是,當 node 編譯完成以後,這些 js 文件是都會被打包到編譯結果中的。當 node 在運行中要引入 lib 下的 js 文件時,並不會從咱們的源碼目錄中讀取了,而是採用的編譯時打包進去的 js 內容。因此在修改了 lib 目錄下的 js 文件後,須要從新對 node 進行編譯。less

發現問題

爲了測試 Cluster 的運行,須要準備一小段測試代碼,保存到 ./test-cluster.js:socket

若是沒有特別說明,接下來文件路徑中的 . 都表示的是 node 源碼目錄

const cluster = require('cluster');
const http = require('http');
// 下面的代碼會依據 numCPUs 的值來建立對應數量的子進程,
// 因爲目前硬件核數都會比較多,爲了使調試時的輸出內容儘量的清晰,因此設定爲 2
// const numCPUs = require('os').cpus().length;
const numCPUs = 2;

if (cluster.isMaster) {
    // 若是是 master 進程:
    // 1. 打印進程號
    // 2. 根據 numCPUs 的值來建立對應數量的子進程
    console.log("master pid: " + process.pid);

    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
} else {
    // 若是是子進程:
    // 1. 打印進程號
    // 2. 建立 http server 實例
    // 3. 並調用上一步建立的實例的 listen 方法
    console.log("isMaster: " + cluster.isMaster + " pid: " + process.pid);

    const srv = http.createServer(function (req, res) {
        res.writeHead(200);
        res.end("hello world\n" + process.pid);
    });

    srv.listen(8000);
}
複製代碼

接下來須要使用在 預備工做 中編譯好的 node 來運行上述代碼:

./out/Debug/node test-cluster.js
複製代碼

會獲得以下的輸出:

master pid: 30094
isMaster: false pid: 30095
isMaster: false pid: 30096
複製代碼

經過對比輸出內容,代碼執行的過程相似:

  1. master 開始運行,即條件分支中的 cluster.isMaster 分支被執行,建立了 2 個子進程
  2. 在 2 個子進程中 else 分支被執行

這裏有幾個值得思考的問題:

  1. cluster 模塊是如何區別 master 和 work 進程的;換言之,work 進程是如何被建立的
  2. 多個 work 進程中,都執行了 listen 方法,爲何沒有報錯 ERR_SERVER_ALREADY_LISTEN
  3. 爲何 master 進程在完成了建立進程的任務後沒有退出
  4. 請求是如何傳遞到 work 進程中並被處理的

對於問題3,準備下面兩個文件

./test-fork-exit.js:

const numCPUs = 2;
const {fork} = require('child_process');

console.log("master pid: " + process.pid);

for (let i = 0; i < numCPUs; i++) {
    fork("./test-fork-sub.js")
}
複製代碼

./test-fork-sub.js:

console.log("sub: " + process.pid);
複製代碼

運行 ./out/Debug/node test-fork-exit.js 後,會發現 master 進程在建立了兩個進程後退出了。

接下來將對上述幾個問題進行分析。

問題1. work 進程的建立

在運行了 ./out/Debug/node test-cluster.js 命令以後,master 進程即被啓動,在該進程中,運行 test-cluster.js 中的代碼。

首先執行的就是 const cluster = require('cluster');。打開 cluster 模塊的源碼 ./lib/child_process.js:

const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);
複製代碼

能夠看到,代碼會根據 childOrMaster 的值來決定引入的是 internal/cluster/child 仍是 internal/cluster/master 模塊。而 childOrMaster 的值取決於環境變量中是否設定了 NODE_UNIQUE_ID,若是設定了,那麼加載對應的 child 模塊,不然加載對應的 master 模塊。

顯然,默認環境變量中是沒有對 NODE_UNIQUE_ID 標識進行設定的,因而引入的就是 ./internal/cluster/master.js

注意下面的代碼片斷:

cluster.isMaster = true;
複製代碼

因而,在接下來的條件判斷 cluster.isMastertrue,進而會執行子進程的建立,也就是調用 master 模塊中的 fork 方法。

注意 fork 方法中的片斷:

cluster.setupMaster();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
複製代碼

只須要注意自增的 id,接下來看下 createWorkerProcess 的代碼片斷:

workerEnv.NODE_UNIQUE_ID = '' + id;

return fork(cluster.settings.exec, cluster.settings.args, {
  cwd: cluster.settings.cwd,
  env: workerEnv,
  silent: cluster.settings.silent,
  windowsHide: cluster.settings.windowsHide,
  execArgv: execArgv,
  stdio: cluster.settings.stdio,
  gid: cluster.settings.gid,
  uid: cluster.settings.uid
});
複製代碼

因而發現實際上是調用的 child_process 模塊中的 fork 方法,並設置了環境變量 NODE_UNIQUE_ID,上文提到 cluster 模塊被引入的時候,會根據環境變量是否存在 NODE_UNIQUE_ID 標識而決定引入 child 仍是 master

另外,child_process.fork 方法第一個參數爲 modulePath,也就是須要在子進程中執行的 js 文件路徑,對應上述代碼中 cluster.settings.exec 的值,對該變量的設定代碼在 setupMaster 方法中:

var settings = {
  args: process.argv.slice(2),
  exec: process.argv[1],
  execArgv: process.execArgv,
  silent: false
};
複製代碼

process.argv[1] 爲當前進程的入口文件,對於這個例子中的主進程而言,即爲: ./test-cluster.js(實際值爲對應的絕對路徑)

因而 cluster 模塊做用下的 master 進程中的 fork 方法執行的內容能夠簡單部分概括爲:

  1. 設置環境變量 NODE_UNIQUE_ID
  2. 執行 child_process.fork,參數 modulePath 爲主進程入口文件

接下來就是子進程中執行的過程。

子進程進來執行的仍是與主進程相同的文件,之因此執行了 cluster.isMasterfalse 的分支,是由於 ./internal/cluster/child.js 的代碼片斷:

cluster.isMaster = false;
複製代碼

問題2. listen 方法

子進程中都執行了 listen 方法,可是卻沒有報錯,因而嘗試分析 listen 的執行細節。

http 模塊中的 Server 是繼承於 net.Server,見 ./lib/_http_server.js 中:

function Server(options, requestListener) {
 // ...
 net.Server.call(this, { allowHalfOpen: true });
 // ...
}
複製代碼

而 listen 方法存在於 net.Server 上。查看 net.Server.listen 中主要的動做都是對參數的 normalization,而後調用 net.Server::listenInCluster 方法:

if (cluster.isMaster || exclusive) {
  // Will create a new handle
  // _listen2 sets up the listened handle, it is still named like this
  // to avoid breaking code that wraps this method
  server._listen2(address, port, addressType, backlog, fd, flags);
  return;
}

cluster._getServer(server, serverQuery, listenOnMasterHandle);
複製代碼

這裏須要注意的是,listen 方法都是在子進程中執行的,因此 cluster.isMasterfalse,而 exclusive 是未設定的,故也爲 false。因而,子進程中的 listen 實際執行的是 cluster._getServer 方法,而且這裏的 cluster 模塊實際是引入的 ./lib/internal/cluster/child.js,因而查看該文件中的 _getServer 方法片斷:

const message = util._extend({
  act: 'queryServer',
  index,
  data: null
}, options);

send(message, (reply, handle) => {
  if (typeof obj._setServerData === 'function')
    obj._setServerData(reply.data);

  if (handle)
    shared(reply, handle, indexesKey, cb);  // Shared listen socket.
  else
    rr(reply, indexesKey, cb);              // Round-robin.
});
複製代碼

send 方法最終會調用 ./lib/internal/cluster/utils.js 中的 sendHelper 方法,而該方法會向父進程發送 { cmd: 'NODE_CLUSTER' } 消息,根據文檔的 描述NODE_ 起頭的 cmd 爲 內部消息(internalMessage),須要經過 .on('internalMessage', lister) 來監聽它。

因爲這個消息是從子進程發往父進程的、即主進程的,因而在 ./lib/internal/cluster/master.js 中找到了相關的監聽代碼片斷:

worker.process.on('internalMessage', internal(worker, onmessage));
複製代碼

接着經過 onmessage 方法定位到 queryServer 方法中的代碼片斷:

const key = `${message.address}:${message.port}:${message.addressType}:` +
            `${message.fd}:${message.index}`;
var handle = handles.get(key);

if (handle === undefined) {
  let address = message.address;

  // Find shortest path for unix sockets because of the ~100 byte limit
  if (message.port < 0 && typeof address === 'string' &&
      process.platform !== 'win32') {

    address = path.relative(process.cwd(), address);

    if (message.address.length < address.length)
      address = message.address;
  }

  var constructor = RoundRobinHandle;
  // UDP is exempt from round-robin connection balancing for what should
  // be obvious reasons: it's connectionless. There is nothing to send to
  // the workers except raw datagrams and that's pointless.
  if (schedulingPolicy !== SCHED_RR ||
      message.addressType === 'udp4' ||
      message.addressType === 'udp6') {
    constructor = SharedHandle;
  }

  handle = new constructor(key,
                           address,
                           message.port,
                           message.addressType,
                           message.fd,
                           message.flags);
  handles.set(key, handle);
}
複製代碼

當這段代碼首次被運行時,會建立一個 handle,並將其和 key 關聯起來。對於 TCP 鏈路,在沒有特別指定 schedulingPolicy 的狀況下,handle 均爲 RoundRobinHandle 的實例。而查看 ./lib/internal/cluster/round_robin_handle.js 文件中的 RoundRobinHandle 構造函數細節,則發現具體的 listen 綁定動做:

if (fd >= 0)
  this.server.listen({ fd });
else if (port >= 0) {
  this.server.listen({
    port,
    host: address,
    // Currently, net module only supports `ipv6Only` option in `flags`.
    ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
  });
} else
  this.server.listen(address);  // UNIX socket path.
複製代碼

因爲兩個子進程都在前後順序不肯定的狀況下向 master 發送 queryServer 內部消息,因此上面的代碼會被執行兩次。若是兩次的 key 不同,就會致使 handle === undefined 的條件判斷爲 true,進而 listen 兩次,最終發生 ERR_SERVER_ALREADY_LISTEN 錯誤。可是在上面的運行過程當中,並無報錯,說明了兩次的 key 是相同的。

來看下 key 的內容:

const key = `${message.address}:${message.port}:${message.addressType}:` +
            `${message.fd}:${message.index}`;
複製代碼

很顯然,對於分別來自兩個子進程的消息而言,除了 message.index 以外,其他項的內容都是相同的,那麼 message.index 的生成過程在 ./lib/internal/cluster/child.js 中的代碼片斷:

const indexesKey = [address,
                    options.port,
                    options.addressType,
                    options.fd ].join(':');

let index = indexes.get(indexesKey);

if (index === undefined)
  index = 0;
else
  index++;
複製代碼

可見,兩個子進程在分別執行這段代碼的時候,index 首次都將爲 0,從而印證了上面的 key 是相同的假設。

之因此子進程中都會執行 listen 方法,而不報錯的緣由小結以下:

  1. 子進程中並無執行實際的 listen 動做,取而代之的是經過發送消息,請求父進程來執行 listen
  2. 父進程中的 listen 因爲相同的 key 使得屢次動做被合併,最終只 listen 了一次

問題3. 不退出

答案現階段只能先從文檔中尋找答案,詳細見 options.stdio,如下爲節選:

It is worth noting that when an IPC channel is established between the parent and child processes, and the child is a Node.js process, the child is launched with the IPC channel unreferenced (using unref()) until the child registers an event handler for the 'disconnect' event or the 'message' event. This allows the child to exit normally without the process being held open by the open IPC channel.

簡單的說,若是子進程中沒有對事件 disconnectmessage 進行監聽的話,那麼主進程在等待子進程執行完畢以後,會正常的退出。後半句的「主進程會等待...」見 options.detached,如下爲節選:

By default, the parent will wait for the detached child to exit.

爲了印證,能夠先將 ./test-fork-sub.js 代碼改成:

const {execSync} = require('child_process');

execSync("sleep 3");
複製代碼

執行 ./out/Debug/node test-fork-exit.js 會發現,在大約等待了幾秒以後,也就是子進程執行完畢以後,主進程進行了退出。

再次將 ./test-fork-sub.js 代碼改成:

const {execSync} = require('child_process');

execSync("sleep 3");

console.log(`child: ${process.pid} resumed`);
process.on("message", () => {});
複製代碼

能夠發現,因爲子進程監聽了 message 事件,使得主進程和子進程之間的 IPC channel 阻止了主進程的退出。

./lib/internal/cluster/child.js 中的 _setupWorker 方法中的片斷:

process.once('disconnect', () => {
  // ...
});
複製代碼

也印證了這一點。

問題4. 處理請求

回到 ./lib/internal/cluster/round_robin_handle.js 文件,注意構造函數 RoundRobinHandle 中的代碼片斷,注意該代碼是在主進程中調用的:

// ...
this.server = net.createServer(assert.fail);
// ...
this.server.once('listening', () => {
  this.handle = this.server._handle;
  this.handle.onconnection = (err, handle) => this.distribute(err, handle);
  this.server._handle = null;
  this.server = null;
});
複製代碼

net.createServer 的參數若是是函數的話,那麼該函數的做用實際是用來處理 connection 的回調函數。之因此會達到回調的效果,是由於在 ./lib/net.js 中的 Server 構造函數的片斷:

if (typeof options === 'function') {
  connectionListener = options;
  options = {};
  this.on('connection', connectionListener);
} else if (options == null || typeof options === 'object') {
  // ...
} else {
  // ...
}
複製代碼

那麼上面傳遞的是 assert.fail,也就是說,若是該方法成功地被回調了的話,那麼進程應該報錯。 既然沒有報錯,那麼說明在新的 connection 進來以後,沒有觸發 connection 事件。要搞清楚這點,就要看看 net.Server 上的 connection 事件是如何被觸發的。

net.Server 上的 connection 事件是在該文件內的 onconnection 方法中被觸發的:

self.emit('connection', socket);
複製代碼

onconnection 顧名思義也是一個 event listener,它是在相同文件內的 setupListenHandle 中被引用的:

this._handle.onconnection = onconnection;
複製代碼

setupListenHandle 函數是在調用 net.Server 上的 listen 方法被逐步調用到的:

  1. RoundRobinHandle 構造函數內部的 listen
  2. net.Server 上的 listen
  3. net.Server 上的 listenInCluster
  4. net.Server 上的 _listen2
  5. net.Server 內的 setupListenHandle

最終在 setupListenHandle 的代碼片斷中發現:

this._handle.onconnection = onconnection;
複製代碼

回到上面列出的 RoundRobinHandle 中的代碼:

this.server.once('listening', () => {
  // 這裏的 this.server._handle 是對 listen fd 的 wrapper
  this.handle = this.server._handle;
  // 回調中的 handle 是對 connection fd 的 wrapper
  this.handle.onconnection = (err, handle) => this.distribute(err, handle);
  this.server._handle = null;
  this.server = null;
});
複製代碼

能夠看出,在真正的 listen 動做執行成功以後,listening 事件被觸發,進入到上面的代碼中,而後上面的代碼複寫了 handle 對象上的 onconnection 屬性的值,在此以前,該屬性的值即爲 assert.fail。handle 對象此時仍是一個 TCP_Wrapper 對象 (對 CPP 層面對象的包裹的一個對象)。

因爲 master 環境下 RoundRobinHandle 構造 net.Server 對象的目的僅僅是但願得到其內部的 listen fd handle 對象,由於 master 只須要將接下來的 connection fd handle 派發給 works 便可,因此上面的回調中在得到了該對象以後,取消了對 net.Server 對象的引用。

跟進上述代碼中的 distribute 方法,發現其會調用 handoff 方法,向 work 發送 connection fd handle。也就是說,當 master 進程接收到新鏈接以後,會將其派發給 work。接下來須要在 ./lib/internal/cluster/child.js 文件中找到事件監聽函數:

function onmessage(message, handle) {
  if (message.act === 'newconn')
    onconnection(message, handle);
  else if (message.act === 'disconnect')
    _disconnect.call(worker, true);
}
複製代碼

進一步發現 onconnection 方法中:

const server = handles.get(key);
// ...
server.onconnection(0, handle);
複製代碼

接來下先搞清楚 server 是何時被添加進 handles 中的,而後再看看 server.onconnection 作了什麼。

回顧下子進程中所作的事情,能夠結合上面的章節。另外,爲了方便結合代碼進行理解,故同時給出具體的代碼位置:

  1. work 進程調用 httpServer 上的 listen 方法,該方法繼承自父類 net.Server code
  2. work 進程調用 net.Server 上的 listen 方法 code
  3. work 進程調用 net.Server::listenInCluster code
  4. work 進程調用 cluster child 模塊上的 _getServer,並指望被回調 code
  5. work 向 master 進程發送 queryServer 消息,並指望被回調 code
  6. master 構造 RoundRobinHandle 實例,並將發來 queryServer 消息的 work 註冊到其中,並指望被回調。在回調中,會向 work 發送消息,觸發第 5 步中 work 指望的回調 code
  7. 第 6 步中的回調參數 handle 都將是 false 值 code
  8. 從而當回調到第 5 步時,work 進程將執行 rr 方法,該方法會僞造一個 handle 對象,加入到 handles 中,並以該對象回調第 4 步 code
  9. work 進程中開始執行第 4 步的回調函數 listenOnMasterHandle,該函數中設定了 server._handle = handle,注意這裏的 handle 即爲上一步產生的 handle;並調用了 listen2 code
  10. listen2 即爲 setupListenHandle,而 setupListenHandle 內部設定了 handle 對象的 onconnection 屬性 code

接下來 work 進程中處理請求的邏輯就都與不使用 clsuter 模塊時的請求處理邏輯一致了,由於是使用的一樣的處理函數,只不過是在 work 進程中執行。

請求的處理邏輯能夠小結爲:

  1. master 進程進行實際的 listen 動做,並等待客戶端鏈接
  2. 客戶端鏈接由 master 進程,經過消息派發給 work 進程
  3. work 進程中複用通常狀況下的請求處理代碼、對請求進行處理

最後看下 RoundRobinHandle 中的派發機制:

function RoundRobinHandle(key, address, port, addressType, fd, flags) {
  this.key = key;
  this.all = new Map();
  this.free = [];
  this.handles = [];

  // ...

  this.server.once('listening', () => {
    // ...
    
    // 1. 當接收到新的客戶端鏈接後,調用 this.distribute 方法
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    
    // ...
  });
}

// master 進程接收到 work 進程的 `queryServer` 消息後,會調用該方法。
// 1. 先將 work 記錄到 this.all 這個 map 中
// 2. 調用 this.handoff 方法,該方法致使兩個結果:
// 2.1 若是此時有 pending handle 的話,那麼即刻使用 work 處理
// 2.2 不然,則將 work 加入到 this.free 這個 map 中
RoundRobinHandle.prototype.add = function(worker, send) {
  //...
  
  this.all.set(worker.id, worker);
  
  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

  // ...
};

// 該方法的做用就是講 connection handle 進行派發
// 1. 先將 handle 加入到 pending 隊列中
// 2. 嘗試使用 this.free 的第一個 work 處理 pending 隊列。若是存在 free work 的話,
// 該 work 還將會被移出 this.free 
RoundRobinHandle.prototype.distribute = function(err, handle) {
  this.handles.push(handle);
  const worker = this.free.shift();

  if (worker)
    this.handoff(worker);
};

// 該方法包含了具體的派發動做
// 1. 從 pending handle 隊列取出第一個項目,若是隊列爲空,則將 work 加入到
// this.free map 中,不然進行派發動做
// 2. 派發是經過將 handle 經由消息發送給 work 進程的,即 sendHelper 部分
RoundRobinHandle.prototype.handoff = function(worker) {
  if (this.all.has(worker.id) === false) {
    return;  // Worker is closing (or has closed) the server.
  }

  const handle = this.handles.shift();

  if (handle === undefined) {
    this.free.push(worker);  // Add to ready queue again.
    return;
  }

  const message = { act: 'newconn', key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    // 該回調由 ./lib/internal/cluster/child.js#L180 觸發
    if (reply.accepted)
      // work 進程表示它能夠處理該 handle,handle 發送到 work 進程中時應該是
      // 使用的副本的形式。因此主進程則能夠關閉屬於其上下文的 handle。handle 內部的
      // fd 被加入到 work 進程的 event loop 中
      handle.close();
    else
      // 如註釋所描述的,從新調用一次 this.distribute,嘗試其餘的 work
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    // 再次調用 this.handoff
    // 若是還有 pending handle,則處理之,不然將 work 從新加入到 this.free 中
    this.handoff(worker);
  });
};
複製代碼

調度的機制能夠簡單理解爲:

  1. 鏈接 conn 到來以後,若是有空閒的 work,則告知其處理 conn
  2. 不然將 conn 加入 pending 隊列
  3. 因爲 work 的激活是由 connection 事件觸發的,因此在 work 處理完 conn 以後,須要主動的再次消化 pending 隊列中的內容,該過程連續進行,直到當發現隊列爲空時,將自身從新標記爲 free,等待下一次的 connection 事件對其進行激活

另外須要注意的是,cluster 模塊中對 works 進程沒有重啓的機制,work 進程若是遇到沒有主動處理的異常就會退出,master 進程不會自動的補齊 works 數量,當全部的 works 都退出後,即不存在任一個 IPC channel 了,master 進程也將退出。

掘金自動抓取的內容不自動更新,因此以後的文章都會手動拷貝到專欄中。

相關文章
相關標籤/搜索