Stream
是Redis 5.0
引入的一種新數據類型,容許消費者等待生產者發送的新數據,數組
每一個Stream都有惟一的名稱,它就是Redis的key
,在咱們首次使用xadd
指令追加消息時自動建立。bash
每一個Stream均可以掛多個消費組,每一個消費組會有個遊標last_delivered_id
在Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了。每一個消費組都有一個Stream內惟一的名稱,消費組不會自動建立,它須要單獨的指令xgroup create
進行建立,須要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id
變量。服務器
每一個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每一個消費組都消費到。學習
同一個消費組(Consumer Group)能夠掛接多個消費者(Consumer),這些消費者之間是競爭關係,任意一個消費者讀取了消息都會使遊標last_delivered_id
往前移動。每一個消費者者有一個組內惟一名稱。ui
消費者(Consumer)內部會有個狀態變量pending_ids
,它記錄了當前已經被客戶端讀取的消息,可是尚未ack
。若是客戶端沒有ack,這個變量裏面的消息ID會愈來愈多,一旦某個消息被ack
,它就開始減小。這個pending_ids
變量在Redis官方被稱之爲PEL
,也就是Pending Entries List
,它用來確保客戶端至少消費了消息一次spa
1,xadd
追加消息code
2,xdel
刪除消息cdn
3,xrange
獲取消息列表,會自動過濾已經刪除的消息blog
4,xlen
消息長度string
5,del
刪除Stream
# xadd key ID field string [field string ...] *號表示服務器自動生成ID
127.0.0.1:6379> xadd mystream * name zfh age 25
"1578845024424-0"
127.0.0.1:6379> xadd mystream * name tom age 10
"1578845052908-0"
127.0.0.1:6379> xadd mystream * name jerry age 9
"1578845085696-0"
127.0.0.1:6379>
複製代碼
# xlen key 消息長度
127.0.0.1:6379> xlen mystream
(integer) 3
# 消息列表 -表示最小值, +表示最大值
# xrange key start end [COUNT count]
127.0.0.1:6379> xrange mystream - +
1) 1) "1578845024424-0"
2) 1) "name"
2) "zfh"
3) "age"
4) "25"
2) 1) "1578845052908-0"
2) 1) "name"
2) "tom"
3) "age"
4) "10"
3) 1) "1578845085696-0"
2) 1) "name"
2) "jerry"
3) "age"
4) "9"
127.0.0.1:6379> xrange mystream 1578845052908-0 + # 指定最小消息ID的列表
1) 1) "1578845052908-0"
2) 1) "name"
2) "tom"
3) "age"
4) "10"
2) 1) "1578845085696-0"
2) 1) "name"
2) "jerry"
3) "age"
4) "9"
127.0.0.1:6379> xrange mystream - 1578845052908-0 # 指定最大消息ID的列表
1) 1) "1578845024424-0"
2) 1) "name"
2) "zfh"
3) "age"
4) "25"
2) 1) "1578845052908-0"
2) 1) "name"
2) "tom"
3) "age"
4) "10"
複製代碼
# xdel key ID [ID ...]
127.0.0.1:6379> xdel mystream 1578845052908-0
(integer) 1
127.0.0.1:6379> xlen mystream
(integer) 2
127.0.0.1:6379> xrange mystream - +
1) 1) "1578845024424-0"
2) 1) "name"
2) "zfh"
3) "age"
4) "25"
2) 1) "1578845085696-0"
2) 1) "name"
2) "jerry"
3) "age"
4) "9"
127.0.0.1:6379> del mystream # 刪除整個Stream
(integer) 1
127.0.0.1:6379> xrange mystream - +
(empty list or set)
複製代碼
消息ID的形式是timestampInMillis-sequence
,例如1578845052908-0
,它表示當前的消息在毫米時間戳1578845052908
時產生,而且是該毫秒內產生的第0
條消息。消息ID能夠由服務器自動生成,也能夠由客戶端本身指定,可是形式必須是整數-整數,並且必須是後面加入的消息的ID要大於前面的消息ID。
未完待續~下一篇學習消息的用法