Streams:深刻理解Redis5.0新特性

概述

相較於Redis4.0,Redis5.0增長了不少新的特性,而streams是其中最重要的特性之一。streams是redis 的一種基本數據結構,它是一個新的強大的支持多播的可持久化的消息隊列,在設計上借鑑了kafaka。streams的數據類型自己很是簡單,有點相似於hash結構,可是它的額外特性異常強大且複雜:node

  • 支持持久化。streams能持久化存儲數據,不一樣於pub/sub機制和list 消息被消費後就會被刪除,streams消費過的數據會被持久化的保存在歷史中。
  • 支持多播。 這一點跟 pub/sub有些相似。
  • 支持消費者組。streams 容許同一消費組內的消費者競爭消息,並提供了一系列機制容許消費者查看本身的歷史消費消息。並容許監控streams的消費者組信息,消費者組內消費者信息,也能夠監控streams內消息的狀態。

基礎內容

數據 ID

streams 提供了默認的id模式用來惟一標識streams中的每一條數據,由兩部分組成:
<millisecondsTime>-<sequenceNumber>
millisecondsTime是redis服務所在機器的時間,sequenceNumber用於同一毫秒建立的數據。須要注意的一點是streams的id老是單調增加的,即便redis服務所在的服務器時間異常。若是當前的毫秒數小於之前的毫秒數,就會使用歷史記錄中最大的毫秒數,而後序列號遞增。而這樣作的緣由是由於streams的機制容許根據時間區間或者某一個時間節點或者某一id查找數據。redis

向streams插入數據

streams 的基礎寫命令爲XADD,其語法爲XADD key ID field value [field value ...]數據庫

127.0.0.1:6379> XADD mystream * name dwj age 18
"1574925508730-0"
127.0.0.1:6379>
複製代碼

上面的例子使用XADD向名爲mystream的streams中添加了一條數據,ID使用*表示使用streams使用默認的ID,在本例中redis返回的1574925508730-0就是redis爲咱們插入的數據生成的ID。安全

另外streams 查看streams長度的命令爲XLENruby

127.0.0.1:6379> XLEN mystream
(integer) 3
127.0.0.1:6379>
複製代碼

從streams中讀取數據

從streams中讀取數據會比寫數據複雜不少,用日誌文件進行對比,咱們能夠查看歷史日誌,能夠根據範圍查詢日誌,咱們能夠經過unix的命令tail -f來監聽日誌,能夠多個用戶查看到同一份日誌,也能夠多個用戶只能查看到本身有權限查看的那一部分日誌。性能優化

按範圍查詢: XRANGE 和 XREVRANGE

首先來介紹一下 根據範圍查詢,這兩種操做都比較簡單,以XRANGE爲例,它的語法格式爲XRANGE key start end [COUNT count], 咱們只須要提供兩個id,startend,返回的將是一個包含startend的閉區間。兩個特殊的ID-+分別表示可能的最小ID和最大ID。bash

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
2) 1) "1574925508730-0"
   2) 1) "name"
      2) "dwj"
      3) "age"
      4) "18"
127.0.0.1:6379>
複製代碼

咱們前邊提到過數據id中包含了建立數據的時間信息,這意味着咱們能夠根據時間範圍查詢數據,爲了根據時間範圍查詢,咱們省略掉ID的序列號部分,若是省略,對於start ID會使用0做爲默認的序列號,對於end ID會使用最大序列號做爲默認值,這樣的話咱們使用兩個unix時間戳去查詢數據就能夠獲得那個時間區間內全部的數據。服務器

1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
127.0.0.1:6379>
複製代碼

可能還會有同窗注意到語法的最後邊還有count參數,這個參數容許咱們一次只返回固定數量的數據,而後根據返回數據的last_id,做爲下一次查詢的start,這樣就容許咱們在一個量很是大的streams裏批量返回數據。
XREVRANGE命令與XRANGE相同,可是以相反的順序返回元素,就不重複介紹了。數據結構

經過XREAD讀取數據

XREAD容許咱們從某一結點開始從streams中讀取數據,它的語法爲XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],咱們在這裏主要將的是經過XREAD來訂閱到達streams新的數據。這種操做可能跟REDIS中原有的pub/sub機制或者阻塞隊列的概念有些相似,都是等待一個key而後獲取到新的數據,可是跟這兩種有着本質的差異:app

  • streams跟pub/sub阻塞隊列容許多個客戶端一塊兒等待數據,默認狀況下,streams會把消息推送給全部等待streams數據的客戶端,這個能力跟pub/sub有點相似,可是streams也容許把消息經過競爭機制推送給其中的一個客戶端(這種模式須要用到消費者組的概念,會在後邊講到)。
  • pub/sub的消息是fire and forget而且從不存儲,你只能夠訂閱到在你訂閱時間以後產生的消息,而且消息只會推送給客戶端一次,不能查看歷史記錄。以及使用阻塞隊列時,當客戶端收到消息時,這個元素會從隊列中彈出,換句話說,不能查看某個消費者消費消息的歷史。而在streams中全部的消息會被無限期的加入到streams中(消息能夠被顯式的刪除而且存在淘汰機制),客戶端須要記住收到的最後一條消息,用於獲取到節點以後的新消息。
  • Streams 消費者組提供了一種Pub/Sub或者阻塞列表都不能實現的控制級別,同一個Stream不一樣的羣組,顯式地確認已經處理的項目,檢查待處理的項目的能力,申明未處理的消息,以及每一個消費者擁有連貫歷史可見性,單個客戶端只能查看本身過去的消息歷史記錄。
    從streams中讀取數據
    127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0
    1) 1) "mystream"
     2) 1) 1) "1574835253335-0"
           2) 1) "name"
              2) "bob"
              3) "age"
              4) "23"
        2) 1) "1574925508730-0"
           2) 1) "name"
              2) "dwj"
              3) "age"
              4) "18"
    127.0.0.1:6379>
    複製代碼
    同list結構同樣,streams也提供了阻塞讀取的命令
    XREAD BLOCK 0 STREAMS mystream
    複製代碼
    在上邊的命令中指定了BLOCK選項,超時時間爲0毫秒(意味着永不會過時)。此外,這個地方使用了特殊的id $,這個特殊的id表明着當前streams中最大的id,這就意味着你只會讀取streams中在你監聽時間之後的消息。有點相似於Unix的tail -f。另外XREAD能夠同時監聽多個流中的數據。

消費者組

若是咱們想要的不是多個客戶端處理相同的消息,而是多個客戶端從streams中獲取到不一樣的消息進行處理。也就是咱們經常使用的生產者-消費者模型。假如想象咱們具備兩個生產者p1,p2,三個消費者c1,c2,c3以及7個商品。咱們想按照下面的效果進行處理

p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1
複製代碼

爲了解決這種場景,redis使用了一個名爲消費者的概念,有點相似於kafka,但只是表現上。消費者組就像是一個僞消費者,它從流內讀取數據,而後分發給組內的消費者,並記錄該消費者組消費了哪些數據,處理了那些數據,並提供了一系列功能。

  1. 每條消息都提供給不一樣的消費者,所以不可能將相同的消息傳遞給多個消費者。
  2. 消費者在消費者組中經過名稱來識別,該名稱是實施消費者的客戶必須選擇的區分大小寫的字符串。這意味着即使斷開鏈接事後,消費者組仍然保留了全部的狀態,由於客戶端會從新申請成爲相同的消費者。 然而,這也意味着由客戶端提供惟一的標識符。
  3. 每個消費者組都有一個第一個ID永遠不會被消費的概念,這樣一來,當消費者請求新消息時,它能提供之前從未傳遞過的消息。
  4. 消費消息須要使用特定的命令進行顯式確認,表示:這條消息已經被正確處理了,因此能夠從消費者組中逐出。
  5. 消費者組跟蹤全部當前全部待處理的消息,也就是,消息被傳遞到消費者組的一些消費者,可是尚未被確認爲已處理。因爲這個特性,當訪問一個Stream的歷史消息的時候,每一個消費者將只能看到傳遞給它的消息。

它的模型相似於以下

| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
複製代碼

從上邊的模型中咱們能夠看出消費者組記錄處理的最後一條消息,將消息分發給不一樣的消費者,每一個消費者只能看到本身的消息。若是把消費者組看作streams的輔助數據結構,咱們能夠看出一個streams能夠擁有多個消費者組,一個消費者組內能夠擁有多個消費者。實際上,一個streams容許客戶端使用XREAD讀取的同時另外一個客戶端經過消費者羣組讀取數據。

建立一個消費者羣組

咱們首先建立一個包含了一些數據的streams

127.0.0.1:6379> XADD fruit * message apple
"1574935311149-0"
127.0.0.1:6379> XADD fruit * message banada
"1574935315886-0"
127.0.0.1:6379> XADD fruit * message pomelo
"1574935323628-0"
複製代碼

而後建立一個消費者組

127.0.0.1:6379> XGROUP CREATE fruit mygroup $
OK
複製代碼

注意咱們須要指定一個id,這裏咱們使用的是特殊id$,咱們也可使用0或者一個unix時間戳,這樣,消費者組只會讀取這個節點以後的消息。

如今消費者組建立好了,咱們可使用XREADGROUP命令當即開始嘗試經過消費者組讀取消息。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...],與XREAD相似,提供了BLOCK選項。假設指定消費者分別是Alice和Bob,來看看系統會怎樣返回不一樣消息給Alice和Bob。

127.0.0.1:6379> XREADGROUP GROUP  mygroup Alice COUNT 1 STREAMS fruit >
1) 1) "fruit"
   2) 1) 1) "1574936034258-0"
         2) 1) "message"
            2) "apple"
127.0.0.1:6379>
複製代碼

上邊命令表明的信息是:我要經過mygroup讀取streams fruit中的數據,我在羣組中的身份是Alice,請給我一條數據。 >操做符只在消費者組的上線文中有效,表明消息到目前爲止沒有交給其它消費者處理過。
咱們也可使用一個有效的id,在這種狀況下,消費者組會告訴咱們的歷史待處理消息,而不會告訴咱們新的消息。這個特性也是頗有用的,當消費者由於某些緣由從新啓動後,咱們能夠查看本身的歷史待處理消息,處理完待處理消息後再去處理新的消息。
咱們能夠經過XACK命令告訴消費者組某條消息已經被正確處理,不要顯示在個人歷史待處理消息列表中。XACK的語法爲XACK key group ID [ID ...]

127.0.0.1:6379> XACK fruit mygroup 1574936034258-0
(integer) 1
複製代碼

有幾件事須要記住:

  1. 消費者是在他們第一次被說起的時候自動建立的,不須要顯式建立。
  2. 即便使用XREADGROUP,你也能夠同時從多個key中讀取,可是要讓其工做,你須要給每個Stream建立一個名稱相同的消費者組。這並非一個常見的需求,可是須要說明的是,這個功能在技術上是能夠實現的。
  3. XREADGROUP命令是一個寫命令,由於當它從Stream中讀取消息時,消費者組被修改了,因此這個命令只能在master節點調用。

從永久失敗中恢復

在一個消費者羣組中可能存在多個消費者消費消息,可是也可能會存在某一個消費者永久退出消費者羣組的狀況,這樣咱們就須要一種機制,把該消費者的待處理消息分配給消費者羣組的另外一個消費者。這就須要咱們具備查看待處理消息的能力以及把某個消息分配給指定消費者的能力。前者是經過一個叫XPENDING的命令,它的語法爲XPENDING key group [start end count] [consumer]

127.0.0.1:6379> XPENDING fruit mygroup
1) (integer) 1
2) "1574936042937-0"
3) "1574936042937-0"
4) 1) 1) "Alice"
      2) "1"
複製代碼

上述返回結果表明的是消費者羣組有1條待處理命令,待處理消息的起始id爲1574936042937-0,結束id爲1574936042937-0,名爲Alice的消費者有一個待處理命令,可能有人會好奇咱們在前邊往fruit放入了3個水果,使用XACK處理了一個水果,消費者待處理列表中應該有兩個水果,而事實上消費者羣組的待處理列表爲該羣組下消費者待處理消息的合集,當有消費者經過羣組獲取消息的時候會改變消費者羣組的狀態,這也是前邊提到的爲何XREADGROUP必須在master節點進行調用。
咱們可使用start end count 參數來查看某個範圍內消息的狀態

127.0.0.1:6379> XPENDING fruit mygroup - + 10 Alice
1) 1) "1574936042937-0"
   2) "Alice"
   3) (integer) 903655
   4) (integer) 1
2) 1) "1574936052018-0"
   2) "Alice"
   3) (integer) 491035
   4) (integer) 1
複製代碼

這樣咱們就看到了一條消息的詳細信息,id爲1574936042937-0的消息的消費者爲Alice,它的pending時間爲903655,這個消息被分配了1次。
咱們會發現第一條消息的處理時間有點長,咱們懷疑Alice已經不能處理這條消息了,因而咱們想把這條消息分配給Bob,這種場景下就須要用到了XCLAIM命令,它的語法爲XCLAIM ...,其中min-idle-time爲消息的最小空閒時間,只有消息的空閒時間大於這個值消息纔會被分配,由於消息被分配的時候會重置消息的空閒時間,若是有同時把一條消息分配給兩個客戶端,只會第一條命令生效,由於當消息分配給第一個客戶端的時候重置空閒時間,第二條命令則會失效。
咱們也可使用一個獨立的進程來不斷尋找超時的消息,並把它分配給活躍的消費者,不過須要注意的是,若是消息的分配次數達到某個闕值,不該該把消息再分配出去,而是應該放到別的地方。

streams的可觀察性

streams具備不錯的可觀察性,前邊的XPENDING命令容許咱們查看streams在某個消費者羣組內待處理消息的狀態。可是咱們想看的更多,好比在這個streams下有多少個group, 在這個group下有多少消費者。這就要用到XINFO命令:
查看streams信息:

127.0.0.1:6379> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1574925508730-0"
11) "first-entry"
12) 1) "1574835253335-0"
    2) 1) "name"
       2) "bob"
       3) "age"
       4) "23"
13) "last-entry"
14) 1) "1574925508730-0"
    2) 1) "name"
       2) "dwj"
       3) "age"
       4) "18"
複製代碼

輸出中會告訴咱們streams的長度,羣組數量,第一條和最後一條信息的詳情。下面看一下streams下羣組的信息:

127.0.0.1:6379> XINFO GROUPS fruit
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1574936052018-0"
2) 1) "name"
   2) "mygroup-1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"
複製代碼

咱們能夠從輸出中看到fruit下有兩個羣組,羣組的名稱以及待處理消息的數量,處理的最後一條消息。咱們能夠在詳細的查看下消費者羣組內消費者的狀態。

127.0.0.1:6379> XINFO CONSUMERS fruit mygroup
1) 1) "name"
   2) "Alice"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 1990242
2) 1) "name"
   2) "Bob"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 9178
複製代碼

從輸出中能夠看到消費者待處理消息的數量以及消費者的閒置時間。

設置streams上限

若是從streams能夠查看到歷史記錄,咱們可能會有疑惑,若是streams無限期的加入內存會不會夠用,一旦消息數量達到上限,將消息永久刪除或者持久化到數據庫都是有必要的,redis也提供了諸如此類場景的支持。
一種方法是咱們使用XADD的時候指定streams的最大長度,XADD mystream MAXLEN ~ 1000其中的數值前能夠加上~標識不須要精確的將長度保持在1000,比1000多一些也能夠接受。若是不使用該標識,性能會差一些。另外一種方法是使用XTRIM,該命令也是使用MAXLEN選項,> XTRIM mystream MAXLEN ~ 10

一些特殊的id

前面提到了在streams API裏邊存在一些特殊的id。
首先是-+,這兩個ID在XRANGE命令中使用,分別表明最小的id和最大的id。-表明0-1+表明18446744073709551615-18446744073709551615,從使用上方便了不少。在XPENDING等範圍查詢中均可以使用。
$表明streams中當前存在的最大的id,在XREADXGROUP中表明只獲取新到的消息。須要注意的是$+的含義並不一致。
還有一個特殊的id是>,這個id只可以在XREADGROUP命令中使用,意味着在這個消費者羣組中,歷來沒有分配給其餘的消費者,因此老是使用>做爲羣組中的last delivered ID

持久化,複製和消息安全性

與redis的其它數據結構同樣,streams會異步複製到從節點,並持久化到AOF和RDB文件中,而且消費者羣組的狀態也會按照此機制進行持久化。
須要注意的幾點是:

  • 若是消息的持久化以及狀態很重要,則AOF必須使用強fsync配合(AOF記錄每一條更改redis數據的命令,有不少種持久化機制,在這個地方要用到的是appendfsync always 這樣會嚴重下降Redis的速度)
  • 默認狀況下,異步複製不能保證從節點的數據與主節點保持一致,在故障轉移之後可能會丟失一些內容,這跟從節點從主節點接受數據的能力有關。
  • WAIT命令能夠用於強制將更改傳輸到一組從節點上。雖然這使得數據不太可能會丟失,可是redis的Sentinel和cluster在進行故障轉移的時候不必定會使用具備最新數據的從節點,在一些特殊故障下,反而會使用缺乏一些數據的從節點。
    所以在使用redis streams和消費者羣組在設計程序的時候,確保瞭解你的應用程序在故障期間的應對策略,並進行相應地配置,評估它對你的程序是否足夠安全。

從streams中刪除數據

刪除streams中的數據使用XDEL命令,其語法爲XDEL key ID [ID ...],須要注意的是在當前的實現中,在宏節點徹底爲空以前,內存並無真正回收,因此你不該該濫用這個特性。

streams的性能

streams的不阻塞命令,好比XRANGE或者不使用BLOCK選項的XREADXREADGROUP跟redis普通命令一致,因此沒有必要討論。若是有興趣的話能夠在redis的文檔中查看到對應命令的時間複雜度。streams命令的速度在必定範圍內跟set是一致的,XADD命令的速度很是快,在一個普通的機器上,一秒鐘能夠插入50w~100w條數據。
咱們感興趣的是在消費者羣組的阻塞場景下,從經過XADD命令向streams中插入一條數據,到消費者經過羣組讀取到這條消息的性能。
爲了測試消息從產生到消費間的延遲,咱們使用ruby程序進行測試,將消息的產生時間做爲消息的一個字段,而後把消息推送到streams中,客戶端收到消息後使用當前時間跟生產時間進行對比,從而計算出消息的延遲時間。這個程序未進行性能優化,運行在一個雙核的機器上,同時redis也運行在這臺機器上,以此來模擬不是理想條件下的場景。消息每秒鐘產生1w條,羣組內有10個消費者消費數據。測試結果以下:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
複製代碼

99.9%的請求的延遲小於等於2毫秒,並且異常值很是接近平均值。另外須要注意的兩點:

  1. 消費者每次處理1w條消息,這樣增長了一些延遲,這樣作是爲了消費速度較慢的消費者可以保持保持消息流。
  2. 用來作測試的系統相比於如今的系統很是慢。


原文連接: redis.io/topics/stre…

本文譯者:Worktile工程師 杜文傑

文章來源:Worktile技術博客

歡迎訪問交流更多關於技術及協做的問題。

文章轉載請註明出處。

相關文章
相關標籤/搜索