im-cloud分佈式中間件分析(二)-cloud節點實現

github.com/brewlin/im-…php

1.概述

cloud 節點對外提供websockettcp client 註冊。並維護每一個鏈接對應的客戶端信息。做爲Grpc server,接受grpc推送數據,並推送到client端node

  • 數據流程圖
    cloud

2.@Grpc server

grpc server 基於swoole 的http2協議,而後經過config/router.php 配置項註冊路由既可使用如rest模式下的交互流程git

grpc 路由註冊

配置文件 config/router.phpgithub

<?php
//Grpc server router
HttpRouter::post('/im.cloud.Cloud/Ping', '/Grpc/Cloud/ping');
HttpRouter::post('/im.cloud.Cloud/Close', '/Grpc/Cloud/close');
HttpRouter::post('/im.cloud.Cloud/PushMsg', '/Grpc/Cloud/pushMsg');
HttpRouter::post('/im.cloud.Cloud/Broadcast', '/Grpc/Cloud/broadcast');
HttpRouter::post('/im.cloud.Cloud/Rooms', '/Grpc/Cloud/rooms');
複製代碼

和rest 路由同樣只須要註冊路由到對應的方法便可,當使用grpc-client進行請求時,就能分發到最遠的控制器去,web

grpc 參數解析

當接收到請求後,能夠根據協程上下文獲取當前鏈接的請求參數,grpc傳輸的協議是二進制,因此不能經過get,post方法直接得到對應的參數,須要採用grpc提供的方法進行解包redis

use Grpc\Parser;
use Core\Context\Context;
public function pushMsg() {
    $rawbody = Context::get()->getRequest()->getRawBody();
    /** @var PushMsgReq $pushMsgReq */
    $pushMsgReq = Parser::deserializeMessage(
            [PushMsgReq::class,null],
            $rawbody
    );
}
複製代碼
  • 獲取請求參數能夠經過協程上下文獲取
    • Context::get()->getRequest()->getRawBody();
    • request()->getRawBody();
  • Grpc\Parser 方法使用的是 swoole\grpc-client 組件包提供的方法,使用swoole對原生grpc 進行了封裝

3.@websocket server

基於websocket 協議註冊到cloud節點,cloud 進行認證,經過grpc將註冊信息傳遞到logic統一管理,認證成功後cloud節點將保存改鏈接的基礎信息算法

握手階段

命名空間:App/Websocket/HandshakeListener.classjson

該事件爲swoole 監聽事件,因此須要註冊監聽回調函數,配置文件爲config/event.phpsegmentfault

use \Core\Swoole\SwooleEvent;
use \App\Websocket\HandshakeListener;

return [
    //websocket握手事件
    SwooleEvent::HANDSHAKE        => new HandshakeListener(),
];
複製代碼

接下來是握手流程api

/**
     * token check '{"mid":123, "room_id":"live://1000", "platform":"web", "accepts":[1000,1001,1002]}'
     * @param Request $request
     * @param Response $response
     * @return bool
     */
    public function onHandshake(Request $request, Response $response): bool
    {
        $httpRequest = HttpRequest::new($request);
        //握手失敗
        if($httpRequest->getUriPath() != self::upgradeUrl){
            $response->end();
            return false;
        }
        // websocket握手鍊接算法驗證
        $secWebSocketKey = $request->header['sec-websocket-key'];
        $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';
        if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) {
            $response->end();
            return false;
        }
        $key = base64_encode(sha1(
            $request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
            true
        ));

        $headers = [
            'Upgrade' => 'websocket',
            'Connection' => 'Upgrade',
            'Sec-WebSocket-Accept' => $key,
            'Sec-WebSocket-Version' => '13',
        ];

        // WebSocket connection to 'ws://127.0.0.1:9502/'
        // failed: Error during WebSocket handshake:
        // Response must not include 'Sec-WebSocket-Protocol' header if not present in request: websocket
        if (isset($request->header['sec-websocket-protocol'])) {
            $headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol'];
        }

        foreach ($headers as $key => $val) {
            $response->header($key, $val);
        }
        $response->status(101);
        $response->end();
        return true;
    }
複製代碼

該方法在握手階段對於http請求進行校驗,若是路徑不爲 '/sub' 則認證失敗關閉鏈接,成功後校驗websocekt協議並升級爲websocket,

主事件處理

一樣須要註冊websocket的onmessage事件 配置文件:config/envent.php

@step1 解包

使用 App\Packet\Packet::class 進行解包, 通信協議爲二進制傳輸,會有單獨一章分析im-cloud通信協議的設計

@step2 處理分發(註冊)

根據協議,若是爲註冊請求,則進行註冊流程,心跳則進行心跳流程 im-cloud暫時不支持雙向推送,也就是該鏈接不支持接受推送消息,推送請走logic節點push

@step3 註冊

  • 1.進行auth參數校驗
  • 2.經過grpc 註冊到logic節點
$server = LogicClient::getLogicClient();
if(empty($server))
    throw  new \Exception("not find any logic node");
$connectReq = new ConnectReq();
/** @var \Im\Logic\LogicClient $rpcClient */
$rpcClient  = null;
$serverId = env("APP_HOST","127.0.0.1").":".env("GRPC_PORT",9500);
$connectReq->setServer($serverId);
$connectReq->setCookie("");
$connectReq->setToken(json_encode($data));
/** @var ConnectReply $rpy */
$rpy = GrpcLogicClient::Connect($server,$connectReq)[0];
複製代碼
  • 3.註冊成功後將當前用戶信息 寫入bucket進程,獨立維護全部的用戶信息和鏈接
[$mid,$key,$roomId,$accepts,$heartbeat] = $this->registerLogic($body);
/** @var Task $task */
\bean(Task::class)->deliver(Bucket::class,"put",[$roomId,$key,$fd]);
複製代碼

4.@tcp server

tcp 處理流程和websocket大體類似,走一樣的流程,只是監聽對應的api有些區別

5.自定義進程

cloud節點 默認啓動了兩個自定義進程伴隨swoole啓動而啓動

discoveryProcess 註冊發現進程

該進程 在啓動時註冊到 註冊中心(默認consul,能夠擴展其餘的註冊中心),而後進行事件輪訓,獲取健康狀態的實例節點

  • 配置文件
    • config/process.php 註冊進程到進程管理器
    • config/consul.php 配置發現中心的配置
  • 獲取到實例節點後 更新swoole全部的worker進程裏的實例節點信息使用sendMessage()進行進程間通訊
/** * 自定義子進程 執行入口 * @param Process $process */
public function run(Process $process) {
    provider()->select()->registerService();
    $config = config("discovery");
    $discovery = $config["consul"]["discovery"]["name"];
    while (true){
        $services = provider()->select()->getServiceList($discovery);
        if(empty($services)){
            CLog::error("not find any instance node:$discovery");
            goto SLEEP;
        }
        for($i = 0; $i < (int)env("WORKER_NUM",4);$i++)
        {
            //將能夠用的服務同步到全部的worker進程
            Cloud::server()->getSwooleServer()->sendMessage($services,$i);
        }
SLEEP:
        sleep(10);
    }
}
複製代碼

bucketProcess 用戶緩存池

配置文件 config/process.php 註冊該進程

該進程兩個任務:

  • 1 註冊成功後緩存用戶信息,管理用戶鏈接
//step 1
[$mid,$key,$roomId,$accepts,$heartbeat] = $this->registerLogic($body);
//step 2
/** @var Task $task */
\bean(Task::class)->deliver(Bucket::class,"put",[$roomId,$key,$fd]);
使用deliver進程間通訊,發送到bucketProcess進程處理
複製代碼
  • 2.做爲主要的推送進程

當cloud節點grpcserver 接收到推送請求,則建立一個協程寫入bucketprocess進程,當前進程消費管道里的數據,每一個數據建立一個協程,處理推送問題

  • 3.使用自定義進程管理用戶信息的選擇
出版採用的redis緩存用戶信息,在實際壓測的時候發現即便是redis緩存仍是會影響併發處理。
致使慢了4-5倍,而採用自定義進程處理的好處有以下兩點,多進程下對數據不須要加鎖。針對每一個請求單首創建一個協程反而效率要高些
複製代碼

5.監聽事件,生命週期管理

swoole 相關生命週期執行管理都依賴監聽事件,例如 進程啓動 請求事件 握手鍊接 關閉鏈接 等等。。

/** * set event to base swoole * 給swoole 設置基礎的監聽事件, */
use \Core\Swoole\SwooleEvent;
use \App\Event\PipeMessageListener;
use \App\Event\WorkerStopListener;
use \App\Event\ShutdownListener;
use \App\Websocket\MessageListener;
use \App\Websocket\HandshakeListener;
use App\Tcp\ReceiveListener;
use App\Event\OnCloseListener;
use App\Event\WorkerStartListener;

return [
    //監聽onpipmessage事件
    SwooleEvent::PIPE_MESSAGE => new PipeMessageListener(),
    
    //監聽進程啓動事件
    SwooleEvent::WORKER_START => new WorkerStartListener(),
    
    //監聽進程關閉事件
    SwooleEvent::WORKER_STOP  => new WorkerStopListener(),
    SwooleEvent::SHUTDOWN     => new ShutdownListener(),

    //監聽tcp事件
    SwooleEvent::RECEIVE      => new ReceiveListener(),

    //監聽websocket 事件
    SwooleEvent::MESSAGE      => new MessageListener(),
    //websocket握手事件
    SwooleEvent::HANDSHAKE    => new HandshakeListener(),

    //server監聽關閉鏈接事件而後grpc通知logic銷燬鏈接信息
    SwooleEvent::CLOSE        => new OnCloseListener(),
];
複製代碼
相關文章
相關標籤/搜索