workerman源碼-workerman事件監聽

在咱們啓動了workerman事後,按照咱們前面的理解.若是是在linux下.worker子進程啓動端口複用,並監聽事件和處理事件(win忽略).那workerman是如何對事件完成監聽和處理的呢.咱們來看一下.linux

worker listen

在咱們看源碼的時候,在forkOneWorkerForLinux有這樣一行代碼.算法

if ($worker->reusePort) {  
 $worker->listen();}

這裏,一看名字就知道是對端口進行監聽.咱們來看一下源碼swoole

public function listen()  
{  
    // 這裏的socketName就是咱們監聽的地址信息.  
    if (!$this->_socketName) { return; }  
    // 設置自動加載的信息.  
    Autoloader::setRootPath($this->_autoloadRootPath); 
    // 須要監聽的socket.  
    if (!$this->_mainSocket) { 
        // 解析socket地址.  
        $local_socket = $this->parseSocketAddress();  
        // 設置基礎協議.若是是udp.就爲STREAM_SERVER_BIND,若是不是udp就是,就是4 | 8.注意,這裏是按位或算法.則爲 100(4) | 1000(8) = 1100. 
        $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN; $errno = 0; $errmsg = ''; 
        // 設置端口複用.  
        if ($this->reusePort) {
            \stream_context_set_option($this->\context, 'socket', 'so_reuseport', 1); 
        }
        // 開始建立socket.  
        $this->_mainSocket = \stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context); 
        if (!$this->_mainSocket) { 
            throw new Exception($errmsg); 
        } 
        // 若是transport爲ssl,就要特殊處理.  
        if ($this->transport === 'ssl') { 
            \stream_socket_enable_crypto($this->_mainSocket, false); 
        } elseif ($this->transport === 'unix') { 
            // 若是是unix.  
            $socket_file = \substr($local_socket, 7); 
            if ($this->user) { 
                chown($socket_file, $this->user); 
            } 
            if ($this->group) { 
                chgrp($socket_file, $this->group); 
            } 
        }
        // 嘗試打開tcp的keepalive並禁用掉Nagle算法.  
        if (\function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') { 
            \set_error_handler(function(){}); 
            $socket = \socket_import_stream($this->_mainSocket); 
            \socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); 
            \socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1); 
            \restore_error_handler(); 
        } // 爲資源流設置爲非阻塞模式.  
        \stream_set_blocking($this->_mainSocket, 0); 
    } 
    // 設置監聽.  
    $this->resumeAccept();}

在這裏,workerman首先是建立了一個_mainSocket的socket監聽信息.並設置爲非阻塞模式.並設置監聽.socket

public function resumeAccept()  
{  
    // 設置事件監聽.  
    if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) { 
        // 若是不是udp協議.就採用acceptConnection來才聽.  
        if ($this->transport !== 'udp') {
            static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); 
        } else { 
            // udp協議,就採用acceptUdpConnection來監聽  
            static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); 
        } 
        // 設置pauseAccept爲false.  
        $this->_pauseAccept = false; 
    }
}

咱們來看看acceptConnection.tcp

public function acceptConnection($socket)  
{  
    // 設置錯誤信息.  
    \set_error_handler(function(){}); 
    // 接收信息.  
    $new_socket = stream_socket_accept($socket, 0, $remote_address); 
    // 恢復錯誤信息.  
    \restore_error_handler();  
    // 若是沒有接收成功.  
    if (!$new_socket) { return; } 
    // 底層採用Tcp來監聽.  
    $connection                         = new TcpConnection($new_socket, $remote_address); 
    // 設置connections來監聽.  
    $this->connections[$connection->id] = $connection; 
    // 把worker複製進去.  
    $connection->worker                 = $this;
    $connection->protocol               = $this->protocol; 
    $connection->transport              = $this->transport; 
    // 設置事件監聽.  
    $connection->onMessage              = $this->onMessage; 
    // 關閉連接時的處理.  
    $connection->onClose                = $this->onClose; 
    // 設置錯誤信息.  
    $connection->onError                = $this->onError; 
    $connection->onBufferDrain          = $this->onBufferDrain; 
    $connection->onBufferFull           = $this->onBufferFull;  
    // 若是有onConnect鏈接處理.就直接調用.  
    if ($this->onConnect) { 
        try { 
        \call_user_func($this->onConnect, $connection); 
        } catch (\Exception $e) { 
            static::log($e); 
            exit(250); 
        } catch (\Error $e) { 
            static::log($e); 
            exit(250); 
        } 
    }
}

這裏,咱們就完成了對事件的監聽.而且每一個連接下面都將worker的信息賦值給TcpConnection的worker中.因此,每個Connection都會有worker信息.完成了事件的監聽.那如何處理事件的呢.oop

event loop

咱們在worker的run方法中,有這樣一句話.ui

static::$globalEvent->loop();

這裏採用了死循環來等待來處理事件.咱們在前面看了.$globalEvent是採用workerman本身自己的來監聽.它是在run方法中初始化的.this

// run 
if (!static::$globalEvent) {  
    $event_loop_class = static::getEventLoopName(); 
    static::$globalEvent = new $event_loop_class; 
    $this->resumeAccept();
}  
  
// Worker::getEventLoopName()  
protected static function getEventLoopName()  
{  
    // 若是存在就直接返回.  
    if (static::$eventLoopClass) { 
        return static::$eventLoopClass; 
    } 
    // 若是存在就刪除Swoole\Event.  
    if (!\class_exists('\Swoole\Event', false)) {
           unset(static::$_availableEventLoops['swoole']); 
    }     
    $loop_name = '';
    
    // protected static $\_availableEventLoops = array(  
    //     'libevent' => '\\Workerman\\Events\\Libevent', 
    //     'event'    => '\\Workerman\\Events\\Event'     
    //     'swoole'   => '\\Workerman\\Events\\Swoole' 
    // ); 
    foreach (static::$\_availableEventLoops as $name=>$class) { 
        if (\extension_loaded($name)) { 
            $loop_name = $name; 
            break; 
        } 
    }     
    if ($loop\_name) {  
        // 若是存在名稱.就使用React下面的事件處理.  
        if  (\interface_exists('\React\EventLoop\LoopInterface')) { 
            switch ($loop_name) { 
                case 'libevent': 
                    static::$eventLoopClass = '\Workerman\Events\React\ExtLibEventLoop'; 
                    break; 
                case 'event': 
                    static::$eventLoopClass = '\Workerman\Events\React\ExtEventLoop'; 
                    break; 
                default : 
                    static::$eventLoopClass = '\Workerman\Events\React\StreamSelectLoop'; 
                    break; 
            } 
        } else { 
            // 就默認. 咱們這裏採用的Libevent.  
            static::$eventLoopClass = static::$\_availableEventLoops[$loop_name]; 
        } 
    } else { 
        static::$eventLoopClass = \interface_exists('\React\EventLoop\LoopInterface')? '\Workerman\Events\React\StreamSelectLoop':'\Workerman\Events\Select'; 
    } 
    return static::$eventLoopClass;
}

以上就是初始化$globalEvent.我當前的環境是libevent.因此咱們就看一下.WorkermanEventsLibevent的loop方法.unix

public function loop()  
{  
    // 調用了一個event_base_loop.  
    \event_base_loop($this->_eventBase);
}

只有一個方法.event_base_loop就等待事件被觸發,而後觸發他們的事件.這裏的是$this->_eventBase.而設置事件則在咱們的add方法中.在listen的時候,咱們調用了以下的方法.rest

static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));

因此咱們來看一下add.

public function add($fd, $flag, $func, $args = array())  
{  
    // 判斷信息.  
    switch ($flag) { 
        case self::EV_SIGNAL: 
            $fd_key                      = (int)$fd;               $real_flag                   = EV_SIGNAL | EV_PERSIST; 
            $this->_eventSignal[$fd_key] = event_new(); 
            if (!\event_set($this->_eventSignal[$fd_key], $fd, $real_flag, $func, null)) { 
                return false; 
            } 
            if (!\event_base_set($this->_eventSignal[$fd_key], $this->_eventBase)) { 
                return false; 
            } 
            if (!\event_add($this->_eventSignal[$fd_key])) { 
                return false; 
            } 
            return true; 
        case self::EV_TIMER: 
        case self::EV_TIMER_ONCE: 
            $event    = \event_new(); 
            $timer_id = (int)$event; 
            if (!\event_set($event, 0, EV_TIMEOUT, array($this, 'timerCallback'), $timer_id)) { 
                return false; 
            }  
            if (!\event_base_set($event, $this->_eventBase)) { 
                return false; 
            }  
            $time_interval = $fd * 1000000; 
            if (!\event_add($event, $time_interval)) { 
                return false; 
            } 
            $this->_eventTimer[$timer_id] = array($func, (array)$args, $event, $flag, $time_interval); 
            return $timer_id; 
        default : 
            $fd_key    = (int)$fd; 
            $real_flag = $flag === self::EV_READ ? EV_READ | EV_PERSIST : EV_WRITE | EV_PERSIST;  
            $event = \event_new();  
            if (!\event_set($event, $fd, $real_flag, $func, null)) { 
                return false; 
            }  
            if (!\event_base_set($event, $this->_eventBase)) { 
                return false; 
            }  
            if (!\event_add($event)) { 
                return false; 
            }  
            $this->_allEvents[$fd_key][$flag] = $event;  
            return true; 
    }
}

咱們從以上方法能夠精簡出大概的幾個語句

// 建立一個新的事件.  
$event = \event_new();  
// 設置事件的監聽.  
if (!\event_set($event, $fd, $real_flag, $func, null)) {  
    return false;
}  
// 從新設置event的綁定事件.  
if (!\event_base_set($event, $this->_eventBase)) {  
    return false;
}  
// 添加事件.  
if (!\event_add($event)) {  
    return false;
}  
// 保存到全局的事件中去.  
$this->_allEvents[$fd_key][$flag] = $event;

因此.咱們在run方法中,先設置事件的監聽.而後在調用事件的處理.而完成事件的處理則是由咱們worker自帶的onMessage方法來處理.

最後

event_set和event_base_set的做用.  
workerman 對 worker的監控.
相關文章
相關標籤/搜索