很久沒寫博客了。
最近公司開了新項目,我負責的內容之一是系統的後端。具體項目內容我就不介紹了,可是用到的技術有些仍是頗有趣的,值得記錄一下。今天介紹的就是其中一個:利用redis的pubsub訂閱消息功能作消息隊列。
對於這個功能自己,仍是比較簡單的。redis自己支持了publish/subscribe的功能,publish是廣播消息,subscribe是訂閱消息。服務端使用
publish [channel] [content]
發佈了一條消息,若是客戶端已經提早訂閱了這個頻道,這個時候就能夠收到消息了。訂閱的命令也很簡單
以後客戶端就開始進入監聽狀態了。
這個功能用python實現起來也很簡單,直接使用redis庫就能夠。至於基本的使用方法,我就不介紹了,這個隨便百度一下就一大片。重點來講說redis裏面的pubsub功能——其實也是百度翻到的,寫一個輔助類:
class RedisSubscriber(object):
"""
Redis頻道訂閱輔助類
"""
def __init__(self, channel):
self._sentinel = Sentinel(config.RedisConfig.HOST_PORT, password=config.RedisConfig.PASSWORD)
self.conn = self._sentinel.master_for(config.RedisConfig.MASTER)
self.channel = channel # 定義頻道名稱
def psubscribe(self):
"""
訂閱方法
"""
pub = self.conn.pubsub()
pub.psubscribe(self.channel) # 同時訂閱多個頻道,要用psubscribe
pub.listen()
return pub
這個類裏面須要解釋的有兩個地方:
- 一是鏈接方式。使用python鏈接redis有三種方式:①使用庫中的Redis類(或StrictRedis類,其實差很少);②使用ConnectionPool鏈接池(可保持長鏈接);③使用Sentinel類(若是有多個redis作集羣時,程序會本身選擇一個合適的鏈接)。我項目中的redis就是個集羣,因此使用了第三種方式。
- 二是訂閱方法。這裏使用的是StrictRedis類中的pubsub方法。鏈接好以後,可以使用subscribe或psubscribe方法來訂閱redis消息。其中subscribe是訂閱一個頻道,psubscribe可訂閱多個頻道(這樣寫的時候,做爲參數的頻道應該是一個列表)。以後就能夠開始監聽了。
接收的地方是這樣:
def test():
subscriber = RedisSubscriber([channel1, channel2, ...])
redis_sub = subscriber.psubscribe() # 調用訂閱方法
while True:
msg = redis_sub.parse_response(block=False, timeout=60)
print("收到訂閱消息 %s" % msg)
注意:
- 剛開始監聽的時候,會收到一條消息,相似於 [b'psubscribe', b'#你訂閱的頻道#', 1] 這樣。出現了這條消息,說明訂閱成功了。
- parse_response像這麼使用的話,是非阻塞的,若是收不到消息,60秒收不到消息就會返回None。這倆參數能夠不加,變成阻塞的。
這就完了。
這就完了?大多數文章就只是簡單的介紹到這裏了。可是我在使用的時候發現一個很是噁心的問題:
訂閱消息過一段時間後就沒動靜了。沒有任何異常,就是簡單的停下了。時間不定,比較常見的是2-4個小時,長的話可能兩三天(python羣裏有位朋友也出現了一毛同樣的問題,也是找了不少資料無果)。我也找了不少資料,有的說是redis服務器緩存滿了,就斷開了,能夠經過修改redis-server的緩存大小來解決。但是,這不科學啊!
再通過幾天的實驗和研究,我猜想了這種狀況可能發生的緣由:
客戶端只是主動鏈接了服務器,而服務器是不在乎的,過段時間發現這個客戶端沒啥用,就主動斷開了。以後,客戶端也不會有報錯,只是尷尬地訂閱着空氣。。。
這個世界好安靜啊!
因而我又嘗試了各類方法,好比:訂閱返回None的時候把訂閱取消,從新訂閱——無論用;把鏈接斷掉從新創建鏈接——無論用;隨便給redis發一條消息——也無論用。
因此我不開心了。我決定採用比較暴力的方式:redis鏈接創建後,就開一條線程,每分鐘主動給服務器發送一條消息(這就比如你睡覺的時候,有人在你身邊,每分鐘問你一遍,喂,你還活着嗎?)。我在RedisSubscriber這個輔助類裏面加了個方法:
def keep_alive(self):
"""
保持客戶端長鏈接
"""
ka_thread = threading.Thread(target=self._ping)
ka_thread.start()
def _ping(self):
"""
發個消息,刷存在感
"""
while True:
time.sleep(60)
# 嘗試向redis-server發一條消息
if not self.conn.ping():
print("oops~ redis-server get lost. call him back now!")
del self._sentinel
self._sentinel = Sentinel(config.RedisConfig.HOST_PORT, password=config.RedisConfig.PASSWORD)
self.conn = self._sentinel.master_for(config.RedisConfig.MASTER)
而後,在test()中,建立好RedisSubscriber類對象以後,加一句
就好。
通過了一個禮拜的測試,訂閱消息還活着。我想這差很少能夠算是我猜對了。暫時當作這個問題解決了吧。