使用python來搞定redis的訂閱功能

很久沒寫博客了。
 
最近公司開了新項目,我負責的內容之一是系統的後端。具體項目內容我就不介紹了,可是用到的技術有些仍是頗有趣的,值得記錄一下。今天介紹的就是其中一個:利用redis的pubsub訂閱消息功能作消息隊列。
 
對於這個功能自己,仍是比較簡單的。redis自己支持了publish/subscribe的功能,publish是廣播消息,subscribe是訂閱消息。服務端使用
publish [channel] [content]

 

發佈了一條消息,若是客戶端已經提早訂閱了這個頻道,這個時候就能夠收到消息了。訂閱的命令也很簡單
subscribe [channel]

 

以後客戶端就開始進入監聽狀態了。
 
這個功能用python實現起來也很簡單,直接使用redis庫就能夠。至於基本的使用方法,我就不介紹了,這個隨便百度一下就一大片。重點來講說redis裏面的pubsub功能——其實也是百度翻到的,寫一個輔助類:
 
class RedisSubscriber(object):
    """
    Redis頻道訂閱輔助類
    """

    def __init__(self, channel):
        self._sentinel = Sentinel(config.RedisConfig.HOST_PORT, password=config.RedisConfig.PASSWORD)
        self.conn = self._sentinel.master_for(config.RedisConfig.MASTER)
        self.channel = channel  # 定義頻道名稱

    def psubscribe(self):
        """
        訂閱方法
        """
        pub = self.conn.pubsub()
        pub.psubscribe(self.channel)  # 同時訂閱多個頻道,要用psubscribe
        pub.listen()
        return pub

 

這個類裏面須要解釋的有兩個地方:
  • 一是鏈接方式。使用python鏈接redis有三種方式:①使用庫中的Redis類(或StrictRedis類,其實差很少);②使用ConnectionPool鏈接池(可保持長鏈接);③使用Sentinel類(若是有多個redis作集羣時,程序會本身選擇一個合適的鏈接)。我項目中的redis就是個集羣,因此使用了第三種方式。
  • 二是訂閱方法。這裏使用的是StrictRedis類中的pubsub方法。鏈接好以後,可以使用subscribe或psubscribe方法來訂閱redis消息。其中subscribe是訂閱一個頻道,psubscribe可訂閱多個頻道(這樣寫的時候,做爲參數的頻道應該是一個列表)。以後就能夠開始監聽了。
 
接收的地方是這樣:
 
def test():
  subscriber = RedisSubscriber([channel1, channel2, ...])
  redis_sub = subscriber.psubscribe()   # 調用訂閱方法
 
  while True:
      msg = redis_sub.parse_response(block=False, timeout=60)
      print("收到訂閱消息 %s" % msg)

 

注意:
  1. 剛開始監聽的時候,會收到一條消息,相似於 [b'psubscribe', b'#你訂閱的頻道#', 1] 這樣。出現了這條消息,說明訂閱成功了。
  2. parse_response像這麼使用的話,是非阻塞的,若是收不到消息,60秒收不到消息就會返回None。這倆參數能夠不加,變成阻塞的。
 
這就完了。
 
這就完了?大多數文章就只是簡單的介紹到這裏了。可是我在使用的時候發現一個很是噁心的問題: 訂閱消息過一段時間後就沒動靜了。沒有任何異常,就是簡單的停下了。時間不定,比較常見的是2-4個小時,長的話可能兩三天(python羣裏有位朋友也出現了一毛同樣的問題,也是找了不少資料無果)。我也找了不少資料,有的說是redis服務器緩存滿了,就斷開了,能夠經過修改redis-server的緩存大小來解決。但是,這不科學啊!
再通過幾天的實驗和研究,我猜想了這種狀況可能發生的緣由: 客戶端只是主動鏈接了服務器,而服務器是不在乎的,過段時間發現這個客戶端沒啥用,就主動斷開了。以後,客戶端也不會有報錯,只是尷尬地訂閱着空氣。。。
 
這個世界好安靜啊!
 
因而我又嘗試了各類方法,好比:訂閱返回None的時候把訂閱取消,從新訂閱——無論用;把鏈接斷掉從新創建鏈接——無論用;隨便給redis發一條消息——也無論用。
 
 
因此我不開心了。我決定採用比較暴力的方式:redis鏈接創建後,就開一條線程,每分鐘主動給服務器發送一條消息(這就比如你睡覺的時候,有人在你身邊,每分鐘問你一遍,喂,你還活着嗎?)。我在RedisSubscriber這個輔助類裏面加了個方法:
 
    def keep_alive(self):
        """
        保持客戶端長鏈接
        """
        ka_thread = threading.Thread(target=self._ping)
        ka_thread.start()

    def _ping(self):
        """
        發個消息,刷存在感
        """
        while True:
            time.sleep(60)
            # 嘗試向redis-server發一條消息
            if not self.conn.ping():
                print("oops~ redis-server get lost. call him back now!")
                del self._sentinel
                self._sentinel = Sentinel(config.RedisConfig.HOST_PORT, password=config.RedisConfig.PASSWORD)
                self.conn = self._sentinel.master_for(config.RedisConfig.MASTER)

 

而後,在test()中,建立好RedisSubscriber類對象以後,加一句
 
subscriber.keep_alive()

 

就好。
 
通過了一個禮拜的測試,訂閱消息還活着。我想這差很少能夠算是我猜對了。暫時當作這個問題解決了吧。
相關文章
相關標籤/搜索