求不更學不動之Redis5.0新特性Stream嚐鮮

Redis5.0最近被做者忽然放出來了,增長了不少新的特點功能。而Redis5.0最大的新特性就是多出了一個數據結構Stream,它是一個新的強大的支持多播的可持久化的消息隊列,做者坦言Redis Stream狠狠地借鑑了Kafka的設計。node

Redis Stream的結構如上圖所示,它有一個消息鏈表,將全部加入的消息都串起來,每一個消息都有一個惟一的ID和對應的內容。消息是持久化的,Redis重啓後,內容還在。數組

每一個Stream都有惟一的名稱,它就是Redis的key,在咱們首次使用xadd指令追加消息時自動建立。服務器

每一個Stream均可以掛多個消費組,每一個消費組會有個遊標last_delivered_id在Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了。每一個消費組都有一個Stream內惟一的名稱,消費組不會自動建立,它須要單獨的指令xgroup create進行建立,須要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。網絡

每一個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每一個消費組都消費到。數據結構

同一個消費組(Consumer Group)能夠掛接多個消費者(Consumer),這些消費者之間是競爭關係,任意一個消費者讀取了消息都會使遊標last_delivered_id往前移動。每一個消費者者有一個組內惟一名稱。ui

消費者(Consumer)內部會有個狀態變量pending_ids,它記錄了當前已經被客戶端讀取的消息,可是尚未ack。若是客戶端沒有ack,這個變量裏面的消息ID會愈來愈多,一旦某個消息被ack,它就開始減小。這個pending_ids變量在Redis官方被稱之爲PEL,也就是Pending Entries List,這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。設計

消息ID

消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當前的消息在毫米時間戳1527846880572時產生,而且是該毫秒內產生的第5條消息。消息ID能夠由服務器自動生成,也能夠由客戶端本身指定,可是形式必須是整數-整數,並且必須是後面加入的消息的ID要大於前面的消息ID。code

消息內容

消息內容就是鍵值對,形如hash結構的鍵值對,這沒什麼特別之處。blog

增刪改查

  1. xadd 追加消息
  2. xdel 刪除消息,這裏的刪除僅僅是設置了標誌位,不影響消息總長度
  3. xrange 獲取消息列表,會自動過濾已經刪除的消息
  4. xlen 消息長度
  5. del 刪除Stream
# *號表示服務器自動生成ID,後面順序跟着一堆key/value
127.0.0.1:6379> xadd codehole * name laoqian age 30  #  名字叫laoqian,年齡30歲
1527849609889-0  # 生成的消息ID
127.0.0.1:6379> xadd codehole * name xiaoyu age 29
1527849629172-0
127.0.0.1:6379> xadd codehole * name xiaoqian age 1
1527849637634-0
127.0.0.1:6379> xlen codehole
(integer) 3
127.0.0.1:6379> xrange codehole - +  # -表示最小值, +表示最大值
127.0.0.1:6379> xrange codehole - +
1) 1) 1527849609889-0
   2) 1) "name"
      2) "laoqian"
      3) "age"
      4) "30"
2) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
3) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
127.0.0.1:6379> xrange codehole 1527849629172-0 +  # 指定最小消息ID的列表
1) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
2) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
127.0.0.1:6379> xrange codehole - 1527849629172-0  # 指定最大消息ID的列表
1) 1) 1527849609889-0
   2) 1) "name"
      2) "laoqian"
      3) "age"
      4) "30"
2) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
127.0.0.1:6379> xdel codehole 1527849609889-0
(integer) 1
127.0.0.1:6379> xlen codehole  # 長度不受影響
(integer) 3
127.0.0.1:6379> xrange codehole - +  # 被刪除的消息沒了
1) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
2) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
127.0.0.1:6379> del codehole  # 刪除整個Stream
(integer) 1

獨立消費

咱們能夠在不定義消費組的狀況下進行Stream消息的獨立消費,當Stream沒有新消息時,甚至能夠阻塞等待。Redis設計了一個單獨的消費指令xread,能夠將Stream當成普通的消息隊列(list)來使用。使用xread時,咱們能夠徹底忽略消費組(Consumer Group)的存在,就比如Stream就是一個普通的列表(list)。隊列

# 從Stream頭部讀取兩條消息
127.0.0.1:6379> xread count 2 streams codehole 0-0
1) 1) "codehole"
   2) 1) 1) 1527851486781-0
         2) 1) "name"
            2) "laoqian"
            3) "age"
            4) "30"
      2) 1) 1527851493405-0
         2) 1) "name"
            2) "yurui"
            3) "age"
            4) "29"
# 從Stream尾部讀取一條消息,毫無疑問,這裏不會返回任何消息
127.0.0.1:6379> xread count 1 streams codehole $
(nil)
# 從尾部阻塞等待新消息到來,下面的指令會堵住,直到新消息到來
127.0.0.1:6379> xread block 0 count 1 streams codehole $
# 咱們重新打開一個窗口,在這個窗口往Stream裏塞消息
127.0.0.1:6379> xadd codehole * name youming age 60
1527852774092-0
# 再切換到前面的窗口,咱們能夠看到阻塞解除了,返回了新的消息內容
# 並且還顯示了一個等待時間,這裏咱們等待了93s
127.0.0.1:6379> xread block 0 count 1 streams codehole $
1) 1) "codehole"
   2) 1) 1) 1527852774092-0
         2) 1) "name"
            2) "youming"
            3) "age"
            4) "60"
(93.11s)

客戶端若是想要使用xread進行順序消費,必定要記住當前消費到哪裏了,也就是返回的消息ID。下次繼續調用xread時,將上次返回的最後一個消息ID做爲參數傳遞進去,就能夠繼續消費後續的消息。

block 0表示永遠阻塞,直到消息到來,block 1000表示阻塞1s,若是1s內沒有任何消息到來,就返回nil

127.0.0.1:6379> xread block 1000 count 1 streams codehole $
(nil)
(1.07s)

建立消費組

Stream經過xgroup create指令建立消費組(Consumer Group),須要傳遞起始消息ID參數用來初始化last_delivered_id變量。

127.0.0.1:6379> xgroup create codehole cg1 0-0  #  表示從頭開始消費
OK
# $表示從尾部開始消費,只接受新消息,當前Stream消息會所有忽略
127.0.0.1:6379> xgroup create codehole cg2 $
OK
127.0.0.1:6379> xinfo codehole  # 獲取Stream信息
 1) length
 2) (integer) 3  # 共3個消息
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2  # 兩個消費組
 9) first-entry  # 第一個消息
10) 1) 1527851486781-0
    2) 1) "name"
       2) "laoqian"
       3) "age"
       4) "30"
11) last-entry  # 最後一個消息
12) 1) 1527851498956-0
    2) 1) "name"
       2) "xiaoqian"
       3) "age"
       4) "1"
127.0.0.1:6379> xinfo groups codehole  # 獲取Stream的消費組信息
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 0  # 該消費組尚未消費者
   5) pending
   6) (integer) 0  # 該消費組沒有正在處理的消息
2) 1) name
   2) "cg2"
   3) consumers  # 該消費組尚未消費者
   4) (integer) 0
   5) pending
   6) (integer) 0  # 該消費組沒有正在處理的消息

消費

Stream提供了xreadgroup指令能夠進行消費組的組內消費,須要提供消費組名稱、消費者名稱和起始消息ID。它同xread同樣,也能夠阻塞等待新消息。讀到新消息後,對應的消息ID就會進入消費者的PEL(正在處理的消息)結構裏,客戶端處理完畢後使用xack指令通知服務器,本條消息已經處理完畢,該消息ID就會從PEL中移除。

# >號表示從當前消費組的last_delivered_id後面開始讀
# 每當消費者讀取一條消息,last_delivered_id變量就會前進
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851486781-0
         2) 1) "name"
            2) "laoqian"
            3) "age"
            4) "30"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851493405-0
         2) 1) "name"
            2) "yurui"
            3) "age"
            4) "29"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851498956-0
         2) 1) "name"
            2) "xiaoqian"
            3) "age"
            4) "1"
      2) 1) 1527852774092-0
         2) 1) "name"
            2) "youming"
            3) "age"
            4) "60"
# 再繼續讀取,就沒有新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
(nil)
# 那就阻塞等待吧
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
# 開啓另外一個窗口,往裏塞消息
127.0.0.1:6379> xadd codehole * name lanying age 61
1527854062442-0
# 回到前一個窗口,發現阻塞解除,收到新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527854062442-0
         2) 1) "name"
            2) "lanying"
            3) "age"
            4) "61"
(36.54s)
127.0.0.1:6379> xinfo groups codehole  # 觀察消費組信息
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 1  # 一個消費者
   5) pending
   6) (integer) 5  # 共5條正在處理的信息還有沒有ack
2) 1) name
   2) "cg2"
   3) consumers
   4) (integer) 0  # 消費組cg2沒有任何變化,由於前面咱們一直在操縱cg1
   5) pending
   6) (integer) 0
# 若是同一個消費組有多個消費者,咱們能夠經過xinfo consumers指令觀察每一個消費者的狀態
127.0.0.1:6379> xinfo consumers codehole cg1  # 目前還有1個消費者
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 5  # 共5條待處理消息
   5) idle
   6) (integer) 418715  # 空閒了多長時間ms沒有讀取消息了
# 接下來咱們ack一條消息
127.0.0.1:6379> xack codehole cg1 1527851486781-0
(integer) 1
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 4  # 變成了5條
   5) idle
   6) (integer) 668504
# 下面ack全部消息
127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0
(integer) 4
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 0  # pel空了
   5) idle
   6) (integer) 745505

Stream消息太多怎麼辦

讀者很容易想到,要是消息積累太多,Stream的鏈表豈不是很長,內容會不會爆掉就是個問題了。xdel指令又不會刪除消息,它只是給消息作了個標誌位。

Redis天然考慮到了這一點,因此它提供了一個定長Stream功能。在xadd的指令提供一個定長長度maxlen,就能夠將老的消息幹掉,確保最多不超過指定長度。

127.0.0.1:6379> xlen codehole
(integer) 5
127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1
1527855160273-0
127.0.0.1:6379> xlen codehole
(integer) 3

咱們看到Stream的長度被砍掉了。

消息若是忘記ACK會怎樣

Stream在每一個消費者結構中保存了正在處理中的消息ID列表PEL,若是消費者收到了消息處理完了可是沒有回覆ack,就會致使PEL列表不斷增加,若是有不少消費組的話,那麼這個PEL佔用的內存就會放大。

PEL如何避免消息丟失

在客戶端消費者讀取Stream消息時,Redis服務器將消息回覆給客戶端的過程當中,客戶端忽然斷開了鏈接,消息就丟失了。可是PEL裏已經保存了發出去的消息ID。待客戶端從新連上以後,能夠再次收到PEL中的消息ID列表。不過此時xreadgroup的起始消息ID不能爲參數>,而必須是任意有效的消息ID,通常將參數設爲0-0,表示讀取全部的PEL消息以及自last_delivered_id以後的新消息。

結論

Stream的消費模型借鑑了kafka的消費分組的概念,它彌補了Redis Pub/Sub不能持久化消息的缺陷。可是它又不一樣於kafka,kafka的消息能夠分partition,而Stream不行。若是非要分parition的話,得在客戶端作,提供不一樣的Stream名稱,對消息進行hash取模來選擇往哪一個Stream裏塞。若是讀者稍微研究過Redis做者的另外一個開源項目Disque的話,這很可能是做者意識到Disque項目的活躍程度不夠,因此將Disque的內容移植到了Redis裏面。這只是本人的猜想,未必是做者的初衷。若是讀者有什麼不一樣的想法,能夠在評論區一塊兒參與討論。

閱讀更多高級文章,關注公衆號「碼洞」

相關文章
相關標籤/搜索