Stream是Redis 5.0引入的一種新數據類型,容許消費者等待生產者發送的新數據,還引入了消費者組概念,組之間數據是相同的(前提是設置的偏移量同樣),組內的消費者不會拿到相同數據。這種概念和kafka很雷同。redis
在某些特定場景可使用redis的stream代替kafka等消息隊列,減小系統複雜性,加強系統的穩定性數組
1.若是使用xrange和xrevrange命令,則Stream和list功能類同oop
2.若是使用xread命令,則有其很是獨特的地方spa
2.1與redis的pub/sub不一樣,pub/sub多個客戶端是收到相同的數據,而stream的多個客戶端是競爭關係,每一個客戶端收到的數據是不相同的命令行
2.2pub/sub中一旦觸發數據獲取,不會記錄下上一次拿的位置,意味着客戶端沒法重複去拿之前的數據,而blpop方式一旦pop,數據就會永久的刪除,也沒法重複去拿之前的數據。而Stream會永久的存放數據,而且客戶端會保留上一次拿的id,甚至經過修改id能夠拿回之前的數據。和kafka的機制相似。排序
2.3.Stream提供了消費者組(kafka也有),不一樣組接收到的數據徹底同樣(前提是條件同樣),可是組內的消費者則是競爭關係(仍是和kafka同樣)。隊列
2.4.能夠設置爲阻塞與非阻塞模式hadoop
2.5.多客戶端時,遵循FIFO特性kafka
先對基本命令作一些熟練操做,後面再研究高級特性。消息隊列
注意:命令是不區分大小寫的,我的比較喜歡小寫的方式
命令格式:XADD stream_name id key-value [key-value ...]
stream_name:給流指定一個名字
id:entry在流中的標識,entry能夠理解爲添加到流中數據的封裝。id通常來講都使用自增的序列,而不須要本身手動指定,有兩部分組成:<millisecondsTime>-<sequenceNumber>。在命令行中執行XADD命令後,會將自動生成的ID返回並輸出。若是兩個命令在時間上很是接近,那麼millisecondsTime相同,則sequenceNumber就自動增加。
由於redis的stream是支持範圍查詢,因此ID組成部分使用了millisecondsTime。由於millisecondsTime就是不斷有序自增的
若是要自定義id,則必定要保證全局惟一,避免出現意料不到問題
1.命令行單條執行
XADD mytopic * acctid 012 age 9
輸出結果:
127.0.0.1:6379> XADD mytopic * acctid 012 age 9
1527837352024-0
2.使用文件批量執行
在文件中編寫命令:
XADD mytopic * acctid 123 age 10
XADD mytopic * acctid 234 age 11
XADD mytopic * acctid 345 age 12
XADD mytopic * acctid 456 age 13
執行命令:
cat 1.txt | redis-cli -a runoob
輸出結果:
[hadoop@hadoop00 /home/hadoop/proc/redis-5.0-rc1]$ cat 1.txt | redis-cli -a runoob
Warning: Using a password with '-a' option on the command line interface may not be safe.
1527837440631-0
1527837440632-0
1527837440632-1
1527837440632-2
命令:
xlen mytopic
輸出:
127.0.0.1:6379> xlen mytopic
(integer) 5
127.0.0.1:6379>
1.xrange
xrange mytopic - +
符號"-":表示最小值
符號"+":表示最大值
xrange mytopic 0 +
命令xrange mytopic 0 +的效果和上面同樣,由於排序時字符0是字符1小的,而上面全部自動生成的millisecondsTime確定是大於0的
xrange mytopic 1527837440632 1527837440632
自定義查詢範圍,指定特殊的值,上面的查詢結果爲同一個毫秒向Stream中添加的數據,包含以下三個entry
1527837440632-0
1527837440632-1
1527837440632-2
xrange mytopic 1527837440632 + count 2
該命令的意思爲:查詢ID以1527837440632開始,以無限大爲結束的entry,但只取出查詢結果集(升序排列)中的前兩個entry,輸出結果包含以下兩個Id
1527837440632-0
1527837440632-1
2.xrevrange
xrevrange mytopic + 1527837440632 count 3
該命令的意思爲:反向查詢ID以無限大爲開始,以1527837440632爲結束的entry,但只取出查詢結果集(降序排列)中的前三個entry,輸出結果包含以下三個id
1527839429360-0
1527837440632-2
1527837440632-1
1.非阻塞
xread count 4 streams mytopic 0
從stream 中拿ID比0大的4個Entry,按升序排列
count 4:count參數的值爲4
streams:該參數必須是xread命令的最後一個參數
xread count 10 streams mytopic mystream 0 0
一次訪問多個stream,可分別指定最大ID
2.阻塞
xread block 0 streams mystream $
監聽name爲mystream的stream,從stream中拿比ID比"$"(特殊ID:該stream中此刻最大ID)還大的Entry,其實只要你將"$"設置爲任何一個比當前ID還大的值,同樣能夠實現阻塞等待,若是比當前ID小,那麼立馬返回符合條件的entry
block 0:block表示命令要阻塞,0表示阻塞時間爲無限大,不超時,若是設置爲>0的整數,即爲阻塞超時時間
監聽生效後,拿到數據監聽就失效,與zk的watcher雷同。意思是該命令執行後,只能拿到一條ID比設置ID更大的entry,要想繼續拿,必須執行xread命令,官方推薦下一次拿entry使用上一次獲得的ID。注意千萬別亂設置很大的ID ,不然你可能永遠拿不到entry。
xread block 0 streams mystream mytopic $ $
收到任何一個stream的消息,本次監聽就失效,只能拿到一條數據,後面還須要拿數據,能夠將各自stream拿到的ID做爲最大ID,從新執行命令
redis5引入了消費者組的概念,一個stream的數據每個消費者組都發一份,消費者組裏面的消費者競爭同一份數據,亦即在同一個消費者組內,一個消息是不可能發給多個消費者的
消費者組提供了以下5點保障:
組內消費者消費的消息不重複
組內消費者名稱必須惟一
消費者拿到的消息確定是沒有被組內其餘消費者消費過的消息
消費者成功消費消息以後要求發送ACK,而後這條消息纔會從消費者組中移除,也就是說消息至少被消費一次,和kafka同樣
消費者組會跟蹤全部待處理的消息
命令:
1.建立消費者組
xgroup create mytopic mygroup $
該命令的意思是:使用xgroup命令建立了一個mygroup消費者組,該消費者組與mytopic stream進行了關聯,之後mygroup消費者組中的消費者就會mytopic stream中拿數據
符號"$"和上面同樣,表明mytopic stream中目前最大的ID,消費者拿到的entry的id必定會大於此刻$表明的最大ID。你也能夠指定這個最大的ID,好比0
2.從消費者組讀數據
xreadgroup group mygroup consumer_a count 1 streams mytopic >
該命令的意思是:使用xreadgroup命令讓消費者consumer_a從mygroup消費者組的mytopic stream中拿最新的,而且沒有被髮送給其餘消費者處理的entry。
group:該參數是必選項
">":該符號只有在消費者組命令xreadgroup中有效,意思爲mytopic stream中最新且沒有被其餘消費者處理的ID,千萬記住不要與上面"$"最大ID搞混了,不然拿出來的數據與你的指望值不符,若是使用的是任何數組ID,那麼該消費者就沒法拿到任何新的消息,只是從它的已經處理過的消息中拿,而且不會有ACK機制
若是想一個消費者組關聯多個stream能夠這樣作:
xgroup create mystream mygroup $
xgroup create mytopic mygroup $
xreadgroup group mygroup consumer_a block 0 count 1 streams mytopic mystream > >
讀消息的參數多了一個block 0,就是說讀數據須要阻塞。
3.發送ACK
將指定ID對應的entry從consumer的已處理消息列表中刪除
XACK mystream mygroup 1527864992409-0