nodejs cluster模塊分析

最近作了點nodejs項目,對nodejs的cluster怎麼利用多進程處理請求產生了疑問,因而着手進行了研究,以後發現這其中竟大有文章!一切仍是先從遙遠的TCP提及吧。。。javascript

TCP與Socket

說到TCP,相信不少人都至關了解了,大學已經教過,可是又相信有不少人也不是很瞭解,要不是當時沒聽,要不也多是自身的編程能力不足以去實踐相關內容,寫到這我還特地去翻了一下大學的計算機網絡教材,內容是很豐富的,但教人實踐的內容仍是太少了,裏面的內容都把學生當成了有至關的Linux編程能力的人了,因此結果就是大部分只上了一年編程課剛學會幾個Hello world程序的大二學生,聽了這門課後一臉懵逼,即便記住了也由於沒什麼實踐很快忘了,當年我就是這麼懵逼過來的。
因此,扯了這些,結果是什麼呢,結果就是咱們要多動手!而要動手創建一條TCP鏈接能夠用socket來實現,不過這裏不是要說socket用法,只是來簡單聊一聊他們之間的一點小聯繫,以便於理解後面的內容。java

應用層經過傳輸層進行TCP通訊時,有時TCP須要爲多個應用程序進程提供併發服務。多個TCP鏈接或多個應用程序進程可能須要經過同一個TCP協議端口傳輸數據。爲了區別不一樣的應用程序進程和鏈接,許多計算機操做系統爲應用程序與TCP協議交互提供了稱爲套接字 (Socket)的接口,區分不一樣應用程序進程間的網絡通訊和鏈接。node

咱們能夠用一個四元組來肯定一條TCP鏈接(源ip,源端口,目標ip,目標端口),而鏈接是經過socket來創建的(服務端進行bind和listen->客戶端發起connect->服務端accept),計算機系統就是經過socket來區分不一樣的TCP鏈接的。因此咱們能夠看出來,只要目標ip/端口不一樣,服務端能夠用同一個端口生成多個socket,創建多條鏈接。
可是,一個進程只能監聽一個端口,一個端口怎麼生成多個socket呢?其實服務器端程序通常會把socket和服務器某個端口(ip+端口)bind起來, 這樣構成了一個特殊的socket, 這個socket沒有目標ip和端口。socket進行listen以後當有新的鏈接進來時, 系統將請求存進隊列(此時TCP握三次手完成), 後續能夠再調用accept拿到隊列的請求,返回一個新的socket, 這個socket是由四元組創建的, 也就對應了一個惟一的鏈接。程序員

說完這些,能夠來聊一聊nodejs是怎樣創建一個TCP服務的了。編程

nodejs createServer啓動TCP服務小解析

通常咱們用nodejs啓動一個TCP服務多是這樣的:服務器

require('net').createServer(function(sock) {
    sock.on('data', function(data) {
        sock.write('Hello world');
    });
}).listen(8080, '127.0.0.1');

進到createServer一看(代碼都在net模塊中),裏面return了一個Server對象,Server繼承EventEmitter,將createServer的參數作爲connection事件的回調函數,這塊比較簡單就不貼代碼了。咱們須要關注的是Serverlisten方法,其不一樣的參數最終都會調用到listenInCluster方法。cluster!是的這和cluster有關,但先無論它,咱們先管在主進程中它的執行:網絡

function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive) {
  // ...

  if (cluster.isMaster || exclusive) {
    // ...
    server._listen2(address, port, addressType, backlog, fd);
    return;
  }

  // ...

}

從代碼咱們能夠看到listenInCluster最終是調用了_listen2方法,它就是服務啓動的關鍵,其定義以下:併發

function setupListenHandle(address, port, addressType, backlog, fd) {
  
    // ...

    var rval = null;

    // ...

    if (rval === null)
        rval = createServerHandle(address, port, addressType, fd);

    // ...
    this._handle = rval;

  // ...
  this._handle.onconnection = onconnection;
  this._handle.owner = this;

  var err = this._handle.listen(backlog || 511);

  // ...
}

其中createServerHandle方法就不展開了,它就如以前所說的:把socket和服務器某個端口(ip+端口)bind起來, 這樣構成了一個特殊的socket, 這個socket沒有目標ip和端口。它綁定了address+port並返回了一個特殊socket(句柄)rval,能夠看到最後它調用了listen對端口進行監聽,而且指定了一個回調函數onconnection,函數會在C++層當accept請求時觸發,其回調參數之一就是前面提到的accept後與客戶端鏈接的新socket句柄。到這裏再看一下onconnection的代碼:負載均衡

function onconnection(err, clientHandle) {
  // ...
  var self = handle.owner;
  var socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect
  });
  socket.readable = socket.writable = true;


  // ...
  self.emit('connection', socket);
}

能夠看到nodejs在對socket句柄進一步封裝後(封裝成nodejs的Socket對象),再觸發server(由createServer建立)的connection事件。這時咱們再回到前面createServer的介紹,其監聽了connection事件,因此最終流程走下來createServer的的方法參數將被觸發,而且能夠拿到一個nodejs的Socket對象進行write與read操做,與客戶端進行通訊。socket

至此咱們已經對nodejs啓動一個TCP服務的流程有了瞭解,接下來就到主題cluster了。

cluster爲咱們作了什麼

開始說代碼以前,先來聊一聊喂鴿子吧。假設你坐在布拉格廣場前靜靜地坐着,而後往前面撒了一把狗糧,喔不對是鴿糧,而後周圍的一羣鴿子都震驚了並往你這邊飛搶東西吃。這個現象能夠用一個詞來形容就是「驚羣「。然而這只是個人瞎掰,咱們程序員理解的驚羣應該是:多個進程/線程同時阻塞等待某個事件,當事件發生時喚醒了全部等待的進程/線程,但最終只有一個能對事件進行處理。很明顯這對cpu形成了浪費,而cluster的多進程模型對此作了處理:只用一個master進程等待請求,而後有請求到來時使用round-robin輪詢分配請求給各個子進程進行處理,這塊後面提到的源碼會涉及到,這裏就不深刻了。除了round-robin,還有其餘的一些cluster爲咱們作的,就用代碼來talk吧:

const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {

  const numCPUs = require('os').cpus().length;
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);
}

以上代碼就是cluster的典型用法,在nodejs啓動文件判斷當前進程,若是當前進程是master進程,那麼就根據cpu的核數fork出相同數量的進程,不然(worker進程)就啓動一個http服務,因此通常這樣會給一個核心分配一個worker進程來啓動一個服務,搭起一個小服務集羣。可是問題來了,爲何這裏能夠有多個進程同時監聽一個端口呢,是由於listen作的一些文章,下面再一步步深刻解析。因爲http.Server實際上是繼承了net.Server,因此跟前面建立TCP服務同樣,listen最終也是調用到listenInCluster,咱們從這裏從新開始。

function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive) {
  // ...

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  };

  // Get the master's server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  // ...
}

listenInCluster在worker進程中調用cluster._getServer,而且傳入了一個函數listenOnMasterHandle。這裏還不知道它作了什麼,因此再進入cluster._getServer看看(因爲當前是在worker進程,cluster模塊文件是lib/internal/cluster/child.js):

cluster._getServer = function(obj, options, cb) {
  // ...

  const message = util._extend({
    act: 'queryServer',
    index: indexes[indexesKey],
    data: null
  }, options);

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });
  // ...
};

關注send方法,它調用了sendHelper方法,該方法是在internal/cluster/utils定義的,至關一個消息轉發器處理進程間通訊,它發送一個「進程內部消息「(internalMessage),而worker進程在master進程被fork出來的時候監聽了internalMessage:

// lib/internal/cluster/master.js
worker.process.on('internalMessage', internal(worker, onmessage));

因此最終在worker進程發送的消息,觸發了master進程執行了onmessage方法,onmessage判斷message.act === 'queryServer'執行queryServer,而就是在這個方法中,新建了一個RoundRobinHandle調度器,就是這個東西分配請求作了負載均衡。這裏用地址和端口號做爲key將調度器存儲起來,調度器不會被worker建立兩次,最後將worker進程add到隊列。相關代碼以下:

// lib/internal/cluster/master.js
function queryServer(worker, message) {
  // ...
  var handle = handles[key];

  if (handle === undefined) {
    var constructor = RoundRobinHandle;
    // ...

    handles[key] = handle = new constructor(key,
                                            message.address,
                                            message.port,
                                            message.addressType,
                                            message.fd,
                                            message.flags);
  }
  // ...
  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    // ...
  });
}

而後咱們再來看看RoundRobinHandle,它裏面調用net.createServer方法新建了一個server,而且開始監聽,這塊能夠看前面內容。不過與前面不一樣的是,server在listening事件完成時拿到監聽端口的那個特殊socket句柄,重置了onconnection方法,當新的鏈接創建時方法被調用,將accept鏈接的socket句柄分發到隊列裏的worker進行處理(distribute)。對於listening事件,它在Server.listen執行後就會觸發,代碼就在setupListenHandle方法裏面。RoundRobinHandle代碼以下:

// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, port, addressType, fd) {
  // ...
  this.server = net.createServer(assert.fail);

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0)
    this.server.listen(port, address);
  else
    this.server.listen(address);  // UNIX socket path.

  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    // ...
  });
}
RoundRobinHandle.prototype.distribute = function(err, handle) {
  this.handles.push(handle);
  const worker = this.free.shift();

  if (worker)
    this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
  // ...
  const message = { act: 'newconn', key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    // ...
  });
};

從代碼上看到最終調度器調用handoff方法,經過sendHelper向worker進程發送一個新鏈接到達的消息newconn,執行worker進程的server的onconnection方法,worker進程相關代碼以下:

// lib/internal/cluster/child.js
cluster._setupWorker = function() {
  // ...
  process.on('internalMessage', internal(worker, onmessage));
  send({ act: 'online' });

  function onmessage(message, handle) {
    if (message.act === 'newconn')
      onconnection(message, handle);
    else if (message.act === 'disconnect')
      _disconnect.call(worker, true);
  }
};
// Round-robin connection.
function onconnection(message, handle) {
  const key = message.key;
  const server = handles[key];
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  if (accepted)
    server.onconnection(0, handle);
}

走到這裏worker進程的server就拿到了鏈接的socket句柄能夠進行處理,可是好像有點問題,worker進程的server好像還沒起起來啊,前面講的只是在master進程的調度器啓動了一個server,worker進程並無server。咱們又得翻回前面的內容看一看了,看看以前提到的workder進程的cluster._getServer,裏面send方法發送了一個函數,函數裏面的rr(reply, indexesKey, cb);就是建立了workder進程server的代碼。

先來看看cluster._getServer中發送的函數怎麼被調用的。這裏須要來了解一下以前出現了幾回的sendHelper,它是cluster模塊用來作進程間通訊的,另外還有一個internal方法用來處理通訊的回調。cluster._getServersend會調用sendHelper,它會用message.seq當key把send的函數存儲起來。而後在internal方法處理通訊的回調時判斷message是否有這個key,是否能找到這個函數,能夠的話就執行。而在master進程執行queryServer把worker進程添加到調度器中時添加了一個回調函數,回調函數send了一個帶seq的消息,而且handle爲null,就是這個消息觸發了cluster._getServer發送的函數的執行。相關代碼以下:

// `internal/cluster/utils.js`
const callbacks = {};
var seq = 0;
function sendHelper(proc, message, handle, cb) {
  // ...
  if (typeof cb === 'function')
    callbacks[seq] = cb;

  message.seq = seq;
  // ...
}
function internal(worker, cb) {
  return function onInternalMessage(message, handle) {
    // ...
    var fn = cb;

    if (message.ack !== undefined && callbacks[message.ack] !== undefined) {
      fn = callbacks[message.ack];
      delete callbacks[message.ack];
    }
    // ...
  };
}
// lib/internal/cluster/master.js
function queryServer(worker, message) {
  // ...
  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    reply = util._extend({
      // ...
      ack: message.seq,
      // ...
    }, reply);
    // ...
    send(worker, reply, handle);
  });

最終,rr(reply, indexesKey, cb);執行,它構造了一個假的socket句柄,句柄設置了一個不作操做的listen方法。而後執行cb,這個cb也就是前面提到過的listenOnMasterHandle,它會把假socket句柄賦值給worker進程的server._handle,隨後因爲server._handle的存在,server._listen2(address, port, addressType, backlog, fd);也不會作任何操做,也就是說worker進程建立的server是不會對端口進行監聽的。相關代碼以下:

// lib/internal/cluster/child.js
function rr(message, indexesKey, cb) {
  function listen(backlog) {
    // ...
    return 0;
  }
  // ...
  cb(0, handle);
}
// lib/net.js
function listenOnMasterHandle(err, handle) {
  // ...
  server._handle = handle;
  server._listen2(address, port, addressType, backlog, fd);
}
// setupListenHandle就是_listen2
function setupListenHandle(address, port, addressType, backlog, fd) {
  // ...
  if (this._handle) {
    debug('setupListenHandle: have a handle already');
  }
  // ...

至此,cluster模塊如何創建多進程服務的就算講完了。畫個草圖總結下吧:
圖片描述

相關文章
相關標籤/搜索