相較於Redis4.0,Redis5.0增長了不少新的特性,而streams是其中最重要的特性之一。streams是redis 的一種基本數據結構,它是一個新的強大的支持多播的可持久化的消息隊列,在設計上借鑑了kafaka。streams的數據類型自己很是簡單,有點相似於hash結構,可是它的額外特性異常強大且複雜:node
pub/sub
機制和list
消息被消費後就會被刪除,streams消費過的數據會被持久化的保存在歷史中。pub/sub
有些相似。streams 提供了默認的id模式用來惟一標識streams中的每一條數據,由兩部分組成:<millisecondsTime>-<sequenceNumber>
millisecondsTime是redis服務所在機器的時間,sequenceNumber用於同一毫秒建立的數據。須要注意的一點是streams的id老是單調增加的,即便redis服務所在的服務器時間異常。若是當前的毫秒數小於之前的毫秒數,就會使用歷史記錄中最大的毫秒數,而後序列號遞增。而這樣作的緣由是由於streams的機制容許根據時間區間或者某一個時間節點或者某一id查找數據。redis
streams 的基礎寫命令爲XADD
,其語法爲XADD key ID field value [field value ...]
數據庫
127.0.0.1:6379> XADD mystream * name dwj age 18
"1574925508730-0"
127.0.0.1:6379>
複製代碼
上面的例子使用XADD
向名爲mystream
的streams中添加了一條數據,ID使用*表示使用streams使用默認的ID,在本例中redis返回的1574925508730-0
就是redis爲咱們插入的數據生成的ID。安全
另外streams 查看streams長度的命令爲XLEN
ruby
127.0.0.1:6379> XLEN mystream
(integer) 3
127.0.0.1:6379>
複製代碼
從streams中讀取數據會比寫數據複雜不少,用日誌文件進行對比,咱們能夠查看歷史日誌,能夠根據範圍查詢日誌,咱們能夠經過unix的命令tail -f
來監聽日誌,能夠多個用戶查看到同一份日誌,也能夠多個用戶只能查看到本身有權限查看的那一部分日誌。性能優化
首先來介紹一下 根據範圍查詢,這兩種操做都比較簡單,以XRANGE
爲例,它的語法格式爲XRANGE key start end [COUNT count]
, 咱們只須要提供兩個id,start
和end
,返回的將是一個包含start
和end
的閉區間。兩個特殊的ID-
和+
分別表示可能的最小ID和最大ID。bash
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1574835253335-0"
2) 1) "name"
2) "bob"
3) "age"
4) "23"
2) 1) "1574925508730-0"
2) 1) "name"
2) "dwj"
3) "age"
4) "18"
127.0.0.1:6379>
複製代碼
咱們前邊提到過數據id中包含了建立數據的時間信息,這意味着咱們能夠根據時間範圍查詢數據,爲了根據時間範圍查詢,咱們省略掉ID的序列號部分,若是省略,對於start ID會使用0做爲默認的序列號,對於end ID會使用最大序列號做爲默認值,這樣的話咱們使用兩個unix時間戳去查詢數據就能夠獲得那個時間區間內全部的數據。服務器
1) 1) "1574835253335-0"
2) 1) "name"
2) "bob"
3) "age"
4) "23"
127.0.0.1:6379>
複製代碼
可能還會有同窗注意到語法的最後邊還有count
參數,這個參數容許咱們一次只返回固定數量的數據,而後根據返回數據的last_id,做爲下一次查詢的start,這樣就容許咱們在一個量很是大的streams裏批量返回數據。
XREVRANGE命令與XRANGE相同,可是以相反的順序返回元素,就不重複介紹了。數據結構
XREAD容許咱們從某一結點開始從streams中讀取數據,它的語法爲XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
,咱們在這裏主要將的是經過XREAD
來訂閱到達streams新的數據。這種操做可能跟REDIS中原有的pub/sub
機制或者阻塞隊列
的概念有些相似,都是等待一個key而後獲取到新的數據,可是跟這兩種有着本質的差異:app
pub/sub
和阻塞隊列
容許多個客戶端一塊兒等待數據,默認狀況下,streams會把消息推送給全部等待streams數據的客戶端,這個能力跟pub/sub
有點相似,可是streams也容許把消息經過競爭機制推送給其中的一個客戶端(這種模式須要用到消費者組的概念,會在後邊講到)。pub/sub
的消息是fire and forget而且從不存儲,你只能夠訂閱到在你訂閱時間以後產生的消息,而且消息只會推送給客戶端一次,不能查看歷史記錄。以及使用阻塞隊列
時,當客戶端收到消息時,這個元素會從隊列中彈出,換句話說,不能查看某個消費者消費消息的歷史。而在streams中全部的消息會被無限期的加入到streams中(消息能夠被顯式的刪除而且存在淘汰機制),客戶端須要記住收到的最後一條消息,用於獲取到節點以後的新消息。127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1574835253335-0"
2) 1) "name"
2) "bob"
3) "age"
4) "23"
2) 1) "1574925508730-0"
2) 1) "name"
2) "dwj"
3) "age"
4) "18"
127.0.0.1:6379>
複製代碼
同list結構同樣,streams也提供了阻塞讀取的命令XREAD BLOCK 0 STREAMS mystream
複製代碼
在上邊的命令中指定了BLOCK選項,超時時間爲0毫秒(意味着永不會過時)。此外,這個地方使用了特殊的id $
,這個特殊的id表明着當前streams中最大的id,這就意味着你只會讀取streams中在你監聽時間之後的消息。有點相似於Unix的tail -f
。另外XREAD能夠同時監聽多個流中的數據。若是咱們想要的不是多個客戶端處理相同的消息,而是多個客戶端從streams中獲取到不一樣的消息進行處理。也就是咱們經常使用的生產者-消費者模型。假如想象咱們具備兩個生產者p1,p2,三個消費者c1,c2,c3以及7個商品。咱們想按照下面的效果進行處理
p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1
複製代碼
爲了解決這種場景,redis使用了一個名爲消費者的概念,有點相似於kafka,但只是表現上。消費者組就像是一個僞消費者,它從流內讀取數據,而後分發給組內的消費者,並記錄該消費者組消費了哪些數據,處理了那些數據,並提供了一系列功能。
它的模型相似於以下
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
複製代碼
從上邊的模型中咱們能夠看出消費者組記錄處理的最後一條消息,將消息分發給不一樣的消費者,每一個消費者只能看到本身的消息。若是把消費者組看作streams的輔助數據結構,咱們能夠看出一個streams能夠擁有多個消費者組,一個消費者組內能夠擁有多個消費者。實際上,一個streams容許客戶端使用XREAD讀取的同時另外一個客戶端經過消費者羣組讀取數據。
咱們首先建立一個包含了一些數據的streams
127.0.0.1:6379> XADD fruit * message apple
"1574935311149-0"
127.0.0.1:6379> XADD fruit * message banada
"1574935315886-0"
127.0.0.1:6379> XADD fruit * message pomelo
"1574935323628-0"
複製代碼
而後建立一個消費者組
127.0.0.1:6379> XGROUP CREATE fruit mygroup $
OK
複製代碼
注意咱們須要指定一個id,這裏咱們使用的是特殊id$
,咱們也可使用0或者一個unix時間戳,這樣,消費者組只會讀取這個節點以後的消息。
如今消費者組建立好了,咱們可使用XREADGROUP命令當即開始嘗試經過消費者組讀取消息。XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
,與XREAD
相似,提供了BLOCK選項。假設指定消費者分別是Alice和Bob,來看看系統會怎樣返回不一樣消息給Alice和Bob。
127.0.0.1:6379> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS fruit >
1) 1) "fruit"
2) 1) 1) "1574936034258-0"
2) 1) "message"
2) "apple"
127.0.0.1:6379>
複製代碼
上邊命令表明的信息是:我要經過mygroup
讀取streams fruit
中的數據,我在羣組中的身份是Alice
,請給我一條數據。 >
操做符只在消費者組的上線文中有效,表明消息到目前爲止沒有交給其它消費者處理過。
咱們也可使用一個有效的id,在這種狀況下,消費者組會告訴咱們的歷史待處理消息,而不會告訴咱們新的消息。這個特性也是頗有用的,當消費者由於某些緣由從新啓動後,咱們能夠查看本身的歷史待處理消息,處理完待處理消息後再去處理新的消息。
咱們能夠經過XACK
命令告訴消費者組某條消息已經被正確處理,不要顯示在個人歷史待處理消息列表中。XACK
的語法爲XACK key group ID [ID ...]
127.0.0.1:6379> XACK fruit mygroup 1574936034258-0
(integer) 1
複製代碼
有幾件事須要記住:
在一個消費者羣組中可能存在多個消費者消費消息,可是也可能會存在某一個消費者永久退出消費者羣組的狀況,這樣咱們就須要一種機制,把該消費者的待處理消息分配給消費者羣組的另外一個消費者。這就須要咱們具備查看待處理消息的能力以及把某個消息分配給指定消費者的能力。前者是經過一個叫XPENDING
的命令,它的語法爲XPENDING key group [start end count] [consumer]
127.0.0.1:6379> XPENDING fruit mygroup
1) (integer) 1
2) "1574936042937-0"
3) "1574936042937-0"
4) 1) 1) "Alice"
2) "1"
複製代碼
上述返回結果表明的是消費者羣組有1條待處理命令,待處理消息的起始id爲1574936042937-0
,結束id爲1574936042937-0
,名爲Alice
的消費者有一個待處理命令,可能有人會好奇咱們在前邊往fruit
放入了3個水果,使用XACK
處理了一個水果,消費者待處理列表中應該有兩個水果,而事實上消費者羣組的待處理列表爲該羣組下消費者待處理消息的合集,當有消費者經過羣組獲取消息的時候會改變消費者羣組的狀態,這也是前邊提到的爲何XREADGROUP
必須在master節點進行調用。
咱們可使用start end count 參數來查看某個範圍內消息的狀態
127.0.0.1:6379> XPENDING fruit mygroup - + 10 Alice
1) 1) "1574936042937-0"
2) "Alice"
3) (integer) 903655
4) (integer) 1
2) 1) "1574936052018-0"
2) "Alice"
3) (integer) 491035
4) (integer) 1
複製代碼
這樣咱們就看到了一條消息的詳細信息,id爲1574936042937-0
的消息的消費者爲Alice
,它的pending時間爲903655
,這個消息被分配了1次。
咱們會發現第一條消息的處理時間有點長,咱們懷疑Alice
已經不能處理這條消息了,因而咱們想把這條消息分配給Bob
,這種場景下就須要用到了XCLAIM
命令,它的語法爲XCLAIM ...
,其中min-idle-time爲消息的最小空閒時間,只有消息的空閒時間大於這個值消息纔會被分配,由於消息被分配的時候會重置消息的空閒時間,若是有同時把一條消息分配給兩個客戶端,只會第一條命令生效,由於當消息分配給第一個客戶端的時候重置空閒時間,第二條命令則會失效。
咱們也可使用一個獨立的進程來不斷尋找超時的消息,並把它分配給活躍的消費者,不過須要注意的是,若是消息的分配次數達到某個闕值,不該該把消息再分配出去,而是應該放到別的地方。
streams具備不錯的可觀察性,前邊的XPENDING
命令容許咱們查看streams在某個消費者羣組內待處理消息的狀態。可是咱們想看的更多,好比在這個streams下有多少個group, 在這個group下有多少消費者。這就要用到XINFO
命令:
查看streams
信息:
127.0.0.1:6379> XINFO STREAM mystream
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1574925508730-0"
11) "first-entry"
12) 1) "1574835253335-0"
2) 1) "name"
2) "bob"
3) "age"
4) "23"
13) "last-entry"
14) 1) "1574925508730-0"
2) 1) "name"
2) "dwj"
3) "age"
4) "18"
複製代碼
輸出中會告訴咱們streams的長度,羣組數量,第一條和最後一條信息的詳情。下面看一下streams下羣組的信息:
127.0.0.1:6379> XINFO GROUPS fruit
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1574936052018-0"
2) 1) "name"
2) "mygroup-1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
複製代碼
咱們能夠從輸出中看到fruit
下有兩個羣組,羣組的名稱以及待處理消息的數量,處理的最後一條消息。咱們能夠在詳細的查看下消費者羣組內消費者的狀態。
127.0.0.1:6379> XINFO CONSUMERS fruit mygroup
1) 1) "name"
2) "Alice"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 1990242
2) 1) "name"
2) "Bob"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 9178
複製代碼
從輸出中能夠看到消費者待處理消息的數量以及消費者的閒置時間。
若是從streams能夠查看到歷史記錄,咱們可能會有疑惑,若是streams無限期的加入內存會不會夠用,一旦消息數量達到上限,將消息永久刪除或者持久化到數據庫都是有必要的,redis也提供了諸如此類場景的支持。
一種方法是咱們使用XADD
的時候指定streams的最大長度,XADD mystream MAXLEN ~ 1000
其中的數值前能夠加上~
標識不須要精確的將長度保持在1000,比1000多一些也能夠接受。若是不使用該標識,性能會差一些。另外一種方法是使用XTRIM
,該命令也是使用MAXLEN
選項,> XTRIM mystream MAXLEN ~ 10
前面提到了在streams API裏邊存在一些特殊的id。
首先是-
和+
,這兩個ID在XRANGE
命令中使用,分別表明最小的id和最大的id。-
表明0-1
,+
表明18446744073709551615-18446744073709551615
,從使用上方便了不少。在XPENDING
等範圍查詢中均可以使用。$
表明streams中當前存在的最大的id,在XREAD
和XGROUP
中表明只獲取新到的消息。須要注意的是$
跟+
的含義並不一致。
還有一個特殊的id是>
,這個id只可以在XREADGROUP
命令中使用,意味着在這個消費者羣組中,歷來沒有分配給其餘的消費者,因此老是使用>
做爲羣組中的last delivered ID
。
與redis的其它數據結構同樣,streams會異步複製到從節點,並持久化到AOF和RDB文件中,而且消費者羣組的狀態也會按照此機制進行持久化。
須要注意的幾點是:
appendfsync always
這樣會嚴重下降Redis的速度)WAIT
命令能夠用於強制將更改傳輸到一組從節點上。雖然這使得數據不太可能會丟失,可是redis的Sentinel和cluster在進行故障轉移的時候不必定會使用具備最新數據的從節點,在一些特殊故障下,反而會使用缺乏一些數據的從節點。刪除streams中的數據使用XDEL
命令,其語法爲XDEL key ID [ID ...]
,須要注意的是在當前的實現中,在宏節點徹底爲空以前,內存並無真正回收,因此你不該該濫用這個特性。
streams的不阻塞命令,好比XRANGE
或者不使用BLOCK選項的XREAD
和XREADGROUP
跟redis普通命令一致,因此沒有必要討論。若是有興趣的話能夠在redis的文檔中查看到對應命令的時間複雜度。streams命令的速度在必定範圍內跟set
是一致的,XADD
命令的速度很是快,在一個普通的機器上,一秒鐘能夠插入50w~100w條數據。
咱們感興趣的是在消費者羣組的阻塞場景下,從經過XADD
命令向streams中插入一條數據,到消費者經過羣組讀取到這條消息的性能。
爲了測試消息從產生到消費間的延遲,咱們使用ruby程序進行測試,將消息的產生時間做爲消息的一個字段,而後把消息推送到streams中,客戶端收到消息後使用當前時間跟生產時間進行對比,從而計算出消息的延遲時間。這個程序未進行性能優化,運行在一個雙核的機器上,同時redis也運行在這臺機器上,以此來模擬不是理想條件下的場景。消息每秒鐘產生1w條,羣組內有10個消費者消費數據。測試結果以下:
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
複製代碼
99.9%的請求的延遲小於等於2毫秒,並且異常值很是接近平均值。另外須要注意的兩點:
原文連接: redis.io/topics/stre…
本文譯者:Worktile工程師 杜文傑
文章來源:Worktile技術博客
歡迎訪問交流更多關於技術及協做的問題。
文章轉載請註明出處。