【nodejs原理&源碼賞析(6)】深度剖析cluster模塊源碼與node.js多進程(下)

示例代碼託管在:http://www.github.com/dashnowords/blogs前端

博客園地址:《大史住在大前端》原創博文目錄git

華爲雲社區地址:【你要的前端打怪升級指南】github

閱讀本章須要先閱讀本系列前兩章內容預熱一下。windows

一. 引言

前兩篇博文中已經分別介紹了使用cluster模塊創建集羣時主進程執行cluster.fork( )方法時的執行邏輯,以及net模塊在不一樣場景下創建通信的基本原理。本篇繼續分析cluster模塊,從第一個子進程開始創建服務器講起,cluster基本用法示例代碼再來一遍:數組

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`主進程 ${process.pid} 正在運行`);

  // 衍生工做進程。
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工做進程 ${worker.process.pid} 已退出`);
  });
} else {
  // 工做進程能夠共享任何 TCP 鏈接。
  // 在本例子中,共享的是 HTTP 服務器。
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('你好世界\n');
  }).listen(8000);

  console.log(`工做進程 ${process.pid} 已啓動`);
}

代碼是足夠精簡的,實現過程也確實是很龐大的工程。每個子進程中執行的邏輯都是http.createServer().listen(),咱們來看看它是如何一步一步運做而最終創建通信機制的,你會發現它和上一節中的簡易模型很是類似。緩存

二.server.listen方法

http模塊的源碼中很容易找到http.createServer( )方法的邏輯就是透傳參數生成了一個net.Server實例,這個實例在上一節中就已經介紹過,實際上就只是生成了一個server的實例,因此這裏跳轉到net.Server.prototype.listen()net.js文件1306-1404行),基本邏輯以下:服務器

Server.prototype.listen = function(...args){
    const normalized = normalizeArgs(args);
    var options = normalized[0];
    /*..獲取監聽參數中的句柄對象..*/
     options = options._handle || options.handle || options;
    
    //若是options上有句柄,句柄是一個TCP實例
    if(options instanceof TCP){
        //......
        listenInCluster(......);
    }
                        
    //若是配置參數中有fd(file descriptor)
    if(typeof options.fd === 'number' && options.fd >=0){
            //......
        listenInCluster(......);
    }
                        
    //若是參數中有port端口號
    if(typeof options.port === 'number' || typeof options.port === 'string'){
         //.....
         listenInCluster(......);
    }
                         
    //若是參數中有port端口號 或 字符型的pipe名稱
    if(typeof options.port === 'number' || typeof options.port === 'string'){
         //.....
         listenInCluster(......);
    }
}

這裏不難看出它的邏輯就和net模塊官方文檔中描述的server.listen( )的幾種場景對應,能夠監聽帶有非空handle屬性的句柄對象,數字型端口號,字符串型命名管道地址,或者直接傳入配置參數合集options,而後分別根據幾種不一樣的狀況來調用listenInCluster方法(集羣功能的邏輯主線是數字型port,假設傳入了12315)。併發

listenInCluster方法定義以下:socket

大體能夠看出,若是是主進程,就直接調用server._listen2()方法而後return了,不然(也就是在工做進程中的邏輯,敲黑板!!!這裏是重點了),構造一個serverQuery的參數集,能夠看到裏面記錄了以各類不一樣姿式調用這個方法時傳入的參數,因此有的參數爲null也很正常,而後調用了cluster._getServer( )方法,這就是工做進程在引用cluster模塊時引入的child.js中定義並掛載在cluster上的方法,最後一個參數listenOnMasterHandle是一個回調函數,也是一個錯誤前置風格的函數,能夠看到,它接收了一個句柄對象,並把這個句柄對象掛載在了子進程這個server實例的_handle屬性上,接着也調用了server._listen2( )方法,能夠看到兩種狀況下調用這個方法時傳入的參數是同樣的。接着來到server._listen2( )方法,它綁定了setupListenHandle方法(別抓狂,這是net模塊中相關邏輯的最後一步了),簡化代碼以下:

function setupListenHandle(......){
  if (this._handle) {
    //工做進程在執行上一步邏輯時,在cluster._getServer()回調函數中把一個handle傳遞給了server._handle
    debug('setupListenHandle: have a handle already');
  } else {
    //主進程會執行的邏輯
    debug('setupListenHandle: create a handle');
      //......
       rval = createServerHandle(address, port, addressType, fd, flags);
      //......
      this._handle = rval;
}
  //......
  this._handle.onconnection = onconnection;
  this._handle[owner_symbol] = this;
  //....
}

工做進程經過cluster._getServer( )方法拿到了一個handle,因此不會再生成,而主進程server.listen(port)執行時會走到else分支,而後生成一個新的綁定了端口號的特殊的socket句柄而後掛載到主進程server._handle上,這裏對句柄的connection事件回調邏輯進行了修改,相關代碼以下:

這裏須要注意的是,server._handleconnection事件和serverconnection事件是兩碼事,server._handle指向的是一個綁定了端口的特殊的socket句柄,當客戶端connect一個server時實際上底層是客戶端socket與服務端這個socket的對接,因此須要在server._handle這個的connection回調函數中,將客戶端的socket句柄clientHandle從新包裝,而後再經過觸發serverconnection事件將其轉發給server實例。因此在使用server實例時能夠直接添加connectionListener:

let server = net.createServer(socket=>{
    /*這個回調函數就是server的connection事件回調
    * 這裏接收到的socket就是server._handle的connection收到的客戶端句柄clientHandle封裝成的socket實例
    */
})

不管是主進程仍是子進程都會觸發這個邏輯,只須要當作是一種功能性質的封裝便可,並不影響業務邏輯

三.cluster._getServer( )方法

下面回到cluster模塊繼續,_getServer( )方法只存在於子進程代碼中,源碼位於lib/internal/cluster/child.js,方法定義在54-106行,基本邏輯以下:

cluster._getServer = function(obj, options, cb){
 /* 這裏的obj就是子進程中運行上面listenInCluster方法中傳入的server,
  * options就是serverQuery,
  * cb就是最後要把主進程handle傳回去的回調函數listenOnMasterHandler   
  */
  
  //先構建index而後進行了一通記錄,就是根據監聽的參數來構建一個識別這個server的索引
  //而後構建消息
  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };
    
  //......
    
  /* 發送act:queryServer消息,並傳一個回調函數,
   * 從形參命名就能夠看出,這個回調函數被調用時會被傳入一個句柄,
   * 最後根據不一樣的調度策略來執行不一樣的函數,這裏主要看Round-robin
  */ 
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });
    
  //......
}

rr方法將響應reply和前一個調用者傳入的回調函數cb進行了透傳,rr的函數體就是實現listen方法偷樑換柱的地方了:

// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  var key = message.key;

  function listen(backlog) {
    return 0;
  }

  function close() {
    if (key === undefined)
      return;

    send({ act: 'close', key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      Object.assign(out, message.sockname);

    return 0;
  }

  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  handles.set(key, handle);
  cb(0, handle); //這裏的cb其實就是listenInCluster方法中定義的那個listenOnMasterHandler回調
}

能夠看到rr方法中構建了一個假的handle句柄,並調用cb將它傳了回去,而後執行邏輯回回到net模塊,前文已經提這個handle在回調函數中被掛載在了server._handle上,因而setupListenHandle( )的邏輯中也不會從新構建句柄。

從新梳理一下這部分的邏輯,就是子進程中調用listen方法時,會經過cluster._getServer( )拿到一個假句柄,而後執行一個空的listen方法,這樣就避免了端口的重複監聽。因此咱們能夠推測,cluster._getServer( )必然會觸發主進程啓動一個監聽端口的服務器,並創建對子進程的調度,進程之間的IPC通信能夠直接經過process對象來完成,不須要再從新構建跨進程通信管道。

四.跨進程通信工具方法Utils

繼續進行後續內容前,先來看一個獨立的跨進程通信工具,源碼放在lib/internal/cluster/utils.js

它是cluster模塊發送跨進程消息的內部代理,這個模塊對外暴露了消息發送方法sendHelper和內部消息監聽器的預處理方法internal,源碼很短就不貼了。當子進程調用sendHelper發送消息時,utils內部會把這條消息處理完成後須要執行的回調函數先緩存起來,而後給消息添加一些包裝標記,而後再發出去;internal會對傳入的內部消息監聽器進行代理,過濾掉非NODE_CLUSTER類別的消息,若是消息攜帶的message對象沒有ack屬性則最終會執行綁定監聽時傳入的回調函數,不然會從緩存中找出以前暫存的回調函數來執行。

發個消息爲何要搞這麼複雜呢?這個ack屬性又是哪來的呢?其實這個utils模塊主要是在跨進程的雙向消息通信時實現了方法複用,同一個message從工做進程發往主進程時和主進程發回給工做進程時是由同一個事件名internalMessage攜帶的,那如何來區分消息發送的方向呢,就是ack屬性,若是消息帶有ack屬性,就表示它是由主進程發給子進程的,那麼就要調用子進程中的後續處理方法,這個方法其實就是子進程發送消息給主進程以前暫存在utils內部callbacks裏的方法,也就是child.jscluster._getServer()中調用send方法時傳入的回調方法,也就是net模塊中listenInCluster( )方法中的listenOnMasterHandle方法,這個方法漂洋過海透傳了N個函數,的確不容易看懂,「回調地獄」也的確不是鬧着玩的。再看看沒有ack屬性的狀況,沒有這個屬性時消息是從子進程發給主進程的,天然要調用主進程的方法,從邏輯裏不難看出,這種狀況下方法引用的就是internal方法執行時傳入的第二個參數(master.js源碼213行執行的internal(worker, onmessage)onmessage這個函數),源碼中就是利用高階函數這種分步執行的特色實現了引用。

五.act:queryServer消息

故事再回到第三節工做進程中發出act:'queryServer的消息後,來看主進程master.js中的代碼,主進程中在調用cluster.fork( )時就綁定了對worker線程internalMessage的監聽,對於act:queryServer類型的集羣消息,主進程已經定義了queryServer這個方法來處理。這段源代碼的主要邏輯以下:

1.根據重要參數組拼接出一個惟一的key
2.1.根據key查詢是否有已經存在的調度句柄round-robin-handle,若是有則直接進行後續邏輯
2.2.若是沒有已經存在的調度句柄,則選擇調度策略,實例化一個調度句柄,並把它添加進記錄裏
3.把消息數據message.data掛載在調度句柄的handle.data字段上
4.執行調度句柄的add方法,把子進程和一個回調方法傳進實例,回調方法被執行時會從調度句柄中取得數據,並組裝返回消息(帶有ack屬性和其餘數據的消息)發給子進程,子進程收到這個消息後執行的方法,就是前文分析過的返回假句柄給net模塊中的`listenInCluster()`邏輯。

從開篇的多進程代碼能夠看到,每一個子進程中執行的listen方法監聽的端口號都是同樣的,因此每一個子進程發送queryServer消息給主進程並執行這段邏輯時,其實對應的key都是同樣的,因此調度對象RoundRobinHandle只會實例化一次,在以後的過程當中,每個子進程會根據key獲取到同一個調度實例,並調用add方法將worker對象和一個回調函數添加進調度實例,能夠看到回調函數執行時,就會將原message中的seq屬性的值添加給ack屬性再掛載上處理後的數據併發送給子進程。那麼剩下的事情,就剩下調度對象RoundRobinHandle的源碼了。

咱們不妨來推測一下,它的主要邏輯就是在主進程中創建真正監聽目標端口的服務器,並添加當客戶端請求到達時對於工做進程的調度代碼,下一節咱們就一塊兒來驗證一下。

六.輪詢調度Round-Robin-Handle

調度方法的源碼是internal/cluster/round_robin_handle.js,另外一種shared_handle.js是windows下使用的調度策略,先不作分析(主要是沒研究過,不敢瞎說)。先從構造函數開始:

16行,bingo,終於看到主進程啓動服務器了。接着就是根據參數而分流的監聽方法,集羣代碼中對應的是20行的帶有有效port參數的狀況,因此服務器就在主進程啓動了,最後來看看server開始觸發listening事件時執行的邏輯(此處調用的是once方法,因此只會執行一次):

1.將主進程server的內部_handle句柄,掛載給round-robin-handle實例
2.當這個句柄被鏈接時(也就是客戶端socket執行connect方法鏈接後),會觸發它的`connection`事件,回調函數會調用`distribute`方法來分發這個客戶端socket句柄,注意32行後面半句的箭頭函數方法,這裏的handle就是指客戶端`socket`實例。
3.將server._handle指向null
4.將server屬性指向null

若是你還記得net模塊中listen方法的邏輯的話可能會有印象,_handleconnection事件回調其實本來已經被複寫過一次了,也就是說單進程運行的程序在創建服務器時,server._handleconnection事件會觸發server實例的connection事件,而在集羣模式下,主進程中調度實例中服務器句柄server._handleconnection再次被複寫,將邏輯改變爲分發socket,而子進程中的server._handle仍是保持原來的邏輯。

最後一步指向null的邏輯還涉及到add方法,繼續看主進程中調用的add方法:

這個send形參實際上就是主進程中傳入的最終向子進程發送返回消息的那個回調函數,它被封裝進了done函數,這裏須要着重看一下55行的邏輯,this.server === null這個條件實際上對應的就是構造函數中服務器開始監聽的事件,因此55-59行的代碼以及構造函數中添加的listening事件的回調函數須要聯合在一塊兒來理解,也就是每一個子進程的send方法都被包裹在一個獨立的done函數中,這個函數會在主進程的server處於listening狀態後觸發執行,而且只觸發一次。當它觸發時,會從實例的handle屬性(也就是server_handle句柄)上取得socket名稱而後調用send方法,這個特殊socket的名稱在回調函數中對應reply形參,最終掛載在message中發回了子進程。

至此其實主進程和子進程創建服務器的消息已經完成了閉環。最後再看一下RoundRobinHandle中最後兩個方法:

當客戶端socket執行connect方法鏈接到主進程server的句柄後,主進程會調用round-robin-handle實例的distribute方法,這個方法的邏輯比較簡單,把這個客戶端句柄加入到待處理隊列,而後從空閒進程隊列頭部取出一個worker進程,把它做爲參數傳給handoff方法。

handoff方法中,從客戶端請求句柄隊列的頭部取出下一個待處理的socket,若是已經沒有要處理的請求,就把傳進來的worker放回空閒子進程隊列free中。在add方法內部封裝的done方法中也執行了這個handoff方法,如今再回過頭來看這個add方法的做用,就是當主進程處於監聽狀態後,將每個子進程對象worker依次添加到空閒進程隊列free中。最後夠早了一個新的act:newconn消息,並經過調度選出的worker.process對象實現跨進程通信來將待處理句柄和【新鏈接】消息發送給子進程。

七. 圖解集羣創建過程的邏輯跳轉

集羣創建過程的邏輯大體的跳轉路徑以下,細節部分直接參考前文的講解便可。

相關文章
相關標籤/搜索