gateway-workman

最外層start.php,設置全局啓動模式,加載Application裏的個子服務目錄下應用的啓動文件(start開頭,這些文件都是workman\work類的子類,在載入文件的同時,這些子服務會生成對象,work類的構造方法會把生成的work對象都存入work類的靜態變量$_workers,方便主文件後續設置以及啓動子服務,全局啓動模式子服務是不會啓動的),設置時區,註冊自動加載,調用workman\Worker:runall啓動全部服務。php

主文件:nginx

// 標記是全局啓動
define('GLOBAL_START', 1);

require_once __DIR__ . '/Workerman/Autoloader.php';

// 加載全部Applications/*/start.php,以便啓動全部服務
foreach(glob(__DIR__.'/Applications/*/start*.php') as $start_file)
{
    require_once $start_file;
}
// 運行全部服務
Worker::runAll();

 子服務文件start_gateway,提供接入請求的服務(class Gateway extends Worker):web

// gateway 進程,這裏使用Text協議,能夠用telnet測試
$gateway = new Gateway("Text://0.0.0.0:8282");
// gateway名稱,status方便查看
$gateway->name = 'YourAppGateway';
// gateway進程數
$gateway->count = 4;
// 本機ip,分佈式部署時使用內網ip
$gateway->lanIp = '127.0.0.1';
// 內部通信起始端口,假如$gateway->count=4,起始端口爲4000
// 則通常會使用4001 4002 4003 4004 4個端口做爲內部通信端口 
$gateway->startPort = 2300;
// 心跳間隔
//$gateway->pingInterval = 10;
// 心跳數據
//$gateway->pingData = '{"type":"ping"}';

/* 
// 當客戶端鏈接上來時,設置鏈接的onWebSocketConnect,即在websocket握手時的回調
$gateway->onConnect = function($connection)
{
    $connection->onWebSocketConnect = function($connection , $http_header)
    {
        // 能夠在這裏判斷鏈接來源是否合法,不合法就關掉鏈接
        // $_SERVER['HTTP_ORIGIN']標識來自哪一個站點的頁面發起的websocket連接
        if($_SERVER['HTTP_ORIGIN'] != 'http://kedou.workerman.net')
        {
            $connection->close();
        }
        // onWebSocketConnect 裏面$_GET $_SERVER是可用的
        // var_dump($_GET, $_SERVER);
    };
}; 
*/

// 若是不是在根目錄啓動,則運行runAll方法
if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

 接入服務除了註冊到全局靜態$_workers變量,還設置了路由:redis

    public function __construct($socket_name, $context_option = array())
    {
        parent::__construct($socket_name, $context_option);
        //隨機返回一個bussness的鏈接
        $this->router = array("\\GatewayWorker\\Gateway", 'routerRand');
        
        $backrace = debug_backtrace();
        $this->_appInitPath = dirname($backrace[0]['file']);
    }

 

子服務文件start_bussinessworker,提供實際的業務處理,和gateway服務內部通信相似nginx與php(class BusinessWorker extends Worker):安全

// bussinessWorker 進程
$worker = new BusinessWorker();
// worker名稱
$worker->name = 'YourAppBusinessWorker';
// bussinessWorker進程數量
$worker->count = 4;


// 若是不是在根目錄啓動,則運行runAll方法
if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

 附帶基類work的構造函數:websocket

/**
     * worker構造函數
     *
     * @param string $socket_name
     * @param array  $context_option
     */
    public function __construct($socket_name = '', $context_option = array())
    {
        // 保存worker實例
        $this->workerId = spl_object_hash($this);
        self::$_workers[$this->workerId] = $this;
        self::$_pidMap[$this->workerId] = array();
        
        // 得到實例化文件路徑,用於自動加載設置根目錄
        $backrace = debug_backtrace();
        $this->_appInitPath = dirname($backrace[0]['file']);
        
        // 設置socket上下文
        if($socket_name)
        {
            $this->_socketName = $socket_name;
            if(!isset($context_option['socket']['backlog']))
            {
                $context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;
            }
            $this->_context = stream_context_create($context_option);
        }
    }

 這裏能夠看到self::$_workers[$this->workerId] = $this;記錄全局worker實例,
 self::$_pidMap這個用來記錄各個子服務開始fork後的全部子進程id
 $context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;設置嵌套字上下文裏的未accept隊列長度
 在後面運行實例的listen方法監聽的時候會傳遞到stream_socket_server方法裏。app

runAll的流程以下:socket

/**
     * 運行全部worker實例
     * @return void
     */
    public static function runAll()
    {
        // 初始化環境變量(pid文件,log文件,status文件,定時器信號回調)
        self::init();
        // 解析命令(運行,重啓,中止,重載,狀態,從命令判斷主進程是否以守護進程啓動)
	// 啓動以後經過php start.php XXX命令會到這裏!由於第一步設置了這個文件的pid(這裏能夠看到pid對應到文件位置的重要性),因此後面的命令會對應爲發送信號
        self::parseCommand();
        // 嘗試以守護進程模式運行(fork兩次進程,重置進程sid)
        self::daemonize();
        // 初始化全部worker實例,主要是監聽端口(記錄全部子服務worker實例的最長名稱name長度,最長嵌套字名socket_name長度,最長運行用戶名user長度;
	// 全部定義了協議的子服務(Gate)開始監聽也就是啓動服務,子服務實例監聽嵌套字對象mainSoctke,尚未註冊accept回調)
	// 到這裏全部Gate子服務啓動監聽,可是沒有accept
	self::initWorkers();
        //  初始化全部信號處理函數(爲主進程註冊stop,stats,reload的信號回調signalHandler)
        self::installSignal();
        // 保存主進程pid(將獲取daemonize方法後的新的主進程sid,存入init方法後的pid文件)
        self::saveMasterPid();
        // 建立子進程(worker進程)並運行(主進程經過self::$_pidMap用來記錄子服務進程建立的各自進程號,方便後面發送信號,
        // 生成的子進程置空主進程的全局變量,self::$_pidMap,self::$_workers,若是在子服務文件裏定義了self::$stdoutFile文件地址,
        // 會重定向子服務子進程的stdout和stderr,直接運行work實例的run方法)
        // 到這裏子服務已經在子進程中運行,後面的代碼就只有主服務執行
        
        // 子進程的run方法會經過libevent綁定子服務mainSocket的accept回調,在accept回調方法裏纔有定義後面怎麼處理請求socket
        // 子進程的run方法會經過libevent綁定從新綁定信號量,以及用libevent來注入定時器
        // 子進程的run方法會回調用戶在子服務文件裏的onWorkStar方法
        // 子進程進入事件監聽輪詢
        // 上面是基類中的run方法,基於gateway的子服務,會實現本身的onworkStar方法,而後在調用基類的run,這樣能夠在onWorkStar裏實現gate與worker的鏈接
        // 這裏不知道怎麼處理子進程對accpet時候的驚羣 self::forkWorkers(); // 展現啓動界面(打印全部啓動的子服務的信息,因爲initWorkers獲取了各個子服務實例的名稱等信息長度能夠很好的格式化展現) self::displayUI(); // 嘗試重定向標準輸入輸出(重定向主服務進程) self::resetStd(); // 監控全部子進程(worker進程)(處理主進程的信號量;經過pcntl_wait循環監聽子進程狀態,保持子進程的運行) /* 什麼是平滑重啓? 平滑重啓不一樣於普通的重啓,平滑重啓能夠作到在不影響用戶的狀況下重啓服務,以便從新載入PHP程序,完成業務代碼更新。 平滑重啓通常應用於業務更新或者版本發佈過程當中,可以避免由於代碼發佈重啓服務致使的暫時性服務不可用的影響。 注意:只有在on{...}回調中載入的文件平滑重啓後纔會自動更新,啓動腳本中直接載入的文件或者寫死的代碼運行reload不會自動更新。 平滑重啓原理 WorkerMan分爲主進程和子進程,主進程負責監控子進程,子進程負責接收客戶端的鏈接和鏈接上發來的請求數據, 作相應的處理並返回數據給客戶端。當業務代碼更新時,其實咱們只要更新子進程,即可以達到更新代碼的目的。 當WorkerMan主進程收到平滑重啓信號時,主進程會向其中一個子進程發送安全退出(讓對應進程處理完畢當前請求後才退出)信號, 當這個進程退出後,主進程會從新建立一個新的子進程(這個子進程載入了新的PHP代碼),而後主進程再次向另一箇舊的進程發送中止 命令,這樣一個進程一個進程的重啓,直到全部舊的進程所有被置換爲止。 咱們看到平滑重啓其實是讓舊的業務進程逐個退出而後並逐個建立新的進程作到的。爲了在平滑重啓時不影響客用戶,這就要求進程中不 要保存用戶相關的狀態信息,即業務進程最好是無狀態的,避免因爲進程退出致使信息丟失。 */ //上面是官網對平滑啓動的說明,設計的代碼就是reload方法的 $one_worker_pid = current(self::$_pidsToRestart );這裏是處理主進程的平滑啓動信號的 // 在主進程裏獲取全部設置爲能夠平滑啓動的子進程的pid,而後取一個發送平滑啓動信號信號,這個信號到子進程,其實子進程會經過stopAll方法中止運行 // exit(0); // 主進程監聽到子進程退出,而後從新生成一個新的子進程,而後把這個子進程的id從self::$_pidsToRestart裏刪除,而後再次調用reload方法去殺掉下一個子進程 // self::monitorWorkers(); }

 主要的過程已經描述清楚了,主服務在主進程裏,子服務開啓監聽以後,主服務開始fork,而後記錄子服務進程的對應的pid,而後經過信號量來處理用戶命令以及管理子服務進程。子服務在子進程裏實現accpet監聽回調。tcp

work基類的主要代碼片斷:分佈式

子服務listen:
 // 得到應用層通信協議以及監聽的地址,udp會轉換爲傳輸協議
        list($scheme, $address) = explode(':', $this->_socketName, 2);
        // 若是有指定應用層協議,則檢查對應的協議類是否存在
        if($scheme != 'tcp' && $scheme != 'udp')
        {
            $scheme = ucfirst($scheme);
            $this->_protocol = '\\Protocols\\'.$scheme;
            if(!class_exists($this->_protocol))
            {
                $this->_protocol = "\\Workerman\\Protocols\\$scheme";
                if(!class_exists($this->_protocol))
                {
                    throw new Exception("class \\Protocols\\$scheme not exist");
                }
            }
        }
        elseif($scheme === 'udp')
        {
            $this->transport = 'udp';
        }
        
        // flag
        $flags =  $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
        $errno = 0;
        $errmsg = '';
        $this->_mainSocket = stream_socket_server($this->transport.":".$address, $errno, $errmsg, $flags, $this->_context);
        if(!$this->_mainSocket)
        {
            throw new Exception($errmsg);
        }

 

建立子進程:
        // 建立子進程
            while(count(self::$_pidMap[$worker->workerId]) < $worker->count)
            {
                static::forkOneWorker($worker);
            }
        }
    }

    /**
     * 建立一個子進程
     * @param Worker $worker
     * @throws Exception
     */
    protected static function forkOneWorker($worker)
    {
        $pid = pcntl_fork();
        // 主進程記錄子進程pid
        if($pid > 0)
        {
            self::$_pidMap[$worker->workerId][$pid] = $pid;
        }
        // 子進程運行
        elseif(0 === $pid)
        {
            // 啓動過程當中嘗試重定向標準輸出
            if(self::$_status === self::STATUS_STARTING)
            {
                self::resetStd();
            }
            self::$_pidMap = array();
            self::$_workers = array($worker->workerId => $worker);
            Timer::delAll();
            self::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());
            self::setProcessUser($worker->user);
            $worker->run();
            exit(250);
        }
        else
        {
            throw new Exception("forkOneWorker fail");
        }
    }

 

子進程執行的基類run方法:
/**
     * 運行worker實例
     */
    public function run()
    {
        // 註冊進程退出回調,用來檢查是否有錯誤
        register_shutdown_function(array("\\Workerman\\Worker", 'checkErrors'));
        
        // 設置自動加載根目錄
        Autoloader::setRootPath($this->_appInitPath);
        
        // 若是沒有全局事件輪詢,則建立一個
        if(!self::$globalEvent)
        {
            if(extension_loaded('libevent'))
            {
                self::$globalEvent = new Libevent();
            }
            else
            {
                self::$globalEvent = new Select();
            }
            // 監聽_mainSocket上的可讀事件(客戶端鏈接事件)也只有Gate纔有這個事件
            if($this->_socketName)
            {
                if($this->transport !== 'udp')
                {
                    self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
                }
                else
                {
                    self::$globalEvent->add($this->_mainSocket,  EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
                }
            }
        }
        
        // 從新安裝事件處理函數,使用全局事件輪詢監聽信號事件
        self::reinstallSignal();
        
        // 用全局事件輪詢初始化定時器
        Timer::init(self::$globalEvent);
        
        // 若是有設置進程啓動回調,則執行
        if($this->onWorkerStart)
        {
            call_user_func($this->onWorkerStart, $this);
        }
        
        // 子進程主循環
        self::$globalEvent->loop();
    }

 

主服務監聽子服務進程:
pcntl_signal_dispatch();
            // 掛起進程,直到有子進程退出或者被信號打斷
            $status = 0;
            $pid = pcntl_wait($status, WUNTRACED);
            // 若是有信號到來,嘗試觸發信號處理函數
            pcntl_signal_dispatch();
            // 有子進程退出
            if($pid > 0)
            {
                // 查找是哪一個進程組的,而後再啓動新的進程補上
                foreach(self::$_pidMap as $worker_id => $worker_pid_array)
                {
                    if(isset($worker_pid_array[$pid]))
                    {
                        $worker = self::$_workers[$worker_id];
                        // 檢查退出狀態
                        if($status !== 0)
                        {
                            self::log("worker[".$worker->name.":$pid] exit with status $status");
                        }
                       
                        // 統計,運行status命令時使用
                        if(!isset(self::$_globalStatistics['worker_exit_info'][$worker_id][$status]))
                        {
                            self::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
                        }
                        self::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
                        
                        // 清除子進程信息
                        unset(self::$_pidMap[$worker_id][$pid]);
                        
                        break;
                    }
                }
                // 若是不是關閉狀態,則補充新的進程
                if(self::$_status !== self::STATUS_SHUTDOWN)
                {
                    self::forkWorkers();
                    // 若是該進程是由於運行reload命令退出,則繼續執行reload流程
                    if(isset(self::$_pidsToRestart[$pid]))
                    {
                        unset(self::$_pidsToRestart[$pid]);
                        self::reload();
                    }
                }

 

平滑啓動過程:
 /**
     * 執行平滑重啓流程
     * @return void
     */
    protected static function reload()
    {
        // 主進程部分
        if(self::$_masterPid === posix_getpid())
        {
            // 設置爲平滑重啓狀態
            if(self::$_status !== self::STATUS_RELOADING && self::$_status !== self::STATUS_SHUTDOWN)
            {
                self::log("Workerman[".basename(self::$_startFile)."] reloading");
                self::$_status = self::STATUS_RELOADING;
            }
            
            // 若是有worker設置了reloadable=false,則過濾掉
            $reloadable_pid_array = array();
            foreach(self::$_pidMap as $worker_id =>$worker_pid_array)
            {
                $worker = self::$_workers[$worker_id];
                if($worker->reloadable)
                {
                    foreach($worker_pid_array as $pid)
                    {
                        $reloadable_pid_array[$pid] = $pid;
                    }
                }
            }
            
            // 獲得全部能夠重啓的進程
            self::$_pidsToRestart = array_intersect(self::$_pidsToRestart , $reloadable_pid_array);
            
            // 平滑重啓完畢
            if(empty(self::$_pidsToRestart))
            {
                if(self::$_status !== self::STATUS_SHUTDOWN)
                {
                    self::$_status = self::STATUS_RUNNING;
                }
                return;
            }
            // 繼續執行平滑重啓流程
            $one_worker_pid = current(self::$_pidsToRestart );
            // 給子進程發送平滑重啓信號
            posix_kill($one_worker_pid, SIGUSR1);
            // 定時器,若是子進程在KILL_WORKER_TIMER_TIME秒後沒有退出,則強行殺死
            Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false);
        }
        // 子進程部分
        else
        {
            // 若是當前worker的reloadable屬性爲真,則執行退出
            $worker = current(self::$_workers);
            if($worker->reloadable)
            {
                self::stopAll();
            }
        }
    } 

 到這裏主進程已經準備好了,子進程(Gate)已經開始監聽了(還未講gate與worker的鏈接通訊,以及gate怎麼接受請求,而後worker怎麼處理請求)。上面說了gate與worker的鏈接是在OnWorkStart裏實現的。後面就來看看

gate的run方法裏保存了用戶自定義的方法,而後本身的onWorkStart,onConnect,onMessage,onClose,onWorkstop都已定義好

/**
     * 運行
     * @see Workerman.Worker::run()
     */
    public function run()
    {
        // 保存用戶的回調,當對應的事件發生時觸發
        $this->_onWorkerStart = $this->onWorkerStart;
        $this->onWorkerStart = array($this, 'onWorkerStart');
        // 保存用戶的回調,當對應的事件發生時觸發
        $this->_onConnect = $this->onConnect;
        $this->onConnect = array($this, 'onClientConnect');
        
        // onMessage禁止用戶設置回調
        $this->onMessage = array($this, 'onClientMessage');
        
        // 保存用戶的回調,當對應的事件發生時觸發
        $this->_onClose = $this->onClose;
        $this->onClose = array($this, 'onClientClose');
        // 保存用戶的回調,當對應的事件發生時觸發
        $this->_onWorkerStop = $this->onWorkerStop;
        $this->onWorkerStop = array($this, 'onWorkerStop');
        
        // 記錄進程啓動的時間
        $this->_startTime = time();
        // 運行父方法
        parent::run();
    }

 在看看他的OnworkStart方法,也就是子進程運行後執行的方法

/**
     * 當Gateway啓動的時候觸發的回調函數
     * @return void
     */
    public function onWorkerStart()
    {
        // 分配一個內部通信端口
		// 主進程pid-子進程pid+startPort保證每一個子進程的內部端口不一樣
        $this->lanPort = function_exists('posix_getppid') ? $this->startPort - posix_getppid() + posix_getpid() : $this->startPort;
        if($this->lanPort<0 || $this->lanPort >=65535)
        {
            $this->lanPort = rand($this->startPort, 65535);
        }
        
        // 若是有設置心跳,則定時執行
        if($this->pingInterval > 0)
        {
            $timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval/2 : $this->pingInterval;
            Timer::add($timer_interval, array($this, 'ping'));
        }
		//別名內部通信協議
        if(!class_exists('\Protocols\GatewayProtocol'))
        {
            class_alias('\GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
        }
        // 初始化gateway內部的監聽,用於監聽worker的鏈接已經鏈接上發來的數據
        // 這裏內部連接在同一個ip+端口的狀況下有兩個服務
        // 這個時候listen因爲全局的事件self::$globalEvent在子進程的run方法裏在回調OnWorkStart以前已經定義,因此不像主進程同樣在listen的不監聽accept事件 $this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}"); $this->_innerTcpWorker->listen(); $this->_innerUdpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}"); $this->_innerUdpWorker->transport = 'udp'; $this->_innerUdpWorker->listen(); // 從新設置自動加載根目錄 Autoloader::setRootPath($this->_appInitPath); // 設置內部監聽的相關回調 $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage'); $this->_innerUdpWorker->onMessage = array($this, 'onWorkerMessage'); $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect'); $this->_innerTcpWorker->onClose = array($this, 'onWorkerClose'); // 註冊gateway的內部通信地址,worker去連這個地址,以便gateway與worker之間創建起TCP長鏈接 if(!$this->registerAddress()) { $this->log('registerAddress fail and exit'); Worker::stopAll(); } if($this->_onWorkerStart) { call_user_func($this->_onWorkerStart, $this); } }

 能夠看到他有新建了兩個個work實例,innerTcpWorker,innerUdpWorker,而且在同一個地址{$this->lanIp}:{$this->lanPort}下開啓了兩個服務,而且註冊了與worker通訊的回調事件onWorkerConnect,onWorkerMessage,onWorkerClose。使用的協議是內部通信協議GatewayProtocol。registerAddress方法經過文件鎖把這個子進程的內部通信服務地址{$this->lanIp}:{$this->lanPort}記錄到一個公共地方,多是文件,多是memcache,多是redis,後兩種支持分佈式部署gate與work,否則就要走同一臺機器上。用後面兩種能夠部署多個gate,而後其餘機器部署work。這時候定義的內部服務經過listen方法已經有accept監聽事件了,若是work跟與gate鏈接就會進入到設置的Worker的回調方法裏,客戶端與Gate鏈接就會進入到Client方法裏,由於他們是兩種不一樣的work實例,監聽的不一樣的端口。

 

到這裏Gate子服務已經準備好了,除了本身是work實例提供給客戶端的鏈接服務,被主進程管理以外;每一個子Gate進程都會在新建一個work實例來提供對worker子進程的訪問服務;對客戶端的服務有new Gate的時候指定協議,對子worker進程的服務是默認協議,而且tcp與udp都監聽了。後面的步奏應該是子worker進程在workstart方法裏從子Gate服務創建內部服務是註冊全局的內部通信服務地址,鏈接到Gate,這樣gate的內部服務監聽就把子worker服務的地址記錄下來。

子worker進程服務經過tcp與gate進程服務通訊,經過在鏈接上的監聽,實現消息的傳遞,worker進程在經過與Event文件的回調來通知客戶端的請求,Event處理完畢以後,經過lib/gateway文件,直接udp到Gate來實現信息的傳遞。

分佈式中的每臺機器有主進程管理子進程,子Gate進程處理監聽客戶度與內部Work進程,Gate記錄全局的客戶端id與Gate的對應關係到全局儲存器,也記錄本身的內部服務地址到全局存儲器。每一個子Gate進程記錄鏈接本身的內部worker地址。而後子worker啓動時候從全局內部服務地址取地址進行tcp鏈接,記錄與本身鏈接的Gate地址,client,gate,work直接的通訊就打通了,若是一個客戶端要與另外一個客戶端通訊,在Event處理時從全局的client與gate的對以關係裏獲得要發送的client鏈接的gate,而後給這個gate發送udp信息,再由gate轉發到對的client。

 

下面看看基類的accept方法:

/**
     * 接收一個客戶端鏈接
     * @param resource $socket
     * @return void
     */
    public function acceptConnection($socket)
    {
        // 得到客戶端鏈接
        $new_socket = @stream_socket_accept($socket, 0);
        // 驚羣現象,忽略
        if(false === $new_socket)
        {
            return;
        }
        
        // 初始化鏈接對象
        $connection = new TcpConnection($new_socket);
        $this->connections[$connection->id] = $connection;
        $connection->worker = $this;
        $connection->protocol = $this->_protocol;
        $connection->onMessage = $this->onMessage;
        $connection->onClose = $this->onClose;
        $connection->onError = $this->onError;
        $connection->onBufferDrain = $this->onBufferDrain;
        $connection->onBufferFull = $this->onBufferFull;
        
        // 若是有設置鏈接回調,則執行
        if($this->onConnect)
        {
            try
            {
                call_user_func($this->onConnect, $connection);
            }
            catch(Exception $e)
            {
                ConnectionInterface::$statistics['throw_exception']++;
                self::log($e);
            }
        }
    }

    /**
     * 處理udp鏈接(udp實際上是無鏈接的,這裏爲保證和tcp鏈接接口一致)
     *
     * @param resource $socket
     * @return bool
     */
    public function acceptUdpConnection($socket)
    {
        $recv_buffer = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $remote_address);
        if(false === $recv_buffer || empty($remote_address))
        {
            return false;
        }
        // 模擬一個鏈接對象
        $connection = new UdpConnection($socket, $remote_address);
        if($this->onMessage)
        {
            if($this->_protocol)
            {
                /** @var \Workerman\Protocols\ProtocolInterface $parser */
                $parser = $this->_protocol;
                $recv_buffer = $parser::decode($recv_buffer, $connection);
            }
            ConnectionInterface::$statistics['total_request']++;
            try
            {
               call_user_func($this->onMessage, $connection, $recv_buffer);
            }
            catch(Exception $e)
            {
                ConnectionInterface::$statistics['throw_exception']++;
            }
        }
    }

 總體上來講就是tcp就是新建一個客戶端鏈接,而後使用tcpConnecttion類封裝,包括通訊協議,而後回調onConnect事件;udp直接從鏈接獲取數據,而後經過協議解析數據,回調onMessage方法。

 

gate的內部服務創建好了,再看看business子服務的run方法:

    /**
     * 運行
     * @see Workerman.Worker::run()
     */
    public function run()
    {
        $this->_onWorkerStart = $this->onWorkerStart;
        $this->onWorkerStart = array($this, 'onWorkerStart');
        parent::run();
    }
    
    /**
     * 當進程啓動時一些初始化工做
     * @return void
     */
    protected function onWorkerStart()
    {
        if(!class_exists('\Protocols\GatewayProtocol'))
        {
            class_alias('\GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
        }
        Timer::add(1, array($this, 'checkGatewayConnections'));
        $this->checkGatewayConnections();
        \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
        if($this->_onWorkerStart)
        {
            call_user_func($this->_onWorkerStart, $this);
        }
    }

 這裏business子服務直接就是循環鏈接全部的Gate了。

鏈接打通以後,怎麼通訊呢,這就要看事件驅動管理以及協議了。

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息