Redis實現消息隊列之生產消費模式

簡單的介紹下消息隊列,使用消息隊列首先咱們得有一個隊列,那麼這個隊列以前講過就是先進先出的一個數據結構;那麼有了隊列之後咱們還須要有人在隊列裏面放東西,那麼這個放東西的人咱們稱之爲生產者;有了生產者對應的須要一個消費者,沒有消費者這個隊列滿了就會溢出。css

簡單隊列實現

那麼咱們Redis恰好有一個數據類型符合這個就是List。list能夠實現隊列(先進先出)和棧(先進後出),那麼這個list又有兩種插入數據的方式:頭插法和尾插法。因此咱們今天使用的結構是隊列,使用尾插法,關鍵的命令有rpush(尾插)和lpop(頭部獲取)。以下圖所示:
git

> rpush squeue zhangsan lisi mango    #插入隊列(integer) 3> lpop squeue    #獲取隊列"zhangsan"> rpush squeue wangwu(integer) 3> lpop squeue"lisi"> lpop squeue"mango"> lpop squeue"wangwu"> lpop squeue    #空隊列(nil)

上面是rpush/lpop結合使用的例子。還可使用lpush/rpop結合使用,效果是同樣的。github

注意:咱們這裏思考一個問題,在我們實際開發中咱們獲取隊列數據的時候若是這個隊列裏面沒有任何值了,咱們會一直pop,這樣咱們的程序出現了一個死循環,並且此時的redis會不斷的處理服務器的pop指令使以內存增高。redis

此時mango靈光一閃,每次pop的時候判斷,若是隊列有值咱們獲取這個值進行操做,若是隊列是空隊列,那麼咱們此時休息三秒鐘再請求,emmm,加雞腿。shell

> lpop squeue    #空隊列(nil).......sleep 3s old> lpop squeue...
隊列阻塞

經過後端程序控制redis服務器休息時間是一個好辦法,可是此處有一個問題,若是說服務器在休眠的時候隊列忽然進來一個值,而此時須要及時反應獲取這個值該如何實現呢?後端

固然,redis早就考慮到這個問題,so提供了一個叫作隊列阻塞讀,其命令blpopbrpop,就是lpop和rpop的阻塞讀方法
bash

blpop [第一個參數:key]... [第二個參數:time]key能夠有多個key等待time就是阻塞時間,單位默認爲秒

嗯,看似完美的解決了上面的方法服務器

問題又來了,若是說此處設置的時間稍微長一點,阻塞請求的客戶端鏈接再多一點那麼會出現下一個問題,那就是空閒鏈接問題。若是線程一直阻塞在哪裏,Redis的客戶端鏈接就成了閒置鏈接,閒置太久,服務器通常會主動斷開鏈接,減小閒置資源佔用。這個時候blpop/brpop會拋出異常來。微信

因此,魚與熊掌不可兼得,開發者當注意此處須要捕獲異常,而後從新請求。數據結構

延遲隊列

上面咱們介紹了阻塞隊列,深知空閒鏈接會被回收出現異常問題,那麼咱們可不能夠實現延遲隊列,我提早一段時間好比5秒獲取隊列中的元素,當元素記錄的時間到達了我再去執行這個值,而後又提早5秒時間去獲取隊列中的值,依次反覆。便可保證我在某一時刻去執行隊列中對應時間的值

咱們來分析,首先保證隊列是一個有序的咱們才能依次執行,這裏咱們使用ZSet由於它帶有排序且不重複,保證客戶端沒有提交重複數據,那麼值保證了,這個排序如何設計呢?咱們不能使用'yyyyMMddHHmmssSSS'這種,第一個不適應這個排序類型可能會超出,第二就是每次轉換會帶來運算轉換的消耗,全部這裏咱們使用時間戳。那麼zset提供一個獲取某段存在數據指令zrangebyscore,這個指令可以獲取到咱們想要的數據,而後經過zrem刪除zset裏面的值便可完成消費。

####假設當前時間戳是0> zadd dqueue 4 zhangsan 8 lisi 12 mango    #添加元素(integer) 3####提早獲取後5秒的數據> zrangebyscore dqueue -inf 5 withscores    #提早獲取5秒內的數據1) "zhangsan"    #值2) 4.0           #當前排序索引> zrem dqueue zhangsan     #消費後刪除元素1####當前時間戳來到了5秒,提早5秒獲取就加上這個> zrangebyscore dqueue -inf 10 withscores    #獲取10之內的數據,輪訓調用1) "lisi"2) 8.0

寫在最後

咱們首先實現簡單的生成消費模式,針對這種簡單輪詢咱們經過有數據和無數據讓這個輪詢實現休息策略來優化,需求更改須要及時響應生產者的數據咱們使用了阻塞隊列來減小響應延時,經過分析咱們又發現阻塞隊列會被回收的問題,咱們又雙叒叕進行了優化,經過zset來實現延遲隊列。

你覺得這樣就萬無一失了,其實還存在一個問題那就是不能保證原子性,咱們的zrangebyscore和zrem不能在同一原子內執行,這裏只是針對的一個客戶端,若是有多個客戶端執行那就會出現屢次消費問題,也就是資源爭搶,這裏咱們可使用lua腳本來執行保證原子性,即獲取後刪除,其值和時間戳保存到內存中,經過後端程序控制時間消費。固然,問題還會有的,咱們在下一篇來說解Redis消息隊列的發佈訂閱模式。


一名正在搶救的coder

筆名:mangolove

CSDN地址:https://blog.csdn.net/mango_love

GitHub地址:https://github.com/mangoloveYu


本文分享自微信公衆號 - 架構技術棧(gh_f036ff0c58eb)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索