Redis 5.0-Stream 操做詳解

簡介

​     Stream是Redis 5.0引入的一種新數據類型,容許消費者等待生產者發送的新數據,還引入了消費者組概念,組之間數據是相同的(前提是設置的偏移量同樣),組內的消費者不會拿到相同數據。這種概念和kafka很雷同。redis

    在某些特定場景可使用redis的stream代替kafka等消息隊列,減小系統複雜性,加強系統的穩定性數組

特色

​ 1.若是使用xrange和xrevrange命令,則Stream和list功能類同oop

​ 2.若是使用xread命令,則有其很是獨特的地方spa

​ 2.1與redis的pub/sub不一樣,pub/sub多個客戶端是收到相同的數據,而stream的多個客戶端是競爭關係,每一個客戶端收到的數據是不相同的命令行

​ 2.2pub/sub中一旦觸發數據獲取,不會記錄下上一次拿的位置,意味着客戶端沒法重複去拿之前的數據,而blpop方式一旦pop,數據就會永久的刪除,也沒法重複去拿之前的數據。而Stream會永久的存放數據,而且客戶端會保留上一次拿的id,甚至經過修改id能夠拿回之前的數據。和kafka的機制相似。排序

​ 2.3.Stream提供了消費者組(kafka也有),不一樣組接收到的數據徹底同樣(前提是條件同樣),可是組內的消費者則是競爭關係(仍是和kafka同樣)。隊列

​ 2.4.能夠設置爲阻塞與非阻塞模式hadoop

2.5.多客戶端時,遵循FIFO特性kafka

命令操做詳解

先對基本命令作一些熟練操做,後面再研究高級特性。消息隊列

注意:命令是不區分大小寫的,我的比較喜歡小寫的方式

添加

命令格式:XADD stream_name id key-value [key-value ...]

​ stream_name:給流指定一個名字

​ id:entry在流中的標識,entry能夠理解爲添加到流中數據的封裝。id通常來講都使用自增的序列,而不須要本身手動指定,有兩部分組成:<millisecondsTime>-<sequenceNumber>。在命令行中執行XADD命令後,會將自動生成的ID返回並輸出。若是兩個命令在時間上很是接近,那麼millisecondsTime相同,則sequenceNumber就自動增加。

​ 由於redis的stream是支持範圍查詢,因此ID組成部分使用了millisecondsTime。由於millisecondsTime就是不斷有序自增的

​ 若是要自定義id,則必定要保證全局惟一,避免出現意料不到問題

1.命令行單條執行

XADD mytopic * acctid 012 age 9

​ 輸出結果:

127.0.0.1:6379> XADD mytopic * acctid 012 age 9
1527837352024-0

2.使用文件批量執行

在文件中編寫命令:

XADD mytopic * acctid 123 age 10
XADD mytopic * acctid 234 age 11
XADD mytopic * acctid 345 age 12
XADD mytopic * acctid 456 age 13

執行命令:

cat 1.txt | redis-cli -a runoob

輸出結果:

[hadoop@hadoop00 /home/hadoop/proc/redis-5.0-rc1]$ cat 1.txt | redis-cli -a runoob
Warning: Using a password with '-a' option on the command line interface may not be safe.
1527837440631-0
1527837440632-0
1527837440632-1
1527837440632-2

查看隊列長度

命令:

xlen mytopic

輸出:

127.0.0.1:6379> xlen mytopic
(integer) 5
127.0.0.1:6379> 

獲取數據:xrange xrevrange

1.xrange

xrange mytopic - +

​ 符號"-":表示最小值

​ 符號"+":表示最大值

xrange mytopic 0 +

​ 命令xrange mytopic 0 +的效果和上面同樣,由於排序時字符0是字符1小的,而上面全部自動生成的millisecondsTime確定是大於0的

xrange mytopic 1527837440632 1527837440632

​ 自定義查詢範圍,指定特殊的值,上面的查詢結果爲同一個毫秒向Stream中添加的數據,包含以下三個entry

1527837440632-0
1527837440632-1
1527837440632-2
xrange mytopic 1527837440632 + count 2

​ 該命令的意思爲:查詢ID以1527837440632開始,以無限大爲結束的entry,但只取出查詢結果集(升序排列)中的前兩個entry,輸出結果包含以下兩個Id

1527837440632-0
1527837440632-1

 

2.xrevrange

xrevrange mytopic + 1527837440632 count 3

​ 該命令的意思爲:反向查詢ID以無限大爲開始,以1527837440632爲結束的entry,但只取出查詢結果集(降序排列)中的前三個entry,輸出結果包含以下三個id

1527839429360-0
1527837440632-2
1527837440632-1

獲取數據:xread

1.非阻塞

xread count 4 streams mytopic 0

​ 從stream 中拿ID比0大的4個Entry,按升序排列

count 4:count參數的值爲4

streams:該參數必須是xread命令的最後一個參數

xread count 10 streams mytopic mystream 0 0

一次訪問多個stream,可分別指定最大ID

2.阻塞

xread block 0 streams mystream $ 

監聽name爲mystream的stream,從stream中拿比ID比"$"(特殊ID:該stream中此刻最大ID)還大的Entry,其實只要你將"$"設置爲任何一個比當前ID還大的值,同樣能夠實現阻塞等待,若是比當前ID小,那麼立馬返回符合條件的entry

block 0:block表示命令要阻塞,0表示阻塞時間爲無限大,不超時,若是設置爲>0的整數,即爲阻塞超時時間

監聽生效後,拿到數據監聽就失效,與zk的watcher雷同。意思是該命令執行後,只能拿到一條ID比設置ID更大的entry,要想繼續拿,必須執行xread命令,官方推薦下一次拿entry使用上一次獲得的ID。注意千萬別亂設置很大的ID ,不然你可能永遠拿不到entry。

xread block 0 streams mystream mytopic $ $

收到任何一個stream的消息,本次監聽就失效,只能拿到一條數據,後面還須要拿數據,能夠將各自stream拿到的ID做爲最大ID,從新執行命令

消費者組-Consumer groups

redis5引入了消費者組的概念,一個stream的數據每個消費者組都發一份,消費者組裏面的消費者競爭同一份數據,亦即在同一個消費者組內,一個消息是不可能發給多個消費者的

消費者組提供了以下5點保障:

  • 組內消費者消費的消息不重複

  • 組內消費者名稱必須惟一

  • 消費者拿到的消息確定是沒有被組內其餘消費者消費過的消息

  • 消費者成功消費消息以後要求發送ACK,而後這條消息纔會從消費者組中移除,也就是說消息至少被消費一次,和kafka同樣

  • 消費者組會跟蹤全部待處理的消息

命令:

1.建立消費者組

xgroup create mytopic mygroup $

該命令的意思是:使用xgroup命令建立了一個mygroup消費者組,該消費者組與mytopic stream進行了關聯,之後mygroup消費者組中的消費者就會mytopic stream中拿數據

符號"$"和上面同樣,表明mytopic stream中目前最大的ID,消費者拿到的entry的id必定會大於此刻$表明的最大ID。你也能夠指定這個最大的ID,好比0

2.從消費者組讀數據

xreadgroup group mygroup consumer_a count 1 streams mytopic >

該命令的意思是:使用xreadgroup命令讓消費者consumer_a從mygroup消費者組的mytopic stream中拿最新的,而且沒有被髮送給其餘消費者處理的entry。

group:該參數是必選項

">":該符號只有在消費者組命令xreadgroup中有效,意思爲mytopic stream中最新且沒有被其餘消費者處理的ID,千萬記住不要與上面"$"最大ID搞混了,不然拿出來的數據與你的指望值不符,若是使用的是任何數組ID,那麼該消費者就沒法拿到任何新的消息,只是從它的已經處理過的消息中拿,而且不會有ACK機制

若是想一個消費者組關聯多個stream能夠這樣作:

xgroup create mystream mygroup $
xgroup create mytopic mygroup $
xreadgroup group mygroup consumer_a block 0 count 1 streams mytopic mystream > >

讀消息的參數多了一個block 0,就是說讀數據須要阻塞。

3.發送ACK

將指定ID對應的entry從consumer的已處理消息列表中刪除

XACK mystream mygroup 1527864992409-0
相關文章
相關標籤/搜索