剖析下聊天室

由來

環境:PHP七、Swoole、linuxphp

對聊天室有點感興趣,對於網絡協議有一點只知其一;不知其二,因此決定藉助swoole實現個簡單的聊天室,來簡單剖析下原理,知道原理之後就能夠考慮用其餘語言或者本身造輪子寫個,固然這是後話。react

源碼我放置github( https://github.com/WalkingSun/SwooleServer ),有興趣能夠借鑑借鑑。linux

系統設計

即時通信的網絡通訊基於長鏈接,通訊方式有TCP、UDP、socket、websocket等,本次實現是websocket,系統創建常駐內存的websocket服務,客戶端即瀏覽器二者創建鏈接通訊。通訊過程以下:git

image

關於客戶端鏈接websocket服務,本文不作細述,websocket服務的創建藉助swoole,須要在服務端的open、recieve、send、close創建回調處理,爲了方便我將鏈接的客戶端信息放入swoole_table(一個基於共享內存和鎖實現的超高性能,併發數據結構)。github

代碼僅供參考:web

websocket 服務類:shell

<?php
/**
 * Created by PhpStorm.
 * User: WalkingSun
 * Date: 2018/10/28
 * Time: 15:54
 */

class WsServer
{
    const host = '0.0.0.0';
    const port = '9501';

    public $swoole;

    public $config = ['gcSessionInterval' => 60000];

    public $openCallback;       //open回調

    public $messageCallback;    //message回調

    public $runApp;    //request回調

    public $workStartCallback;       //work回調

    public $finishCallback;     //finish回調

    public $closeCallback;       //close回調

    public $taskCallback;       //task回調

    public function __construct( $host, $port, $mode, $socketType, $swooleConfig=[], $config=[])
    {
        $host = $host?:self::host;
        $port = $port?:self::port;
        $this->swoole = new Swoole_websocket_server($host,$port,$mode,$socketType);
        $this->webRoot = $swooleConfig['document_root'];
        if( !empty($this->config) ) $this->config = array_merge($this->config, $config);
        $this->swoole->set($swooleConfig);

        $this->swoole->on('open',[$this,'onOpen']);
        $this->swoole->on('message',[$this,'onMessage']);
        $this->swoole->on('request',[$this,'onRequest']);
        $this->swoole->on('WorkerStart',[$this,'onWorkerStart']);            //增長work進程
        $this->swoole->on('task',[$this,'onTask']);            //增長task任務進程
        $this->swoole->on('finish',[$this,'onFinish']);
        $this->swoole->on('close',[$this,'onClose']);
    }

    public function run(){
        $this->swoole->start();
    }

    /**
     * 當WebSocket客戶端與服務器創建鏈接並完成握手後會回調此函數
     * @param $serv swoole_websocket_server 服務對象
     * @param $request swoole_http_server 服務對象
     */
    public function onOpen( swoole_websocket_server $serv,  $request){
        call_user_func_array( $this->openCallback, [ $serv, $request ] );

        //定時器(異步執行)
//        if($request->fd == 1){
//            swoole_timer_tick(2000,function($timer_id){
//                echo time().PHP_EOL;
//            });
//        }
    }

    /**
     *當服務器收到來自客戶端的數據幀時會回調此函數。
     * @param $server  swoole_websocket_server 服務對象
     * @param $frame  swoole_websocket_frame對象,包含了客戶端發來的數據幀信息
     * $frame->fd,客戶端的socket id,使用$server->push推送數據時須要用到
    $frame->data,數據內容,能夠是文本內容也能夠是二進制數據,能夠經過opcode的值來判斷
    $frame->opcode,WebSocket的OpCode類型,能夠參考WebSocket協議標準文檔
    $frame->finish, 表示數據幀是否完整,一個WebSocket請求可能會分紅多個數據幀進行發送(底層已經實現了自動合併數據幀,如今不用擔憂接收到的數據幀不完整)
     */
    public function onMessage(swoole_websocket_server $serv, swoole_websocket_frame $frame ){

        call_user_func_array( $this->messageCallback, [ $serv, $frame ]);

    }

    /**
     * @param $serv  swoole_websocket_server 服務對象
     * @param $fd  鏈接的文件描述符
     * @param $reactorId  來自那個reactor線程
     * onClose回調函數若是發生了致命錯誤,會致使鏈接泄漏。經過netstat命令會看到大量CLOSE_WAIT狀態的TCP鏈接
     * 當服務器主動關閉鏈接時,底層會設置此參數爲-1,能夠經過判斷$reactorId < 0來分辨關閉是由服務器端仍是客戶端發起的。
     */
    public function onClose( swoole_websocket_server $serv , $fd , $reactorId ){

        call_user_func_array( $this->closeCallback ,[ $serv , $fd , $reactorId ]);

    }

    /**
     * 在task_worker進程內被調用。worker進程可使用swoole_server_task函數向task_worker進程投遞新的任務。當前的Task進程在調用onTask回調函數時會將進程狀態切換爲忙碌,這時將再也不接收新的Task,當onTask函數返回時會將進程狀態切換爲空閒而後繼續接收新的Task。
     * @param $serv  swoole_websocket_server 服務對象
     * @param $task_id int 任務id,由swoole擴展內自動生成,用於區分不一樣的任務。$task_id和$src_worker_id組合起來纔是全局惟一的,不一樣的worker進程投遞的任務ID可能會有相同
     * @param $src_worker_id int 來自於哪一個worker進程
     * @param $data mixed 任務的內容
     */
    public function onTask(swoole_server $serv,  $task_id,  $src_worker_id,  $data){

        call_user_func_array( $this->taskCallback , [ $serv, $task_id, $src_worker_id, $data ]);

//        sleep(10);
//        onTask函數中 return字符串,表示將此內容返回給worker進程。worker進程中會觸發onFinish函數,表示投遞的task已完成。
//        return "task {$src_worker_id}-{$task_id} success";
    }

    /**
     * 當worker進程投遞的任務在task_worker中完成時,task進程會經過swoole_server->finish()方法將任務處理的結果發送給worker進程。
     * @param $serv  swoole_websocket_server 服務對象
     * @param $task_id int  任務id
     * @param $data string  task任務處理的結果內容
     * task進程的onTask事件中沒有調用finish方法或者return結果,worker進程不會觸發onFinish
    執行onFinish邏輯的worker進程與下發task任務的worker進程是同一個進程
     */
    public function onFinish(swoole_server $serv,  $task_id,  $data){

        call_user_func_array( $this->finishCallback ,[ $serv,$task_id,$data]);
//        echo $data;
//        return $data;
    }

    public function onRequest( $request, $response ){
        call_user_func_array( $this->runApp, [ $request, $response ]);
    }

    public function onWorkerStart( $server,  $worker_id ){
        call_user_func_array( $this->workStartCallback , [$server,  $worker_id]);
    }
}

起服務和回調設置:json

class SwooleController extends BasicController{

       public $host;

       public $port;

       public  $swoole_config=[];

       public static $table;


       public function actionStart(){

           $config = include __DIR__ . '/../config/console.php';

           if( isset($config['swoole']['log_file']) ) $this->swoole_config['log_file'] = $config['swoole']['log_file'];

           if( isset($config['swoole']['pid_file']) ) $this->swoole_config['pid_file'] = $config['swoole']['pid_file'];

           $this->swoole_config = array_merge(
               [
                   'document_root' => $config['swoole']['document_root'],
                   'enable_static_handler'     => true,
   //                'daemonize'=>1,
                   'worker_num'=>4,
                   'max_request'=>2000,
   //            'task_worker_num'=>100,
                   //檢查死連接  使用操做系統提供的keepalive機制來踢掉死連接
                   'open_tcp_keepalive'=>1,
                   'tcp_keepidle'=> 1*60,     //鏈接在n秒內沒有數據請求,將開始對此鏈接進行探測
                   'tcp_keepcount' => 3,       //探測的次數,超過次數後將close此鏈接
                   'tcp_keepinterval' => 0.5*60,     //探測的間隔時間,單位秒

                   //swoole實現的心跳機制,只要客戶端超過必定時間沒發送數據,無論這個鏈接是否是死連接,都會關閉這個鏈接
   //            'heartbeat_check_interval' => 10*60,        //每m秒偵測一次心跳
   //            'heartbeat_idle_time' => 30*60,            //一個TCP鏈接若是在n秒內未向服務器端發送數據,將會被切斷
               ],$this->swoole_config
           );

           $this->host = $config['swoole']['host'];
           $this->port = $config['swoole']['port'];
           $swooleServer = new WsServer(  $this->host,$this->port,$config['swoole']['mode'],$config['swoole']['socketType'],$this->swoole_config,$config);

           //鏈接信息保存到swoole_table
           self::$table = new \swoole_table(10);
           self::$table->column('username',\Swoole\Table::TYPE_STRING, 10);
           self::$table->column('avatar',\Swoole\Table::TYPE_STRING, 255);
           self::$table->column('msg',\Swoole\Table::TYPE_STRING, 255);
           self::$table->column('fd',\Swoole\Table::TYPE_INT, 6);
           self::$table->create();

           $swooleServer->openCallback = function( $server , $request ){
               echo "server handshake with fd={$request->fd}\n";
           };

           $swooleServer->runApp = function( $request , $response ) use($config,$swooleServer){

               //全局變量設置及app.log
               $this->globalParam( $request );
               $_SERVER['SERVER_SWOOLE'] = $swooleServer;

               //記錄日誌
               $apiData = $_SERVER;
               unset($apiData['SERVER_SWOOLE']);
               Common::addLog( $config['log'] , ($apiData) );

               //解析路由
               $r = $_GET['r'];
               $r = $r?:( isset($config['defaultRoute'])?$config['defaultRoute']:'index/index');
               $params = explode('/',$r);
               $controller = __DIR__.'/../controllers/'.ucfirst($params[0]).'Controller.php';

               $result = '';
               if( file_exists( $controller ) ){
                   require_once $controller;
                   $class = new ReflectionClass(ucfirst($params[0]).'Controller');
                   if( $class->hasMethod( 'action'.ucfirst($params[1]) ) ){
                       $instance  = $class->newInstanceArgs();
                       $method = $class->getmethod('action'.ucfirst($params[1])); // 獲取類中方法
                       ob_start();
                       $method->invoke($instance);    // 執行方法
                       $result = ob_get_contents();
                       ob_clean();
                   }else{
                       $result = 'NOT FOUND!';
                   }
               }else{
                   $result = "$controller not exist!";
               }

               $response->end( $result );
           };

           $swooleServer->workStartCallback = function( $server,  $worker_id ){

           };

           $swooleServer->taskCallback = function( $server , $request ){
               //發送通知或者短信、郵件等

           };

           $swooleServer->finishCallback = function( $serv,  $task_id,  $data ){

   //            return $data;
           };

           $swooleServer->messageCallback = function( $server,  $iframe  ){
               //記錄客戶端信息
               echo "Client connection fd {$iframe->fd} ".PHP_EOL;

               $data = json_decode( $iframe->data ,1 );

               if( !empty($data['token']) ){
                   if( $data['token']== 'simplechat_open' ){
                       if( !self::$table->exist($iframe->fd) ){
                           $user = array_merge($data,['fd'=>$iframe->fd]);
                           self::$table->set($iframe->fd,$user);

                           //發送鏈接用戶信息
                           foreach (self::$table as $v){
                               if($v['fd']!=$iframe->fd){
                                   $pushData = array_merge($user,['action'=>'connect']);
                                   $server->push($v['fd'],json_encode($pushData));
                               }
                           }
                       }

                   }

                   if( $data['token']=='simplechat' ){
                       //查詢全部鏈接用戶,分發消息

                       foreach (self::$table as $v){
                           if($v['fd']!=$iframe->fd){
                               $pushData = ['username'=>$data['username'],'avatar'=>$data['avatar'],'time'=>date('H:i'),'data'=>$data['data'],'action'=>'send'];
                               $server->push($v['fd'],json_encode($pushData));
                           }
                       }
                   }
               }

               //接受消息,對消息進行解析,發送給組內人其餘人

           };

           $swooleServer->closeCallback = function(  $server,  $fd,  $reactorId ){

               if(  self::$table->exist($fd) ){
                   //退出房間處理
                   self::$table->del($fd);
                   foreach (self::$table as $v){
                       $pushData = ['fd'=>$fd,'username'=>'','avatar'=>'','time'=>date('H:i'),'data'=>'','action'=>'remove'];
                       $server->push($v['fd'],json_encode($pushData));
                   }
               }

               echo  "Client close fd {$fd}".PHP_EOL;
           };

           $this->stdout("server is running, listening {$this->host}:{$this->port}" . PHP_EOL);
           $swooleServer->run();
       }


       public function actionStop(){
           $r = $this->sendSignal( SIGTERM );
           if( $r ){
               $this->stdout("server is stopped, stop listening {$this->host}:{$this->port}" . PHP_EOL);
           }
       }

       public function actionRestart(){
           $this->sendSignal(SIGTERM);     //向主進程發送SIGTERM實現關閉服務器
           $this->actionStart();
       }

       public function actionReload(){
           $this->sendSignal(SIGUSR1);   //向主進程/管理進程發送SIGUSR1信號,將平穩地restart全部Worker進程
       }

   }

起了服務,客戶端就能夠鏈接通訊了。api

心跳檢測

起了半天后服務常會斷掉,查看監聽端口進程狀態,服務器輸入:瀏覽器

$ netstat -anp |grep 9501

發現大量CLOSE_WAIT狀態,經常使用狀態有 ESTABLISHED 表示正在通訊,TIME_WAIT 表示主動關閉,CLOSE_WAIT 表示被動關閉。

TIME_WAIT和CLOSE_WAIT兩種狀態若是一直被保持,意味着對應數目的通道就一直被佔用,且「佔着茅坑不使勁」,一旦句柄數達到上限,新的請求就沒法處理。並且由於swoole是master-worker模式,
基本上http、tcp通訊都是在worker進程,CLOSE_WAIT一直在,子進程將一直沒法釋放,隨着時間的推移CLOSE_WAIT狀態的進程愈來愈多,阻礙新的鏈接進來,websocket服務不可用。

主動關閉 和 被動關閉
TCP關閉 四次揮手過程以下:

image

揮手流程:

一、 客戶端是調用函數close(),這時,客戶端會發送一個FIN給服務器。

二、 服務器收到FIN,關閉套接字讀通道,並將本身狀態設置爲CLOSE_WAIT(表示被動關閉),
並返回一個ACK給客戶端。

三、 客戶端收到ACK,關閉套接字寫通道
接下來,服務器會調用close():

一、 服務器close(),發送一個FIN到客戶端。

二、 客戶端收到FIN,關閉讀通道,並將本身狀態設置成TIME_WAIT,發送一個ACK給服務器。

三、 服務器收到ACK,關閉寫通道,並將本身狀態設置爲CLOSE。

四、 客戶端等待兩個最大數據傳輸時間,而後將本身狀態設置成CLOSED。

由此咱們看到CLOSE-WAIT 狀態,TIME-WAIT 狀態 產生的過程,產生的緣由是複雜的,好比說網絡通訊中斷、用戶手機網絡切換wifi網絡、網絡通訊丟包等,故此tcp揮手過程會出現中斷,繼而
產生這些關閉狀態。

爲了解決這些佔用鏈接數的異常鏈接,須要檢測鏈接是不是活動的,對於死鏈接咱們須要釋放關閉它。

TIME_WAIT 主動關閉

主動關閉的一方在發送最後一個ACK包後,不管對方是否收到都會進入狀態,等待2MSL(Maximum Segment Lifetime數據包的最大生命週期,是一個數據包能在互聯網上生存的最長時間,若超過這個時間則該數據包將會消失在網絡中)
的時間,纔會釋放網絡資源。

TIME_WAIT狀態的存在主要有兩個緣由:

1)可靠地實現TCP全雙工鏈接的終止。在關TCP閉鏈接時,最後的ACK包是由主動關閉方發出的,若是這個ACK包丟失,則被動關閉方將重發FIN包,所以主動方必須維護狀態信息,以容許它重發這個
ACK包。若是不維持這個狀態信息,那麼主動方將回到CLOSED狀態,並對被動方重發的FIN包響應RST包,而被動關閉方將此包解釋成一個錯誤。於是,要實現TCP全雙工鏈接的正常終止,必須可以處
理四次握手協議中任意一個包丟失的狀況,主動關閉方必須維持狀態信息進入TIME_WAIT狀態。

2)確保迷路重複數據包在網絡中消失,防止上一次鏈接中的包迷路後從新出現,影響新鏈接。TCP數據包可能因爲路由器異常而迷路,在迷路期間,數據包發送方可能因超時而重發這個包,迷路的
數據包在路由器恢復後也會被送到目的地,這個迷路的數據包就稱爲Lost Duplicate。在關閉一個TCP鏈接後,若是立刻使用相同的IP地址和端口創建新的TCP鏈接,那麼有可能出現前一個鏈接的迷
路重複數據包在前一個鏈接關閉後再次出現,影響新創建的鏈接。爲了不這一狀況,TCP協議不容許使用處於TIME_WAIT狀態的鏈接的IP和端口啓動一個新鏈接,只有通過2MSL的時間,確保上一次
鏈接中全部的迷路重複數據包都已消失在網絡中,才能安全地創建新鏈接。

若是Server主動關閉鏈接,一樣會有大量的鏈接在關閉後處於TIME_WAIT狀態,等待2MSL的時間後才能釋放網絡資源。對於併發鏈接,出現大量等待鏈接,新的鏈接進不來,會下降系統性能。

time_wait問題能夠經過調整內核參數和適當的設置web服務器的keep-Alive值來解決。由於time_wait是本身可控的,要麼就是對方鏈接的異常,要麼就是本身沒有快速的回收資源,總之不是因爲本身程序錯誤引發的。

解決方式:

  • 試圖讓Client主動關閉鏈接,因爲每一個Client的併發量都比較低,於是不會產生性能瓶頸
  • 優化Server的系統TCP參數,使其網絡資源的最大值、消耗速度和恢復速度達到平衡;

    修改/etc/sysctl.conf

    net.ipv4.tcp_tw_recycle = 1       #啓用TIME-WAIT狀態sockets的快速回收
    
    net.ipv4.tcp_tw_reuse = 1         #容許將TIME-WAIT sockets從新用於新的TCP鏈接,默認爲0,表示關閉
    
    #緩存每一個鏈接最新的時間戳,後續請求中若是時間戳小於緩存的時間戳,即視爲無效,相應的數據包會被丟棄,啓用這種行爲取決於tcp_timestamps和tcp_tw_recycle
    net.ipv4.tcp_timestamps = 1

CLOSE_WAIT 被動關閉

對方發送一個FIN後,程序本身這邊沒有進一步發送ACK以確認。換句話說就是在對方關閉鏈接後,程序裏沒有檢測到,或者程序裏自己就已經忘了這個時候須要關閉鏈接,因而這個資源就一直被程序佔用着。

解決辦法:

  • 釋放關閉掉異常的鏈接;
  • 修復程序的bug,從新發布;

Keep-Alive

TCP中有一個Keep-Alive的機制能夠檢測死鏈接,LINUX內核包含對keepalive的支持,其中使用了三個參數:tcp_keepalive_time(開啓keepalive的閒置時長)tcp_keepalive_intvl(keepalive探測包的發送
間隔)和tcp_keepalive_probes(若是對方不予應答,探測包的發送次數);如此服務端會隔斷時間發送個探測包給客戶端,能夠是屢次,若是在超出設置閒置時長,內核會關閉這個鏈接。

客戶端主動發心跳

經過程序設置最大鏈接時長,若是客戶端在這段時間內沒有發送過數據,則關閉釋放這個鏈接。

解決關閉問題

TIME_WAIT 卻是沒有出現過, CLOSE_WAIT狀態總會出現。

就看看文檔,swoole有這些設置,當前使用的是TCP的keep-alive檢測,只需改配置便可:

...
    //檢查死連接  使用操做系統提供的keepalive機制來踢掉死連接
                'open_tcp_keepalive'=>1,
                'tcp_keepidle'=> 1*60,     //鏈接在n秒內沒有數據請求,將開始對此鏈接進行探測
                'tcp_keepcount' => 3,       //探測的次數,超過次數後將close此鏈接
                'tcp_keepinterval' => 0.5*60,     //探測的間隔時間,單位秒
   ...

我設置的週期比較短,方便測試。

設置了這些看似穩定了,卻仍是會出現CLOSE_WAIT,後來查了日誌,發生錯誤中斷了,大概意思,代碼中出現exit、die,顯然常駐內存的swoole不支持這些,會立馬中斷程序。因此改些這些代碼,
剛開始藉助YII2.0寫的,框架源碼的問題,因此swoole這塊服務須要單獨出來,嗯。。。因此索性直接本身擼個。如今看來,服務跑起來穩定多了,一直沒掛呢。

貼下臨時地址:http://47.99.189.105:91/

系統監控及優化

系統很簡單,可是做爲研究,應該更透徹點。

咱們的系統如何監控?若是說系統崩潰怎麼辦?能支撐多大併發?高併發下如何保持系統穩定。。。 一個高性能的即時通信是如何架構的?

額,留待之後再研究下補充。

參考資料:

https://juejin.im/post/5c3b21e4e51d455231347349

相關文章
相關標籤/搜索