我大學的時候英語6級沒過,所以但凡懂點英語的同窗,若是你進到此頁面,儘可能去閱讀原文,連接在下方原文地址.最次也要對照着原文閱讀,以避免我出了什麼差錯(這是不可避免的),坑了別的小夥伴.node
若是您發現任何翻譯的有歧義的地方,歡迎評論或者發郵件至huyanshi2580@gmail.com
程序員
本文翻譯自Reids官網對Stream的介紹.redis
最近工做須要,須要學一下Redis的新數據結構Stream
.因爲算是比較新一些的技術,中文資料比較少.就找到了Redis官網上做者對Stream的介紹.讀完受益不淺.數組
同時,爲了記錄以及加深理解,決定將原文翻譯過來記錄在博客裏.安全
如下內容爲原文,標題《Introduction to Redis Streams》ruby
Stream是Redis 5.0引入的一種新數據類型,它以更抽象的方式模擬日誌數據結構,然而日誌的本質仍然無缺無損:像日誌文件同樣,一般實現爲僅追加模式打開的文件.Redis stream主要是僅追加的數據結構。至少在概念上是這樣,由於Redis Streams是一種在內存中的抽象數據類型,因此它實現了更強大的操做,以克服日誌文件自己的限制。bash
讓Redis Streams變得很是複雜的是,儘管Stream數據結構自己很是簡單,可是它實現了額外的非強制性功能:容許消費者等待生產者添加到流中的新數據的一組阻塞操做,此外還有一個名爲Consumer Groups(消費者組)的概念。服務器
消費者組最初由Kafka(TM)(一個很受歡迎的的消息系統)引入。Redis以徹底不一樣的方式從新實現了相似的想法,但目標是相同的:容許一組客戶端合做消費同一消息流的不一樣部分。數據結構
爲了理解Redis Streams是什麼以及如何使用它們,咱們將忽略全部高級功能,而是根據用於操做和訪問它的命令來關注數據結構自己。這基本上是大多數其餘Redis數據類型共有的部分,如列表,集合,排序集等。可是,請注意,列表還有一個可選的更復雜的阻塞API,相似於BLPOP
等。所以,Streams 在這方面與列表沒有太大的不一樣,只是附加的API更復雜,更強大。app
因爲Stream是僅追加的數據結構,所以基本寫入命令(稱爲XADD)會將新條目附加到指定的流中。Stream的條目不只僅是一個字符串,而是由一個或多個列-值
對組成。這樣,Stream的每一個條目都已經結構化,就像僅以CSV格式追加式寫入的文件,每行中存在多個分離的字段。
XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
複製代碼
上面對XADD
命令的調用,在鍵爲mystream
的Stream中添加了值爲sensor-id: 123, temperature: 19.8
的一個條目,它使用的條目ID爲1518951480106-0
,是自動生成且由XADD
命令返回的.它將鍵名mystream
做爲第一個參數,第二個參數是標識Stream中每一個條目的條目ID。然而,在上面的例子中,咱們使用了*
,由於咱們但願服務器爲咱們生成新的ID。每一個新的ID都會單調遞增,更簡單地說,添加的每一個新條目都會有比過去的全部條目更高的ID。服務器自動生成ID幾乎老是您想要的,而且明確指定ID的緣由很是少見。咱們稍後會詳細討論這個問題。就像日誌文件擁有行號或者文件內的字節偏移量同樣,每一個條目擁有ID是Stream與日誌文件類似的另外一個特徵.回到咱們的XADD示例,在鍵名和ID以後,下一個參數是組成咱們Stream條目的列-值對。
只需使用XLEN命令就能夠獲取Stream中的項目數:
> XLEN mystream
(integer) 1
複製代碼
條目ID由XADD
命令返回,在給定的Stream中明確地標識每個條目.它由兩部分組成.
<millisecondsTime>-<sequenceNumber> | 毫秒時間-序列號
毫秒時間部分是正在生成 Stream ID的Redis節點的本地時間,然而,若是當前毫秒時間剛好小於前一個條目時間,則使用前一個條目時間,所以若是時鐘回撥,ID的單調遞增屬性仍然存在。序列號用於在相同毫秒內建立的條目。因爲序列號是64位的,因此在相同的毫秒內能夠生成的條目數是沒有限制的。
這些ID的格式最初看起來可能很奇怪,善意的讀者可能想知道爲何時間是ID的一部分。緣由是Redis Stream支持根據ID進行範圍查詢。因爲ID與生成條目的時間相關,這使得根據時間範圍進行查詢基本上是無消耗的.==原文中爲free==。咱們即將在使用XRANGE
命令時瞭解到這一點,
若是因爲某種緣由,用戶須要與時間無關但實際上與另外一個外部系統ID關聯的增量ID,如前所述,XADD
命令能夠採用明確的ID而不是使用*
通配符來觸發自動生成ID,就像下面的例子這樣:
XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2
複製代碼
請注意,在這種狀況下,最小ID爲0-1,而且命令將不接受等於或小於前一個ID的ID:
> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
複製代碼
如今咱們終於能夠經過XADD
在咱們的Stream中添加條目了。然而,將數據附加到Stream中很是明確,可是爲了提取數據而查詢Stream的方式並不像這樣明確。若是咱們繼續類比日誌文件,一種顯而易見的方法是模仿咱們一般使用Unix命令tail -f
作的事情,也就是說,咱們可能會開始監聽以獲取附加到Stream的新消息。注意,與Redis 列表的阻塞操做不一樣.在列表中,對於給定的元素,BLPOP
等流行風格的操做會阻塞其到達單個客戶端,而在Stream中,咱們但願多個消費者能夠看到追加到Stream的新消息,就像多個tail -f
進程能夠查看添加到日誌的內容那樣。使用傳統術語,咱們但願Stream可以將消息扇==fan out==出到多個客戶端。
可是,這只是一種潛在的訪問模式。咱們還能夠以徹底不一樣的方式看待Stream:不是做爲消息傳遞系統,而是做爲時間序列存儲。在這種狀況下,獲取新追加的信息也頗有用,但另外一種天然查詢模式是按時間範圍獲取消息,或者使用遊標遍歷消息以逐步檢查全部歷史記錄。這絕對是另外一種有用的訪問模式。
最後,若是咱們從消費者的角度看Stream,咱們可能但願以另外一種方式訪問流,即,做爲一個能夠將多個消費者分隔開來處理這些消息的消息流.以便於消費者組只能看到到達流的信息的一個子集.經過這種方式,能夠跨不一樣的消費者進行消息處理,而不須要單個消費者處理全部消息:每一個消費者只須要處理不一樣的消息。這基本上是Kafka(TM)中的消費者羣體。經過消費者組閱讀消息是另外一種從Redis Stream中讀取的有趣模式。
Redis Stream經過不一樣的命令支持上述三種查詢模式。接下來的部分將展現它們,從最簡單直接的使用開始:範圍查詢。
範圍查詢:XRANGE
和 XREVRANGE
.
要按範圍查詢Stream,咱們只須要指定兩個ID,即開始和結束。返回的範圍將包括開始和結束ID的元素,所以範圍是包含首項與末項的。這兩種特殊ID-
和+
分別意味着可能的最小和最大的ID。
> XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
複製代碼
返回的每一個條目都是兩個項目的數組:ID和列-值對的列表。咱們已經說過條目ID與時間有關,由於-
左邊的部分是建立Stream條目的本地節點的Unix時間(以毫秒爲單位)(但請注意使用徹底指定的XADD命令複製Stream,所以從屬服務器將具備與主服務器相同的ID)。這意味着我可使用XRANGE
查詢一個範圍內的時間。可是,爲了作到這一點,我可能想要省略ID的序列部分:若是省略,則將範圍的最小值假設爲0,最大值將被假定爲最大值可用序列號。這樣,僅使用兩個Unix毫秒時間查詢,咱們以就能夠得到在該時間範圍內生成的全部條目。例如,我可能想查詢:
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
複製代碼
我在這個時間範圍內只有一個條目,可是在實際數據集中,我能夠查詢小時數範圍,或者在兩毫秒內可能有不少項目,因此返回的結果可能很大。所以,XRANGE
最後支持可選的COUNT選項。經過指定數量,我能夠僅得到前N個項目。若是我想要更多,我能夠得到最後一個ID,序列號增長一,而後再次查詢。讓咱們在下面的例子中瞭解這一點,咱們開始用XADD
添加10個項目(我沒有列出這個,假設 Stream mystream中已經填充了10個項目)。要開始個人遍歷,每一個命令得到2個項目,我從全範圍開始開始查找,但指定數量爲2。
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
複製代碼
爲了繼續遍歷接下來的兩個項目,我必須拿到返回的最後一個ID,即1519073279157-0並將其序列號部分加1。注意,序列號數字爲64位,所以無需檢查溢出。生成的Id,1519073279157-1如今能夠用做下一個XRANGE
調用的新起始參數:
> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
2) 1) "foo"
2) "value_3"
2) 1) 1519073281432-0
2) 1) "foo"
2) "value_4"
複製代碼
就像上面這樣。因爲XRANGE
查找的時間複雜度爲O(log(N)),而後使用O(M)的時間返回M個元素,因此此命令具備對數時間複雜度,這意味着遍歷的每一步都很快。所以XRANGE
也是實際上的流迭代器== the de facto 不會翻譯==,不須要XSCAN
命令。
命令XREVRANGE
與XRANGE
類似,只是以反轉順序返回元素,所以XREVRANGE
的實際用途是檢查Stream中的最後一項是什麼:
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
2) 1) "foo"
2) "value_10"
複製代碼
注意,XREVRANGE
命令以相反的順序獲取start和stop參數。
SREAD
監聽新項目當咱們不想按範圍範文Stream中的項目時,一般咱們想要的是訂閱到達Stream的新項目。這個概念可能出如今與Redis 發佈/訂閱有關的地方,你訂閱一個頻道,或者一個Reids的阻塞列表,而後等待某個key,已得到到達的最新元素.可是這與您消費一個Stream有根本上的不一樣:
Stream能夠有多個客戶端(消費者)等待數據。默認狀況下,每一個新項目都將傳遞給等待指定Stream中的數據的每一個消費者。這個行爲與阻止列表不一樣,其中每一個消費者將得到不一樣的元素。可是,扇出到多個消費者的能力相似於發佈/訂閱。
在發佈/訂閱中消息是自主引導而且永遠不會存儲的,在阻塞列表中,當客戶端收到消息時,它會從列表中彈出(有效刪除),Stream以徹底不一樣的方式工做.全部消息都無限期地追加在Stream中(除非用戶明確要求刪除條目):不一樣的消費者經過記住收到的最後一條消息的ID,來判斷什麼是新消息。
Streams Consumer Groups(==Stream的消費者組==)提供發佈/訂閱或阻塞列表沒法實現的控制級別,同一Stream中的不一樣組,已處理項目的明確確認,檢查待處理項目的能力,未處理消息的聲明以及單個客戶端的連貫歷史可見性,只能查看其私人的歷史消息消費記錄。
提供監聽到達Stream的新消息的能力的命令稱爲XREAD
。它比XRANGE
複雜一點,因此咱們將開始展現簡單的形式,稍後將提供整個命令佈局。
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
複製代碼
以上是XREAD
的非阻塞形式。注意,COUNT
選項不是必需的,實際上該命令的惟一強制選項是STREAMS
選項,它指定一個鍵列表以及消費者已經看過指定Stream的最大ID,因此命令將僅向客戶端提供ID大於咱們指定ID的消息。
在上述命令中,咱們編寫了STREAMS mystream 0
,咱們但願得到名爲mystream的Stream中的全部ID大於的0-0的消息。正如您在上面的示例中所看到的,該命令返回鍵名,由於實際上可使用多個鍵調用此命令以同時從不一樣的Stream中讀取。我能夠寫,STREAMS mystream otherstream 0 0
.注意在STREAMS選項以後咱們須要提供key,以及以後的ID。所以,STREAMS
選項必須始終是最後一個。
除了XREAD
能夠同時訪問多個流,以及咱們可以指定咱們擁有的最後一個ID以獲取更新的消息以外,在這個簡單的形式中,沒有作與XRANGE
不一樣的一些事情。可是,有趣的部分是咱們能夠經過指定BLOCK
參數輕鬆地在阻塞命令中使用XREAD
:
> XREAD BLOCK 0 STREAMS mystream $
複製代碼
注意,在上面的示例中,除了刪除COUN選項以外,我指定了新的BLOCK選項,其超時時間爲0毫秒(這意味着永不超時)。並且,mystream我沒有使用Stream的普通ID,而是使用了特殊ID$
。這種特殊的ID意味着XREAD
應該使用Stream mystream中已經存儲的最大的ID.,因此咱們從開始監聽開始,咱們將只收到新的消息。這在某種程度上相似於Unix命令tail -f
。
注意,使用BLOCK選項時,咱們沒必要使用特殊ID $
。咱們可使用任何有效的ID。若是命令可以當即服務咱們的請求而不會阻塞,它將執行此操做,不然它將阻塞。一般,若是咱們想要重新條目開始消費Stream,咱們從ID$
開始,以後咱們繼續使用收到的最後一條消息的ID來進行下一次調用,依此類推。
XREAD
的阻塞形式也能夠經過指定多個鍵名來監聽多個Streams。若是請求能夠同步提供,由於至少有一個Stream擁有比咱們指定的ID更大的元素,則返回結果。不然,該命令將阻塞並將返回第一個獲取到新數據的Stream的元素(根據指定的ID)。
與阻塞列表操做相似,從等待讀取數據的客戶端的角度來看,阻塞式的Stream是公正的.由於策略是FIFO。給定Stream的第一個阻塞的客戶端也是第一個獲取到新元素的客戶端.
XREAD
沒有除COUNT和BLOCK以外的其餘選項,所以它是一個很是基礎的命令,具備將消費者鏈接到一個或多個Stream的特殊功能.消費Stream的更增強大的功能是使用消費者組API。可是使用消費者組來讀取信息,要使用另外一個不一樣的命令,XREADGROUP
.本指南的下一部分將對此進行介紹。
當手頭的任務是使用不一樣客戶端來消費同一個Stream時,XREAD
已經提供了扇出到N個客戶端的方法,還使用從屬服務器以提供更強的讀取擴展性。然而,有一個明確的問題,咱們想要作的不是向許多客戶端提供相同的消息Stream,而是從同一Stream向許多客戶端提供不一樣的消息子集。一個明顯的例子就是處理消息的速度很慢:可以讓N個不一樣的工做人員接收流的不一樣部分,經過將不一樣的消息路由到能夠作更多工做的(==處理能力強或者當前空閒==)不一樣工做人員來擴展消息處理工做。
實際上,若是咱們想象有三個消費者C1,C2,C3,以及包含消息1,2,3,4,5,6,7的Stream,那麼咱們想要的是以下圖所示的消息服務:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
複製代碼
爲了實現這種效果,Redis使用了一個名爲消費者組的概念。瞭解Redis消費組與Kafka(TM)消費者組的實現方法無關,這一點很是重要,僅從實現的概念來看,它們只是類似.因此我決定與最初普及這種想法的軟件產品相比較,不要改變術語。
消費者組就像一個僞消費者,從Stream中獲取數據,實際上爲多個消費者提供服務,提供這些保證:
在某種程度上,消費者組能夠被想象爲關於流的一些狀態:
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+
複製代碼
若是從這個角度看到這一點,就能夠很是簡單地理解消費者組能夠作什麼,如何向消費者提供他們的未決歷史記錄,以及如何僅處理消費者對新消息的請求,僅當消息ID大於last_delivered_id。同時,若是將消費者組視爲Redis Stream的輔助數據結構,很明顯單個流能夠擁有多個消費者組,擁有消費者的不一樣集合。實際上,同一個Stream甚至可讓客戶端經過XREAD
讀取沒有消費者組的客戶端,以及客戶端經過XREADGROUP
來從不一樣的消費者組讀取.
如今是時候詳細的查看使用消費者組的基本命令,以下所示:
XGROUP
用於建立,銷燬和管理消費者組。XREADGROUP
用於經過消費者組組從Stream中讀取。XACK
是容許消費者將待處理消息標記爲正確處理的命令。假設我已經有一個名爲mystream的Stream,爲了建立一個消費者組,我須要執行如下操做:
> XGROUP CREATE mystream mygroup $
OK
複製代碼
注意:目前沒法爲不存在的Stream建立消費者組,可是在短時間內咱們可能會在XGROUP
命令中添加一個選項,以便在這種狀況下建立一個空的Stream。
正如您上面的命令中看到的,在建立消費者組時,咱們必須指定一個ID,在示例中是$
。這是必需的,由於消費者組在其餘狀態中必須知道在鏈接後處理哪些消息,即剛剛建立該組時的最後消息ID是什麼?若是按照咱們提供的$
,那麼只有從如今開始到達Stream的新消息纔會提供給該組中的消費者。若是咱們指定0
,消費者組將消費全部Stream歷史中的消息記錄。固然,您能夠指定任何其餘有效ID。您所知道的是,消費者組將開始消費ID大於您指定的ID的消息。由於$
表示Stream中當前最大的ID,因此指定$
將僅消費新消息。
如今建立了消費者組,咱們可使用XREADGROUP
命令當即開始嘗試經過消費者組讀取消息。咱們將從消費者那裏讀到,消費者名爲Alice和Bob,看看系統將如何向Alice和Bob返回不一樣的消息。
XREADGROUP
很是相似於XREAD
,也提供相同的BLOCK選項,不然它是一個同步指令。可是,必須始終指定一個強制選項GROUP
,它擁有兩個參數:消費者組的名稱以及嘗試讀取的消費者的名稱。還支持選項COUNT
,它與XREAD
中相同。
在從Stream中讀取信息以前,讓咱們在裏面放一些消息:
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0
複製代碼
注意: 在這裏,message是列的名稱,水果是值.記住,Stream的項目是一個小字典.
如今是時候嘗試使用消費者組讀取一些東西了.
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
複製代碼
XREADGROUP
的回覆就像XREAD
回覆同樣。可是請注意上面提供的GROUP
中的 <group-name> <consumer-name>
,它代表我想使用消費者組從mystream中讀取消息而且我是消費者Alice。每次消費者使用消費者組執行操做時,它必須指定其名稱,惟一地標識該組內的此使用者。
在上面的命令中還有另外一個很是重要的細節,在強制選項STREAMS
以後的,請求的ID是一個特殊ID>
。此特殊ID僅在消費者組的上下文中有效,它意味着:到目前爲止,消息從未傳遞給其餘消費者。
這幾乎老是你想要的,可是也能夠指定一個真實的ID,例如0
或任何其餘有效的ID.可是在這個案例中,咱們要求XREADGROUP
向咱們提供未決消息的歷史記錄,永遠不會在組中看到新消息。因此基本上XREADGROUP
基於咱們指定的ID具備如下行爲:
>
,那麼該命令將僅返回到目前爲止從未傳遞給其餘消費者的新消息,而且將更新消費者組的最後一個消息ID。XACK
確認過。咱們能夠當即測試此行爲,指定ID爲0
,沒有任何COUNT選項:咱們只會看到惟一的待處理消息,即關於apple的消息:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
複製代碼
然而,若是咱們確認處理過的消息,它將不會被分到未決消息歷史記錄中,因此係統將不會報告任何東西了.
> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set)
複製代碼
別爲你還不知道XACK
怎怎麼工做而擔憂,這個概念只是已處理的消息再也不是咱們能夠訪問的歷史記錄中的一部分.
如今輪到Bob讀取一些信息了.
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
2) 1) 1526569506935-0
2) 1) "message"
2) "strawberry"
複製代碼
Bob要求最多兩條消息,而且正在經過同一個組,mygroup閱讀。那麼,Redis將只報告新的消息。正如您所看到的那樣,apple消息未被傳遞,由於它已經傳遞給Alice,所以Bob得到了橘子和草莓等等。
這樣,Alice,Bob和該組中的任何其餘消費者可以從相同的Stream中讀取不一樣的消息,讀取他們還沒有處理消息的歷史,或者將消息標記爲已處理。這容許建立不一樣的拓撲和語義來消費Stream的消息。
有幾點須要注意:
XREADGROUP
,您也能夠同時讀取多個鍵,可是要使其工做,您須要在每一個Stream中建立一個具備相同名稱的消費者組。這不是常見的需求,但值得一提的是該功能在技術上可用。XREADGROUP
是一個寫命令,由於即便它從Stream中讀取,他的反作用也會修改消費者組,所以只能在主實例中調用它。使用Ruby語言編寫的使用消費者組的消費者實現示例以下。Ruby代碼的編寫方式幾乎可讓任何有經驗而不瞭解Ruby的程序員閱讀:
require 'redis'
if ARGV.length == 0
puts "Please specify a consumer name"
exit 1
end
ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new
def process_message(id,msg)
puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end
$lastid = '0-0'
puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
# Pick the ID based on the iteration: the first time we want to
# read our pending messages, in case we crashed and are recovering.
# Once we consumer our history, we can start getting new messages.
if check_backlog
myid = $lastid
else
myid = '>'
end
items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)
if items == nil
puts "Timeout!"
next
end
# If we receive an empty reply, it means we were consuming our history
# and that the history is now empty. Let's start to consume new messages.
check_backlog = false if items[0][1].length == 0
items[0][1].each{|i|
id,fields = i
# Process the message
process_message(id,fields)
# Acknowledge the message as processed
r.xack(:my_stream_key,GroupName,id)
$lastid = id
}
end
複製代碼
正如您所看到的,這裏的想法是開始消費歷史記錄,即咱們的待處理消息列表。這頗有用由於消費者以前可能已經崩潰,因此在從新啓動的狀況下,咱們但願再次讀取在發送給咱們可是沒有獲得確認的消息。經過這種方式,咱們能夠屢次或一次處理消息(在消費者失敗的狀況下,但Redis也有持久性和複製的限制,請參閱有關此主題的特定部分)。
消費完歷史記錄後,咱們會獲得一個空的消息列表,咱們能夠切換到使用特殊ID>
來消費新消息。
上面的示例容許咱們編寫參與同一個消費者組的消費者,處理消息的每一個子集,並從故障中恢復。然而,在現實世界中,消費者可能永遠失敗並永遠沒法恢復.因爲任何緣由中止且沒法恢復後,消費者的待處理消息會發生什麼樣呢?
Redis消費者組提供了一種在這種狀況下正好使用的功能,聲明給定消費者的未處理消息,以便此類消息更改全部權並從新分配給其餘消費者。該功能很是明確,消費者必須檢查待處理消息列表,而且必須使用特殊命令聲明特定消息,不然服務器將把待處理的消息永久分配給舊消費者,這樣不一樣的應用程序就能夠選擇是否使用這樣的功能,以及使用它的方式。
此過程的第一步是提供消費者組中待處理條目的可觀察性的命令,稱爲XPENDING
。這只是一個只讀命令,它始終能夠安全地調用,不會更改任何消息的全部權。在最簡單的形式中,只使用兩個參數調用該命令,這兩個參數是Stream的名稱和消費者者組的名稱。
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
2) "2"
複製代碼
以這種方式調用時,命令只輸出消費者組中的待處理消息總數,在當前案例下只有兩個消息,待處理消息中的較低和較高消息ID,最後是消費者列表和他們的待處理消息數。咱們只有Bob有兩個待處理的消息,由於Alice請求的惟一消息是使用XACK
確認的。
咱們能夠經過給XPENDING
提供更多參數來詢問更多信息,由於完整的命令簽名以下:
XPENDING <key> <groupname> [<start-id> <end-id> <count> [<conusmer-name>]]
複製代碼
經過提供一個開始和結束ID(也能夠像在XRANGE
中同樣只是-
與+
)和數量控制的命令返回的信息量,咱們可以更多地瞭解未處理消息。若是咱們想要將輸出限制爲僅針對給定消費者組的待處理消息,則使用可選的最終參數(消費者組名稱),但咱們不會在下面的示例中使用此功能。
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
2) 1) 1526569506935-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
複製代碼
如今咱們有每條消息的詳細信息:ID,消費者名稱,以毫秒爲單位的空閒時間(即自上次將消息傳遞給某個消費者以來通過了多少毫秒),最後是給定消息的被髮送過的次數。咱們有來自Bob的兩條消息,它們閒置74170458毫秒,大約20小時。
注意,沒有人阻止咱們檢查第一個消息內容是什麼,使用XRANGE
就能夠。
> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
複製代碼
咱們只須要在參數中重複兩次相同的ID。如今咱們已經有了一些想法,Alice可能會決定在20小時不處理消息後,Bob可能沒法及時恢復,而且是時候聲明這些消息並繼續代替Bob處理。爲此,咱們使用XCLAIM
命令。
這個命令的完整選項的形式很是複雜,由於它用於複製消費者組的更改,但咱們將只使用咱們一般須要的參數。在這種狀況下,就像下面同樣簡單的調用他:
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
複製代碼
基本上,對於這個給定的鍵和組,我但願更改指定的ID的消息的全部權,並將其分配給指定的名稱爲<consumer>
的消費者。可是,咱們還提供了最小空閒時間,所以只有在上述消息的空閒時間大於指定的空閒時間時,操做纔會起做用。這頗有用,由於可能有兩個客戶端正在重試同時認領一條消息:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
複製代碼
然而聲稱一條消息,反作用將重置其空閒時間!而且會增長其接受消息數量的計數器,所以第二個客戶端將沒法聲明它。經過這種方式,咱們能夠避免對消息進行不須要的從新處理(即便在通常狀況下,您沒法得到一次處理)。
這是命令執行的結果:
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
複製代碼
Alice成功認領了該消息,如今能夠處理消息並確認消息,而且即便原始消費者沒有恢復,也能夠向前移動。
從上面的示例中能夠清楚地看出,做爲成功認領給定消息的反作用,XCLAIM
命令也會返回它。但這不是強制性的。JUSTID選項用於返回認領成功的消息。這個選項頗有用,若是要減小客戶端和服務器之間使用的帶寬,以及提升命令的性能,而且您對該消息不感興趣,由於稍後您的消費者的實現方式將從新掃描待處理的歷史記錄消息。
認領也能夠經過一個單獨的進程來實現:一個只檢查待處理消息列表,並將空閒消息分配給看似活躍的消費者。可使用Redis Stream的一個可觀察性功能得到活躍的消費者。這是下一節的主題。
您在XPENDING
命令的輸出中觀察到的計數器是每條消息的交付數量。這個計數器在兩種狀況下遞增:當經過XCLAIM
成功認領消息時,或者當使用XREADGROUP
調用來訪問未處理消息的歷史時。
當出現故障時,屢次傳遞消息是正常的,但最終它們一般會獲得處理。可是,處理給定的消息有時會出現問題,由於它會以觸發處理代碼中的錯誤的方式被破壞或製做(==感受不太OK==)。在這種狀況下,會發生的是消費者將連續失敗的處理此特定消息。由於咱們有交付嘗試的計數器,因此咱們可使用該計數器來檢測沒法處理消息的緣由。所以,一旦發送計數器達到您選擇的數字,將這些消息放入另外一個Stream並將發送通知給系統管理員可能更明智。這基本上是Redis流實現死掉的信息概念的方式。
缺少可觀察性的消息系統很難處理。不知道誰在消費消息,哪些消息正在等待,在給定Stream中有哪些活躍的消費者組使得一切都不透明。出於這個緣由,Redis Stream和消費者組有不一樣的方式來觀察正在發生的事情。咱們已經介紹了XPENDING
,它容許咱們檢查在給定時刻正在被處理的消息列表,以及它們的空閒時間和交付數量。
可是,咱們可能但願作更多的事情,XINFO
命令是一個可觀察性接口,能夠與子命令一塊兒使用,以獲取有關Stream或消費者組的信息。
此命令使用子命令以顯示有關Stream及其消費者組的狀態的不一樣信息。例如使用XINFO STREAM
報告有關Stream自己的信息。
> XINFO STREAM mystream
1) length
2) (integer) 13
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1524494395530-0
2) 1) "a"
2) "1"
3) "b"
4) "2"
11) last-entry
12) 1) 1526569544280-0
2) 1) "message"
2) "banana"
複製代碼
輸出顯示有關如何在Stream內部編碼的信息,還顯示Stream中的第一條和最後一條消息。另外一個可用信息是與該Stream相關聯的消費者組的數量。咱們能夠進一步挖掘有關消費者組的更多信息
> XINFO GROUPS mystream
1) 1) name
2) "mygroup"
3) consumers
4) (integer) 2
5) pending
6) (integer) 2
2) 1) name
2) "some-other-group"
3) consumers
4) (integer) 1
5) pending
6) (integer) 0
複製代碼
正如您在此輸出和上一個輸出中所看到的,XINFO
命令輸出一系列列-值項。由於它是一個可觀察性命令,因此它容許人類用戶當即瞭解報告的信息,並容許命令經過添加更多字段來報告更多信息,而不會破壞與舊客戶端的兼容性。其餘必須提升帶寬效率的命令,如XPENDING
,只報告沒有字段名稱的信息。
上面示例的輸出(使用GROUPS
子命令)應該能夠清楚地觀察字段名稱。咱們能夠經過檢查在該組中註冊的消費者來更詳細地檢查特定消費者組的狀態。
> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 1
5) idle
6) (integer) 9104628
2) 1) name
2) "Bob"
3) pending
4) (integer) 1
5) idle
6) (integer) 83841983
複製代碼
若是你不記得命令的語法,只須要調用命令自己的幫助:
> XINFO HELP
1) XINFO <subcommand> arg arg ... arg. Subcommands are:
2) CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.
3) GROUPS <key> -- Show the stream consumer groups.
4) STREAM <key> -- Show information about the stream.
5) HELP -- Print this help.
複製代碼
Redis Stream中的消費者組可能在某種程度上相似於Kafka(TM)基於分區的消費者組,但請注意Redis Stream實際上很是不一樣。分區只是邏輯分區,消息只是放在一個Redis鍵中,所以不一樣客戶端的服務方式取決於誰能夠處理新消息,而不是從哪一個分區客戶端讀取。例如,若是消費者C3在某個時刻永久失效了,Redis將繼續服務C1和C2,全部新消息會就像如今只有兩個邏輯分區同樣到達。
相似地,若是給定的某個消費者在處理消息方面比其餘消費者快得多,則該消費者將相應地在相同的時間單位中接收更多消息。這是可能的,由於Redis明確跟蹤全部未確認的消息,並記住誰收到了哪條消息以及從未傳遞給任何消費者的第一條消息的ID。
可是,這也意味着在Redis中,若是您確實要將有關同一Stream的消息分區爲多個Redis實例,則必須使用多個鍵和一些分片系統(如Redis Cluster或其餘特定於某些應用程序的分片系統)。單個Redis Stream不會自動分區到多個實例。
咱們能夠說下面的圖表是真的:
所以,基本上Kafka分區更相似於使用N個不一樣的Redis 鍵。Redis消費者組是一個從給定Stream負載均衡到N個不一樣消費者消息系統。
許多應用程序不但願永遠將數據收集到Stream中。有時在Stream中最多具備給定數量的項是有用的,有時一旦達到給定的大小,將數據從Redis移動到不在內存中且不是那麼快但適合儲存歷史消息的存儲介質是有用的。Redis Stream對此有一些支持。一個是XADD
命令的MAXLEN選項。這個選項使用起來很是簡單.
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
複製代碼
使用MAXLEN在達到指定長度時,將自動逐出舊條目,以便Stream有一個恆定的大小。目前沒有選項能夠告訴Stream只保留不超過給定數量的項目,由於爲了一致地運行,這樣的命令必須在很長一段時間內阻塞以驅逐項目。想象一下,例如,若是存在插入尖峯,而後是長暫停,以及另外一次插入,則他們都具備相同的最大時間。Stream將阻塞以驅逐暫停期間變得太舊的數據。所以,用戶須要進行一些規劃並瞭解所需的最大Stream長度。此外,雖然Stream的長度與使用的內存成比例,可是按時間修剪不太容易控制和預測:它取決於插入速率,這是一個常常隨時間變化的變量(當他沒有變化是,那麼只是按照大小進行調整是微不足道的).
然而,使用MAXLEN
進行修整是花銷很大的:Stream由宏節點表示爲基數樹,以便很是節省內存。改變由幾十個元素組成的單個宏節點不是最佳的。所以可使用如下特殊形式提供命令:
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
複製代碼
在MAXLEN選項個實際技術之間的~
參數意味着:我並不真的須要這剛好1000個項目,它能夠是1000或1010或1030,只需確保至少保存1000個項目。使用此參數,僅在咱們能夠刪除整個節點時執行修剪。這使它更有效率,一般是你想要的。
還有可用的XTRIM
命令,它執行與上面的MAXLEN
選項很是類似的操做,可是此命令不須要添加任何內容,能夠以獨立方式對任何Stream運行。
> XTRIM mystream MAXLEN 10
複製代碼
或者,使用XADD
:
> XTRIM mystream MAXLEN ~ 10
複製代碼
可是,即便目前只實現了MAXLEN
,XTRIM
被設計爲能夠接受不一樣的修剪策略。鑑於這是一個明確的命令,未來有可能容許指定時間修剪,由於用戶以獨立的方式調用此命令時應該知道她或他在作什麼。
XTRIM
應該具有的一個有用的驅逐策略多是經過一個ID範圍刪除的能力。目前這是不可能的,但未來可能會實施,以便更輕鬆地將XRANGE
和XTRIM
一塊兒用於將數據從Redis移動到其餘存儲系統(若是須要)。
您可能已經注意到Redis API中可使用多個特殊ID。這是一個簡短的回顧,以便他未來能更加有意義.
前兩個特殊ID是-
和+
,在XRANGE
命令的範圍查詢中使用。這兩個ID分別表示可能的最小ID(基本上是0-1)和可能的最大ID(即18446744073709551615-18446744073709551615)。正如你所看到的那樣,-
和+
寫起來更清晰,而不是那些數字。
而後是咱們想要說的API,即Stream中具備最大ID的項的ID。這就是`,僅使用該羣組向消費者提供新的內容。
正如您所看到的$
並不意味着+
,它們是兩個不一樣的東西,+
是在每一個可能的Stream中可能的最大的ID ,而$
是在給定Stream中已經包含的最大ID。另外的API一般只認識+
或$
,由於它頗有用,能夠避免以多個含義加載一個給定的符號。
另外一個特殊ID是>
,僅在消費者組的上下文中且僅使用XREADGROUP
命令時才具備特殊含義。這種特殊ID意味着咱們只想要到目前爲止從未提供給其餘消費者的條目。因此基本上>
是消費者組的最後交付ID。
最後是特殊ID*
,只能與XADD
命令一塊兒使用,意味着爲咱們要建立的新條目自動選擇ID。
所以,咱們有-
,+
,$
,>
和*
,他們擁有不一樣的含義,大多數時候,只能在不一樣的環境中使用。
與其餘Redis數據結構同樣,Stream被異步複製到從屬並持久存儲到AOF和RDB文件中。然而,可能不那麼明顯的是,消費者組的完整狀態也傳播到AOF,RDB和從屬中,所以若是主服務器中的消息未處理,則從服務器也將具備相同的信息。一樣,重啓後,AOF將恢復消費者者組的狀態。
可是請注意,Redis Stream和消費者組使用Redis默認複製進行持久化和複製,所以:
XADD
命令形成的消費者組狀態更改:在故障轉移以後,可能會丟失某些內容,具體取決於從服務器從主服務器接收數據的能力。WAIT
命令能夠強行讓這些變化傳播至一系列叢書服務器上。但請注意,雖然這使得數據不太可能丟失,但由Sentinel或Redis Cluster操做的Redis故障轉移過程僅執行盡力檢查以轉移到最新的從站,而且在某些特定故障下可能會使從站丟失一些數據。所以,在使用Redis Stream和消費者者組設計應用程序時,請確保瞭解應用程序在故障期間應具備的語義屬性,並相應地配置,評估它是否足夠安全用於您的案例。
Streams還有一個特殊命令,能夠經過ID從流中間刪除項目。一般,對於僅附加數據結構,這可能看起來像一個奇怪的特徵,但它實際上對涉及例如隱私法規的應用程序有用。該命令名爲XDE
L,只須要獲取Stream的名稱,以及要刪除的ID:
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
2) 1) "value"
2) "3"
複製代碼
可是在當前實現中,在宏節點徹底爲空以前,內存不會被回收,所以您不該濫用此功能。
流和其餘Redis數據結構的一個區別在於,當其餘數據結構再也不具備元素時,刪除元素的命令也會將鍵自己刪除。例如,當對ZREM
的調用將刪除有序集合中的最後一個元素時,將徹底刪除有序集合。Stream容許保留零元素,當使用MAXLEN
選項且數量爲爲零(XADD
和XTRIM
命令),或者由於調用了XDEL
.
存在這種不對稱的緣由是由於Streams可能具備關聯的消費者組,而且咱們不但願由於Stream中沒有元素就丟失消費者組定義的狀態.目前,即便沒有關聯的消費者組,也不會刪除該Stream,但這可能在未來發生變化。
沒有BLOCK選項的非阻塞Stream命令(如XRANGE
和XREAD
或XREADGROUP
)與任何其餘Redis命令同樣是同步提供服務,所以討論此類命令的延遲是沒有意義的:更有趣的是檢查Redis文檔中命令的時間複雜度。能夠說,在提取範圍時,Stream的XADD
命令很是快,而且若是使用流水線操做,則能夠在普通機器中輕鬆地每秒插入50萬到100萬個項目。
然而,若是咱們想要理解處理消息的延遲,在阻塞消費者組中的消費者的上下文中,從經過XADD
生成消息的那一刻起,到消費者得到消息的那一刻,延遲就變成了一個有趣的參數。由於XREADGROUP
返回這些信息。
在提供執行測試的結果以前,有必要了解Redis使用什麼模型來路由Stream消息(其實是如何管理等待數據的任何阻塞操做)。
阻塞的客戶端在哈希表中被引用,該哈希表將至少有一個阻塞消費者的鍵映射到等待這個鍵的消費者列表。這樣,給定一個接收數據的key,咱們就能夠解析全部等待這些數據的客戶端。
當發生寫入時,在這種狀況下,當調用XADD命
令時,它會調用signalKeyAsReady()
函數。這個函數會將鍵放入須要處理的鍵列表中,由於這些鍵可能會爲阻止的消費者提供新數據。請注意,稍後將處理此類就緒鍵,所以在相同的事件循環週期中,鍵可能會接收其餘寫入。
最後,在事件循環結束以前,處理就緒鍵。對於每一個鍵,運行等待數據的客戶端列表,若是適用,這些客戶端將接收到達的新數據。在Stream中,數據是消費者請求的適用範圍內的消息。
正如您所看到的,基本上,在返回事件循環以前,全部調用XADD
的客戶端阻塞地等待消費消息,所以XADD
的調用者應該同時收到Redis的回覆,消費者將收到新的消息。
此模型基於推送,將數據添加到使用者緩衝區將直接經過調用XADD
的操做執行,所以延遲每每是可預測的。
爲了檢查這種延遲特性,咱們使用多個Ruby程序實例進行測試,推送電腦時間做爲附加消息的做信息,Ruby程序讀取消費者組的消息並處理它們。消息處理步驟包括將當前計算機時間與消息時間戳進行比較,以便理解總延遲。
此類程序未通過優化,而且運行在小型兩核的Redis實例中,以便嘗試提供在非最佳條件下可能出現的延遲數字。消息以每秒10k的速率生成,同時有10個消費者消費並確認來自同一Redis Stream和消費者組的消息。
得到的結果:
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
複製代碼
99.9%的請求的延遲小於等於2毫秒,異常值仍然很是接近平均值。
在Stream中添加數百萬條未確認的消息不會改變基準測試的要點,大多數查詢仍然以很是短的延遲進行處理。
幾條評論:
XREADGROUP
的count參數設置爲10000.這增長了不少延遲,可是爲了讓慢速消費者可以與消息流保持一致,這是必需的。因此你能夠期待一個更小的真實世界延遲。完。
以上皆爲我的所思所得,若有錯誤歡迎評論區指正。
歡迎轉載,煩請署名並保留原文連接。
聯繫郵箱:huyanshi2580@gmail.com
更多學習筆記見我的博客------>呼延十