workerman源碼-workerman啓動流程

前面咱們跟着代碼看了一遍workerman的初始化流程.但對於如何監聽端口.等操做尚未具體的實現.咱們此次就來看一下.workerman是如何監聽端口並運行的.php

runAll

在前面咱們初始化方法事後,就開始執行runAll方法.在runAll方法中,有以下幾個方法.咱們會一個一個方法介紹.注意.在這裏,全部的都是以static來運行的.因此,就是Worker類自己.linux

// 初始化環境變量.是否爲linux.  
static::checkSapiEnv();  
// 執行初始化流程  
static::init();  
// 鎖定命令  
static::lock();  
// 解析命令行  
static::parseCommand();  
// 是否後臺運行  
static::daemonize();  
// 初始化worker  
static::initWorkers();  
// 安裝信號處理  
static::installSignal();  
// 保存主進程ID  
static::saveMasterPid();  
// 解鎖  
static::unlock();  
// 展現ui  
static::displayUI();  
// fork子進程  
static::forkWorkers();  
// 重定義輸入輸出  
static::resetStd();  
// 監控子進程.  
static::monitorWorkers();

checkSapiEnv

檢查當前命令行的環境.至於爲何會有這個函數,主要是因爲window下.只能開啓一個進程.因此在不少地方都須要對於windows來進行特殊的處理.咱們來看看代碼.ubuntu

// 若是不是cli運行,就直接提示.  
if (php_sapi_name() != "cli") {  
   exit("only run in command line mode n");
}  
// 若是根據文件分隔符來判斷,windows下爲,linux爲/.這是一種經常使用的方式.  
if (DIRECTORY_SEPARATOR === '') {  
   self::$_OS = OS_TYPE_WINDOWS;
}

init

這裏主要是對錯誤處理,pid文件,和日誌文件的初始化.還有初始化計時器windows

set_error_handler(function($code, $msg, $file, $line){  
 Worker::safeEcho("$msg in file $file on line $linen");
});    
// 這裏指定了啓動文件.啓動文件將用於後續的啓動信息.  
$backtrace        = debug_backtrace();  
// startFile格式爲全路徑.例如在/var/www/study/https.php,那麼startFile就爲/var/www/study/https.php.  
static::$_startFile = $backtrace[count($backtrace) - 1]['file'];  
// 根據啓動生成惟一的前綴. 根據startFile的全路徑生成惟一的前綴.  
$unique_prefix = str_replace('/', '_', static::$_startFile);  
  
// 根據前綴生成pidFile.存放於vendor/workerman下面.  
if (empty(static::$pidFile)) {  
 static::$pidFile = __DIR__ . "/../$unique_prefix.pid";
}  
  
// 生成日誌文件.  
if (empty(static::$logFile)) {  
 static::$logFile = __DIR__ . '/../workerman.log';
}  
  
$log_file = (string)static::$logFile;  
// 若是沒有日誌文件,就初始話日誌文件.  
if (!is_file($log_file)) {  
    touch($log_file); 
    chmod($log_file, 0622);
}  
  
// 初始化狀態爲啓動中...  
static::$_status = static::STATUS_STARTING;  
  
// 設置啓動時間和狀態文件  
static::$_globalStatistics['start_timestamp'] = time();  
static::$_statisticsFile                      = sys_get_temp_dir() . "/$unique_prefix.status";  
  
// 設置進程標題  
static::setProcessTitle('WorkerMan: master process  start_file=' . static::$_startFile);  
  
// 初始化worker的idmap.  
static::initId();  
  
// 初始化計時器.  
Timer::init();

這裏面其中最主要的是初始化worker的idmap,用於記錄worker的對應關係. 這裏主要存儲子worker的pid信息.咱們來看下函數.api

protected static function initId()  
{  
 // 循環全部的worker,並將全部的進程信息初始化賦值.  
 foreach (static::$_workers as $worker_id => $worker){ 
    $new_id_map = array(); 
    $worker->count = $worker->count <= 0 ? 1 : $worker->count; 
    for($key = 0; $key < $worker->count; $key++) {  
        $new_id_map[$key] = isset(static::$_idMap[$worker_id][$key]) ? static::$_idMap[$worker_id][$key] : 0; 
    } 
    static::$_idMap[$worker_id] = $new_id_map; 
 }
}

初始化完成會造成,worker_id => 子進程信息.例如數組

array(1) {  
 ["000000001364d94e00000000462a38e7"]=> array(4) { 
 [0]=> int(0) 
 [1]=> int(0) 
 [2]=> int(0) 
 [3]=> int(0) 
 }
}

這樣的結構便於將id對應到對應的信息中.socket

lock

protected static function lock()  
{  
 // 打開鎖定的啓動文件.  
 $fd = fopen(static::$_startFile, 'r'); // 若是文件沒法打卡.則默認認爲workerman已經啓動了.  
 if (!$fd || !flock($fd, LOCK_EX)) {            
    static::log("Workerman[".static::$_startFile."] already running"); exit; 
 }
}

爲什麼須要鎖定啓動文件.這裏會引起一個問題,若是同一時間啓動worker.這樣會致使沒法預知的錯誤.因此在這裏就先鎖定啓動文件.就保存的原子性操做.tcp

parseCommand

完成了文件鎖定後,就開始解析咱們的命令了.因爲這次信息較多.會單獨寫一個文章來解讀.函數

daemonize

daemonize是否運行在後臺,若是咱們在命令行中使用了-d參數,就會執行此函數.可是咱們執行的命令行沒有加上-d.因此,就不會執行此函數.咱們能夠來看一下.oop

protected static function daemonize()  
{  
 // 若是不是後臺運行,若是不是linux.就直接返回.  
 if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) { 
    return; 
 } 
 // 設定umask.  
 umask(0); // 開始分叉,並返回pid.若是分叉失敗,pid就爲-1.若是成功pid就爲0.  
 $pid = pcntl_fork(); 
 if (-1 === $pid) { 
    throw new Exception('fork fail'); 
 } elseif ($pid > 0) { exit(0); } 
 // 將當前會話設置爲進程組長,並管理其餘子進程. 設置父進程先與子進程退出,子進程則會被1號進程收養,這個子進程就會成爲init的子進程.  
 if (-1 === posix_setsid()) { 
    throw new Exception("setsid fail"); 
 } 
 // 再次分叉進程.以免SVR4系統從新得到終端的控制.  
 $pid = pcntl_fork(); 
 if (-1 === $pid) { 
    throw new Exception("fork fail"); } elseif (0 !== $pid) { exit(0); 
 }
}

initWorkers

執行完後臺運行事後,就開始初始化worker了.

protected static function initWorkers()  
{  
 // 若是系統不是linux就直接返回.  
 if (static::$_OS !== OS_TYPE_LINUX) { 
     return; 
 } 
 // 開始初始化worker  
 foreach (static::$_workers as $worker) { 
     // 設置worker name.  
     if (empty($worker->name)) { 
        $worker->name = 'none'; 
     }  
     // 設置worker的user  
     if (empty($worker->user)) { 
        $worker->user = static::getCurrentUser(); 
     } else { 
         if (posix_getuid() !== 0 && $worker->user != static::getCurrentUser()) { 
             static::log('Warning: You must have the root privileges to change uid and gid.'); 
         } 
     }  
     // 設置socket名稱.  
     $worker->socket = $worker->getSocketName();  
     // 設置狀態.  
     $worker->status = '<g> [OK] </g>';  
     // 設置ui.  
     foreach(static::getUiColumns() as $column_name => $prop){ 
        !isset($worker->{$prop}) && $worker->{$prop}= 'NNNN'; 
        $prop_length = strlen($worker->{$prop}); 
        $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength'; 
        static::$$key = max(static::$$key, $prop_length); 
     }  
     // 監聽.  
     if (!$worker->reusePort) { $worker->listen(); } }}

installSignal

初始化完worker事後,就監聽信號集了.在這裏監聽的信號集.信號集只能在linux中使用.

protected static function installSignal()  
{  
 if (static::$_OS !== OS_TYPE_LINUX) { 
    return; 
 } 
 // 進程關閉,Ctrl+c時觸發  
 pcntl_signal(SIGINT, array('WorkermanWorker', 'signalHandler'), false); 
 // 進程強制關閉,例如使用kill.等命令關閉進程時觸發.  
 pcntl_signal(SIGTERM, array('WorkermanWorker', 'signalHandler'), false); 
 // 進程重啓  
 pcntl_signal(SIGUSR1, array('WorkermanWorker', 'signalHandler'), false); 
 // 進程退出  
 pcntl_signal(SIGQUIT, array('WorkermanWorker', 'signalHandler'), false); 
 // 查看進程的狀態  
 pcntl_signal(SIGUSR2, array('WorkermanWorker', 'signalHandler'), false); 
 // 查看連接狀態  
 pcntl_signal(SIGIO, array('WorkermanWorker', 'signalHandler'), false); 
 // 屏蔽SIGPIPE消息  
 pcntl_signal(SIGPIPE, SIG_IGN, false);
}

saveMasterPid

將主進程的ID保存到pidFile文件中.

protected static function saveMasterPid()  
{  
 if (static::$_OS !== OS_TYPE_LINUX) { 
    return; 
 }  
 static::$_masterPid = posix_getpid(); // 將主進程ID保存到pidFile中,pidFile在init時已經指定爲vendor/workerman/下面的pid文件中去了.  
 if (false === file_put_contents(static::$pidFile, static::$_masterPid)) { 
    throw new Exception('can not save pid to ' . static::$pidFile); 
 }
}

unlock

當以上都執行完成事後就解除咱們啓動文件的鎖定狀態.這個函數主要就是進行解鎖操做.

protected static function unlock()  
{  
     $fd = fopen(static::$_startFile, 'r'); $fd && flock($fd, LOCK_UN);
}

displayUI

顯示ui.workerman在處理的時候,是先展現的ui.就是打印出相似於如下狀況的信息

Workerman[http.php] start in DEBUG mode  
----------------------------------------- WORKERMAN -----------------------------------------  
Workerman version:3.5.22          PHP version:7.1.23-4+ubuntu18.04.1+deb.sury.org+1  
------------------------------------------ WORKERS ------------------------------------------  
proto   user            worker          listen                 processes    status 
tcp     adolph          none            http://0.0.0.0:2345    4             [OK] 
---------------------------------------------------------------------------------------------  
Press Ctrl+C to stop. Start success.

因此,信息打印出來事後也不必定會正確的監聽事件.下面咱們就繼續查看.

forkWorkers

以上的準備工做完成,就開始fork咱們的子進程了.並設置子進程的監聽.

rotected static function forkWorkers()  
{  
    if (static::$_OS === OS_TYPE_LINUX) { 
        // 若是是linux就按照fork  
        static::forkWorkersForLinux(); 
    } else { 
        // windows下監聽  
        static::forkWorkersForWindows(); 
    }
}

根據平臺來處理對應的子進程.咱們是在linux中,因此咱們就看一下static::forkWorkersForLinux這個函數.

protected static function forkWorkersForLinux()  
{  
    // 根據全部的worker,咱們的狀態信息在init中存放信息.  
    foreach (static::$_workers as $worker) { 
        if (static::$_status === static::STATUS_STARTING) { 
            if (empty($worker->name)) { 
                $worker->name = $worker->getSocketName(); 
            } 
            $worker_name_length = strlen($worker->name); 
            if (static::$_maxWorkerNameLength < $worker_name_length) { 
                static::$_maxWorkerNameLength = $worker_name_length; 
            } 
        } 
        // 開始fork.從$_pidMap來獲取信息並判斷.這裏的$_pidMap則是由workerId => 子進程的進程號組成的數組.  
        // 而_pidMap則是在new Worker的時候將Worker直接存儲進去的.因此在這裏咱們就解決了在初始化流程的時候疑問了.  
        while (count(static::$_pidMap[$worker->workerId]) < $worker->count) { 
        // 開始fork  
            static::forkOneWorkerForLinux($worker); 
        } 
    }
}

static::forOneWorkerForLinux函數主要是來處理worker信息.這裏咱們來看一下.

protected static function forkOneWorkerForLinux($worker)  
{  
    // 開始獲取id信息. 從idMap中獲取   
    // array_search($pid, static::$_idMap[$worker_id]);  
    // 因爲咱們的idMap是由workerId=>子進程號組成的數組.因此,第一次查出來的id指定爲0.  
    $id = static::getId($worker->workerId, 0); 
    if ($id === false) { 
        return; 
    } 
    // 開始fork
    $pid = pcntl_fork(); 
    // 若是pid大於0. 則是主進程.  
    if ($pid > 0) { 
        // 存到pidMap中,  
        static::$_pidMap[$worker->workerId][$pid] = $pid; 
        // 將idMap中的id換爲進程號.這裏的進程號就包含了主進程的信息.  
        static::$_idMap[$worker->workerId][$id]   = $pid; 
    } elseif (0 === $pid) {  
        // 若是是子進程.  
        srand(); 
        mt_srand(); 
        // 開始監聽事件.  
        if ($worker->reusePort) { 
            $worker->listen(); 
        } 
        // 若是狀態爲STATUS_STARTING,就重定向輸入輸出.  
        if (static::$_status === static::STATUS_STARTING) { 
            static::resetStd(); 
        } 
        // 從新初始化pid. 因此,在子進程中.子進程的pidMap爲空.  
        static::$_pidMap  = array(); 
        // 開始處理解除掉其餘worker的監聽.保證當前子進程中,只有一個worker來監聽事件.  
        foreach(static::$_workers as $key => $one_worker) { 
            if ($one_worker->workerId !== $worker->workerId) { 
            $one_worker->unlisten(); unset(static::$_workers[$key]); 
            } 
        } 
        // 刪除全部的定時器.  
        Timer::delAll();
        static::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());    
        // 設置用戶和組.  
        $worker->setUserAndGroup(); 
        $worker->id = $id; 
        // 執行run方法.  
        $worker->run(); 
        $err = new Exception('event-loop exited'); 
        static::log($err); 
        exit(250); 
    } else { 
        throw new Exception("forkOneWorker fail"); 
    }
}

上面代碼若是在linux中運行,就已經分爲主進程和子進程來進行處理了.在主進程中包含全部對子進程信息的保存.而子進程則在這裏監聽方法.須要注意一點的是,子進程中對全部父進程的變量是共享的.咱們來看看子進程是如何監聽事件的.$worker->run();

public function run()  
{  
    //更新狀態爲運行中.  
    static::$_status = static::STATUS_RUNNING;  
    // 註冊關閉事件.  
    register_shutdown_function(array("WorkermanWorker", 'checkErrors'));  
    // 設置root的路徑  
    Autoloader::setRootPath($this->_autoloadRootPath);  
    // 建立事件監聽  
    if (!static::$globalEvent) { 
        $event_loop_class = static::getEventLoopName(); 
        static::$globalEvent = new $event_loop_class;           $this->resumeAccept(); 
    }  
    // 從新安裝信號集.  
    static::reinstallSignal();  
    // 初始化計時器  
    Timer::init(static::$globalEvent);  
    // 設置消息時間.  
    if (empty($this->onMessage)) { 
        $this->onMessage = function () {}; 
    } 
    // 還原錯誤處理函數.  
    restore_error_handler(); 
    // 觸發workerStart事件.  
    if ($this->onWorkerStart) { 
        try { 
            call_user_func($this->onWorkerStart, $this); 
        } catch (Exception $e) { 
            static::log($e); 
            // Avoid rapid infinite loop exit.
            sleep(1); 
            exit(250); 
        } catch (Error $e) { 
            static::log($e); 
            // Avoid rapid infinite loop exit. 
            sleep(1); 
            exit(250); 
        } 
    }  
    // 等待事件觸發.  
    static::$globalEvent->loop();
}

run函數開始觸發workerStart事件,並等待worker事件觸發.因此這裏的globalEvent就是咱們的事件處理函數.以上都是在子進程中完成的處理.子進程就會在這裏中止.

resetStd

當咱們fork完全部的子進程事後,就執行咱們的重定向輸入輸出.

public static function resetStd()  
{  
    // 若是不是守護進程.  
    if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) { 
        return; 
    }  
    global $STDOUT, $STDERR; 
    // 打開輸出的文件.  
    $handle = fopen(static::$stdoutFile, "a"); 
    if ($handle) { 
        unset($handle); 
        set_error_handler(function(){});    
        fclose($STDOUT); 
        fclose($STDERR); 
        fclose(STDOUT); 
        fclose(STDERR); 
        $STDOUT = fopen(static::$stdoutFile, "a"); 
        $STDERR = fopen(static::$stdoutFile, "a"); 
        // 從新定向到本身定義輸出裏面.  
        static::outputStream($STDOUT);      
        restoe_error_handler(); 
    } else { 
        throw new Exception('can not open stdoutFile ' . static::$stdoutFile); 
    }
}

monitorWorkers()

最後,就是監聽咱們的子進程信息.這裏因爲咱們的篇幅過長,就另起一張寫入.

未完成

workerman對事件的監聽  
workerman對子進程的監控
相關文章
相關標籤/搜索