提早打個預防針:php
因爲 Worker 類的靜態屬性和子類對象的混用(當前類的靜態屬性存放當前類對象,靜態方法循環靜態屬性遍歷調用子類對象的方法),特別再加上 Master-Worker 進程組織模型,對一些剛接觸的人來講,很容易形成理解上的偏差,當形成理解上的混亂時,咱們須要明確:html
進程和類對象沒有直接關係,進程內不論是靜態方法、普通對象都屬於該進程空間,靜態屬性也不會跨進程共享。當 fork 以後,咱們所閱讀的代碼已經被複制到了另外一個進程空間內(棧/堆/常量等),不要被那些 Worker 子類誤導了。linux
另外,Master-Worker 進程模型中的 Worker 是子進程的表述,屬於進程概念,跟 Worker 類不是一回事。web
入口
入口文件 start.php
,包含了 composer autoload,並執行下面的入口,建立相應的對象:編程
- start_web.php(WebServer)
- start_register.php(Register)
- start_gateway.php(Gateway)
- start_businessworker.php(BusinessWorker)
這 4 個子類其實都繼承自 Worker
類(咱們暫且把它們叫作 「角色」):後端
看一下 Worker
的構造方法:api
public function __construct($socket_name = '', $context_option = array())
{
// Save all worker instances.
$this->workerId = spl_object_hash($this);
static::$_workers[$this->workerId] = $this;
static::$_pidMap[$this->workerId] = array();
// Get autoload root path.
$backtrace = debug_backtrace();
$this->_autoloadRootPath = dirname($backtrace[0]['file']);
// Context for socket.
if ($socket_name) {
$this->_socketName = $socket_name;
if (!isset($context_option['socket']['backlog'])) {
$context_option[
'socket']['backlog'] = static::DEFAULT_BACKLOG;
}
$this->_context = stream_context_create($context_option);
}
}
|
能夠看到入口中建立對象時,實際上是放到了當前類的靜態屬性 $_workers
中,這裏的 workerId
用 get_class($this)
也是同樣的效果,爲了方便表述,暫且把 workerId
叫作 「角色」。另外,子進程的 pid 在靜態屬性 $_pidMap
中是以子類名進行歸類的。服務器
建立完對象,全部的子類對象都到了 $_workers
中,接下來就是執行靜態方法 runAll
。網絡
runAll
該靜態方法把功能都拆了出去,子方法都爲 protected static
方便 override 重寫:session
public static function runAll()
{
static::checkSapiEnv();
static::init();
static::parseCommand();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::displayUI();
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
}
|
看一下每一個方法的大致功能:
checkSapiEnv()
主要是限制只能在 cli 模式下運行。
init()
完成一些初始化操做:指定 .pid 文件、建立日誌文件、設置主進程狀態爲已開始、記錄開始時間、設置主進程名稱、調用靜態方法 initId()
按照每一個角色的數量建立 $_idMap
靜態屬性(用來佔位),最後經過 Timer::init()
安裝一個 alarm
鬧鐘信號處理(定時器)。
parseCommand()
該方法用來解析命令行參數,start 並無作太多事情,只是判斷了一下 .pid 文件是否存在,防止重複啓動,並判斷是否有 -d
肯定是否在後臺執行。
daemonize()
該方法是讓一個進程成爲守護進程的核心,因爲 linux 做爲分時多用戶系統,須要標記每一個登錄的用戶,因此須要 「會話」 的概念,當用戶經過終端登錄後,通常會存在如下幾類進程:
看一下 daemonize()
的代碼:
protected static function daemonize()
{
if (!static::$daemonize || static::$_OS !== 'linux') {
return;
}
umask(
0);
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception('fork fail');
}
elseif ($pid > 0) {
exit(0);
}
if (-1 === posix_setsid()) {
throw new Exception("setsid fail");
}
// Fork again avoid SVR4 system regain the control of terminal.
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception("fork fail");
}
elseif (0 !== $pid) {
exit(0);
}
}
|
該方法的核心是 posix_setsid()
,它能夠建立一個會話,因爲會話一般是獨佔終端的,致使當前會話期內的某個進程建立會話後沒法 「搶到」 終端,進而變成後端進程,也就是咱們所說的 「守護進程」。
另外,在 php 中能夠經過 pcntl
擴展的 pcntl_fork()
方法從當前進程 fork 一個新的進程,daemonize()
方法之因此須要在建立會話先後各 fork 一次,是因爲:
- 組長進程沒法建立會話,因此會在建立會話以前先 fork 一次,這樣保證子進程必定不是組長,以後退出主進程,在子進程中建立會話
- 在子進程中建立會話以後,此時的子進程已是會話的首進程(session leader),爲了不當前的 leader 再次鏈接終端,所以須要再 fork 一次以後退出主進程
通過這些步驟以後,獲得的子進程即 Master 進程。
initWorkers()
首先遍歷全部 workers 對象,對一些名稱和權限作下判斷,接着若是端口非複用($reusePort
屬性),會直接調用對象方法 listen()
開啓監聽:
if (!$worker->reusePort) {
$worker->listen();
}
|
注意這裏的 listen()
是循環出的 Worker
對象調用的,例如當前有 2 個 WebServer
、1 個 Register
、4 個 Gateway
、4 個 BusinessWorker
進程,若是端口是非複用,在 forkWorkers()
以前,只會建立 3 個 socket 監聽(BusinessWorker
沒有),這一點從端口號上就能夠看出來(圖片來自 WebSocket實戰之————Workerman服務器的安裝啓動):
這些監聽地址對應的是建立角色對象時的構造參數 $socket_name
(後面咱們遇到的內部監聽地址基於 lanIp
屬性)。
如今以 Gateway 爲例,此時端口是非複用的,那爲何 4 個 Gateway 進程依然能夠監聽同一個端口?事實上這 4 個 socket 是在 forkWorkers()
以後子進程從父進程的 Gateway
對象那裏繼承來的。
這種狀況下,當監聽的端口有客戶端請求過來時,就會形成 「驚羣」 現象(Linux網絡編程「驚羣」問題總結),即這 4 個 Gateway
進程都會被喚醒,但通過內核調度以後,只有一個進程可以處理請求,其他的失敗後繼續休眠。
而開啓監聽端口複用後,能夠容許多個無親緣關係的進程監聽相同的端口,而且由系統內核作負載均衡,決定將 socket 鏈接交給哪一個進程處理,避免了驚羣效應,能夠提高多進程短鏈接應用的性能,listen()
中開啓複用的代碼:
if ($this->reusePort) {
stream_context_set_option(
$this->_context, 'socket', 'so_reuseport', 1);
}
|
這段代碼實際上是在下面的 forkWorkers()
子進程中開啓端口複用的狀況下才會被調用;另外,開啓端口複用須要 php 版本 >=7.0。
installSignal()
接下來就是爲當前主進程安裝信號處理,handler 的代碼:
public static function signalHandler($signal)
{
switch ($signal) {
// Stop.
case SIGINT:
static::$_gracefulStop = false;
static::stopAll();
break;
// Graceful stop.
case SIGTERM:
static::$_gracefulStop = true;
static::stopAll();
break;
// Reload.
case SIGQUIT:
case SIGUSR1:
if($signal === SIGQUIT){
static::$_gracefulStop = true;
}
else{
static::$_gracefulStop = false;
}
static::$_pidsToRestart = static::getAllWorkerPids();
static::reload();
break;
// Show status.
case SIGUSR2:
static::writeStatisticsToStatusFile();
break;
// Show connection status.
case SIGIO:
static::writeConnectionsStatisticsToStatusFile();
break;
}
}
|
能夠看到 SIGINT
/SIGTERM
都是中止,SIGQUIT
/SIGUSR1
爲重啓,SIGUSR2
爲輸出進程狀態,SIGIO
輸出鏈接狀態。
saveMasterPid()
建立當前主進程的 .pid 文件。
displayUI()
輸出命令行下的界面。
forkWorkers()
這裏只討論在 linux 下的狀況,經過遍歷 $_workers
全部的角色對象,判斷 $_pidMap
中當前角色已經有的 pid 數量和角色對象 count
屬性須要的工做進程數量,直到每一個角色都建立出 count
須要的數量。
看一下 linux 下建立工做進程的邏輯:
protected static function forkOneWorkerForLinux($worker)
{
// Get available worker id.
$id =
static::getId($worker->workerId, 0);
if ($id === false) {
return;
}
$pid = pcntl_fork();
// For master process.
if ($pid > 0) {
static::$_pidMap[$worker->workerId][$pid] = $pid;
static::$_idMap[$worker->workerId][$id] = $pid;
}
// For child processes.
elseif (0 === $pid) {
if ($worker->reusePort) {
$worker->listen();
}
if (static::$_status === static::STATUS_STARTING) {
static::resetStd();
}
static::$_pidMap = array();
static::$_workers = array($worker->workerId => $worker);
Timer::delAll();
static::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
$worker->setUserAndGroup();
$worker->id = $id;
$worker->run();
$err =
new Exception('event-loop exited');
static::log($err);
exit(250);
}
else {
throw new Exception("forkOneWorker fail");
}
}
|
首先會調用 getId()
判斷:
protected static function getId($worker_id, $pid)
{
return array_search($pid, static::$_idMap[$worker_id]);
}
|
在以前的靜態方法 init()
中已經爲每一個角色對象開闢了相應 count
數量的位置,這裏判斷 $_idMap
就是佔座的過程;而且以後若是子進程退出,$_idMap
對應 id 的位置就會空缺,從新建立子進程就是填坑的過程。
回到 forkOneWorkerForLinux()
的邏輯,fork 以後,主進程記錄下 pid 就結束了,子進程首先會判斷端口是否可複用,可複用則調用 listen()
開啓監聽,不過須要注意的是,以前的 initWorkers()
中非複用狀況下建立的是屬於角色對象的 socket(3 個),而這裏若是開啓複用,是每一個子進程中的當前角色對象本身建立出來的 socket,而不是像非複用那樣從主進程的相應對象繼承;也就是說有幾個子進程就建立幾個 socket(前提是子進程對應角色對象建立時有 $socket_name
),請注意和非複用狀況進行區分。
接着重定向 std 輸出和錯誤,因爲 $_pidMap
是提供給主進程監控用的,對子進程來講沒什麼意義,因此直接清空;緊接着調用 Timer::delAll()
結束可能的鬧鐘信號,並清除全部可能存在的 event loop。
設置完當前進程的一些屬性以後,調用 $worker->run()
子進程就正式執行它本身的邏輯了,run()
裏面的流程咱們放到後面來解析,先接着原有的方法往下看。
resetStd()
重定向主進程的 STDOUT 和 STDERR,由於進程已經脫離了終端,輸出可能致使一些未知的異常,因此須要重定向到 /dev/null
這臺黑洞設備上;從上面的 forkWorkers()
能夠看到,這個方法也會在子進程中使用。
monitorWorkers()
通過 forkWorkers()
以後,子進程已經使用定時器或者進入 event loop 處理本身的業務去了,所以這個方法只會在主進程內執行。
該方法的做用就是經過 wait
掛起,直到收到子進程退出信號以後,來決定是補上仍是直接退出,看一下監控邏輯:
protected static function monitorWorkersForLinux()
{
static::$_status = static::STATUS_RUNNING;
while (1) {
// Calls signal handlers for pending signals.
pcntl_signal_dispatch();
// Suspends execution of the current process until a child has exited, or until a signal is delivered
$status =
0;
$pid = pcntl_wait($status, WUNTRACED);
// Calls signal handlers for pending signals again.
pcntl_signal_dispatch();
// If a child has already exited.
if ($pid > 0) {
// Find out witch worker process exited.
foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
if (isset($worker_pid_array[$pid])) {
$worker =
static::$_workers[$worker_id];
// Exit status.
if ($status !== 0) {
static::log("worker[" . $worker->name . ":$pid] exit with status $status");
}
// For Statistics.
if (!isset(static::$_globalStatistics['worker_exit_info'][$worker_id][$status])) {
static::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
}
static::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
// Clear process data.
unset(static::$_pidMap[$worker_id][$pid]);
// Mark id is available.
$id =
static::getId($worker_id, $pid);
static::$_idMap[$worker_id][$id] = 0;
break;
}
}
// Is still running state then fork a new worker process.
if (static::$_status !== static::STATUS_SHUTDOWN) {
static::forkWorkers();
// If reloading continue.
if (isset(static::$_pidsToRestart[$pid])) {
unset(static::$_pidsToRestart[$pid]);
static::reload();
}
}
else {
// If shutdown state and all child processes exited then master process exit.
if (!static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
}
else {
// If shutdown state and all child processes exited then master process exit.
if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
}
}
|
這裏先提一下 pcntl_signal_dispatch()
,該方法是爲了去信號隊列看一下有沒有發送給當前進程的信號,並觸發主進程的 signalHandler()
,固然咱們也可使用 declare(ticks)
來決定執行 ticks
行基礎代碼後自動調用 handler,但畢竟不是每次自動調用時都會有信號,因此咱們須要主動調用 pcntl_signal_dispatch()
來觸發以減小性能損失。
回到 monitorWorkersForLinux()
的邏輯,該方法爲一個死循環,核心方法爲 pcntl_wait($status, WUNTRACED)
,經過該方法將主進程掛起,直到收到子進程退出的信號;引用類型的參數 $status
能夠拿到 exit 值,好比代碼裏常常看到的 exit(250)
。
拿到退出的子進程號以後,將 $_pidMap
中相應的記錄刪除,並經過 pid 和 getId()
找到 id 以後把 $_idMap
中的佔位騰出來(置爲 0)。
緊接着判斷當前進程狀態,因爲 shutdown 會標記主進程狀態爲 STATUS_SHUTDOWN
,因此若是是非結束狀態,會繼續 fork 工做進程來填補 $_idMap
以前騰出來的空缺,而且若是 $_pidsToRestart
中有此 pid,即表明該工做進程須要重啓。
若是此時主進程狀態被標記爲 STATUS_SHUTDOWN
而且 getAllWorkerPids()
存在工做進程,則調用 exitAndClearAll()
來結束掉全部進程。
至此,主進程邏輯分析完畢。
listen()
該方法既有可能在主進程,也可能在子進程中調用,但無論調用時機如何,必定是在 run()
方法開始前調用的,因此仍是放到本文來分析,看一下代碼:
public function listen()
{
if (!$this->_socketName) {
return;
}
// Autoload.
Autoloader::setRootPath(
$this->_autoloadRootPath);
if (!$this->_mainSocket) {
// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);
// Check application layer protocol class.
if (!isset(static::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
if (!isset(static::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
}
}
else {
$this->transport = $scheme;
}
$local_socket =
static::$_builtinTransports[$this->transport] . ":" . $address;
// Flag.
$flags =
$this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno =
0;
$errmsg =
'';
// SO_REUSEPORT.
if ($this->reusePort) {
stream_context_set_option(
$this->_context, 'socket', 'so_reuseport', 1);
}
// Create an Internet or Unix domain server socket.
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if (!$this->_mainSocket) {
throw new Exception($errmsg);
}
if ($this->transport === 'ssl') {
stream_socket_enable_crypto(
$this->_mainSocket, false);
}
elseif ($this->transport === 'unix') {
$socketFile = substr($address,
2);
if ($this->user) {
chown($socketFile,
$this->user);
}
if ($this->group) {
chgrp($socketFile,
$this->group);
}
}
// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') {
$socket = socket_import_stream(
$this->_mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE,
1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY,
1);
}
// Non blocking.
stream_set_blocking(
$this->_mainSocket, 0);
}
$this->resumeAccept();
}
|
能夠看到主要工做是根據建立對象時的 $socket_name
參數值,建立對應的 socket-server 放到當前對象的 $this->_mainSocket
屬性中,再調用 resumeAccept()
方法:
public function resumeAccept()
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
if ($this->transport !== 'udp') {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
}
else {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
array($this, 'acceptUdpConnection'));
}
$this->_pauseAccept = false;
}
}
|
能夠看到該方法是向 event-loop 註冊當前 $this->_mainSocket
的可讀事件,即當有客戶端鏈接上時,根據鏈接類型調用 acceptConnection()
或 acceptUdpConnection()
,因爲本系列文章分析的是 todpole
遊戲,因此這裏暫且只分析 tcp 長連接:
public function acceptConnection($socket)
{
// Accept a connection on server socket.
$new_socket = @stream_socket_accept($socket,
0, $remote_address);
// Thundering herd.
if (!$new_socket) {
return;
}
// TcpConnection.
$connection =
new TcpConnection($new_socket, $remote_address);
$this->connections[$connection->id] = $connection;
$connection->worker =
$this;
$connection->protocol =
$this->protocol;
$connection->transport =
$this->transport;
$connection->onMessage =
$this->onMessage;
$connection->onClose =
$this->onClose;
$connection->onError =
$this->onError;
$connection->onBufferDrain =
$this->onBufferDrain;
$connection->onBufferFull =
$this->onBufferFull;
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
call_user_func(
$this->onConnect, $connection);
}
catch (\Exception $e) {
static::log($e);
exit(250);
}
catch (\Error $e) {
static::log($e);
exit(250);
}
}
}
|
該方法主要工做是經過 stream_socket_accept()
包裝一下以前的 socket-server,並針對每一個 socket-accept 建立一個 Connection 對象放到 $this->connections
屬性中,而且這裏遇到了第二個重要的回調 onConnect()
,該回調每一個鏈接事件只會觸發一次。
ConnectionInterface
到了這裏,咱們已經能夠監聽客戶端的鏈接,可是當客戶端上傳數據時,應該如何處理呢?這時咱們須要沿着上面建立的 Connection 對象繼續分析下去。
上面方法中的 TcpConnection
繼承自 ConnectionInterface
(看名字覺得是個接口,實際上是個抽象類),看一下 TcpConnection
的構造方法:
public function __construct($socket, $remote_address = '')
{
self::$statistics['connection_count']++;
$this->id = $this->_id = self::$_idRecorder++;
if(self::$_idRecorder === PHP_INT_MAX){
self::$_idRecorder = 0;
}
$this->_socket = $socket;
stream_set_blocking(
$this->_socket, 0);
// Compatible with hhvm
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer(
$this->_socket, 0);
}
Worker::$globalEvent->add(
$this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
$this->_remoteAddress = $remote_address;
static::$connections[$this->id] = $this;
}
|
該方法中咱們找到了 socket-accept 的讀取回調 baseRead()
,如今咱們知道數據的讀取是在建立 Connection 對象時經過註冊 event-loop 可讀事件來觸發的。
baseRead()
邏輯很長,這裏就不貼出來了,不過在代碼中找到了第三個重要的回調 onMessage()
,回顧一下 listen()
的主要工做,簡單的歸納爲:
- 向 event-loop 註冊 socket-server 的可讀事件,回調
acceptConnection()
經過stream_socket_accept()
獲得 socket-accept 並建立 Connection 對象; - 建立 Connection 對象的構造方法中向 event-loop 註冊 socket-accept 的可讀事件,回調
baseRead()
中解析數據,並觸發回調onMessage()
;
須要注意的是,event-loop 在子進程調用 run()
以前尚未開始循環,因此上面分析的回調邏輯只是註冊,還不會觸發。
本文結束,下一篇文章開始分析子進程的工做流程。