寫在開頭:前端
做爲一名曾經重度使用Node.js做爲即時通信客戶端接入層的開發人員,沒法避免調試V8,配合開發addon。因而對Node.js源碼產生了很大的興趣~ git
順便吐槽一句,Node的內存控制,因爲是自動回收,我以前作的產品是20萬人超級羣的IM產品,像一秒鐘1000條消息,持續時間長了內存和CPU佔用仍是會有一些問題算法
以前寫過cluster模塊源碼分析、PM2原理等,感受興趣的能夠去公衆號翻一翻redux
Node.js的js部分源碼基本看得差很少了,今天寫一個createServer過程給你們,對於不怎麼熟悉Node.js源碼的朋友來講,多是一個不錯的開始,源碼在gitHub上有,直接克隆便可,最近一直比較忙,公司和業餘的工做也是,因此原創比較少。服務器
原生Node.js建立一個基本的服務:負載均衡
var http = require('http'); http.createServer(function (request, response) { // 發送 HTTP 頭部 // HTTP 狀態值: 200 : OK // 內容類型: text/plain response.writeHead(200, {'Content-Type': 'text/plain'}); // 發送響應數據 "Hello World" response.end('Hello World\n'); }).listen(8888); // 終端打印以下信息 console.log('Server running at http://127.0.0.1:8888/');
咱們目前只分析Node.js源碼的js部分的dom
首先找到Node.js源碼的lib文件夾 socket
而後找到http.js文件async
發現createServer真正返回的是new Server,而Server來自_http_server ide
因而找到同目錄下的_http_server.js文件,發現整個文件有800行的樣子,全局搜索Server找到函數
function Server(options, requestListener) { if (!(this instanceof Server)) return new Server(options, requestListener); if (typeof options === 'function') { requestListener = options; options = {}; } else if (options == null || typeof options === 'object') { options = { ...options }; } else { throw new ERR_INVALID_ARG_TYPE('options', 'object', options); } this[kIncomingMessage] = options.IncomingMessage || IncomingMessage; this[kServerResponse] = options.ServerResponse || ServerResponse; net.Server.call(this, { allowHalfOpen: true }); if (requestListener) { this.on('request', requestListener); }
createServer函數解析:
參數控制有點像redux源碼裏的initState和reducer,根據傳入類型不一樣,作響應的處理
this.on('request', requestListener);}
if (!(this instanceof Server)) return new Server(options, requestListener);
上面已經將onrequest事件觸發回調函數講清楚了,那麼鏈式調用listen方法,監聽端口是怎麼回事呢?
傳統的鏈式調用,像JQ源碼是return this, 手動實現A+規範的Promise則是返回一個全新的Promise,而後Promise原型上有then方法,因而能夠鏈式調用
怎麼實現.listen鏈式調用,重點在這行代碼:
net.Server.call(this, { allowHalfOpen: true });
allowHalfOpen實驗結論: 這裏TCP的知識再也不作過分的講解
(1)allowHalfOpen爲true,一端發送FIN報文: 進程結束了,那麼確定會發送FIN報文; 進程未結束,不會發送FIN報文 (2)allowHalfOpen爲false,一端發送FIN報文: 進程結束了,確定發送FIN報文; 進程未結束,也會發送FIN報文;
因而找到net.js文件模塊中的Server函數
function Server(options, connectionListener) { if (!(this instanceof Server)) return new Server(options, connectionListener); EventEmitter.call(this); if (typeof options === 'function') { connectionListener = options; options = {}; this.on('connection', connectionListener); } else if (options == null || typeof options === 'object') { options = { ...options }; if (typeof connectionListener === 'function') { this.on('connection', connectionListener); } } else { throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); } this._connections = 0; Object.defineProperty(this, 'connections', { get: deprecate(() => { if (this._usingWorkers) { return null; } return this._connections; }, 'Server.connections property is deprecated. ' + 'Use Server.getConnections method instead.', 'DEP0020'), set: deprecate((val) => (this._connections = val), 'Server.connections property is deprecated.', 'DEP0020'), configurable: true, enumerable: false }); this[async_id_symbol] = -1; this._handle = null; this._usingWorkers = false; this._workers = []; this._unref = false; this.allowHalfOpen = options.allowHalfOpen || false; this.pauseOnConnect = !!options.pauseOnConnect; }
這裏巧妙的經過.call調用net模塊Server函數,保證了this指向一致
this.\_handle = null
這裏是由於Node.js考慮到多進程問題,因此會hack掉這個屬性,由於.listen方法最終會調用_handle中的方法,多個進程只會啓動一個真正進程監聽端口,而後負責分發給不一樣進程,這個後面會講
Node.js源碼的幾個特點:
我在net.js文件模塊中發現了一個原型上.listen的方法:
Server.prototype.listen = function(...args) { const normalized = normalizeArgs(args); var options = normalized[0]; const cb = normalized[1]; if (this._handle) { throw new ERR_SERVER_ALREADY_LISTEN(); } if (cb !== null) { this.once('listening', cb); } const backlogFromArgs = // (handle, backlog) or (path, backlog) or (port, backlog) toNumber(args.length > 1 && args[1]) || toNumber(args.length > 2 && args[2]); // (port, host, backlog) options = options._handle || options.handle || options; const flags = getFlags(options.ipv6Only); // (handle[, backlog][, cb]) where handle is an object with a handle if (options instanceof TCP) { this._handle = options; this[async_id_symbol] = this._handle.getAsyncId(); listenInCluster(this, null, -1, -1, backlogFromArgs); return this; } // (handle[, backlog][, cb]) where handle is an object with a fd if (typeof options.fd === 'number' && options.fd >= 0) { listenInCluster(this, null, null, null, backlogFromArgs, options.fd); return this; } // ([port][, host][, backlog][, cb]) where port is omitted, // that is, listen(), listen(null), listen(cb), or listen(null, cb) // or (options[, cb]) where options.port is explicitly set as undefined or // null, bind to an arbitrary unused port if (args.length === 0 || typeof args[0] === 'function' || (typeof options.port === 'undefined' && 'port' in options) || options.port === null) { options.port = 0; } // ([port][, host][, backlog][, cb]) where port is specified // or (options[, cb]) where options.port is specified // or if options.port is normalized as 0 before var backlog; if (typeof options.port === 'number' || typeof options.port === 'string') { if (!isLegalPort(options.port)) { throw new ERR_SOCKET_BAD_PORT(options.port); } backlog = options.backlog || backlogFromArgs; // start TCP server listening on host:port if (options.host) { lookupAndListen(this, options.port | 0, options.host, backlog, options.exclusive, flags); } else { // Undefined host, listens on unspecified address // Default addressType 4 will be used to search for master server listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); } return this; } // (path[, backlog][, cb]) or (options[, cb]) // where path or options.path is a UNIX domain socket or Windows pipe if (options.path && isPipeName(options.path)) { var pipeName = this._pipeName = options.path; backlog = options.backlog || backlogFromArgs; listenInCluster(this, pipeName, -1, -1, backlog, undefined, options.exclusive); if (!this._handle) { // Failed and an error shall be emitted in the next tick. // Therefore, we directly return. return this; } let mode = 0; if (options.readableAll === true) mode |= PipeConstants.UV_READABLE; if (options.writableAll === true) mode |= PipeConstants.UV_WRITABLE; if (mode !== 0) { const err = this._handle.fchmod(mode); if (err) { this._handle.close(); this._handle = null; throw errnoException(err, 'uv_pipe_chmod'); } } return this; } if (!(('port' in options) || ('path' in options))) { throw new ERR_INVALID_ARG_VALUE('options', options, 'must have the property "port" or "path"'); } throw new ERR_INVALID_OPT_VALUE('options', inspect(options)); };
這個就是咱們要找的listen方法,但是裏面不少ipv4和ipv6的處理,最重要的方法是listenInCluster
這個函數須要好好看一下,只有幾十行
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !!exclusive; if (cluster === undefined) cluster = require('cluster'); if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); if (err) { var ex = exceptionWithHostPort(err, 'bind', address, port); return server.emit('error', ex); } // Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); } }
若是是主進程,那麼就直接調用_.listen2方法了
Server.prototype._listen2 = setupListenHandle;
找到setupListenHandle函數
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
裏面的createServerHandle是重點
function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd); // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } else { debug('setupListenHandle: create a handle'); var rval = null; // Try to bind to the unspecified IPv6 address, see if IPv6 is available if (!address && typeof fd !== 'number') { rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags); if (typeof rval === 'number') { rval = null; address = DEFAULT_IPV4_ADDR; addressType = 4; } else { address = DEFAULT_IPV6_ADDR; addressType = 6; } } if (rval === null) rval = createServerHandle(address, port, addressType, fd, flags); if (typeof rval === 'number') { var error = uvExceptionWithHostPort(rval, 'listen', address, port); process.nextTick(emitErrorNT, this, error); return; } this._handle = rval; } this[async_id_symbol] = getNewAsyncId(this._handle); this._handle.onconnection = onconnection; this._handle[owner_symbol] = this; // Use a backlog of 512 entries. We pass 511 to the listen() call because // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); // which will thus give us a backlog of 512 entries. const err = this._handle.listen(backlog || 511); if (err) { var ex = uvExceptionWithHostPort(err, 'listen', address, port); this._handle.close(); this._handle = null; defaultTriggerAsyncIdScope(this[async_id_symbol], process.nextTick, emitErrorNT, this, ex); return; } // Generate connection key, this should be unique to the connection this._connectionKey = addressType + ':' + address + ':' + port; // Unref the handle if the server was unref'ed prior to listening if (this._unref) this.unref(); defaultTriggerAsyncIdScope(this[async_id_symbol], process.nextTick, emitListeningNT, this); }
已經能夠看到TCP了,離真正的綁定監聽端口,更近了一步
最終經過下面的方法綁定監聽端口
handle.bind6(address, port, flags); 或者 handle.bind(address, port);
首選ipv6綁定,是由於ipv6能夠接受到ipv4的套接字,而ipv4不能夠接受ipv6的套接字,固然也有方法能夠接收,就是麻煩了一點
上面的內容,請你認真看,由於下面會更復雜,設計到Node.js的多進程負載均衡原理
若是不是主進程,就調用cluster._getServer,找到cluster源碼
'use strict'; const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; module.exports = require(`internal/cluster/${childOrMaster}`);
找到_getServer函數源碼
// `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function(obj, options, cb) { let address = options.address; // Resolve unix socket paths to absolute paths if (options.port < 0 && typeof address === 'string' && process.platform !== 'win32') address = path.resolve(address); const indexesKey = [address, options.port, options.addressType, options.fd ].join(':'); let index = indexes.get(indexesKey); if (index === undefined) index = 0; else index++; indexes.set(indexesKey, index); const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData(); send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = (address && address.port) || options.port; send(message); }); };
咱們以前傳入了三個參數給它,分別是
server,serverQuery,listenOnMasterHandle
這裏是比較複雜的,曾經我也在這裏迷茫過一段時間,可是想着仍是看下去吧。堅持下,你們若是看到這裏看不下去了,先休息下,保存着。後面等心情平復了再靜下來接下去看
首先咱們傳入了Server、serverQuery和cb(回調函數listenOnMasterHandle),整個cluster模塊的_getServer中最重要的就是:
if (obj._getServerData) message.data = obj._getServerData(); send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. });
首先咱們會先獲取server上的data數據,而後調用send函數
function send(message, cb) { return sendHelper(process, message, null, cb); }
send函數調用的是cluster模塊的utills文件內的函數,傳入了一個默認值process
function sendHelper(proc, message, handle, cb) { if (!proc.connected) return false; // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js message = { cmd: 'NODE_CLUSTER', ...message, seq }; if (typeof cb === 'function') callbacks.set(seq, cb); seq += 1; return proc.send(message, handle); }
這裏要看清楚,咱們調用sendHelper傳入的第三個參數是null !!!
那麼主進程返回也是null
send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. });
因此咱們會進入rr函數調用的這個判斷,這裏調用rr傳入的cb就是在net.js模塊定義的listenOnMasterHandle函數
Node.js的負載均衡算法是輪詢,官方給出的解釋是簡單粗暴效率高
上面的sendHelper函數就是作到了這點,每次+1
if (typeof cb === 'function') callbacks.set(seq, cb); seq += 1;
function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; function listen(backlog) { // TODO(bnoordhuis) Send a message to the master that tells it to // update the backlog size. The actual backlog should probably be // the largest requested size by any worker. return 0; } function close() { // lib/net.js treats server._handle.close() as effectively synchronous. // That means there is a time window between the call to close() and // the ack by the master process in which we can still receive handles. // onconnection() below handles that by sending those handles back to // the master. if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } function getsockname(out) { if (key) Object.assign(out, message.sockname); return 0; } // Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles.has(key) === false); handles.set(key, handle); cb(0, handle); }
此時的handle已經被重寫,listen方法調用會返回0,不會再佔用端口了。因此這樣Node.js多個進程也只是一個進程監聽端口而已
此時的cb仍是net.js模塊的setupListenHandle即 - _listen2方法。
官方的註釋:
Faux handle. Mimics a TCPWrap with just enough fidelity to get away
花了一夜整理,以前還有一些像cluster模塊源碼、pm2負載均衡原理等,有興趣的能夠翻一翻。以爲寫得不錯的能夠點個在看,謝謝。時間匆忙,若是有寫得不對的地方能夠指出。
能夠關注下公衆號:前端巔峯 把文章推薦給須要的人