從源碼分析Node的Cluster模塊

前段時間,公司的老哥遇到一個問題,大概就是本機有個node的http服務器,可是每次請求這個服務器的端口返回的數據都報錯,一看返回的數據根本不是http的報文格式,而後通過一番排查發現是另一個服務器同時監聽了http服務器的這個端口。這個時候老哥就很奇怪,爲啥我這個端口明明使用了,卻仍是能夠啓動呢?這個時候我根據之前看libuv源碼的經驗解釋了這個問題,由於uv__tcp_bind中,對socket會設置SO_REUSEADDR選項,使得端口能夠複用,可是tcp中地址不能複用,由於那兩個監聽雖然是同一個端口,可是地址不一樣,因此能夠同時存在。這個問題讓我不由想到了以前看一篇文章裏有人留言說這個選項是cluster內部複用端口的緣由,當時沒有細細研究覺得說的是SO_REUSEPORT也就沒有細想,可是此次由於這個問題仔細看了下結果是設置的SO_REUSEADDR選項,這個選項雖然能複用端口,可是前提是每一個ip地址不一樣,好比能夠同時監聽'0.0.0.0'和'192.168.0.12'的端口,但不能兩個都是'0.0.0.0'的同一個 端口,若是cluster是用這個來實現的,那要是多起幾個子進程很明顯ip地址不夠用啊,因而就用node文檔中的例子試了下:node

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}
複製代碼

在使用cluster的在幾個子進程同時監聽了8000端口後,查看了一下只有主進程監聽了這個端口,其餘都沒有。這個時候,我猜想node仍是使用在父進程中建立sever的io可是這個父進程應該就是經過Unix域套接字的cmsg_data將父進程中收到客戶端套接字描述符傳遞給子進程而後讓子進程來處理具體的數據與邏輯,可是node究竟是如何經過在子進程中createServer而且listen可是隻在父進程中真的監聽了該端口來實現這個邏輯的呢?這個問題引發了個人好奇,讓我不得不到源碼中一探究竟。編程

從net模塊出發

按理說,這個問題咱們應該直接經過cluster模塊來分析,可是很明顯,在加載http模塊的時候並不會像cluster模塊啓動時同樣經過去判斷NODE_ENV來加載不一樣的模塊,可是從上面的分析,我能夠得出子進程中的createServer執行了跟父進程不一樣的操做,因此只能說明http模塊中經過isMaster這樣的判斷來進行了不一樣的操做,不過http.js_http_server.js中都沒有這個判斷,可是經過對createServer向上的查找我在net.jslistenInCluster中找到了isMaster的判斷,listenInCluster會在createServer後的server.listen(8000)中調用,因此咱們能夠看下他的關鍵邏輯。服務器

if (cluster === null) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    server._listen2(address, port, addressType, backlog, fd);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  };

  // 獲取父進程的server句柄,並監聽它
  cluster._getServer(server, serverQuery, listenOnMasterHandle);
複製代碼

從這段代碼中咱們能夠看出,若是是在父進程中,直接經過_listen2的邏輯就能開始正常的監聽了,可是在子進程中,會經過cluster._getServer的方式獲取父進程的句柄,並經過回調函數listenOnMasterHandle監聽它。看到這裏我其實比較疑惑,由於在我對於網絡編程的學習中,只據說過傳遞描述符的,這個傳遞server的句柄實在是太新鮮了,因而趕忙繼續深刻研究了起來。網絡

深刻cluster的代碼

首先,來看一下_gerServer的方法的代碼。socket

const message = util._extend({
    act: 'queryServer',
    index: indexes[indexesKey],
    data: null
  }, options);
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });
複製代碼

這個方法經過send像主進程發送一個包,由於在send函數中有這樣一句代碼:tcp

message = util._extend({ cmd: 'NODE_CLUSTER' }, message);
複製代碼

經過Node的文檔,咱們能夠知道這種cmd帶了Node字符串的包,父進程會經過internalMessage事件來響應,因此咱們能夠從internal/cluster/master.js中看到找到,對應於act: 'queryServer'的處理函數queryServer的代碼。函數

...
  var constructor = RoundRobinHandle;
  ...
  handle = new constructor(key, message.address,message.port,message.addressType,message.fd,message.flags);
  ...
  handle.add(worker, (errno, reply, handle) => {
    reply = util._extend({
      errno: errno,
      key: key,
      ack: message.seq,
      data: handles[key].data
    }, reply);

    if (errno)
      delete handles[key];  // Gives other workers a chance to retry.

    send(worker, reply, handle);
  });
複製代碼

這裏建立了一個RoundRobinHandle實例,在該實例的構造函數中經過代碼:oop

this.server = net.createServer(assert.fail);

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0)
    this.server.listen(port, address);
  else
    this.server.listen(address);  // UNIX socket path.

  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
複製代碼

在父進程中生成了一個server,而且經過註冊listen的方法將有心的客戶端鏈接到達時執行的onconnection改爲了使用自身的this.distribute函數,這個函數咱們先記下由於他是後來父進程給子進程派發任務的重要函數。說回getServer的代碼,這裏經過RoundRobinHandle實例的add方法:學習

const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

  // Still busy binding.
  this.server.once('listening', done);
複製代碼

會給子進程的getServer以回覆。從這裏咱們能夠看到在給子進程的回覆中handle一直都是null。那這個所謂的去取得父進程的server是怎麼取得的呢?這個地方讓我困惑了一下,不事後來看子進程的代碼我就明白了,實際上根本不存在什麼取得父進程server的句柄,這個地方的註釋迷惑了閱讀者,從以前子進程的回調中咱們能夠看到,返回的handle只是決定子進程是用shared方式仍是Round-robin的方式來處理父進程派下來的任務。從這個回調函數咱們就能夠看出,子進程是沒有任何獲取句柄的操做的,那它是如何處理的呢?咱們經過該例子中的rr方法能夠看到:ui

const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  handles[key] = handle;
  cb(0, handle);
複製代碼

這個函數中生成了一個自帶listen和close方法的對象,並傳遞給了函數listenOnMasterHandle,雖然這個名字寫的是在父進程的server句柄上監聽,實際上咱們這個例子中是子進程自建了一個handle,可是若是是udp的狀況下這個函數名字還確實就是這麼回事,緣由在於SO_REUSEADDR選項,裏面有這樣一個解釋:

SO_REUSEADDR容許徹底相同的地址和端口的重複綁定。但這隻用於UDP的多播,不用於TCP。
複製代碼

因此,在udp狀況同一個地址和端口是能夠重複監聽的(以前網上看到那個哥們兒說的也沒問題,只是一葉障目了),因此能夠共享父進程的handle,跟TCP的狀況不一樣。咱們繼續來看當前這個TCP的狀況,在這個狀況下listenOnMasterHandle會將咱們在子進程中本身生成的handle對象傳入子進程中經過createServer建立的server的_handle屬性中並經過

server._listen2(address, port, addressType, backlog, fd);
複製代碼

作了一個假的監聽操做,實際上由於_handle的存在這裏只會爲以前_handle賦值一個onconnection函數,這個函數的觸發則跟父進程中經過真實的客戶端鏈接觸發的時機不一樣,而是經過

process.on('internalMessage', (message, handle) {
  if (message.act === 'newconn')
    onconnection(message, handle);
  else if (message.act === 'disconnect')
    _disconnect.call(worker, true);
}
複製代碼

中註冊的internalMessage事件中的對父進程傳入的act爲newconn的包觸發。而父進程中就經過咱們剛剛說到的改寫了server對象的onconnection函數的distribute函數,這個函數中會調用一個叫handoff的函數,經過代碼:

const message = { act: 'newconn', key: this.key };
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
複製代碼

其中send到子進程的handle就是新鏈接客戶端的句柄,Node中父子進程之間的通訊最後是經過src/stream_base.cc中的StreamBase::WriteString函數實現的,從這段代碼咱們能夠看出:

...
//當進程間通訊時
uv_handle_t* send_handle = nullptr;

if (!send_handle_obj.IsEmpty()) {
  HandleWrap* wrap;
  ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
  send_handle = wrap->GetHandle();
  // Reference LibuvStreamWrap instance to prevent it from being garbage
  // collected before `AfterWrite` is called.
  CHECK_EQ(false, req_wrap->persistent().IsEmpty());
  req_wrap_obj->Set(env->handle_string(), send_handle_obj);
}

err = DoWrite(
    req_wrap,
    &buf,
    1,
    reinterpret_cast<uv_stream_t*>(send_handle));
複製代碼

能夠看到,在調用此方式時,若是傳入了一個客戶端的句柄則經過Dowrite方法最後經過輔助數據cmsg_data將客戶端句柄的套接字fd傳送到子進程中進行處理。看到這裏我不由恍然大悟,原來仍是走的是我熟悉的那套網絡編程的邏輯啊。

總結

經過上面的一輪分析,咱們能夠總結出如下兩個結論:

  1. 建立TCP服務器時,會在父進程中建立一個server並監聽目標端口,新鏈接到達後,會經過ipc的方式將新鏈接的句柄分配到子進程中而後處理新鏈接的數據和請求,因此只有主進程會監聽目標ip和端口。
  2. 建立UDP服務器,會共享在父進程中建立的server的句柄對象,而且在子進程中都會監聽到跟對象相同的ip地址和端口上,因此建立n個子進程則會有n+1個進程同時監聽到目標ip和端口上。
相關文章
相關標籤/搜索