在咱們啓動了workerman事後,按照咱們前面的理解.若是是在linux下.worker子進程啓動端口複用,並監聽事件和處理事件(win忽略).那workerman是如何對事件完成監聽和處理的呢.咱們來看一下.linux
在咱們看源碼的時候,在forkOneWorkerForLinux有這樣一行代碼.算法
if ($worker->reusePort) { $worker->listen();}
這裏,一看名字就知道是對端口進行監聽.咱們來看一下源碼swoole
public function listen() { // 這裏的socketName就是咱們監聽的地址信息. if (!$this->_socketName) { return; } // 設置自動加載的信息. Autoloader::setRootPath($this->_autoloadRootPath); // 須要監聽的socket. if (!$this->_mainSocket) { // 解析socket地址. $local_socket = $this->parseSocketAddress(); // 設置基礎協議.若是是udp.就爲STREAM_SERVER_BIND,若是不是udp就是,就是4 | 8.注意,這裏是按位或算法.則爲 100(4) | 1000(8) = 1100. $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN; $errno = 0; $errmsg = ''; // 設置端口複用. if ($this->reusePort) { \stream_context_set_option($this->\context, 'socket', 'so_reuseport', 1); } // 開始建立socket. $this->_mainSocket = \stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context); if (!$this->_mainSocket) { throw new Exception($errmsg); } // 若是transport爲ssl,就要特殊處理. if ($this->transport === 'ssl') { \stream_socket_enable_crypto($this->_mainSocket, false); } elseif ($this->transport === 'unix') { // 若是是unix. $socket_file = \substr($local_socket, 7); if ($this->user) { chown($socket_file, $this->user); } if ($this->group) { chgrp($socket_file, $this->group); } } // 嘗試打開tcp的keepalive並禁用掉Nagle算法. if (\function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') { \set_error_handler(function(){}); $socket = \socket_import_stream($this->_mainSocket); \socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); \socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1); \restore_error_handler(); } // 爲資源流設置爲非阻塞模式. \stream_set_blocking($this->_mainSocket, 0); } // 設置監聽. $this->resumeAccept();}
在這裏,workerman首先是建立了一個_mainSocket的socket監聽信息.並設置爲非阻塞模式.並設置監聽.socket
public function resumeAccept() { // 設置事件監聽. if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) { // 若是不是udp協議.就採用acceptConnection來才聽. if ($this->transport !== 'udp') { static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); } else { // udp協議,就採用acceptUdpConnection來監聽 static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); } // 設置pauseAccept爲false. $this->_pauseAccept = false; } }
咱們來看看acceptConnection.tcp
public function acceptConnection($socket) { // 設置錯誤信息. \set_error_handler(function(){}); // 接收信息. $new_socket = stream_socket_accept($socket, 0, $remote_address); // 恢復錯誤信息. \restore_error_handler(); // 若是沒有接收成功. if (!$new_socket) { return; } // 底層採用Tcp來監聽. $connection = new TcpConnection($new_socket, $remote_address); // 設置connections來監聽. $this->connections[$connection->id] = $connection; // 把worker複製進去. $connection->worker = $this; $connection->protocol = $this->protocol; $connection->transport = $this->transport; // 設置事件監聽. $connection->onMessage = $this->onMessage; // 關閉連接時的處理. $connection->onClose = $this->onClose; // 設置錯誤信息. $connection->onError = $this->onError; $connection->onBufferDrain = $this->onBufferDrain; $connection->onBufferFull = $this->onBufferFull; // 若是有onConnect鏈接處理.就直接調用. if ($this->onConnect) { try { \call_user_func($this->onConnect, $connection); } catch (\Exception $e) { static::log($e); exit(250); } catch (\Error $e) { static::log($e); exit(250); } } }
這裏,咱們就完成了對事件的監聽.而且每一個連接下面都將worker的信息賦值給TcpConnection的worker中.因此,每個Connection都會有worker信息.完成了事件的監聽.那如何處理事件的呢.oop
咱們在worker的run方法中,有這樣一句話.ui
static::$globalEvent->loop();
這裏採用了死循環來等待來處理事件.咱們在前面看了.$globalEvent是採用workerman本身自己的來監聽.它是在run方法中初始化的.this
// run if (!static::$globalEvent) { $event_loop_class = static::getEventLoopName(); static::$globalEvent = new $event_loop_class; $this->resumeAccept(); } // Worker::getEventLoopName() protected static function getEventLoopName() { // 若是存在就直接返回. if (static::$eventLoopClass) { return static::$eventLoopClass; } // 若是存在就刪除Swoole\Event. if (!\class_exists('\Swoole\Event', false)) { unset(static::$_availableEventLoops['swoole']); } $loop_name = ''; // protected static $\_availableEventLoops = array( // 'libevent' => '\\Workerman\\Events\\Libevent', // 'event' => '\\Workerman\\Events\\Event' // 'swoole' => '\\Workerman\\Events\\Swoole' // ); foreach (static::$\_availableEventLoops as $name=>$class) { if (\extension_loaded($name)) { $loop_name = $name; break; } } if ($loop\_name) { // 若是存在名稱.就使用React下面的事件處理. if (\interface_exists('\React\EventLoop\LoopInterface')) { switch ($loop_name) { case 'libevent': static::$eventLoopClass = '\Workerman\Events\React\ExtLibEventLoop'; break; case 'event': static::$eventLoopClass = '\Workerman\Events\React\ExtEventLoop'; break; default : static::$eventLoopClass = '\Workerman\Events\React\StreamSelectLoop'; break; } } else { // 就默認. 咱們這裏採用的Libevent. static::$eventLoopClass = static::$\_availableEventLoops[$loop_name]; } } else { static::$eventLoopClass = \interface_exists('\React\EventLoop\LoopInterface')? '\Workerman\Events\React\StreamSelectLoop':'\Workerman\Events\Select'; } return static::$eventLoopClass; }
以上就是初始化$globalEvent.我當前的環境是libevent.因此咱們就看一下.WorkermanEventsLibevent的loop方法.unix
public function loop() { // 調用了一個event_base_loop. \event_base_loop($this->_eventBase); }
只有一個方法.event_base_loop就等待事件被觸發,而後觸發他們的事件.這裏的是$this->_eventBase.而設置事件則在咱們的add方法中.在listen的時候,咱們調用了以下的方法.rest
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
因此咱們來看一下add.
public function add($fd, $flag, $func, $args = array()) { // 判斷信息. switch ($flag) { case self::EV_SIGNAL: $fd_key = (int)$fd; $real_flag = EV_SIGNAL | EV_PERSIST; $this->_eventSignal[$fd_key] = event_new(); if (!\event_set($this->_eventSignal[$fd_key], $fd, $real_flag, $func, null)) { return false; } if (!\event_base_set($this->_eventSignal[$fd_key], $this->_eventBase)) { return false; } if (!\event_add($this->_eventSignal[$fd_key])) { return false; } return true; case self::EV_TIMER: case self::EV_TIMER_ONCE: $event = \event_new(); $timer_id = (int)$event; if (!\event_set($event, 0, EV_TIMEOUT, array($this, 'timerCallback'), $timer_id)) { return false; } if (!\event_base_set($event, $this->_eventBase)) { return false; } $time_interval = $fd * 1000000; if (!\event_add($event, $time_interval)) { return false; } $this->_eventTimer[$timer_id] = array($func, (array)$args, $event, $flag, $time_interval); return $timer_id; default : $fd_key = (int)$fd; $real_flag = $flag === self::EV_READ ? EV_READ | EV_PERSIST : EV_WRITE | EV_PERSIST; $event = \event_new(); if (!\event_set($event, $fd, $real_flag, $func, null)) { return false; } if (!\event_base_set($event, $this->_eventBase)) { return false; } if (!\event_add($event)) { return false; } $this->_allEvents[$fd_key][$flag] = $event; return true; } }
咱們從以上方法能夠精簡出大概的幾個語句
// 建立一個新的事件. $event = \event_new(); // 設置事件的監聽. if (!\event_set($event, $fd, $real_flag, $func, null)) { return false; } // 從新設置event的綁定事件. if (!\event_base_set($event, $this->_eventBase)) { return false; } // 添加事件. if (!\event_add($event)) { return false; } // 保存到全局的事件中去. $this->_allEvents[$fd_key][$flag] = $event;
因此.咱們在run方法中,先設置事件的監聽.而後在調用事件的處理.而完成事件的處理則是由咱們worker自帶的onMessage方法來處理.
event_set和event_base_set的做用. workerman 對 worker的監控.