Redis5新特性Streams做消息隊列

前言

Redis 5 新特性中,Streams 數據結構的引入,能夠說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 做爲消息隊列使用時,獲得更完善,更強大的原生支持,其中尤其明顯的是持久化消息隊列。同時,stream 借鑑了 kafka 的消費組模型概念和設計,使消費消息處理上更加高效快速。本文就 Streams 數據結構中經常使用 API 進行分析。bash

準備

本文所使用 Redis 版本爲 5.0.5 。若是使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不一樣。服務器

添加消息

Streams 添加數據使用 XADD 指令進行添加,消息中的數據以 K-V 鍵值對的形式進行操做。一條消息能夠存在多個鍵值對,添加命令格式:數據結構

XADD key ID field string [field string ...]

其中 key 爲 Streams 的名稱,ID 爲消息的惟一標誌,不可重複,field string 就爲鍵值對。下面咱們就添加以 person 爲名稱的流,進行操做。優化

XADD person * name ytao des https://ytao.top

上面添加案例中,ID 使用 * 號複製,這裏表明着服務端自動生成 Id,添加後返回數據 "1578238486193-0"設計

這裏自動生成的 Id 格式爲 <millisecondstime>-<sequencenumber> Id 是由兩部分組成:指針

  1. millisecondsTime 爲當前服務器時間毫秒時間戳。
  2. sequenceNumber 當前序列號,取值來源於當前毫秒內,生成消息的順序,默認從 0 開始加 1 遞增。

好比:1578238486193-3 表示在 1578238486193 毫秒的時間戳時,添加的第 4 條消息。code

除了服務端自動生成 Id 方式外,也支持指定 Id 的生成,可是指定 Id 有如下條件限制:blog

  1. Id 中的先後部分必須爲數字。
  2. 最小 Id 爲 0-1,不能爲 0-0,可是 2-0,3-0 .... 是被容許的。
  3. 添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 後半部分不能比存在前半部分相同的最大後半部分小。

不然,當不知足上述條件時,添加後會拋出異常:隊列

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

實際上,當添加一條消息時,會進行兩部操做。第一步,先判斷若是不存在 Streams,則建立 Streams 的名稱,再添加消息到 Streams 中。即便添加消息時,因爲 Id 異常,也能夠在 Redis 中存在以當前 Streams 的名稱。 Streams 中 Id 也可做爲指針使用,由於它是一個有序的標記。ci

生產中,若是這樣使用添加消息,會存在一個問題,那就是消息數量太大時,會使服務宕機。這裏 Streams 的設計初期也有考慮到這個問題,那就是能夠指定 Streams 的容量。若是容量操做這個設定的值,就會對調舊的消息。在添加消息時,設置 MAXLEN 參數。

XADD person MAXLEN 5 * name ytao des https://ytao.top

這樣就指定該了 Streams 中的容量爲 5 條消息。也可以使用 XTRIM 截取消息,從小到大剔除多餘的消息:

XTRIM person MAXLEN 8

消息數量

查看消息數量使用 XLEN 指令進行操做。

XLEN key

例:查看 person 流中的消息數量:

> XLEN person
(integer) 5

查詢消息

查詢 Streams 中的消息使用 XRANGEXREVRANGE 指令。

XRANGE

查詢數據時,能夠按照指定 Id 範圍進行查詢,XRANGE 查詢指令格式:

XRANGE key start end [COUNT count]

參數說明:

  • key 爲 Streams 的名稱
  • start 爲範圍查詢開始 Id,包含本 Id。
  • start 爲範圍查詢結束 Id,包含本 Id。
  • Count 爲查詢返回最大的消息數量,非必填。

這裏 start 和 end 有-+兩個非指定值,他們分別表示無窮小和無窮大,因此當使用這個兩個值時,會查詢出所有的消息。

> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"

上面查詢的消息數據,能夠看到是按照先進先出的順序查詢出來的。

使用 COUNT 指定查詢返回的數量:

# 查詢全部的消息,而且返回一條數據
> XRANGE person - + COUNT 1
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"

在範圍查詢中,Id 的後半部分可省略,後半部分中的數據會所有查詢到。

XREVRANGE

XREVRANGE 的查詢和 XRANGE 指令中的使用相似,但查詢的 start 和 end 參數順序進行了調換:

XREVRANGE key end start [COUNT count]

使用案例:

> XREVRANGE person +  -
1) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"

查詢後的結果與 XRANGE 的結果順序恰好相反,其餘都同樣,這兩個指令可進行消息的升序和降序的返回。

刪除消息

刪除消息使用 XDEL 指令操做,只需指定將要刪除的 Streams 名稱和 Id 便可,支持一次刪除多個消息 。

XDEL key ID [ID ...]

刪除案例:

# 查詢全部消息
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"
# 刪除消息      
> XDEL person 2-0
(integer) 1
# 再次查詢刪除後的全部消息
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
# 查詢刪除後的長度      
> XLEN person
(integer) 2

從上面能夠看到,刪除消息後,長度也會減小相應的數量。

消費消息

在 Redis 的 PUB/SUB 中,咱們是經過訂閱來消費消息,在 Streams 數據結構中,一樣也能實現同等功能,當沒有新的消息時,可進行阻塞等待。不只支持單獨消費,並且還能夠支持羣組消費。

單獨消費

單獨消費使用 XREAD 指令。能夠看到,下面命令中,STREAMS,key, 以及 ID 爲必填項。ID 表示將要讀取大於該 ID 的消息。當 ID 值使用 $ 賦予時,表示已存在消息的最大 Id 值。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

上面的 COUNT 參數用來指定讀取的最大數量,與 XRANGE 的用法同樣。

> XREAD COUNT 1 STREAMS person 0
1) 1) "person"
   2) 1) 1) "0-1"
         2) 1) "name"
            2) "ytao"
            3) "des"
            4) "https://ytao.top"

> XREAD COUNT 2 STREAMS person 0
1) 1) "person"
   2) 1) 1) "0-1"
         2) 1) "name"
            2) "ytao"
            3) "des"
            4) "https://ytao.top"
      2) 1) "0-2"
         2) 1) "name"
            2) "luffy"
            3) "des"
            4) "valiant!"

XREAD 裏面還有個 BLOCK 參數,這個是用來阻塞訂閱消息的,BLOCK 攜帶的參數爲阻塞時間,單位爲毫秒,若是在這個時間內沒有新的消息消費,那麼就會釋放該阻塞。當這裏的時間指定爲 0 時,會一直阻塞,直到有新的消息來消費到。

# 窗口 1 開啓阻塞,等待新消息的到來
> XREAD BLOCK 0 STREAMS person $

# 另開一個鏈接窗口 2,添加一條新的消息
> XADD person 2-2 name tao des coder
"2-2"

# 窗口 1,獲取到有新的消息來消費,而且帶有阻塞的時間
> XREAD BLOCK 0 STREAMS person $
1) 1) "person"
   2) 1) 1) "2-2"
         2) 1) "name"
            2) "tao"
            3) "des"
            4) "coder"
(60.81s)

當使用 XREAD 進行順序消費時,須要額外記錄下讀取到位置的 Id,方便下次繼續消費。

羣組消費

羣組消費的主要目的也就是爲了分流消息給不一樣的客戶端處理,以更高效的速率處理消息。爲達到這一肝功能需求,咱們須要作三件事:建立羣組羣組讀取消息向服務端確認消息以處理

羣組操做

操做羣組使用 XGROUP 指令:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

上面命令中,包含操做有:

  • CREATE 建立消費組。
  • SETID 修改下一個處理消息的 Id。
  • DESTROY 銷燬消費組。
  • DELCONSUMER 刪除消費組中指定的消費者。

咱們當前須要使用的是建立消費組:

# 以當前存在的最大 Id 做爲消費起始 
> XGROUP CREATE person group1 $
OK

羣組讀取消息

羣組讀取使用 XREADGROUP 指令,COUNTBLOCK的使用相似 XREAD 的操做,只是多了個羣組和消費者的指定:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

因爲羣組消費和單獨消費相似,這裏只進行個阻塞分析,這裏 Id 也有個特殊值>,表示還未進行消費的消息:

# 窗口 1,消費羣組中,taotao 消費者創建阻塞監聽
XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >

# 窗口 2,消費羣組中,yangyang 消費者創建阻塞監聽 
XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >

# 窗口 3,添加消費消息
> XADD person 3-1 name tony des 666
"3-1"

# 窗口 1,讀取到新消息,此時 窗口 2 沒有任何反應
> XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
1) 1) "person"
   2) 1) 1) "3-1"
         2) 1) "name"
            2) "tony"
            3) "des"
            4) "666"
(77.54s)

# 窗口 3,再次添加消費消息
> XADD person 3-2 name james des abc!
"3-2"

# 窗口 2,讀取到新消息,此時 窗口 1 沒有任何反應
> XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
1) 1) "person"
   2) 1) 1) "3-2"
         2) 1) "name"
            2) "james"
            3) "des"
            4) "abc!"
(76.36s)

以上執行流程中,group1 羣組中有兩個消費者,當添加兩條消息後,這兩個消費者輪流消費。

消息ACK

消息消費後,爲避免再次重複消費,這是須要向服務端發送 ACK,確保消息被消費後的標記。 例以下列狀況,咱們上面咱們將最新兩條消息已進行了消費,可是當咱們再次讀取消息時,仍是被讀到:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
   2) 1) 1) "3-2"
         2) 1) "name"
            2) "james"
            3) "des"
            4) "abc!"

這時,咱們使用 XACK 指令告訴服務器,咱們已處理的消息:

XACK key group ID [ID ...]0

讓服務器標記 3-2 已處理:

> XACK person group1 3-2
(integer) 1

再次獲取羣組讀取消息:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
   2) (empty list or set)

隊列中沒有了可讀消息。 除了上面以講解到的 API 外,查看消費羣組信息可以使用 XINFO 指令查看,本文不作分析。

總結

上面對 Streams 經常使用 API 進行了分析,咱們能夠感覺到 Redis 在消息隊列支持的道路上,也愈來愈強大。若是使用過它的 PUB/SUB 功能的話,就會感覺到 5.x 迭代正是將你的一些痛點進行了優化。

我的博客: https://ytao.top

關注公衆號 【ytao】,更多原創好文

個人公衆號

相關文章
相關標籤/搜索