Redis中的Stream數據類型做爲消息隊列的嘗試

Redis的List數據類型做爲消息隊列,已經比較合適了,但存在一些不足,好比只能獨立消費,訂閱發佈又沒法支持數據的持久化,相對前二者,Redis Stream做爲消息隊列的使用更爲有優點。
 
相信球迷小夥伴們對文字直播這個東西都不陌生,時常在想,這個功能是怎麼實現的?
具體說就是用什麼技術實現最爲合適?如何面對數以百萬計的讀壓力?廣告消息是如何插播進來的?最後的歷史消息如何歸檔,如何持久化存儲?
文字直播其實就是解說員做爲生產者,生產消息(文字信息),各類客戶端做爲消費者,消費信息(刷新文字內容)。
典型的消息隊列實現,能夠用隊列或者相似隊列的功能實現,這裏只是簡單想象一下,結合redis中的stream數據類型,來學習stream做爲消息隊列的功能實現。
 
 
1,生成者:生產者隊列的建立,與消息的增刪改
1.1 建立並寫入消息
  語法: xadd queue_name Id filed value(filed value)
     1,每一組消息須要一個惟一的Id,*號表示服務器自動生成ID,後面順序跟着一組或者多組消息(filed value
     2,消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當前的消息在毫米時間戳1527846880572時產生,而且是該毫秒內產生的第5條消息。
           消息ID能夠由服務器自動生成,也能夠由客戶端本身指定,可是形式必須是整數-整數,並且必須是後面加入的消息的ID要大於前面的消息ID。
     3,消息元素的的結構爲key-value,必須成對出現,若是key或者value元素中有空格,必須用"abc  def"或者'abc  def'括起來
 
1.2 生產者寫入消息
  語法:xadd  queue_name *|Id filed value    
 
1.3 xlen 當前stream的長度:xlen stream_name
   xlen "NBA_Match_001" ,也就是上面寫入的10條消息
      
1.4 限制某一個stream的最大長度,maxlen 
  依據先進先出的原則,自動刪除超出最長長度的消息
   xadd "NBA_Match_001" maxlen 50000 * "2019-07-13 08:26:39" "反擊哈騰,一條龍上籃得分"
  
1.5 查詢消息(查詢是生產者查詢本身生產的消息,跟消費者的消費是兩碼事)

正向查詢
xrange "NBA_Match_001"              # 查詢全部消息
xrange "NBA_Match_001" - +              # -表示最小值, +表示最大值
xrange "NBA_Match_001" 1562980142175-0 +    # 指定最小消息ID的列表
xrange "NBA_Match_001"- 1562980142175-0      # 指定最大消息ID的列表
反向查詢
xrevrange "NBA_Match_001"
xrevrange "NBA_Match_001" + -
xrevrange "NBA_Match_001" + 1562980142175-0
xrevrange "NBA_Match_001" 1562980142175-0 -redis

1.6 刪除消息
  xdel stream_name id,刪除消息並非真正的物理刪除,隊列的長度不變,指示標記當前消息被刪除服務器

1.7 查看stream屬性xinfo stream stream_name 
1.8 del stream_name
刪除 stream :del NBA_Match_001
刪除本質上本Redis中的其餘數據類型一致,stream自己就是一個key值,del key值就刪除了整個消息的所有信息。

 

 
 
2 xread:獨立消費
相似於List,生產者往list中寫數據,消費者從list中讀數據,只能有一個消費者
 
2. 1,從頭部讀取消息,從某個streams中讀取n條消息,0-0只從頭開始,或者指定從streams的Id開始
  xread count 1 streams "NBA_Match_001" 0-0
  xread count 1 streams "NBA_Match_001" 1562980142175-0
  
2.2,從尾部讀取最新的一條消息
xread count 1 streams "NBA_Match_001" $
此時默認不返回任何消息
xread  block 0 count 1 streams "NBA_Match_001" $
以阻塞的方式讀取尾部最新的一條消息,直到新的消息的到來
  
 
3 多消費者xgroup :消費組,每一個組中的消費者獨立消費stream中的消息
典型的好比文字直播的安卓App客戶端,蘋果App客戶端,網頁客戶端等等。多個終端,均可以獨立地消費隊列裏面的

3.1 建立消費組學習

對消息隊列"NBA_Match_001"建立了兩個消費組,一個是cg1,一個是cg2,好比網頁客戶端與App客戶端 spa

1,xgroup create "NBA_Match_001" cg1 0-0  #  表示從頭開始消費
建立消費組cg1,消費組必須綁定一個steam(NBA_Match_001),從頭0-0 )開始消費"NBA_Match_001"中的消息
2, xgroup create "NBA_Match_001" cg2 0-0  #  表示從頭開始消費
3,2 從消費組中建立消費者
xreadgroup指令能夠進行消費組的組內消費
xreadgroup GROUP cg1 c1 count 1 streams "NBA_Match_001" >
>號表示從當前消費組的last_delivered_id後面開始讀 , 每當消費者讀取一條消息,last_delivered_id變量就會前進 
當一個組的消費則消費徹底部消息以後,就沒有新的消息了
 

每一個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每一個消費組都消費到。
同一個消費組(Consumer Group)能夠掛接多個消費者(Consumer),這些消費者之間是競爭關係,任意一個消費者讀取了消息都會使遊標last_delivered_id往前移動。
每一個消費者者有一個組內惟一名稱。code

 

關於消費組,可能不太好理解,舉個例子就比較清楚
假設有2個消費組cg1,cg2,對於cg1,其組內共有3個消費者c1,、c二、c3。一個消息隊列中共有5條消息a,b,c,d,e,那麼一種可能的消費方式以下
a -> c1
b -> c2
c -> c3
d -> c1
e -> c2
也就是說3個消費者,對於消息的消費是互斥的,消費的消息是沒有交集的
而對於cg2,一樣能夠消費a,b,c,d,e這5條消息,不依賴於cg1消費組以及消費狀況,同理,具體怎麼消費,取決於其組內的消費者數量
就比如體育直播的客戶端,正常狀況下,網頁客戶端能夠收到全部的直播消息,手機App客戶端也能夠收到全部的直播消息同樣,不一樣消費組間對消息的消費互不干擾。htm

 

 

4 多個生產者和多個消費者blog

  這種狀況相似以上,不用的是增長了多個消費者,在上面的基礎上作了擴展。
  其實不難想象,文字直播插播的廣告消息,多是相似以下結構,是另一個獨立的生產者,與文字直播員同樣生成寫入消息到隊列,而後客戶端看到的就是夾雜了廣告的直播。隊列

 

 

目前就我的認識而言,stream數據類型實現消息隊列並不完美,最大的問題就是單點壓力問題:這裏是說單點壓力,而不是單點故障,stream類型數據,其實從邏輯上看,是一個key值(stream_name),跟着一系列value(消息),這些消息只能存儲在一個Redis實例中,如何緩解多個消費者對單個Key值中的消息消費壓力?說來講去,不就是想說kafka的partition麼……get

 

 

參考:kafka

http://database.51cto.com/art/201812/588189.htm

https://www.zhihu.com/question/279540635

相關文章
相關標籤/搜索