im-cloud分佈式中間件分析(二)-cloud節點實現
cloud 節點對外提供
websocket
、tcp
client 註冊。並維護每一個鏈接對應的客戶端信息。做爲Grpc server,接受grpc推送數據,並推送到client端node
grpc server 基於swoole 的
http2
協議,而後經過config/router.php 配置項註冊路由既可使用如rest模式下的交互流程git
配置文件
config/router.php
github
<?php
//Grpc server router
HttpRouter::post('/im.cloud.Cloud/Ping', '/Grpc/Cloud/ping');
HttpRouter::post('/im.cloud.Cloud/Close', '/Grpc/Cloud/close');
HttpRouter::post('/im.cloud.Cloud/PushMsg', '/Grpc/Cloud/pushMsg');
HttpRouter::post('/im.cloud.Cloud/Broadcast', '/Grpc/Cloud/broadcast');
HttpRouter::post('/im.cloud.Cloud/Rooms', '/Grpc/Cloud/rooms');
複製代碼
和rest 路由同樣只須要註冊路由到對應的方法便可,當使用grpc-client進行請求時,就能分發到最遠的控制器去,web
當接收到請求後,能夠根據協程上下文獲取當前鏈接的請求參數,
grpc
傳輸的協議是二進制,因此不能經過get,post方法直接得到對應的參數,須要採用grpc提供的方法進行解包redis
use Grpc\Parser;
use Core\Context\Context;
public function pushMsg() {
$rawbody = Context::get()->getRequest()->getRawBody();
/** @var PushMsgReq $pushMsgReq */
$pushMsgReq = Parser::deserializeMessage(
[PushMsgReq::class,null],
$rawbody
);
}
複製代碼
Context::get()->getRequest()->getRawBody();
request()->getRawBody();
Grpc\Parser
方法使用的是 swoole\grpc-client 組件包提供的方法,使用swoole對原生grpc 進行了封裝基於websocket 協議註冊到cloud節點,cloud 進行認證,經過grpc將註冊信息傳遞到logic統一管理,認證成功後cloud節點將保存改鏈接的基礎信息算法
命名空間:
App/Websocket/HandshakeListener.class
json
該事件爲swoole 監聽事件,因此須要註冊監聽回調函數,配置文件爲config/event.php
segmentfault
use \Core\Swoole\SwooleEvent;
use \App\Websocket\HandshakeListener;
return [
//websocket握手事件
SwooleEvent::HANDSHAKE => new HandshakeListener(),
];
複製代碼
接下來是握手流程api
/**
* token check '{"mid":123, "room_id":"live://1000", "platform":"web", "accepts":[1000,1001,1002]}'
* @param Request $request
* @param Response $response
* @return bool
*/
public function onHandshake(Request $request, Response $response): bool
{
$httpRequest = HttpRequest::new($request);
//握手失敗
if($httpRequest->getUriPath() != self::upgradeUrl){
$response->end();
return false;
}
// websocket握手鍊接算法驗證
$secWebSocketKey = $request->header['sec-websocket-key'];
$patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';
if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) {
$response->end();
return false;
}
$key = base64_encode(sha1(
$request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
true
));
$headers = [
'Upgrade' => 'websocket',
'Connection' => 'Upgrade',
'Sec-WebSocket-Accept' => $key,
'Sec-WebSocket-Version' => '13',
];
// WebSocket connection to 'ws://127.0.0.1:9502/'
// failed: Error during WebSocket handshake:
// Response must not include 'Sec-WebSocket-Protocol' header if not present in request: websocket
if (isset($request->header['sec-websocket-protocol'])) {
$headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol'];
}
foreach ($headers as $key => $val) {
$response->header($key, $val);
}
$response->status(101);
$response->end();
return true;
}
複製代碼
該方法在握手階段對於http請求進行校驗,若是路徑不爲 '/sub' 則認證失敗關閉鏈接,成功後校驗websocekt協議並升級爲websocket,
一樣須要註冊websocket的onmessage事件 配置文件:
config/envent.php
使用
App\Packet\Packet::class
進行解包, 通信協議爲二進制傳輸,會有單獨一章分析im-cloud通信協議的設計
根據協議,若是爲註冊請求,則進行註冊流程,心跳則進行心跳流程
im-cloud暫時不支持雙向推送,也就是該鏈接不支持接受推送消息,推送請走logic節點push
$server = LogicClient::getLogicClient();
if(empty($server))
throw new \Exception("not find any logic node");
$connectReq = new ConnectReq();
/** @var \Im\Logic\LogicClient $rpcClient */
$rpcClient = null;
$serverId = env("APP_HOST","127.0.0.1").":".env("GRPC_PORT",9500);
$connectReq->setServer($serverId);
$connectReq->setCookie("");
$connectReq->setToken(json_encode($data));
/** @var ConnectReply $rpy */
$rpy = GrpcLogicClient::Connect($server,$connectReq)[0];
複製代碼
[$mid,$key,$roomId,$accepts,$heartbeat] = $this->registerLogic($body);
/** @var Task $task */
\bean(Task::class)->deliver(Bucket::class,"put",[$roomId,$key,$fd]);
複製代碼
tcp 處理流程和websocket大體類似,走一樣的流程,只是監聽對應的api有些區別
cloud節點 默認啓動了兩個自定義進程伴隨swoole啓動而啓動
該進程 在啓動時註冊到 註冊中心(默認consul,能夠擴展其餘的註冊中心),而後進行事件輪訓,獲取健康狀態的實例節點
config/process.php
註冊進程到進程管理器config/consul.php
配置發現中心的配置sendMessage()
進行進程間通訊/** * 自定義子進程 執行入口 * @param Process $process */
public function run(Process $process) {
provider()->select()->registerService();
$config = config("discovery");
$discovery = $config["consul"]["discovery"]["name"];
while (true){
$services = provider()->select()->getServiceList($discovery);
if(empty($services)){
CLog::error("not find any instance node:$discovery");
goto SLEEP;
}
for($i = 0; $i < (int)env("WORKER_NUM",4);$i++)
{
//將能夠用的服務同步到全部的worker進程
Cloud::server()->getSwooleServer()->sendMessage($services,$i);
}
SLEEP:
sleep(10);
}
}
複製代碼
配置文件
config/process.php
註冊該進程
該進程兩個任務:
//step 1
[$mid,$key,$roomId,$accepts,$heartbeat] = $this->registerLogic($body);
//step 2
/** @var Task $task */
\bean(Task::class)->deliver(Bucket::class,"put",[$roomId,$key,$fd]);
使用deliver進程間通訊,發送到bucketProcess進程處理
複製代碼
當cloud節點grpcserver 接收到推送請求,則建立一個協程寫入bucketprocess進程,當前進程消費管道里的數據,每一個數據建立一個協程,處理推送問題
出版採用的redis緩存用戶信息,在實際壓測的時候發現即便是redis緩存仍是會影響併發處理。
致使慢了4-5倍,而採用自定義進程處理的好處有以下兩點,多進程下對數據不須要加鎖。針對每一個請求單首創建一個協程反而效率要高些
複製代碼
swoole 相關生命週期執行管理都依賴監聽事件,例如
進程啓動
請求事件
握手鍊接
關閉鏈接
等等。。
/** * set event to base swoole * 給swoole 設置基礎的監聽事件, */
use \Core\Swoole\SwooleEvent;
use \App\Event\PipeMessageListener;
use \App\Event\WorkerStopListener;
use \App\Event\ShutdownListener;
use \App\Websocket\MessageListener;
use \App\Websocket\HandshakeListener;
use App\Tcp\ReceiveListener;
use App\Event\OnCloseListener;
use App\Event\WorkerStartListener;
return [
//監聽onpipmessage事件
SwooleEvent::PIPE_MESSAGE => new PipeMessageListener(),
//監聽進程啓動事件
SwooleEvent::WORKER_START => new WorkerStartListener(),
//監聽進程關閉事件
SwooleEvent::WORKER_STOP => new WorkerStopListener(),
SwooleEvent::SHUTDOWN => new ShutdownListener(),
//監聽tcp事件
SwooleEvent::RECEIVE => new ReceiveListener(),
//監聽websocket 事件
SwooleEvent::MESSAGE => new MessageListener(),
//websocket握手事件
SwooleEvent::HANDSHAKE => new HandshakeListener(),
//server監聽關閉鏈接事件而後grpc通知logic銷燬鏈接信息
SwooleEvent::CLOSE => new OnCloseListener(),
];
複製代碼