php實現redis消息發佈訂閱

基礎介紹

Pub/Sub功能(means Publish, Subscribe)即發佈及訂閱功能php

  • 基於事件的系統中,Pub/Sub是目前普遍使用的通訊模型,它採用事件做爲基本的通訊機制,提供大規模系統所要求的鬆散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發佈者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。
  • 消息發佈者,即publish客戶端,無需獨佔連接,你能夠在publish消息的同時,使用同一個redis-client連接進行其餘操做(例如:INCR等)
  • 消息訂閱者,即subscribe客戶端,須要獨佔連接,即進行subscribe期間,redis-client沒法穿插其餘操做,此時client以阻塞的方式等待「publish端」的消息;這一點很好理解,所以subscribe端須要使用單獨的連接,甚至須要在額外的線程中使用。

當使用銀行卡消費的時候,銀行每每會經過微信、短信或郵件通知用戶這筆交易的信息,這即是一種發佈訂閱模式,這裏的發佈是交易信息的發佈,訂閱則是各個渠道。這在實際工做中十分經常使用,Redis 支持這樣的一個模式。html

發佈訂閱模式首先須要消息源,也就是要有消息發佈出來,好比例子中的銀行通知。首先是銀行的記帳系統,收到了交易的命令,成功記帳後,它就會把消息發送出來,這個時候,訂閱者就能夠收到這個消息進行處理了,觀察者模式就是這個模式的典型應用了。redis

終端實現

訂閱,頻道爲'chat'
圖片描述
發佈消息
圖片描述數組

代碼實現

subscribe.php服務器

<?php
ini_set('default_socket_timeout', -1);  //php配置設置不超時
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
//$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);  //redis方式設置不超時,推薦

$redis->subscribe(['chan'],'callback');     //callback爲回調函數名稱
//$redis->subscribe(['chan'],array(new TestCall(),'callback') ); //若是回調函數是類中的方法名,這樣寫

// 回調函數,這裏寫處理邏輯
function callback($instance, $channelName, $message)
{
         echo $channelName, "==>", $message, PHP_EOL;
         
         //$instance,即爲上面建立的redis實例對象,在回調函數中,默認的這個參數就是,所以不需專門傳參。 這裏除了SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE這4條命令以外其它命令都不能使用
         //若是要使用redis中的其餘命令,這樣實現
         $newredis = new Redis();
        $newredis->connect("127.0.0.1", 6379);
        echo $newredis->get('test') . PHP_EOL;
        $newredis->close();
        
          //能夠根據$channelName, $message,處理不一樣的業務邏輯
          switch($chan) {
               case 'chan-1':
                  ...
                  break;
         
               case 'chan-2':
                              ...
                   break;
           }
           
           switch($message) {
               case 'msg1':
                  ...
                  break;
         
               case 'msg2':
                              ...
                   break;
           }
    
}

publish.php微信

<?php

$redis = new Redis();
$redis->connect("127.0.0.1",6379);

$redis->publish('chan','this is a message');

代碼介紹

  1. subscribe.php中設置不超時
方法1:ini_set('default_socket_timeout', -1);

方法2: $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);socket

若是不設置不超時,60s後會報一個錯誤函數

PHP Fatal error:  Uncaught RedisException: read error on connection to 127.0.0.1:6379 in subscribe.php:6

方式一的實現,是經過臨時修改ini的配置值,default_socket_timeout默認爲60s,default_socket_timeout是socket流的超時參數,即socket流從創建到傳輸再到關閉整個過程必需要在這個參數設置的時間之內完成,若是不能完成,那麼PHP將自動結束這個socket並返回一個警告。
圖片描述this

方式二是經過修改redis的配置項,所以僅對redis鏈接生效,相對於方式1,不會產生意外的對其餘方法的影響。spa

批量訂閱

redis的psubscribe支持經過模式匹配的方式實現批量訂閱,訂閱方式

$redis->psubscribe(['my*'],'psubscribe'); //回調函數寫函數名
或者
$redis->psubscribe(['my*'],array(new TestCall(),'psubscribe')); //回調函數爲類中的方法,類名寫你本身定義的類

subscribe.php

<?php
//ini_set('default_socket_timeout', -1);  //不超時
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);

//匹配方式1:發佈可用$redis->publish('mymest','this is a message');
//$redis->psubscribe(['my*'],'psubscribe');    

//匹配方式2:發佈可用$redis->publish('mydest','this is a message');
//$redis->psubscribe(['my?est'],'psubscribe');

//匹配方式3:發佈可用$redis->publish('myaest','this is a message');或$redis->publish('myeest','this is a message');
$redis->psubscribe(['my[ae]est'],'psubscribe');

function psubscribe($redis, $pattern, $chan, $msg) {
      echo "Pattern: $pattern\n";
      echo "Channel: $chan\n";
      echo "Payload: $msg\n";
}

模式匹配規則

支持如下幾種,以hello舉例:

h?llo subscribes to hello, hallo and hxllo
h*llo subscribes to hllo and heeeello
h[ae]llo subscribes to hello and hallo, but not hillo
特殊字符用\轉義

pubsub方法介紹

public function pubsub( $keyword, $argument )

pubsub獲取pub/sub系統的信息,$keyword可用爲"channels", "numsub", 或者"numpat",三種,傳入不一樣的keyword返回的數據不一樣

* $redis->pubsub('channels'); // All channels 獲取全部的頻道,返回數組
     * $redis->pubsub('channels', '*pattern*'); // Just channels matching your pattern,返回符合匹配模式的頻道
     * $redis->pubsub('numsub', array('chan1', 'chan2')); // Get subscriber counts for 'chan1' and 'chan2'    //返回每一個訂閱頻道的數量,返回數組
     * $redis->pubsub('numpat'); // Get the number of pattern subscribers 獲取模式匹配方式的訂閱的數量,即$redis->psubscribe(['my[ae]est'],'psubscribe');返回數量爲1,$redis->subscribe(['chan'],'callback');    這種方式獲取不到,所以返回數量爲0

參考:

Redis發佈訂閱模式

相關文章
相關標籤/搜索