使用 Hyperf 框架的 websocket 客戶端和服務端,完成消息推送!

使用場景

公司原有的業務消息推送是靠前端 ajax 輪眉請求後端接口完成的。而後我新來的,讓我改爲 websocket 雙向通訊的來作消息推送。php

業務場景 => PC 端瀏覽器打開後臺系統後,若是有業務訂單,而後時時推送到 PC 端上,PC 進行時時的語音播報。

步驟

  1. 安裝框架前端

    1. composer create-project hyperf/hyperf-skeleton 【注意要符合框架的使用環境,在安裝】
    2. 安裝 websocket 服務端的 composer 安裝包node

      1. `composer require hyperf/websocket-server`
    3. 安裝 websocket 的客戶端,安裝的緣由是 經過 http 請求後,websocket 客戶端直接向 websocket 服務端創建鏈接,而後推送消息。react

      1. `composer require hyperf/websocket-client`
    4. 修改 config 文件中的 server.php 配置文件,有時候,配置文件不存在,須要本身手動建立。git

      1. server.php 配置文件代碼:

server.php 配置文件代碼【改爲本身的端口號】

<?php  
  
declare(strict\_types=1);  
/\*\*  
 \* This file is part of Hyperf. \* \* @link     https://www.hyperf.io  
  \* @document https://doc.hyperf.io  
  \* @contact  group@hyperf.io  
  \* @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE  
  \*/  
  
use Hyperf\\Server\\Server;  
use Hyperf\\Server\\SwooleEvent;

return [  
  'mode' => SWOOLE_PROCESS,  
  'servers' => [  
 [  'name' => 'http',  
  'type' => Server::SERVER_HTTP,  
  'host' => '0.0.0.0',  
  'port' => 8098,  
  'sock_type' => SWOOLE_SOCK_TCP,  
  'callbacks' => [  
        SwooleEvent::ON_REQUEST => [Hyperf\\HttpServer\\Server::class, 'onRequest'],  
    ],  
  ],  
  [  
  'name' => 'ws',  
  'type' => Server::SERVER\_WEBSOCKET,  
  'host' => '0.0.0.0',  
  'port' => 8099,  
  'sock_type' => SWOOLE_SOCK_TCP,  
  'callbacks' => [  
        SwooleEvent::ON_HAND_SHAKE => [Hyperf\\WebSocketServer\\Server::class, 'onHandShake'],  
        SwooleEvent::ON_MESSAGE => [Hyperf\\WebSocketServer\\Server::class, 'onMessage'],  
        SwooleEvent::ON_CLOSE => [Hyperf\\WebSocketServer\\Server::class, 'onClose'],  
        ],  
      ],  
  ],  
  'settings' => [  
  'enable_coroutine' => true,  
  'worker_num' => swoole_cpu_num(),  
  'pid_file' => BASE_PATH . '/runtime/hyperf.pid',  
  'open_tcp_nodelay' => true,  
  'max_coroutine' => 100000,  
  'open_http2\_protocol' => true,  
  'max_request' => 100000,  
  'socket_buffer_size' => 2 * 1024 * 1024,  
  ],  
  'callbacks' => [  
     SwooleEvent::ON_BEFORE_START => [Hyperf\\Framework\\Bootstrap\\ServerStartCallback::class, 'beforeStart'],  
     SwooleEvent::ON_WORKER_START => [Hyperf\\Framework\\Bootstrap\\WorkerStartCallback::class, 'onWorkerStart'],  
     SwooleEvent::ON_PIPE_MESSAGE => [Hyperf\\Framework\\Bootstrap\\PipeMessageCallback::class, 'onPipeMessage'],  
  ],  
];

api 和 websocket 服務端連接的路由代碼,routes.php

image.png

換成本身的路由對應的控制器和方法

websocket 服務端代碼 VoiceBroadcastWebSocketController.php

<?php  
declare(strict\_types=1);  
  
namespace App\\Controller\\WebSocket;  
  
use App\\Service\\SendWebSocketQueueService;  
use Hyperf\\Contract\\OnCloseInterface;  
use Hyperf\\Contract\\OnMessageInterface;  
use Hyperf\\Contract\\OnOpenInterface;  
use Hyperf\\Validation\\ValidationException;  
use Swoole\\Exception;  
use Swoole\\Http\\Request;  
use Swoole\\Server;  
use Swoole\\Websocket\\Frame;  
use Swoole\\WebSocket\\Server as WebSocketServer;  
use App\\Exception\\WebSocketException;  
use Hyperf\\Utils\\ApplicationContext;  
use Hyperf\\Logger\\LoggerFactory;  
  
use Hyperf\\Di\\Annotation\\Inject;

class VoiceBroadcastWebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface  
{  
  protected $redis;
  
  /**  
   * @Inject  
   * @var SendWebSocketQueueService  
  */
  protected $service;
  
  /**
   * @var \\Psr\\Log\\LoggerInterface
   */
   protected $logger;
   
   // 初始化
   public function __construct(LoggerFactory $loggerFactory)  
   {  
      $container = ApplicationContext::getContainer();  
      $this->redis = $container->get(\Redis::class);  
      $this->logger = $loggerFactory->get('log','default');  
   }
   
   
   // onmessage 方法接收 客戶端或者服務端消息
    public function onMessage(WebSocketServer $server, Frame $frame): void  
    {  

      $recvData = json_decode($frame->data);  
      if(!is_object($recvData)) {  
        $this->checkData($frame->data,$frame->fd);  
      } else {  
        $this->sendData($server,$frame->data,$frame->fd);  
      }  
     }
   
   
   /**  
     * 校驗數據  
     * @param $string  
     * @param $fd  
     * @return string  
    */
   function checkData ($string, $fd) {
        if (!is_string($string)) {
            $this->logger->error('字符串類型錯誤');  
            return '字符串類型錯誤';
        }
        
        $strArray = explode('_',$string);
        $shopIds = json_decode($string[1],true);
        
        if(!is_array($shopIds) || empty($shopIds)) {
           $this->logger->error('參數錯誤');
           return '參數有誤';
        }
        
        echo "所有映射成功"
     
   }
   
   
   /**
     * 發送消息到 PC 端
   */
   public function sendData($server,$sendData,$fd) {
       $recvData = json_encode($sendData,true);
       $uid = $recvData['uid'];
       $data = $recvData['data'];
       
       $fdsArr = $this->redis->sMembers('jiayouwa:websocket:voiceSet_'.$uid);
       
       echo 'voiceSet_'.$uid;
       
       $data = [
            'result' => true,
            'code'=>0,
            'msg'=>'操做成功',
            'data'=>$data,
       ];
       
       
       if(count($fdsArr)) {  
          foreach ($fdsArr as $key=>$value) {  
          try {  
              $server->push(intval($value),json_encode($data));  
             echo "線程:$fd 向線程 $value 發送信息\\n";  
          } catch (\\Throwable $e) {  
          // 增長 重試次數  
              $this->service->push($recvData,1);  
              // 把數據刪除  
              $this->redis->sRem('jiayouwa:websocket:voiceSet_'.$uid,$value);  
              continue;  
         }  
       }}
   }
   
   
   public function onClose(Server $server, int $fd, int $reactorId): void  
    {  
      echo "$fd\-closed\\n";  
    }  
  
    public function onOpen(WebSocketServer $server, Request $request): void  
    {  
      echo "線程:$request\->fd\-打開\\n";  
    }
}

http 客戶端連接到 websocket 文件 VoiceBroadcastController.php

/**  
 * 向指定瀏覽器發送數據  
  *  
 * @param VoiceBroadcastRequest $request  
 * @param ResponseInterface $response  
 * @return \\Psr\\Http\\Message\\ResponseInterface  
 */
 
 public function voiceBroadcast(VoiceBroadcastRequest $request, ResponseInterface $response)  
    {  
      $serviceType = $request->input('service_type',0);  
      $name = $request->input('name','');  
      $shopNameId = $request->input('shop_name_id',0);  

      // 判斷是在redis 中存在  

      $fdsArr = $this->redis->sMembers('jiayouwa:websocket:voiceSet_'.$shopNameId);  
      if(!count($fdsArr)) {  
        return $this->returnSuccess([],true,0,'shop_name_id 不存在');  
      }  

      $data  = [  
          'uid'=>$shopNameId,  
          'service'=>'voiceBroadcast',  
          'data'=> array(  
          array(  
          'service_type'=>$serviceType,  
          'name'=>$name,  
          ),  
         ),  
      ];  

      // 發送數據到 webSocket  
     $this->connectWebSocket($data);  
     return $this->returnSuccess();  
    }
    
    /**  
     * 連接websocket 併發送數據  
     * @param $data  
     * @return array  
    */
    public function connectWebSocket($data) {  
      $client = $this->clientFactory->create($this->webSocketIp);  
      $client->push(json_encode($data));  
    }

pc 端請求 websocket 代碼

<script>  
  //這裏的ip地址改成本身服務器的ip地址  
  var ws = new WebSocket('ws://192.168.0.0:9502/voiceBroadcast');  
  ws.onopen = function(){  
        var uid = 'jiayouwa_[1,2,3]';  
  ws.send(uid);  
  };  
  ws.onmessage = function(e){  
        var message_info =e.data  
  console.log(message_info);  
  };  
  
</script>

注意

用 hyperf 框架實現太簡單了。只需下載 websocket 的客戶端 和 服務端 composer 包。
若是要實現點對點的 消息推送,只須要將你的 uid 和 fd 進程 redis 的 key=> value 的映射便可
若是要實現 一個推送到 多個 pc 端,將 uid => [1,2,3,4,5] 用 redis 的集合類型就好
代碼僅供參考,有什麼問題評論就好
寫文章今年纔開始,寫的很差,請多多包涵
hyperf 框架地址: https://doc.hyperf.io/#/
相關文章
相關標籤/搜索