Redis 5 新特性中,Streams 數據結構的引入,能夠說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 做爲消息隊列使用時,獲得更完善,更強大的原生支持,其中尤其明顯的是持久化消息隊列。同時,stream 借鑑了 kafka 的消費組模型概念和設計,使消費消息處理上更加高效快速。本文就 Streams 數據結構中經常使用 API 進行分析。bash
本文所使用 Redis 版本爲 5.0.5 。若是使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不一樣。服務器
Streams 添加數據使用 XADD 指令進行添加,消息中的數據以 K-V 鍵值對的形式進行操做。一條消息能夠存在多個鍵值對,添加命令格式:數據結構
XADD key ID field string [field string ...]
複製代碼
其中 key 爲 Streams 的名稱,ID 爲消息的惟一標誌,不可重複,field string 就爲鍵值對。下面咱們就添加以 person 爲名稱的流,進行操做。優化
XADD person * name ytao des https://ytao.top
複製代碼
上面添加案例中,ID 使用 * 號複製,這裏表明着服務端自動生成 Id,添加後返回數據 "1578238486193-0"
ui
這裏自動生成的 Id 格式爲 <millisecondsTime>-<sequenceNumber>
Id 是由兩部分組成:spa
好比:1578238486193-3 表示在 1578238486193 毫秒的時間戳時,添加的第 4 條消息。設計
除了服務端自動生成 Id 方式外,也支持指定 Id 的生成,可是指定 Id 有如下條件限制:指針
不然,當不知足上述條件時,添加後會拋出異常:code
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
複製代碼
實際上,當添加一條消息時,會進行兩部操做。第一步,先判斷若是不存在 Streams,則建立 Streams 的名稱,再添加消息到 Streams 中。即便添加消息時,因爲 Id 異常,也能夠在 Redis 中存在以當前 Streams 的名稱。 Streams 中 Id 也可做爲指針使用,由於它是一個有序的標記。cdn
生產中,若是這樣使用添加消息,會存在一個問題,那就是消息數量太大時,會使服務宕機。這裏 Streams 的設計初期也有考慮到這個問題,那就是能夠指定 Streams 的容量。若是容量操做這個設定的值,就會對調舊的消息。在添加消息時,設置 MAXLEN
參數。
XADD person MAXLEN 5 * name ytao des https://ytao.top
複製代碼
這樣就指定該了 Streams 中的容量爲 5 條消息。也可以使用 XTRIM 截取消息,從小到大剔除多餘的消息:
XTRIM person MAXLEN 8
複製代碼
查看消息數量使用 XLEN 指令進行操做。
XLEN key
複製代碼
例:查看 person 流中的消息數量:
> XLEN person
(integer) 5
複製代碼
查詢 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。
查詢數據時,能夠按照指定 Id 範圍進行查詢,XRANGE 查詢指令格式:
XRANGE key start end [COUNT count]
複製代碼
參數說明:
這裏 start 和 end 有-
和+
兩個非指定值,他們分別表示無窮小和無窮大,因此當使用這個兩個值時,會查詢出所有的消息。
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
複製代碼
上面查詢的消息數據,能夠看到是按照先進先出的順序查詢出來的。
使用 COUNT 指定查詢返回的數量:
# 查詢全部的消息,而且返回一條數據
> XRANGE person - + COUNT 1
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
複製代碼
在範圍查詢中,Id 的後半部分可省略,後半部分中的數據會所有查詢到。
XREVRANGE 的查詢和 XRANGE 指令中的使用相似,但查詢的 start 和 end 參數順序進行了調換:
XREVRANGE key end start [COUNT count]
複製代碼
使用案例:
> XREVRANGE person + -
1) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
複製代碼
查詢後的結果與 XRANGE 的結果順序恰好相反,其餘都同樣,這兩個指令可進行消息的升序和降序的返回。
刪除消息使用 XDEL 指令操做,只需指定將要刪除的 Streams 名稱和 Id 便可,支持一次刪除多個消息 。
XDEL key ID [ID ...]
複製代碼
刪除案例:
# 查詢全部消息
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
3) 1) "2-0"
2) 1) "name"
2) "gaga"
3) "des"
4) "fishion!"
# 刪除消息
> XDEL person 2-0
(integer) 1
# 再次查詢刪除後的全部消息
> XRANGE person - +
1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
# 查詢刪除後的長度
> XLEN person
(integer) 2
複製代碼
從上面能夠看到,刪除消息後,長度也會減小相應的數量。
在 Redis 的 PUB/SUB 中,咱們是經過訂閱來消費消息,在 Streams 數據結構中,一樣也能實現同等功能,當沒有新的消息時,可進行阻塞等待。不只支持單獨消費,並且還能夠支持羣組消費。
單獨消費使用 XREAD 指令。能夠看到,下面命令中,STREAMS,key, 以及 ID 爲必填項。ID 表示將要讀取大於該 ID 的消息。當 ID 值使用 $
賦予時,表示已存在消息的最大 Id 值。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
複製代碼
上面的 COUNT
參數用來指定讀取的最大數量,與 XRANGE 的用法同樣。
> XREAD COUNT 1 STREAMS person 0
1) 1) "person"
2) 1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
> XREAD COUNT 2 STREAMS person 0
1) 1) "person"
2) 1) 1) "0-1"
2) 1) "name"
2) "ytao"
3) "des"
4) "https://ytao.top"
2) 1) "0-2"
2) 1) "name"
2) "luffy"
3) "des"
4) "valiant!"
複製代碼
在 XREAD 裏面還有個 BLOCK
參數,這個是用來阻塞訂閱消息的,BLOCK
攜帶的參數爲阻塞時間,單位爲毫秒,若是在這個時間內沒有新的消息消費,那麼就會釋放該阻塞。當這裏的時間指定爲 0 時,會一直阻塞,直到有新的消息來消費到。
# 窗口 1 開啓阻塞,等待新消息的到來
> XREAD BLOCK 0 STREAMS person $
# 另開一個鏈接窗口 2,添加一條新的消息
> XADD person 2-2 name tao des coder
"2-2"
# 窗口 1,獲取到有新的消息來消費,而且帶有阻塞的時間
> XREAD BLOCK 0 STREAMS person $
1) 1) "person"
2) 1) 1) "2-2"
2) 1) "name"
2) "tao"
3) "des"
4) "coder"
(60.81s)
複製代碼
當使用 XREAD 進行順序消費時,須要額外記錄下讀取到位置的 Id,方便下次繼續消費。
羣組消費的主要目的也就是爲了分流消息給不一樣的客戶端處理,以更高效的速率處理消息。爲達到這一肝功能需求,咱們須要作三件事:建立羣組,羣組讀取消息,向服務端確認消息以處理。
操做羣組使用 XGROUP 指令:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
複製代碼
上面命令中,包含操做有:
咱們當前須要使用的是建立消費組:
# 以當前存在的最大 Id 做爲消費起始
> XGROUP CREATE person group1 $
OK
複製代碼
羣組讀取使用 XREADGROUP 指令,COUNT
和BLOCK
的使用相似 XREAD 的操做,只是多了個羣組和消費者的指定:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
複製代碼
因爲羣組消費和單獨消費相似,這裏只進行個阻塞分析,這裏 Id 也有個特殊值>
,表示還未進行消費的消息:
# 窗口 1,消費羣組中,taotao 消費者創建阻塞監聽
XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
# 窗口 2,消費羣組中,yangyang 消費者創建阻塞監聽
XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
# 窗口 3,添加消費消息
> XADD person 3-1 name tony des 666
"3-1"
# 窗口 1,讀取到新消息,此時 窗口 2 沒有任何反應
> XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
1) 1) "person"
2) 1) 1) "3-1"
2) 1) "name"
2) "tony"
3) "des"
4) "666"
(77.54s)
# 窗口 3,再次添加消費消息
> XADD person 3-2 name james des abc!
"3-2"
# 窗口 2,讀取到新消息,此時 窗口 1 沒有任何反應
> XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
1) 1) "person"
2) 1) 1) "3-2"
2) 1) "name"
2) "james"
3) "des"
4) "abc!"
(76.36s)
複製代碼
以上執行流程中,group1 羣組中有兩個消費者,當添加兩條消息後,這兩個消費者輪流消費。
消息消費後,爲避免再次重複消費,這是須要向服務端發送 ACK,確保消息被消費後的標記。 例以下列狀況,咱們上面咱們將最新兩條消息已進行了消費,可是當咱們再次讀取消息時,仍是被讀到:
> XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
2) 1) 1) "3-2"
2) 1) "name"
2) "james"
3) "des"
4) "abc!"
複製代碼
這時,咱們使用 XACK 指令告訴服務器,咱們已處理的消息:
XACK key group ID [ID ...]0
複製代碼
讓服務器標記 3-2 已處理:
> XACK person group1 3-2
(integer) 1
複製代碼
再次獲取羣組讀取消息:
> XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
2) (empty list or set)
複製代碼
隊列中沒有了可讀消息。 除了上面以講解到的 API 外,查看消費羣組信息可以使用 XINFO 指令查看,本文不作分析。
上面對 Streams 經常使用 API 進行了分析,咱們能夠感覺到 Redis 在消息隊列支持的道路上,也愈來愈強大。若是使用過它的 PUB/SUB 功能的話,就會感覺到 5.x 迭代正是將你的一些痛點進行了優化。
我的博客: ytao.top
關注公衆號 【ytao】,更多原創好文