workerman-todpole 執行流程(1)

該系列文章主要是完全扒一下 workerman todpole 遊戲的實現原理。

提早打個預防針:php

因爲 Worker 類的靜態屬性和子類對象的混用(當前類的靜態屬性存放當前類對象,靜態方法循環靜態屬性遍歷調用子類對象的方法),特別再加上 Master-Worker 進程組織模型,對一些剛接觸的人來講,很容易形成理解上的偏差,當形成理解上的混亂時,咱們須要明確:html

進程和類對象沒有直接關係,進程內不論是靜態方法、普通對象都屬於該進程空間,靜態屬性也不會跨進程共享。當 fork 以後,咱們所閱讀的代碼已經被複制到了另外一個進程空間內(棧/堆/常量等),不要被那些 Worker 子類誤導了。linux

另外,Master-Worker 進程模型中的 Worker 是子進程的表述,屬於進程概念,跟 Worker 類不是一回事。web

入口

入口文件 start.php,包含了 composer autoload,並執行下面的入口,建立相應的對象:編程

  • start_web.php(WebServer)
  • start_register.php(Register)
  • start_gateway.php(Gateway)
  • start_businessworker.php(BusinessWorker)

這 4 個子類其實都繼承自 Worker 類(咱們暫且把它們叫作 「角色」):後端

看一下 Worker 的構造方法:api

public function __construct($socket_name = '', $context_option = array())
{
// Save all worker instances.
$this->workerId = spl_object_hash($this);
static::$_workers[$this->workerId] = $this;
static::$_pidMap[$this->workerId] = array();
 
// Get autoload root path.
$backtrace = debug_backtrace();
$this->_autoloadRootPath = dirname($backtrace[0]['file']);
 
// Context for socket.
if ($socket_name) {
$this->_socketName = $socket_name;
if (!isset($context_option['socket']['backlog'])) {
$context_option[ 'socket']['backlog'] = static::DEFAULT_BACKLOG;
}
$this->_context = stream_context_create($context_option);
}
}

能夠看到入口中建立對象時,實際上是放到了當前類的靜態屬性 $_workers 中,這裏的 workerId 用 get_class($this) 也是同樣的效果,爲了方便表述,暫且把 workerId 叫作 「角色」。另外,子進程的 pid 在靜態屬性 $_pidMap 中是以子類名進行歸類的。服務器

建立完對象,全部的子類對象都到了 $_workers 中,接下來就是執行靜態方法 runAll網絡

runAll

該靜態方法把功能都拆了出去,子方法都爲 protected static 方便 override 重寫:session

public static function runAll()
{
static::checkSapiEnv();
static::init();
static::parseCommand();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::displayUI();
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
}

看一下每一個方法的大致功能:

checkSapiEnv()

主要是限制只能在 cli 模式下運行。

init()

完成一些初始化操做:指定 .pid 文件、建立日誌文件、設置主進程狀態爲已開始、記錄開始時間、設置主進程名稱、調用靜態方法 initId() 按照每一個角色的數量建立 $_idMap 靜態屬性(用來佔位),最後經過 Timer::init() 安裝一個 alarm 鬧鐘信號處理(定時器)。

parseCommand()

該方法用來解析命令行參數,start 並無作太多事情,只是判斷了一下 .pid 文件是否存在,防止重複啓動,並判斷是否有 -d 肯定是否在後臺執行。

daemonize()

該方法是讓一個進程成爲守護進程的核心,因爲 linux 做爲分時多用戶系統,須要標記每一個登錄的用戶,因此須要 「會話」 的概念,當用戶經過終端登錄後,通常會存在如下幾類進程:

看一下 daemonize() 的代碼:

protected static function daemonize()
{
if (!static::$daemonize || static::$_OS !== 'linux') {
return;
}
umask( 0);
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception('fork fail');
} elseif ($pid > 0) {
exit(0);
}
if (-1 === posix_setsid()) {
throw new Exception("setsid fail");
}
// Fork again avoid SVR4 system regain the control of terminal.
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception("fork fail");
} elseif (0 !== $pid) {
exit(0);
}
}

該方法的核心是 posix_setsid(),它能夠建立一個會話,因爲會話一般是獨佔終端的,致使當前會話期內的某個進程建立會話後沒法 「搶到」 終端,進而變成後端進程,也就是咱們所說的 「守護進程」。

另外,在 php 中能夠經過 pcntl 擴展的 pcntl_fork() 方法從當前進程 fork 一個新的進程,daemonize() 方法之因此須要在建立會話先後各 fork 一次,是因爲:

  • 組長進程沒法建立會話,因此會在建立會話以前先 fork 一次,這樣保證子進程必定不是組長,以後退出主進程,在子進程中建立會話
  • 在子進程中建立會話以後,此時的子進程已是會話的首進程(session leader),爲了不當前的 leader 再次鏈接終端,所以須要再 fork 一次以後退出主進程

通過這些步驟以後,獲得的子進程即 Master 進程。

initWorkers()

首先遍歷全部 workers 對象,對一些名稱和權限作下判斷,接着若是端口非複用($reusePort 屬性),會直接調用對象方法 listen() 開啓監聽:

if (!$worker->reusePort) {
$worker->listen();
}

注意這裏的 listen() 是循環出的 Worker 對象調用的,例如當前有 2 個 WebServer、1 個 Register、4 個 Gateway、4 個 BusinessWorker 進程,若是端口是非複用,在 forkWorkers() 以前,只會建立 3 個 socket 監聽(BusinessWorker 沒有),這一點從端口號上就能夠看出來(圖片來自 WebSocket實戰之————Workerman服務器的安裝啓動):

這些監聽地址對應的是建立角色對象時的構造參數 $socket_name(後面咱們遇到的內部監聽地址基於 lanIp 屬性)。

如今以 Gateway 爲例,此時端口是非複用的,那爲何 4 個 Gateway 進程依然能夠監聽同一個端口?事實上這 4 個 socket 是在 forkWorkers() 以後子進程從父進程的 Gateway 對象那裏繼承來的。

這種狀況下,當監聽的端口有客戶端請求過來時,就會形成 「驚羣」 現象(Linux網絡編程「驚羣」問題總結),即這 4 個 Gateway 進程都會被喚醒,但通過內核調度以後,只有一個進程可以處理請求,其他的失敗後繼續休眠。

而開啓監聽端口複用後,能夠容許多個無親緣關係的進程監聽相同的端口,而且由系統內核作負載均衡,決定將 socket 鏈接交給哪一個進程處理,避免了驚羣效應,能夠提高多進程短鏈接應用的性能,listen() 中開啓複用的代碼:

if ($this->reusePort) {
stream_context_set_option( $this->_context, 'socket', 'so_reuseport', 1);
}

這段代碼實際上是在下面的 forkWorkers() 子進程中開啓端口複用的狀況下才會被調用;另外,開啓端口複用須要 php 版本 >=7.0。

installSignal()

接下來就是爲當前主進程安裝信號處理,handler 的代碼:

public static function signalHandler($signal)
{
switch ($signal) {
// Stop.
case SIGINT:
static::$_gracefulStop = false;
static::stopAll();
break;
// Graceful stop.
case SIGTERM:
static::$_gracefulStop = true;
static::stopAll();
break;
// Reload.
case SIGQUIT:
case SIGUSR1:
if($signal === SIGQUIT){
static::$_gracefulStop = true;
} else{
static::$_gracefulStop = false;
}
static::$_pidsToRestart = static::getAllWorkerPids();
static::reload();
break;
// Show status.
case SIGUSR2:
static::writeStatisticsToStatusFile();
break;
// Show connection status.
case SIGIO:
static::writeConnectionsStatisticsToStatusFile();
break;
}
}

能夠看到 SIGINT/SIGTERM 都是中止,SIGQUIT/SIGUSR1 爲重啓,SIGUSR2 爲輸出進程狀態,SIGIO輸出鏈接狀態。

saveMasterPid()

建立當前主進程的 .pid 文件。

displayUI()

輸出命令行下的界面。

forkWorkers()

這裏只討論在 linux 下的狀況,經過遍歷 $_workers 全部的角色對象,判斷 $_pidMap 中當前角色已經有的 pid 數量和角色對象 count 屬性須要的工做進程數量,直到每一個角色都建立出 count 須要的數量。

看一下 linux 下建立工做進程的邏輯:

protected static function forkOneWorkerForLinux($worker)
{
// Get available worker id.
$id = static::getId($worker->workerId, 0);
if ($id === false) {
return;
}
$pid = pcntl_fork();
// For master process.
if ($pid > 0) {
static::$_pidMap[$worker->workerId][$pid] = $pid;
static::$_idMap[$worker->workerId][$id] = $pid;
} // For child processes.
elseif (0 === $pid) {
if ($worker->reusePort) {
$worker->listen();
}
if (static::$_status === static::STATUS_STARTING) {
static::resetStd();
}
static::$_pidMap = array();
static::$_workers = array($worker->workerId => $worker);
Timer::delAll();
static::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
$worker->setUserAndGroup();
$worker->id = $id;
$worker->run();
$err = new Exception('event-loop exited');
static::log($err);
exit(250);
} else {
throw new Exception("forkOneWorker fail");
}
}

首先會調用 getId() 判斷:

protected static function getId($worker_id, $pid)
{
return array_search($pid, static::$_idMap[$worker_id]);
}

在以前的靜態方法 init() 中已經爲每一個角色對象開闢了相應 count 數量的位置,這裏判斷 $_idMap就是佔座的過程;而且以後若是子進程退出,$_idMap 對應 id 的位置就會空缺,從新建立子進程就是填坑的過程。

回到 forkOneWorkerForLinux() 的邏輯,fork 以後,主進程記錄下 pid 就結束了,子進程首先會判斷端口是否可複用,可複用則調用 listen() 開啓監聽,不過須要注意的是,以前的 initWorkers() 中非複用狀況下建立的是屬於角色對象的 socket(3 個),而這裏若是開啓複用,是每一個子進程中的當前角色對象本身建立出來的 socket,而不是像非複用那樣從主進程的相應對象繼承;也就是說有幾個子進程就建立幾個 socket(前提是子進程對應角色對象建立時有 $socket_name),請注意和非複用狀況進行區分。

接着重定向 std 輸出和錯誤,因爲 $_pidMap 是提供給主進程監控用的,對子進程來講沒什麼意義,因此直接清空;緊接着調用 Timer::delAll() 結束可能的鬧鐘信號,並清除全部可能存在的 event loop。

設置完當前進程的一些屬性以後,調用 $worker->run() 子進程就正式執行它本身的邏輯了,run() 裏面的流程咱們放到後面來解析,先接着原有的方法往下看。

resetStd()

重定向主進程的 STDOUT 和 STDERR,由於進程已經脫離了終端,輸出可能致使一些未知的異常,因此須要重定向到 /dev/null 這臺黑洞設備上;從上面的 forkWorkers() 能夠看到,這個方法也會在子進程中使用。

monitorWorkers()

通過 forkWorkers() 以後,子進程已經使用定時器或者進入 event loop 處理本身的業務去了,所以這個方法只會在主進程內執行。

該方法的做用就是經過 wait 掛起,直到收到子進程退出信號以後,來決定是補上仍是直接退出,看一下監控邏輯:

protected static function monitorWorkersForLinux()
{
static::$_status = static::STATUS_RUNNING;
while (1) {
// Calls signal handlers for pending signals.
pcntl_signal_dispatch();
 
// Suspends execution of the current process until a child has exited, or until a signal is delivered
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
 
// Calls signal handlers for pending signals again.
pcntl_signal_dispatch();
// If a child has already exited.
if ($pid > 0) {
// Find out witch worker process exited.
foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
if (isset($worker_pid_array[$pid])) {
$worker = static::$_workers[$worker_id];
// Exit status.
if ($status !== 0) {
static::log("worker[" . $worker->name . ":$pid] exit with status $status");
}
 
// For Statistics.
if (!isset(static::$_globalStatistics['worker_exit_info'][$worker_id][$status])) {
static::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
}
static::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
 
// Clear process data.
unset(static::$_pidMap[$worker_id][$pid]);
 
// Mark id is available.
$id = static::getId($worker_id, $pid);
static::$_idMap[$worker_id][$id] = 0;
 
break;
}
}
// Is still running state then fork a new worker process.
if (static::$_status !== static::STATUS_SHUTDOWN) {
static::forkWorkers();
// If reloading continue.
if (isset(static::$_pidsToRestart[$pid])) {
unset(static::$_pidsToRestart[$pid]);
static::reload();
}
} else {
// If shutdown state and all child processes exited then master process exit.
if (!static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
} else {
// If shutdown state and all child processes exited then master process exit.
if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
}
}

這裏先提一下 pcntl_signal_dispatch(),該方法是爲了去信號隊列看一下有沒有發送給當前進程的信號,並觸發主進程的 signalHandler(),固然咱們也可使用 declare(ticks) 來決定執行 ticks 行基礎代碼後自動調用 handler,但畢竟不是每次自動調用時都會有信號,因此咱們須要主動調用 pcntl_signal_dispatch() 來觸發以減小性能損失。

回到 monitorWorkersForLinux() 的邏輯,該方法爲一個死循環,核心方法爲 pcntl_wait($status, WUNTRACED),經過該方法將主進程掛起,直到收到子進程退出的信號;引用類型的參數 $status 能夠拿到 exit 值,好比代碼裏常常看到的 exit(250)

拿到退出的子進程號以後,將 $_pidMap 中相應的記錄刪除,並經過 pid 和 getId() 找到 id 以後把 $_idMap 中的佔位騰出來(置爲 0)。

緊接着判斷當前進程狀態,因爲 shutdown 會標記主進程狀態爲 STATUS_SHUTDOWN,因此若是是非結束狀態,會繼續 fork 工做進程來填補 $_idMap 以前騰出來的空缺,而且若是 $_pidsToRestart 中有此 pid,即表明該工做進程須要重啓。

若是此時主進程狀態被標記爲 STATUS_SHUTDOWN 而且 getAllWorkerPids() 存在工做進程,則調用 exitAndClearAll() 來結束掉全部進程。

至此,主進程邏輯分析完畢。

listen()

該方法既有可能在主進程,也可能在子進程中調用,但無論調用時機如何,必定是在 run() 方法開始前調用的,因此仍是放到本文來分析,看一下代碼:

public function listen()
{
if (!$this->_socketName) {
return;
}
 
// Autoload.
Autoloader::setRootPath( $this->_autoloadRootPath);
 
if (!$this->_mainSocket) {
// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);
// Check application layer protocol class.
if (!isset(static::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
 
if (!isset(static::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
}
} else {
$this->transport = $scheme;
}
 
$local_socket = static::$_builtinTransports[$this->transport] . ":" . $address;
 
// Flag.
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = 0;
$errmsg = '';
// SO_REUSEPORT.
if ($this->reusePort) {
stream_context_set_option( $this->_context, 'socket', 'so_reuseport', 1);
}
 
// Create an Internet or Unix domain server socket.
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if (!$this->_mainSocket) {
throw new Exception($errmsg);
}
 
if ($this->transport === 'ssl') {
stream_socket_enable_crypto( $this->_mainSocket, false);
} elseif ($this->transport === 'unix') {
$socketFile = substr($address, 2);
if ($this->user) {
chown($socketFile, $this->user);
}
if ($this->group) {
chgrp($socketFile, $this->group);
}
}
 
// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') {
$socket = socket_import_stream( $this->_mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
}
 
// Non blocking.
stream_set_blocking( $this->_mainSocket, 0);
}
 
$this->resumeAccept();
}

能夠看到主要工做是根據建立對象時的 $socket_name 參數值,建立對應的 socket-server 放到當前對象的 $this->_mainSocket 屬性中,再調用 resumeAccept() 方法:

public function resumeAccept()
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
if ($this->transport !== 'udp') {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
} else {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
array($this, 'acceptUdpConnection'));
}
$this->_pauseAccept = false;
}
}

能夠看到該方法是向 event-loop 註冊當前 $this->_mainSocket 的可讀事件,即當有客戶端鏈接上時,根據鏈接類型調用 acceptConnection() 或 acceptUdpConnection(),因爲本系列文章分析的是 todpole遊戲,因此這裏暫且只分析 tcp 長連接:

public function acceptConnection($socket)
{
// Accept a connection on server socket.
$new_socket = @stream_socket_accept($socket, 0, $remote_address);
// Thundering herd.
if (!$new_socket) {
return;
}
 
// TcpConnection.
$connection = new TcpConnection($new_socket, $remote_address);
$this->connections[$connection->id] = $connection;
$connection->worker = $this;
$connection->protocol = $this->protocol;
$connection->transport = $this->transport;
$connection->onMessage = $this->onMessage;
$connection->onClose = $this->onClose;
$connection->onError = $this->onError;
$connection->onBufferDrain = $this->onBufferDrain;
$connection->onBufferFull = $this->onBufferFull;
 
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
call_user_func( $this->onConnect, $connection);
} catch (\Exception $e) {
static::log($e);
exit(250);
} catch (\Error $e) {
static::log($e);
exit(250);
}
}
}

該方法主要工做是經過 stream_socket_accept() 包裝一下以前的 socket-server,並針對每一個 socket-accept 建立一個 Connection 對象放到 $this->connections 屬性中,而且這裏遇到了第二個重要的回調 onConnect(),該回調每一個鏈接事件只會觸發一次。

ConnectionInterface

到了這裏,咱們已經能夠監聽客戶端的鏈接,可是當客戶端上傳數據時,應該如何處理呢?這時咱們須要沿着上面建立的 Connection 對象繼續分析下去。

上面方法中的 TcpConnection 繼承自 ConnectionInterface(看名字覺得是個接口,實際上是個抽象類),看一下 TcpConnection 的構造方法:

public function __construct($socket, $remote_address = '')
{
self::$statistics['connection_count']++;
$this->id = $this->_id = self::$_idRecorder++;
if(self::$_idRecorder === PHP_INT_MAX){
self::$_idRecorder = 0;
}
$this->_socket = $socket;
stream_set_blocking( $this->_socket, 0);
// Compatible with hhvm
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer( $this->_socket, 0);
}
Worker::$globalEvent->add( $this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
$this->_remoteAddress = $remote_address;
static::$connections[$this->id] = $this;
}

該方法中咱們找到了 socket-accept 的讀取回調 baseRead(),如今咱們知道數據的讀取是在建立 Connection 對象時經過註冊 event-loop 可讀事件來觸發的。

baseRead() 邏輯很長,這裏就不貼出來了,不過在代碼中找到了第三個重要的回調 onMessage(),回顧一下 listen() 的主要工做,簡單的歸納爲:

  1. 向 event-loop 註冊 socket-server 的可讀事件,回調 acceptConnection() 經過 stream_socket_accept() 獲得 socket-accept 並建立 Connection 對象;
  2. 建立 Connection 對象的構造方法中向 event-loop 註冊 socket-accept 的可讀事件,回調 baseRead()中解析數據,並觸發回調 onMessage()

須要注意的是,event-loop 在子進程調用 run() 以前尚未開始循環,因此上面分析的回調邏輯只是註冊,還不會觸發。

本文結束,下一篇文章開始分析子進程的工做流程。

相關文章
相關標籤/搜索