EggCluster 是如何解決多進程模式下相關問題的

背景

Node 官方提供了 cluster 模塊來提供多進程的解決方案,以儘量提高服務器資源使用效率。javascript

總體而言,在這個問題域裏,要解決的子問題有三塊html

  • 重啓機制
  • 負載均衡
  • 狀態共享,即通訊機制

image.png
(上圖總結自 《深刻淺出Node》第9章)

Egg 做爲企業級框架,也針對這些問題,提供了 egg-cluster 模塊來作了些加強java

通訊機制

爲何先講通訊?啓動流程要用嘛。node

image.png

能夠看到,主要的實體包括了 Master、Agent、Worker三個, Master、Agent、Worker,其實三者更多完成的是通訊的執行工做,真正提供通訊管理能力的是 Manager 和 Messenger。git

在看這倆模塊以前,能夠先看些基礎知識github

Messenger

Messenger 是一個消息發送器,負責:接收消息 -> 定向轉發。npm

那爲何要單獨搞這個模塊?api

  • 協議格式統一:Agent 和 Worker 都有 exit 和 message 事件
  • 通訊方法統一:與 Parent 通訊要走 process.send,與 Worker / Agent 通訊要走 sendmessage 模塊,和Master 走 EventEmitter

它包含兩個部分:bash

  • 信息收集
  • 路由轉發

以一個 worker 啓動的例子爲例。服務器

首先,信息收集,使用的是訂閱/通知模式,是以 master 顯式調用 messenger 來處理的。

// 在 cluster 啓動完畢後,會告知父進程啓動成功
const action = 'egg-ready';
this.messenger.send({
  action,
  to: 'parent',
  data: {
    port: this[REAL_PORT],
    address: this[APP_ADDRESS],
    protocol: this[PROTOCOL],
  },
});
複製代碼

其次是經過 send 作定向轉發,包括兩部分

// 路由識別
if (data.to === 'parent') {
  this.sendToParent(data);
  return;
}

// 調用指定方法
sendToParent(data) {
  if (!this.hasParent) {
    return;
  }
  process.send(data);
}
複製代碼

更多信息能夠,查看這篇文檔: Messenger 模塊

Manager

Manager 模塊比較簡單,主要是針對Agent、Worker 提供管理操做。值得一提的是,它的存活檢查代碼

// agent.status的修改操做在master的onAgentStart中完成
count() {
  return {
    agent: (this.agent && this.agent.status === 'started') ? 1 : 0,
    worker: this.listWorkerIds().length,
  };
}

startCheck() {
  this.exception = 0;
  // 每10秒檢查一次
  this.timer = setInterval(() => {
    const count = this.count();
    if (count.agent && count.worker) {
      this.exception = 0;
      return;
    }
    // 若是agent和worker都不符合要求,超過3次就觸發exception,master那邊收到消息後會退出
    this.exception++;
    if (this.exception >= 3) {
      this.emit('exception', count);
      clearInterval(this.timer);
    }
  }, 10000);
}
複製代碼

詳見文檔:Manager

啓動流程

從 npm run dev 開始

先從啓動流程入手,來看看 npm run dev 這個命令到底發生了什麼。

它實際上是執行了 egg-bin 的 lib/cmd/dev.js 文件的 run 方法

// lib/cmd/dev.js 文件
constructor(rawArgv) {
  // 省略其餘初始化代碼
  this.serverBin = path.join(__dirname, '../start-cluster');
}

* run(context) {
  // 省略參數格式化過程
  yield this.helper.forkNode(this.serverBin, devArgs, options);
}

// start-cluster.js文件,執行框架的 startCluster
require(options.framework).startCluster(options);
複製代碼

若是框架是egg,那最後就會執行 egg 的這段代碼

exports.startCluster = require('egg-cluster').startCluster;
複製代碼

所以最終是執行了,egg-cluster 模塊的 index.js

exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};
複製代碼

以後的流程不難,可是內容很是細碎,能夠去看 啓動和退出分析,主要是介紹如何實現下面的流程的

+---------+           +---------+          +---------+
    |  Master |           |  Agent  |          |  Worker |
    +---------+           +----+----+          +----+----+
         |      fork agent     |                    |
         +-------------------->|                    |
         |      agent ready    |                    |
         |<--------------------+                    |
         |                     |     fork worker    |
         +----------------------------------------->|
         |     worker ready    |                    |
         |<-----------------------------------------+
         |      Egg ready      |                    |
         +-------------------->|                    |
         |      Egg ready      |                    |
         +----------------------------------------->|
複製代碼

Agent的平滑重啓

首先,在啓動Agent的時候就會去註冊回調

forkAgentWorker(){
  // 得到agent
  const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
  // 監聽退出事件,轉發給master
  agentWorker.once('exit', (code, signal) => {
    this.messenger.send({
      action: 'agent-exit',
      data: {
        code,
        signal,
      },
      to: 'master',
      from: 'agent',
    });
  });
}

constructor(){
	this.on('agent-exit', this.onAgentExit.bind(this));
}
複製代碼

接着在 onAgentExit 中去處理重啓邏輯

onAgentExit(data) {
  if (this.closed) return;
  // 清理工做
  const agentWorker = this.agentWorker;
  this.workerManager.deleteAgent(this.agentWorker);
  agentWorker.removeAllListeners();

  // 若是已經啓動過,就自動重啓
  if (this.isStarted) {
    setTimeout(() => {
      this.forkAgentWorker();
    }, 1000);
 
		// 省略一段轉發消息給parent的代碼
  } else {
    process.exit(1);
  }
}
複製代碼

isStarted 標誌,是用來記錄總體是否啓動成功,它在 ready 回調用中被賦值

// 這個ready是由 get-ready模塊提供的,主要是解決異步任務註冊問題的,便於自由添加啓動前的異步任務
this.ready(() => {
	this.isStarted = true;
});
複製代碼

Worker的平滑重啓

Worker的平滑重啓主要是交給 cfork 模塊完成的,egg-cluster 中對於exit 事件的監聽只是作個轉發。

大體思路是經過 cluster 模塊去監聽 exit 事件 和 disconnect 事件,而後來根據 disableRefork 配置,判斷是否要重啓,這其中會處理一些重啓邏輯

cluster.on('disconnect', function (worker) {
  // API參考:https://nodejs.org/api/cluster.html#cluster_worker_isdead
  var isDead = worker.isDead && worker.isDead();
  if (isDead) {
    // worker has terminated before disconnect
    return;
  }
  // 配置不重啓就不會繼續進行
  if (worker.disableRefork) {
    return;
  }
	
  // disconnect 用來保存失聯的進程,下文會用到
  disconnects[worker.process.pid] = utility.logDate();
 
  // 重啓邏輯
  if (allow()) {
    newWorker = forkWorker(worker._clusterSettings);
    newWorker._clusterSettings = worker._clusterSettings;
  } else {
  	// 省略
  }
});

cluster.on('exit', function (worker, code, signal) {
  var isExpected = !!disconnects[worker.process.pid];

  // 若是已經先響應了disconnect事件,就不用再走後續退出流程了
  if (isExpected) {
    delete disconnects[worker.process.pid];
    // worker disconnect first, exit expected
    return;
  }

  // 相似的判斷 disableRefork 的邏輯,省略

  unexpectedCount++;
	
  // 相似的重啓邏輯,省略
  
  cluster.emit('unexpectedExit', worker, code, signal);
});
複製代碼

負載均衡 Sticky Mode

背景:最先 Session 等狀態信息是保存在 Worker 內存裏的,因此一旦用戶的屢次請求打到不一樣的Worker上的時候,必然會出現登陸態失效的問題。

解決方案:經過必定的方式保證同一個用戶的請求打到同一個 Worker 上,Sticky Mode 就是爲了解決這個問題

:::info 其實查看 egg-bin 的README.md 文件就能夠發現其實默認是不啓動的,但出於有趣,仍是想介紹下。 :::

轉發實現

首先,若是啓用了 sticky 模式,在 master 當中會分配一個 stickyWorkerPort

// master.js 
detectPorts() {
  return GetFreePort()
		// 省略中間一段設置主端口的代碼
    .then(port => {
    if (this.options.sticky) {
      this.options.stickyWorkerPort = port;
    }
  })
}
複製代碼

同時,會啓動一個內部的 net.Server,用來作消息轉發給Worker

if (this.options.sticky) {
  this.startMasterSocketServer(err => {
 		// 省略
  });
}

startMasterSocketServer(cb) {
  
  // 內部 net server
  require('net').createServer({
    pauseOnConnect: true,
  }, connection => {
     // 這段涉及到 TCP_reset_attack,有興趣能夠自查,不介紹
    if (!connection.remoteAddress) {
      connection.destroy();
    } else {
      // 選出一個worker
      const worker = this.stickyWorker(connection.remoteAddress);
      worker.send('sticky-session:connection', connection);
    }
  }).listen(this[REAL_PORT], cb);
}
複製代碼

:::info 題外話:爲何這裏 listen 不會報端口重複監聽?個人理解是,按照這篇文章的介紹,Master 在初次給 Worker 傳遞 Socket 的時候,纔會去啓動內部 TCP 服務,比 startMasterSocketServer要晚。 :::

在 worker 當中,若是是有 配置 sticky,就會使用該 stickyWorkerPort 端口進行監聽,同時只監聽 父進程(也就是master)轉發過來的 sticky-session:connection消息

if (options.sticky) {
  server.listen(options.stickyWorkerPort, '127.0.0.1');
  
  process.on('message', (message, connection) => {
    if (message !== 'sticky-session:connection') {
      return;
    }

    server.emit('connection', connection);
    connection.resume();
  });
}
// 省略正常監聽的代碼
複製代碼

這其中有個細節,那如何保證轉發的過程當中,數據不丟失呢?

:::info 爲何會丟失?由於 net.Socket 是個 Duplex Stream 對象,在 Flowing Mode 下面會自動讀取數據,若是 不響應 data 事件,數據就丟了) :::

首先在建立 socket 的時候,開啓 pauseOnConnect 選項;

If pauseOnConnect is set to true, then the socket associated with each incoming connection will be paused, and no data will be read from its handle.

其次在接受到 socket 的時候,恢復執行 resume

轉發策略

轉發是經過 stickyWorker 函數實現的,本質上就是把 remoteAddress 對 Worker 數量取餘數,做爲索引去 Worker 列表裏隨機取一個 Worker

stickyWorker(ip) {
    const workerNumbers = this.options.workers;
    // ws是一組pid列表
    const ws = this.workerManager.listWorkerIds();

    let s = '';
    // IP處理:127.0.0.1 -> 127001
    for (let i = 0; i < ip.length; i++) {
      
      // 這個判斷能夠過濾掉字母和符號,這樣就能夠兼容IPv4和IPv6
      if (!isNaN(ip[i])) {
        s += ip[i];
      }
    }
    s = Number(s);
    // 取餘數
    const pid = ws[s % workerNumbers];
    return this.workerManager.getWorker(pid);
  }

複製代碼

背景詳見issue:Sticky Mode 引起的問題

其餘有趣的小發現

agent/worker 啓動方式差別

回去翻代碼,能夠發現 agent 是 child_process.fork 方法啓動的,worker 是經過 cluster 啓動的。 猜想多是若是須要提供一些相似管理頁面的本地服務,通常是 agent作,所以agent 要有獨立監聽端口的能力

如何檢測進程真的退出

對進程強制執行process.exit而且用try-catch包裹住,若是有報錯,說明是真的退出了。

使用場景:一開始使用 SIGTERM 要求退出(對比 kill -1),後來退出超過必定時間了,使用 SIGKILL 強制退出(相似 kill -9)

function getUnterminatedProcesses(pids) {
  return pids.filter(pid => {
    try {
      // success means it's still alive
      process.kill(pid, 0);
      return true;
    } catch (err) {
      // error means it's dead
      return false;
    }
  });
}
複製代碼

小結

EggCluster涉及知識體系大圖

image.png
相關文章
相關標籤/搜索