Redis 5 新特性中,Streams 數據結構的引入,能夠說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 做爲消息隊列使用時,獲得更完善,更強大的原生支持,其中尤其明顯的是持久化消息隊列。同時,stream 借鑑了 kafka 的消費組模型概念和設計,使消費消息處理上更加高效快速。本文就 Streams 數據結構中經常使用 API 進行分析。
本文所使用 Redis 版本爲 5.0.5 。若是使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不一樣。segmentfault
Streams 添加數據使用 XADD 指令進行添加,消息中的數據以 K-V 鍵值對的形式進行操做。一條消息能夠存在多個鍵值對,添加命令格式:bash
XADD key ID field string [field string ...]
其中 key 爲 Streams 的名稱,ID 爲消息的惟一標誌,不可重複,field string 就爲鍵值對。下面咱們就添加以 person 爲名稱的流,進行操做。服務器
XADD person * name ytao des https://ytao.top
上面添加案例中,ID 使用 * 號複製,這裏表明着服務端自動生成 Id,添加後返回數據 "1578238486193-0"
數據結構
這裏自動生成的 Id 格式爲 <millisecondsTime>-<sequenceNumber>
Id 是由兩部分組成:優化
好比:1578238486193-3 表示在 1578238486193 毫秒的時間戳時,添加的第 4 條消息。spa
除了服務端自動生成 Id 方式外,也支持指定 Id 的生成,可是指定 Id 有如下條件限制:設計
不然,當不知足上述條件時,添加後會拋出異常:指針
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
實際上,當添加一條消息時,會進行兩部操做。第一步,先判斷若是不存在 Streams,則建立 Streams 的名稱,再添加消息到 Streams 中。即便添加消息時,因爲 Id 異常,也能夠在 Redis 中存在以當前 Streams 的名稱。
Streams 中 Id 也可做爲指針使用,由於它是一個有序的標記。code
生產中,若是這樣使用添加消息,會存在一個問題,那就是消息數量太大時,會使服務宕機。這裏 Streams 的設計初期也有考慮到這個問題,那就是能夠指定 Streams 的容量。若是容量操做這個設定的值,就會對調舊的消息。在添加消息時,設置 MAXLEN
參數。blog
XADD person MAXLEN 5 * name ytao des https://ytao.top
這樣就指定該了 Streams 中的容量爲 5 條消息。也可以使用 XTRIM 截取消息,從小到大剔除多餘的消息:
XTRIM person MAXLEN 8
查看消息數量使用 XLEN 指令進行操做。
XLEN key
例:查看 person 流中的消息數量:
> XLEN person (integer) 5
查詢 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。
查詢數據時,能夠按照指定 Id 範圍進行查詢,XRANGE 查詢指令格式:
XRANGE key start end [COUNT count]
參數說明:
這裏 start 和 end 有-
和+
兩個非指定值,他們分別表示無窮小和無窮大,因此當使用這個兩個值時,會查詢出所有的消息。
> XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!"
上面查詢的消息數據,能夠看到是按照先進先出的順序查詢出來的。
使用 COUNT 指定查詢返回的數量:
# 查詢全部的消息,而且返回一條數據 > XRANGE person - + COUNT 1 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top"
在範圍查詢中,Id 的後半部分可省略,後半部分中的數據會所有查詢到。
XREVRANGE 的查詢和 XRANGE 指令中的使用相似,但查詢的 start 和 end 參數順序進行了調換:
XREVRANGE key end start [COUNT count]
使用案例:
> XREVRANGE person + - 1) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top"
查詢後的結果與 XRANGE 的結果順序恰好相反,其餘都同樣,這兩個指令可進行消息的升序和降序的返回。
刪除消息使用 XDEL 指令操做,只需指定將要刪除的 Streams 名稱和 Id 便可,支持一次刪除多個消息 。
XDEL key ID [ID ...]
刪除案例:
# 查詢全部消息 > XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" # 刪除消息 > XDEL person 2-0 (integer) 1 # 再次查詢刪除後的全部消息 > XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" # 查詢刪除後的長度 > XLEN person (integer) 2
從上面能夠看到,刪除消息後,長度也會減小相應的數量。
在 Redis 的 PUB/SUB 中,咱們是經過訂閱來消費消息,在 Streams 數據結構中,一樣也能實現同等功能,當沒有新的消息時,可進行阻塞等待。不只支持單獨消費,並且還能夠支持羣組消費。
單獨消費使用 XREAD 指令。能夠看到,下面命令中,STREAMS,key, 以及 ID 爲必填項。ID 表示將要讀取大於該 ID 的消息。當 ID 值使用 $
賦予時,表示已存在消息的最大 Id 值。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
上面的 COUNT
參數用來指定讀取的最大數量,與 XRANGE 的用法同樣。
> XREAD COUNT 1 STREAMS person 0 1) 1) "person" 2) 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" > XREAD COUNT 2 STREAMS person 0 1) 1) "person" 2) 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!"
在 XREAD 裏面還有個 BLOCK
參數,這個是用來阻塞訂閱消息的,BLOCK
攜帶的參數爲阻塞時間,單位爲毫秒,若是在這個時間內沒有新的消息消費,那麼就會釋放該阻塞。當這裏的時間指定爲 0 時,會一直阻塞,直到有新的消息來消費到。
# 窗口 1 開啓阻塞,等待新消息的到來 > XREAD BLOCK 0 STREAMS person $ # 另開一個鏈接窗口 2,添加一條新的消息 > XADD person 2-2 name tao des coder "2-2" # 窗口 1,獲取到有新的消息來消費,而且帶有阻塞的時間 > XREAD BLOCK 0 STREAMS person $ 1) 1) "person" 2) 1) 1) "2-2" 2) 1) "name" 2) "tao" 3) "des" 4) "coder" (60.81s)
當使用 XREAD 進行順序消費時,須要額外記錄下讀取到位置的 Id,方便下次繼續消費。
羣組消費的主要目的也就是爲了分流消息給不一樣的客戶端處理,以更高效的速率處理消息。爲達到這一肝功能需求,咱們須要作三件事:建立羣組,羣組讀取消息,向服務端確認消息以處理。
操做羣組使用 XGROUP 指令:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
上面命令中,包含操做有:
咱們當前須要使用的是建立消費組:
# 以當前存在的最大 Id 做爲消費起始 > XGROUP CREATE person group1 $ OK
羣組讀取使用 XREADGROUP 指令,COUNT
和BLOCK
的使用相似 XREAD 的操做,只是多了個羣組和消費者的指定:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
因爲羣組消費和單獨消費相似,這裏只進行個阻塞分析,這裏 Id 也有個特殊值>
,表示還未進行消費的消息:
# 窗口 1,消費羣組中,taotao 消費者創建阻塞監聽 XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person > # 窗口 2,消費羣組中,yangyang 消費者創建阻塞監聽 XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person > # 窗口 3,添加消費消息 > XADD person 3-1 name tony des 666 "3-1" # 窗口 1,讀取到新消息,此時 窗口 2 沒有任何反應 > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person > 1) 1) "person" 2) 1) 1) "3-1" 2) 1) "name" 2) "tony" 3) "des" 4) "666" (77.54s) # 窗口 3,再次添加消費消息 > XADD person 3-2 name james des abc! "3-2" # 窗口 2,讀取到新消息,此時 窗口 1 沒有任何反應 > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person > 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!" (76.36s)
以上執行流程中,group1 羣組中有兩個消費者,當添加兩條消息後,這兩個消費者輪流消費。
消息消費後,爲避免再次重複消費,這是須要向服務端發送 ACK,確保消息被消費後的標記。
例以下列狀況,咱們上面咱們將最新兩條消息已進行了消費,可是當咱們再次讀取消息時,仍是被讀到:
> XREADGROUP GROUP group1 yangyang STREAMS person 0 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!"
這時,咱們使用 XACK 指令告訴服務器,咱們已處理的消息:
XACK key group ID [ID ...]0
讓服務器標記 3-2 已處理:
> XACK person group1 3-2 (integer) 1
再次獲取羣組讀取消息:
> XREADGROUP GROUP group1 yangyang STREAMS person 0 1) 1) "person" 2) (empty list or set)
隊列中沒有了可讀消息。
除了上面以講解到的 API 外,查看消費羣組信息可以使用 XINFO 指令查看,本文不作分析。
上面對 Streams 經常使用 API 進行了分析,咱們能夠感覺到 Redis 在消息隊列支持的道路上,也愈來愈強大。若是使用過它的 PUB/SUB 功能的話,就會感覺到 5.x 迭代正是將你的一些痛點進行了優化。
我的博客: https://ytao.top
關注公衆號 【ytao】,更多原創好文