原創精讀:帶你從零看清Node源碼createServer和負載均衡整個過程

寫在開頭:前端

做爲一名曾經重度使用Node.js做爲即時通信客戶端接入層的開發人員,沒法避免調試V8,配合開發addon。因而對Node.js源碼產生了很大的興趣~ git

順便吐槽一句,Node的內存控制,因爲是自動回收,我以前作的產品是20萬人超級羣的IM產品,像一秒鐘1000條消息,持續時間長了內存和CPU佔用仍是會有一些問題算法

以前寫過cluster模塊源碼分析、PM2原理等,感受興趣的能夠去公衆號翻一翻redux

Node.js的js部分源碼基本看得差很少了,今天寫一個createServer過程給你們,對於不怎麼熟悉Node.js源碼的朋友來講,多是一個不錯的開始,源碼在gitHub上有,直接克隆便可,最近一直比較忙,公司和業餘的工做也是,因此原創比較少。服務器


原生Node.js建立一個基本的服務:負載均衡

var http = require('http');
http.createServer(function (request, response) {
// 發送 HTTP 頭部 
// HTTP 狀態值: 200 : OK
// 內容類型: text/plain
response.writeHead(200, {'Content-Type': 'text/plain'});
// 發送響應數據 "Hello World"
response.end('Hello World\n');
}).listen(8888);
// 終端打印以下信息
console.log('Server running at http://127.0.0.1:8888/');

咱們目前只分析Node.js源碼的js部分的dom

首先找到Node.js源碼的lib文件夾 socket

而後找到http.js文件async

發現createServer真正返回的是new Server,而Server來自_http_server ide

因而找到同目錄下的_http_server.js文件,發現整個文件有800行的樣子,全局搜索Server找到函數

function Server(options, requestListener) {
if (!(this instanceof Server)) return new Server(options, requestListener);
if (typeof options === 'function') {
    requestListener = options;
    options = {};
  } else if (options == null || typeof options === 'object') {
    options = { ...options };
  } else {
throw new ERR_INVALID_ARG_TYPE('options', 'object', options);
  }
this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;
this[kServerResponse] = options.ServerResponse || ServerResponse;
  net.Server.call(this, { allowHalfOpen: true });
if (requestListener) {
this.on('request', requestListener);
  }

createServer函數解析:

  • 參數控制有點像redux源碼裏的initState和reducer,根據傳入類型不一樣,作響應的處理

    this.on('request', requestListener);}
  •  每次有請求,就會調用requestListener這個回調函數
  • 至於IncomingMessage和ServerResponse,請求是流,響應也是流,請求是可讀流,響應是可寫流,當時寫那個靜態資源服務器時候有提到過
  • 那麼怎麼能夠鏈式調用?有人可能會有疑惑。Node.js源碼遵循commonJs規範,大都掛載在prototype上,因此函數開頭有,就是確保能夠鏈式調用
if (!(this instanceof Server)) 
    return new Server(options, requestListener);

上面已經將onrequest事件觸發回調函數講清楚了,那麼鏈式調用listen方法,監聽端口是怎麼回事呢?

傳統的鏈式調用,像JQ源碼是return this, 手動實現A+規範的Promise則是返回一個全新的Promise,而後Promise原型上有then方法,因而能夠鏈式調用


怎麼實現.listen鏈式調用,重點在這行代碼:

net.Server.call(this, { allowHalfOpen: true });

 allowHalfOpen實驗結論: 這裏TCP的知識再也不作過分的講解

(1)allowHalfOpen爲true,一端發送FIN報文:
進程結束了,那麼確定會發送FIN報文;
進程未結束,不會發送FIN報文
(2)allowHalfOpen爲false,一端發送FIN報文:
進程結束了,確定發送FIN報文;
進程未結束,也會發送FIN報文;

因而找到net.js文件模塊中的Server函數

function Server(options, connectionListener) {
  if (!(this instanceof Server))
    return new Server(options, connectionListener);

  EventEmitter.call(this);

  if (typeof options === 'function') {
    connectionListener = options;
    options = {};
    this.on('connection', connectionListener);
  } else if (options == null || typeof options === 'object') {
    options = { ...options };

    if (typeof connectionListener === 'function') {
      this.on('connection', connectionListener);
    }
  } else {
    throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
  }

  this._connections = 0;

  Object.defineProperty(this, 'connections', {
    get: deprecate(() => {

      if (this._usingWorkers) {
        return null;
      }
      return this._connections;
    }, 'Server.connections property is deprecated. ' +
       'Use Server.getConnections method instead.', 'DEP0020'),
    set: deprecate((val) => (this._connections = val),
                   'Server.connections property is deprecated.',
                   'DEP0020'),
    configurable: true, enumerable: false
  });

  this[async_id_symbol] = -1;
  this._handle = null;
  this._usingWorkers = false;
  this._workers = [];
  this._unref = false;

  this.allowHalfOpen = options.allowHalfOpen || false;
  this.pauseOnConnect = !!options.pauseOnConnect;
}

這裏巧妙的經過.call調用net模塊Server函數,保證了this指向一致

this.\_handle = null 這裏是由於Node.js考慮到多進程問題,因此會hack掉這個屬性,由於.listen方法最終會調用_handle中的方法,多個進程只會啓動一個真正進程監聽端口,而後負責分發給不一樣進程,這個後面會講

Node.js源碼的幾個特點:

  1. 遵循conmonjs規範,不少方法掛載到prototype上了
  2. 不少object.definepropoty數據劫持
  3. this指向的修改,配合第一個進行鏈式調用
  4. 自帶自定義事件模塊,不少內置的函數都繼承或經過Object.setPrototypeOf去封裝了一些自定義事件
  5. 代碼模塊互相依賴比較多,一個.listen過程就很麻煩,初學代碼者很容易睡着
  6. 源碼學習,本就枯燥。沒什麼好說的了
    • *

我在net.js文件模塊中發現了一個原型上.listen的方法:

Server.prototype.listen = function(...args) {
  const normalized = normalizeArgs(args);
  var options = normalized[0];
  const cb = normalized[1];

  if (this._handle) {
    throw new ERR_SERVER_ALREADY_LISTEN();
  }

  if (cb !== null) {
    this.once('listening', cb);
  }
  const backlogFromArgs =
    // (handle, backlog) or (path, backlog) or (port, backlog)
    toNumber(args.length > 1 && args[1]) ||
    toNumber(args.length > 2 && args[2]);  // (port, host, backlog)

  options = options._handle || options.handle || options;
  const flags = getFlags(options.ipv6Only);
  // (handle[, backlog][, cb]) where handle is an object with a handle
  if (options instanceof TCP) {
    this._handle = options;
    this[async_id_symbol] = this._handle.getAsyncId();
    listenInCluster(this, null, -1, -1, backlogFromArgs);
    return this;
  }
  // (handle[, backlog][, cb]) where handle is an object with a fd
  if (typeof options.fd === 'number' && options.fd >= 0) {
    listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
    return this;
  }

  // ([port][, host][, backlog][, cb]) where port is omitted,
  // that is, listen(), listen(null), listen(cb), or listen(null, cb)
  // or (options[, cb]) where options.port is explicitly set as undefined or
  // null, bind to an arbitrary unused port
  if (args.length === 0 || typeof args[0] === 'function' ||
      (typeof options.port === 'undefined' && 'port' in options) ||
      options.port === null) {
    options.port = 0;
  }
  // ([port][, host][, backlog][, cb]) where port is specified
  // or (options[, cb]) where options.port is specified
  // or if options.port is normalized as 0 before
  var backlog;
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    if (!isLegalPort(options.port)) {
      throw new ERR_SOCKET_BAD_PORT(options.port);
    }
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }

  // (path[, backlog][, cb]) or (options[, cb])
  // where path or options.path is a UNIX domain socket or Windows pipe
  if (options.path && isPipeName(options.path)) {
    var pipeName = this._pipeName = options.path;
    backlog = options.backlog || backlogFromArgs;
    listenInCluster(this, pipeName, -1, -1,
                    backlog, undefined, options.exclusive);

    if (!this._handle) {
      // Failed and an error shall be emitted in the next tick.
      // Therefore, we directly return.
      return this;
    }

    let mode = 0;
    if (options.readableAll === true)
      mode |= PipeConstants.UV_READABLE;
    if (options.writableAll === true)
      mode |= PipeConstants.UV_WRITABLE;
    if (mode !== 0) {
      const err = this._handle.fchmod(mode);
      if (err) {
        this._handle.close();
        this._handle = null;
        throw errnoException(err, 'uv_pipe_chmod');
      }
    }
    return this;
  }

  if (!(('port' in options) || ('path' in options))) {
    throw new ERR_INVALID_ARG_VALUE('options', options,
                                    'must have the property "port" or "path"');
  }

  throw new ERR_INVALID_OPT_VALUE('options', inspect(options));
};

這個就是咱們要找的listen方法,但是裏面不少ipv4和ipv6的處理,最重要的方法是listenInCluster


這個函數須要好好看一下,只有幾十行

function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags) {
  exclusive = !!exclusive;
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // Get the master's server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      var ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    // Reuse master's server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

若是是主進程,那麼就直接調用_.listen2方法了

Server.prototype._listen2 = setupListenHandle;

找到setupListenHandle函數

function setupListenHandle(address, port, addressType, backlog, fd, flags) {

裏面的createServerHandle是重點

function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug('setupListenHandle', address, port, addressType, backlog, fd);

  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don't need to do this.
  if (this._handle) {
    debug('setupListenHandle: have a handle already');
  } else {
    debug('setupListenHandle: create a handle');

    var rval = null;

    // Try to bind to the unspecified IPv6 address, see if IPv6 is available
    if (!address && typeof fd !== 'number') {
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);

      if (typeof rval === 'number') {
        rval = null;
        address = DEFAULT_IPV4_ADDR;
        addressType = 4;
      } else {
        address = DEFAULT_IPV6_ADDR;
        addressType = 6;
      }
    }

    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags);

    if (typeof rval === 'number') {
      var error = uvExceptionWithHostPort(rval, 'listen', address, port);
      process.nextTick(emitErrorNT, this, error);
      return;
    }
    this._handle = rval;
  }

  this[async_id_symbol] = getNewAsyncId(this._handle);
  this._handle.onconnection = onconnection;
  this._handle[owner_symbol] = this;

  // Use a backlog of 512 entries. We pass 511 to the listen() call because
  // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
  // which will thus give us a backlog of 512 entries.
  const err = this._handle.listen(backlog || 511);

  if (err) {
    var ex = uvExceptionWithHostPort(err, 'listen', address, port);
    this._handle.close();
    this._handle = null;
    defaultTriggerAsyncIdScope(this[async_id_symbol],
                               process.nextTick,
                               emitErrorNT,
                               this,
                               ex);
    return;
  }

  // Generate connection key, this should be unique to the connection
  this._connectionKey = addressType + ':' + address + ':' + port;

  // Unref the handle if the server was unref'ed prior to listening
  if (this._unref)
    this.unref();

  defaultTriggerAsyncIdScope(this[async_id_symbol],
                             process.nextTick,
                             emitListeningNT,
                             this);
}

已經能夠看到TCP了,離真正的綁定監聽端口,更近了一步

最終經過下面的方法綁定監聽端口

handle.bind6(address, port, flags);
 或者
 handle.bind(address, port);

首選ipv6綁定,是由於ipv6能夠接受到ipv4的套接字,而ipv4不能夠接受ipv6的套接字,固然也有方法能夠接收,就是麻煩了一點


上面的內容,請你認真看,由於下面會更復雜,設計到Node.js的多進程負載均衡原理

若是不是主進程,就調用cluster._getServer,找到cluster源碼

'use strict';

const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);

找到_getServer函數源碼

// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
  if (options.port < 0 && typeof address === 'string' &&
      process.platform !== 'win32')
    address = path.resolve(address);

  const indexesKey = [address,
                      options.port,
                      options.addressType,
                      options.fd ].join(':');

  let index = indexes.get(indexesKey);

  if (index === undefined)
    index = 0;
  else
    index++;

  indexes.set(indexesKey, index);

  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };

  message.address = address;

  // Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

  obj.once('listening', () => {
    cluster.worker.state = 'listening';
    const address = obj.address();
    message.act = 'listening';
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

咱們以前傳入了三個參數給它,分別是

server,serverQuery,listenOnMasterHandle


這裏是比較複雜的,曾經我也在這裏迷茫過一段時間,可是想着仍是看下去吧。堅持下,你們若是看到這裏看不下去了,先休息下,保存着。後面等心情平復了再靜下來接下去看


首先咱們傳入了Server、serverQuery和cb(回調函數listenOnMasterHandle),整個cluster模塊的_getServer中最重要的就是:

if (obj._getServerData)
    message.data = obj._getServerData();

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

首先咱們會先獲取server上的data數據,而後調用send函數

function send(message, cb) {
  return sendHelper(process, message, null, cb);
}

send函數調用的是cluster模塊的utills文件內的函數,傳入了一個默認值process

function sendHelper(proc, message, handle, cb) {
  if (!proc.connected)
    return false;

  // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
  message = { cmd: 'NODE_CLUSTER', ...message, seq };

  if (typeof cb === 'function')
    callbacks.set(seq, cb);

  seq += 1;
  return proc.send(message, handle);
}

這裏要看清楚,咱們調用sendHelper傳入的第三個參數是null  !!!

那麼主進程返回也是null

send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

因此咱們會進入rr函數調用的這個判斷,這裏調用rr傳入的cb就是在net.js模塊定義的listenOnMasterHandle函數


Node.js的負載均衡算法是輪詢,官方給出的解釋是簡單粗暴效率高

上面的sendHelper函數就是作到了這點,每次+1

if (typeof cb === 'function')
    callbacks.set(seq, cb);

  seq += 1;
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  var key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the master that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the master process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the master.
    if (key === undefined)
      return;

    send({ act: 'close', key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      Object.assign(out, message.sockname);

    return 0;
  }

  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it's backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  handles.set(key, handle);
  cb(0, handle);
}

此時的handle已經被重寫,listen方法調用會返回0,不會再佔用端口了。因此這樣Node.js多個進程也只是一個進程監聽端口而已

此時的cb仍是net.js模塊的setupListenHandle即 - _listen2方法。

官方的註釋:

Faux handle. Mimics a TCPWrap with just enough fidelity to get away

花了一夜整理,以前還有一些像cluster模塊源碼、pm2負載均衡原理等,有興趣的能夠翻一翻。以爲寫得不錯的能夠點個在看,謝謝。時間匆忙,若是有寫得不對的地方能夠指出。

能夠關注下公衆號:前端巔峯 把文章推薦給須要的人

相關文章
相關標籤/搜索