http://github.com/brewlin/im-...
im-cloud分佈式中間件分析(二)-cloud節點實現
cloud 節點對外提供websocket
、tcp
client 註冊。並維護每一個鏈接對應的客戶端信息。做爲Grpc server,接受grpc推送數據,並推送到client端
grpc server 基於swoole 的
http2
協議,而後經過config/router.php 配置項註冊路由既可使用如rest模式下的交互流程
配置文件
config/router.php
<?php //Grpc server router HttpRouter::post('/im.cloud.Cloud/Ping', '/Grpc/Cloud/ping'); HttpRouter::post('/im.cloud.Cloud/Close', '/Grpc/Cloud/close'); HttpRouter::post('/im.cloud.Cloud/PushMsg', '/Grpc/Cloud/pushMsg'); HttpRouter::post('/im.cloud.Cloud/Broadcast', '/Grpc/Cloud/broadcast'); HttpRouter::post('/im.cloud.Cloud/Rooms', '/Grpc/Cloud/rooms');
和rest 路由同樣只須要註冊路由到對應的方法便可,當使用grpc-client進行請求時,就能分發到最遠的控制器去,php
當接收到請求後,能夠根據協程上下文獲取當前鏈接的請求參數,
grpc
傳輸的協議是二進制,因此不能經過get,post方法直接得到對應的參數,須要採用grpc提供的方法進行解包
use Grpc\Parser; use Core\Context\Context; public function pushMsg() { $rawbody = Context::get()->getRequest()->getRawBody(); /** @var PushMsgReq $pushMsgReq */ $pushMsgReq = Parser::deserializeMessage( [PushMsgReq::class,null], $rawbody ); }
獲取請求參數能夠經過協程上下文獲取node
Context::get()->getRequest()->getRawBody();
request()->getRawBody();
Grpc\Parser
方法使用的是 swoolegrpc-client 組件包提供的方法,使用swoole對原生grpc 進行了封裝基於websocket 協議註冊到cloud節點,cloud 進行認證,經過grpc將註冊信息傳遞到logic統一管理,認證成功後cloud節點將保存改鏈接的基礎信息
命名空間:
App/Websocket/HandshakeListener.class
該事件爲swoole 監聽事件,因此須要註冊監聽回調函數,配置文件爲config/event.php
git
use \Core\Swoole\SwooleEvent; use \App\Websocket\HandshakeListener; return [ //websocket握手事件 SwooleEvent::HANDSHAKE => new HandshakeListener(), ];
接下來是握手流程github
/** * token check '{"mid":123, "room_id":"live://1000", "platform":"web", "accepts":[1000,1001,1002]}' * @param Request $request * @param Response $response * @return bool */ public function onHandshake(Request $request, Response $response): bool { $httpRequest = HttpRequest::new($request); //握手失敗 if($httpRequest->getUriPath() != self::upgradeUrl){ $response->end(); return false; } // websocket握手鍊接算法驗證 $secWebSocketKey = $request->header['sec-websocket-key']; $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#'; if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) { $response->end(); return false; } $key = base64_encode(sha1( $request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true )); $headers = [ 'Upgrade' => 'websocket', 'Connection' => 'Upgrade', 'Sec-WebSocket-Accept' => $key, 'Sec-WebSocket-Version' => '13', ]; // WebSocket connection to 'ws://127.0.0.1:9502/' // failed: Error during WebSocket handshake: // Response must not include 'Sec-WebSocket-Protocol' header if not present in request: websocket if (isset($request->header['sec-websocket-protocol'])) { $headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol']; } foreach ($headers as $key => $val) { $response->header($key, $val); } $response->status(101); $response->end(); return true; }
該方法在握手階段對於http請求進行校驗,若是路徑不爲 '/sub' 則認證失敗關閉鏈接,成功後校驗websocekt協議並升級爲websocket,web
一樣須要註冊websocket的onmessage事件 配置文件:
config/envent.php
使用App\Packet\Packet::class
進行解包,
通信協議爲二進制傳輸,會有單獨一章分析im-cloud通信協議的設計
根據協議,若是爲註冊請求,則進行註冊流程,心跳則進行心跳流程
im-cloud暫時不支持雙向推送,也就是該鏈接不支持接受推送消息,推送請走logic節點push
$server = LogicClient::getLogicClient(); if(empty($server)) throw new \Exception("not find any logic node"); $connectReq = new ConnectReq(); /** @var \Im\Logic\LogicClient $rpcClient */ $rpcClient = null; $serverId = env("APP_HOST","127.0.0.1").":".env("GRPC_PORT",9500); $connectReq->setServer($serverId); $connectReq->setCookie(""); $connectReq->setToken(json_encode($data)); /** @var ConnectReply $rpy */ $rpy = GrpcLogicClient::Connect($server,$connectReq)[0];
[$mid,$key,$roomId,$accepts,$heartbeat] = $this->registerLogic($body); /** @var Task $task */ \bean(Task::class)->deliver(Bucket::class,"put",[$roomId,$key,$fd]);
tcp 處理流程和websocket大體類似,走一樣的流程,只是監聽對應的api有些區別
cloud節點 默認啓動了兩個自定義進程伴隨swoole啓動而啓動redis
該進程 在啓動時註冊到 註冊中心(默認consul,能夠擴展其餘的註冊中心),而後進行事件輪訓,獲取健康狀態的實例節點
配置文件算法
config/process.php
註冊進程到進程管理器config/consul.php
配置發現中心的配置sendMessage()
進行進程間通訊/** * 自定義子進程 執行入口 * @param Process $process */ public function run(Process $process) { provider()->select()->registerService(); $config = config("discovery"); $discovery = $config["consul"]["discovery"]["name"]; while (true){ $services = provider()->select()->getServiceList($discovery); if(empty($services)){ CLog::error("not find any instance node:$discovery"); goto SLEEP; } for($i = 0; $i < (int)env("WORKER_NUM",4);$i++) { //將能夠用的服務同步到全部的worker進程 Cloud::server()->getSwooleServer()->sendMessage($services,$i); } SLEEP: sleep(10); } }
配置文件
config/process.php
註冊該進程
該進程兩個任務:json
//step 1 [$mid,$key,$roomId,$accepts,$heartbeat] = $this->registerLogic($body); //step 2 /** @var Task $task */ \bean(Task::class)->deliver(Bucket::class,"put",[$roomId,$key,$fd]); 使用deliver進程間通訊,發送到bucketProcess進程處理
當cloud節點grpcserver 接收到推送請求,則建立一個協程寫入bucketprocess進程,當前進程消費管道里的數據,每一個數據建立一個協程,處理推送問題
出版採用的redis緩存用戶信息,在實際壓測的時候發現即便是redis緩存仍是會影響併發處理。 致使慢了4-5倍,而採用自定義進程處理的好處有以下兩點,多進程下對數據不須要加鎖。針對每一個請求單首創建一個協程反而效率要高些
swoole 相關生命週期執行管理都依賴監聽事件,例如進程啓動
請求事件
握手鍊接
關閉鏈接
等等。。
/** * set event to base swoole * 給swoole 設置基礎的監聽事件, */ use \Core\Swoole\SwooleEvent; use \App\Event\PipeMessageListener; use \App\Event\WorkerStopListener; use \App\Event\ShutdownListener; use \App\Websocket\MessageListener; use \App\Websocket\HandshakeListener; use App\Tcp\ReceiveListener; use App\Event\OnCloseListener; use App\Event\WorkerStartListener; return [ //監聽onpipmessage事件 SwooleEvent::PIPE_MESSAGE => new PipeMessageListener(), //監聽進程啓動事件 SwooleEvent::WORKER_START => new WorkerStartListener(), //監聽進程關閉事件 SwooleEvent::WORKER_STOP => new WorkerStopListener(), SwooleEvent::SHUTDOWN => new ShutdownListener(), //監聽tcp事件 SwooleEvent::RECEIVE => new ReceiveListener(), //監聽websocket 事件 SwooleEvent::MESSAGE => new MessageListener(), //websocket握手事件 SwooleEvent::HANDSHAKE => new HandshakeListener(), //server監聽關閉鏈接事件而後grpc通知logic銷燬鏈接信息 SwooleEvent::CLOSE => new OnCloseListener(), ];