公司原有的業務消息推送是靠前端 ajax 輪眉請求後端接口完成的。而後我新來的,讓我改爲 websocket 雙向通訊的來作消息推送。php
業務場景 => PC 端瀏覽器打開後臺系統後,若是有業務訂單,而後時時推送到 PC 端上,PC 進行時時的語音播報。
安裝框架前端
composer create-project hyperf/hyperf-skeleton
【注意要符合框架的使用環境,在安裝】安裝 websocket 服務端的 composer 安裝包node
1. `composer require hyperf/websocket-server`
安裝 websocket 的客戶端,安裝的緣由是 經過 http 請求後,websocket 客戶端直接向 websocket 服務端創建鏈接,而後推送消息。react
1. `composer require hyperf/websocket-client`
修改 config 文件中的 server.php 配置文件,有時候,配置文件不存在,須要本身手動建立。git
1. server.php 配置文件代碼:
<?php declare(strict\_types=1); /\*\* \* This file is part of Hyperf. \* \* @link https://www.hyperf.io \* @document https://doc.hyperf.io \* @contact group@hyperf.io \* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE \*/ use Hyperf\\Server\\Server; use Hyperf\\Server\\SwooleEvent; return [ 'mode' => SWOOLE_PROCESS, 'servers' => [ [ 'name' => 'http', 'type' => Server::SERVER_HTTP, 'host' => '0.0.0.0', 'port' => 8098, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_REQUEST => [Hyperf\\HttpServer\\Server::class, 'onRequest'], ], ], [ 'name' => 'ws', 'type' => Server::SERVER\_WEBSOCKET, 'host' => '0.0.0.0', 'port' => 8099, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_HAND_SHAKE => [Hyperf\\WebSocketServer\\Server::class, 'onHandShake'], SwooleEvent::ON_MESSAGE => [Hyperf\\WebSocketServer\\Server::class, 'onMessage'], SwooleEvent::ON_CLOSE => [Hyperf\\WebSocketServer\\Server::class, 'onClose'], ], ], ], 'settings' => [ 'enable_coroutine' => true, 'worker_num' => swoole_cpu_num(), 'pid_file' => BASE_PATH . '/runtime/hyperf.pid', 'open_tcp_nodelay' => true, 'max_coroutine' => 100000, 'open_http2\_protocol' => true, 'max_request' => 100000, 'socket_buffer_size' => 2 * 1024 * 1024, ], 'callbacks' => [ SwooleEvent::ON_BEFORE_START => [Hyperf\\Framework\\Bootstrap\\ServerStartCallback::class, 'beforeStart'], SwooleEvent::ON_WORKER_START => [Hyperf\\Framework\\Bootstrap\\WorkerStartCallback::class, 'onWorkerStart'], SwooleEvent::ON_PIPE_MESSAGE => [Hyperf\\Framework\\Bootstrap\\PipeMessageCallback::class, 'onPipeMessage'], ], ];
換成本身的路由對應的控制器和方法
<?php declare(strict\_types=1); namespace App\\Controller\\WebSocket; use App\\Service\\SendWebSocketQueueService; use Hyperf\\Contract\\OnCloseInterface; use Hyperf\\Contract\\OnMessageInterface; use Hyperf\\Contract\\OnOpenInterface; use Hyperf\\Validation\\ValidationException; use Swoole\\Exception; use Swoole\\Http\\Request; use Swoole\\Server; use Swoole\\Websocket\\Frame; use Swoole\\WebSocket\\Server as WebSocketServer; use App\\Exception\\WebSocketException; use Hyperf\\Utils\\ApplicationContext; use Hyperf\\Logger\\LoggerFactory; use Hyperf\\Di\\Annotation\\Inject; class VoiceBroadcastWebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface { protected $redis; /** * @Inject * @var SendWebSocketQueueService */ protected $service; /** * @var \\Psr\\Log\\LoggerInterface */ protected $logger; // 初始化 public function __construct(LoggerFactory $loggerFactory) { $container = ApplicationContext::getContainer(); $this->redis = $container->get(\Redis::class); $this->logger = $loggerFactory->get('log','default'); } // onmessage 方法接收 客戶端或者服務端消息 public function onMessage(WebSocketServer $server, Frame $frame): void { $recvData = json_decode($frame->data); if(!is_object($recvData)) { $this->checkData($frame->data,$frame->fd); } else { $this->sendData($server,$frame->data,$frame->fd); } } /** * 校驗數據 * @param $string * @param $fd * @return string */ function checkData ($string, $fd) { if (!is_string($string)) { $this->logger->error('字符串類型錯誤'); return '字符串類型錯誤'; } $strArray = explode('_',$string); $shopIds = json_decode($string[1],true); if(!is_array($shopIds) || empty($shopIds)) { $this->logger->error('參數錯誤'); return '參數有誤'; } echo "所有映射成功" } /** * 發送消息到 PC 端 */ public function sendData($server,$sendData,$fd) { $recvData = json_encode($sendData,true); $uid = $recvData['uid']; $data = $recvData['data']; $fdsArr = $this->redis->sMembers('jiayouwa:websocket:voiceSet_'.$uid); echo 'voiceSet_'.$uid; $data = [ 'result' => true, 'code'=>0, 'msg'=>'操做成功', 'data'=>$data, ]; if(count($fdsArr)) { foreach ($fdsArr as $key=>$value) { try { $server->push(intval($value),json_encode($data)); echo "線程:$fd 向線程 $value 發送信息\\n"; } catch (\\Throwable $e) { // 增長 重試次數 $this->service->push($recvData,1); // 把數據刪除 $this->redis->sRem('jiayouwa:websocket:voiceSet_'.$uid,$value); continue; } }} } public function onClose(Server $server, int $fd, int $reactorId): void { echo "$fd\-closed\\n"; } public function onOpen(WebSocketServer $server, Request $request): void { echo "線程:$request\->fd\-打開\\n"; } }
/** * 向指定瀏覽器發送數據 * * @param VoiceBroadcastRequest $request * @param ResponseInterface $response * @return \\Psr\\Http\\Message\\ResponseInterface */ public function voiceBroadcast(VoiceBroadcastRequest $request, ResponseInterface $response) { $serviceType = $request->input('service_type',0); $name = $request->input('name',''); $shopNameId = $request->input('shop_name_id',0); // 判斷是在redis 中存在 $fdsArr = $this->redis->sMembers('jiayouwa:websocket:voiceSet_'.$shopNameId); if(!count($fdsArr)) { return $this->returnSuccess([],true,0,'shop_name_id 不存在'); } $data = [ 'uid'=>$shopNameId, 'service'=>'voiceBroadcast', 'data'=> array( array( 'service_type'=>$serviceType, 'name'=>$name, ), ), ]; // 發送數據到 webSocket $this->connectWebSocket($data); return $this->returnSuccess(); } /** * 連接websocket 併發送數據 * @param $data * @return array */ public function connectWebSocket($data) { $client = $this->clientFactory->create($this->webSocketIp); $client->push(json_encode($data)); }
<script> //這裏的ip地址改成本身服務器的ip地址 var ws = new WebSocket('ws://192.168.0.0:9502/voiceBroadcast'); ws.onopen = function(){ var uid = 'jiayouwa_[1,2,3]'; ws.send(uid); }; ws.onmessage = function(e){ var message_info =e.data console.log(message_info); }; </script>
用 hyperf 框架實現太簡單了。只需下載 websocket 的客戶端 和 服務端 composer 包。
若是要實現點對點的 消息推送,只須要將你的 uid 和 fd 進程 redis 的 key=> value 的映射便可
若是要實現 一個推送到 多個 pc 端,將 uid => [1,2,3,4,5] 用 redis 的集合類型就好
代碼僅供參考,有什麼問題評論就好
寫文章今年纔開始,寫的很差,請多多包涵
hyperf 框架地址: https://doc.hyperf.io/#/