redis的發佈與訂閱機制

Redis 發佈/訂閱機制原理分析

Redis 經過 PUBLISH 、 SUBSCRIBE 和 PSUBSCRIBE 等命令實現發佈和訂閱功能。html

  這些命令被普遍用於構建即時通訊應用,好比網絡聊天室(chatroom)和實時廣播、實時提醒等。redis

  本文經過分析 Redis 源碼裏的 pubsub.c 文件,瞭解發佈和訂閱機制的底層實現,籍此加深對 Redis 的理解。緩存

  訂閱、發佈和退訂服務器

  在開始研究源碼以前,不妨先來回顧一下幾個相關命令的使用方式。網絡

  PUBLISH 命令用於向給定的頻道發送信息,返回值爲接收到信息的訂閱者數量:異步

redis> PUBLISH treehole "top secret here ..." 
  (integer) 0 
  redis> PUBLISH chatroom "hi?" 
  (integer) 1

  SUBSCRIBE 命令訂閱給定的一個或多個頻道:函數

redis> SUBSCRIBE chatroom 
  Reading messages... (press Ctrl-C to quit) 
  1) "subscribe" # 訂閱反饋 
  2) "chatroom" # 訂閱的頻道 
  3) (integer) 1 # 目前客戶端已訂閱頻道/模式的數量 
  1) "message" # 信息 
  2) "chatroom" # 發送信息的頻道 
  3) "hi?" # 信息內容

   SUBSCRIBE 的返回值當中, 1) 爲 subscribe 的是訂閱的反饋信息,而 1) 爲 message 的則是訂閱的頻道所發送的信息。post

  SUBSCRIBE 還能夠訂閱多個頻道,這樣一來它接收到的信息就可能來自多個頻道:ui

redis> SUBSCRIBE chatroom talk-to-jack 
  Reading messages... (press Ctrl-C to quit) 
  1) "subscribe" # 訂閱 chatroom 的反饋 
  2) "chatroom" 
  3) (integer) 1 
  1) "subscribe" # 訂閱 talk-to-jack 的反饋 
  2) "talk-to-jack" 
  3) (integer) 2 
  1) "message" # 來自 chatroom 的消息 
  2) "chatroom" 
  3) "yahoo" 
  1) "message" # 來自 talk-to-peter 的消息 
  2) "talk-to-jack" 
  3) "Goodmorning, peter."

  PSUBSCRIBE 提供了一種訂閱符合給定模式的全部頻道的方法,好比說,使用 it.* 爲輸入,就能夠訂閱全部以 it. 開頭的頻道,好比 it.news 、 it.blog 、 it.tweets ,諸如此類:url

redis> PSUBSCRIBE it.* 
  Reading messages... (press Ctrl-C to quit) 
  1) "psubscribe" 
  2) "it.*" 
  3) (integer) 1 
  1) "pmessage" 
  2) "it.*" # 匹配的模式 
  3) "it.news" # 消息的來源頻道 
  4) "Redis 2.6rc5 release" # 消息內容 
  1) "pmessage" 
  2) "it.*" 
  3) "it.blog" 
  4) "Why NoSQL matters" 
  1) "pmessage" 
  2) "it.*" 
  3) "it.tweet" 
  4) "@redis: when will the 2.6 stable release?"

  固然, PSUBSCRIBE 也能夠接受多個參數,從而匹配多種模式。

  最後, UNSUBSCRIBE 命令和 PUNSUBSCRIBE 負責退訂給定的頻道或模式。

  發佈和訂閱機制

  當一個客戶端經過 PUBLISH 命令向訂閱者發送信息的時候,咱們稱這個客戶端爲發佈者(publisher)。

  而當一個客戶端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的時候,咱們稱這個客戶端爲訂閱者(subscriber)。

  爲了解耦發佈者(publisher)和訂閱者(subscriber)之間的關係,Redis 使用了 channel (頻道)做爲二者的中介 —— 發佈者將信息直接發佈給 channel ,而 channel 負責將信息發送給適當的訂閱者,發佈者和訂閱者之間沒有相互關係,也不知道對方的存在:

  

 

  知道了發佈和訂閱的機制以後,接下來就能夠開始研究具體的實現了,咱們從 Redis 的訂閱命令開始提及。

  SUBSCRIBE 命令的實現

  前面說到,Redis 將全部接受和發送信息的任務交給 channel 來進行,而全部 channel 的信息就儲存在 redisServer 這個結構中:

 

struct redisServer { 
  // 省略 ... 
  dict *pubsub_channels; // Map channels to list of subscribed clients 
  // 省略 ... 
  };

 

  pubsub_channels 是一個字典,字典的鍵就是一個個 channel ,而字典的值則是一個鏈表,鏈表中保存了全部訂閱這個 channel 的客戶端。

  舉個例子,若是在一個 redisServer 實例中,有一個叫作 news 的頻道,這個頻道同時被client_123 和 client_456 兩個客戶端訂閱,那麼這個 redisServer 結構看起來應該是這樣子:

  

  能夠看出,實現 SUBSCRIBE 命令的關鍵,就是將客戶端添加到給定 channel 的訂閱鏈表中。

  函數 pubsubSubscribeChannel 是 SUBSCRIBE 命令的底層實現,它完成了將客戶端添加到訂閱鏈表中的工做:

 

// 訂閱指定頻道 
  // 訂閱成功返回 1 ,若是已經訂閱過,返回 0 
  int pubsubSubscribeChannel(redisClient *c, robj *channel) { 
  struct dictEntry *de; 
  list *clients = NULL; 
  int retval = 0; 
  /* Add the channel to the client -> channels hash table */ 
  // dictAdd 在添加新元素成功時返回 DICT_OK 
  // 所以這個判斷句表示,若是新訂閱 channel 成功,那麼 。。。 
  if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { 
  retval = 1; 
  incrRefCount(channel); 
  /* Add the client to the channel -> list of clients hash table */ 
  // 將 client 添加到訂閱給定 channel 的鏈表中 
  // 這個鏈表是一個哈希表的值,哈希表的鍵是給定 channel 
  // 這個哈希表保存在 server.pubsub_channels 裏 
  de = dictFind(server.pubsub_channels,channel); 
  if (de == NULL) { 
  // 若是 de 等於 NULL 
  // 表示這個客戶端是首個訂閱這個 channel 的客戶端 
  // 那麼建立一個新的列表, 並將它加入到哈希表中 
  clients = listCreate(); 
  dictAdd(server.pubsub_channels,channel,clients); 
  incrRefCount(channel); 
  } else { 
  // 若是 de 不爲空,就取出這個 clients 鏈表 
  clients = dictGetVal(de); 
  } 
  // 將客戶端加入到鏈表中 
  listAddNodeTail(clients,c); 
  } 
  /* Notify the client */ 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.subscribebulk); 
  // 返回訂閱的頻道 
  addReplyBulk(c,channel); 
  // 返回客戶端當前已訂閱的頻道和模式數量的總和 
  addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
  return retval; 
  }

 

  PSUBSCRIBE 命令的實現

  除了直接訂閱給定 channel 外,還可使用 PSUBSCRIBE 訂閱一個模式(pattern),訂閱一個模式等同於訂閱全部匹配這個模式的 channel 。

  和 redisServer.pubsub_channels 屬性相似, redisServer.pubsub_patterns 屬性用於保存全部被訂閱的模式,和 pubsub_channels 不一樣的是, pubsub_patterns 是一個鏈表(而不是字典):

 

struct redisServer { 
  // 省略 ... 
  list *pubsub_patterns; // A list of pubsub_patterns 
  // 省略 ... 
  };

  pubsub_patterns 的每個節點都是一個 pubsubPattern 結構的實例,它保存了被訂閱的模式,以及訂閱這個模式的客戶客戶端:

typedef struct pubsubPattern { 
  redisClient *client; 
  robj *pattern; 
  } pubsubPattern;

  舉個例子,假設在一個 redisServer 實例中,有一個叫作 news.* 的模式同時被客戶端client_789 和 client_999 訂閱,那麼這個 redisServer 結構看起來應該是這樣子:

  

  如今能夠知道,實現 PSUBSCRIBE 命令的關鍵,就是將客戶端和訂閱的模式添加到redisServer.pubsub_patterns 當中。

  pubsubSubscribePattern 是 PSUBSCRIBE 的底層實現,它將客戶端和所訂閱的模式添加到redisServer.pubsub_patterns 當中:

// 訂閱指定模式 
  // 訂閱成功返回 1 ,若是已經訂閱過,返回 0 
  int pubsubSubscribePattern(redisClient *c, robj *pattern) { 
  int retval = 0; 
  // 向 c->pubsub_patterns 中查找指定 pattern 
  // 若是返回值爲 NULL ,說明這個 pattern 還沒被這個客戶端訂閱過 
  if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { 
  retval = 1; 
  // 添加 pattern 到客戶端 pubsub_patterns 
  listAddNodeTail(c->pubsub_patterns,pattern); 
  incrRefCount(pattern); 
  // 將 pattern 添加到服務器 
  pubsubPattern *pat; 
  pat = zmalloc(sizeof(*pat)); 
  pat->pattern = getDecodedObject(pattern); 
  pat->client = c; 
  listAddNodeTail(server.pubsub_patterns,pat); 
  } 
  /* Notify the client */ 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.psubscribebulk); 
  // 返回被訂閱的模式 
  addReplyBulk(c,pattern); 
  // 返回客戶端當前已訂閱的頻道和模式數量的總和 
  addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
  return retval; 
  }

   PUBLISH 命令的實現

  使用 PUBLISH 命令向訂閱者發送消息,須要執行如下兩個步驟:

  1) 使用給定的頻道做爲鍵,在 redisServer.pubsub_channels 字典中查找記錄了訂閱這個頻道的全部客戶端的鏈表,遍歷這個鏈表,將消息發佈給全部訂閱者。

  2) 遍歷 redisServer.pubsub_patterns 鏈表,將鏈表中的模式和給定的頻道進行匹配,若是匹配成功,那麼將消息發佈到相應模式的客戶端當中。

  舉個例子,假設有兩個客戶端分別訂閱 it.news 頻道和 it.* 模式,當執行命令PUBLISH it.news "hello moto" 的時候, it.news 頻道的訂閱者會在步驟 1 收到信息,而當PUBLISH 進行到步驟 2 的時候, it.* 模式的訂閱者也會收到信息。

  PUBLISH 命令的實際實現由 pubsubPublishMessage 函數完成,它的完整定義以下:

// 發送消息 
  int pubsubPublishMessage(robj *channel, robj *message) { 
  int receivers = 0; 
  struct dictEntry *de; 
  listNode *ln; 
  listIter li; 
  /* Send to clients listening for that channel */ 
  // 向全部頻道的訂閱者發送消息 
  de = dictFind(server.pubsub_channels,channel); 
  if (de) { 
  list *list = dictGetVal(de); // 取出全部訂閱者 
  listNode *ln; 
  listIter li; 
  // 遍歷全部訂閱者, 向它們發送消息 
  listRewind(list,&li); 
  while ((ln = listNext(&li)) != NULL) { 
  redisClient *c = ln->value; 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.messagebulk); 
  addReplyBulk(c,channel); // 打印頻道名 
  addReplyBulk(c,message); // 打印消息 
  receivers++; // 更新接收者數量 
  } 
  } 
  /* Send to clients listening to matching channels */ 
  // 向全部被匹配模式的訂閱者發送消息 
  if (listLength(server.pubsub_patterns)) { 
  listRewind(server.pubsub_patterns,&li); // 取出全部模式 
  channel = getDecodedObject(channel); 
  while ((ln = listNext(&li)) != NULL) { 
  pubsubPattern *pat = ln->value; // 取出模式 
  // 若是模式和 channel 匹配的話 
  // 向這個模式的訂閱者發送消息 
  if (stringmatchlen((char*)pat->pattern->ptr, 
  sdslen(pat->pattern->ptr), 
  (char*)channel->ptr, 
  sdslen(channel->ptr),0)) { 
  addReply(pat->client,shared.mbulkhdr[4]); 
  addReply(pat->client,shared.pmessagebulk); 
  addReplyBulk(pat->client,pat->pattern); // 打印被匹配的模式 
  addReplyBulk(pat->client,channel); // 打印頻道名 
  addReplyBulk(pat->client,message); // 打印消息 
  receivers++; // 更新接收者數量 
  } 
  } 
  decrRefCount(channel); // 釋放用過的 channel 
  } 
  return receivers; // 返回接收者數量 
  }

   UNSUBSCRIBE 和 PUNSUBSCRIBE 的實現

  UNSUBSCRIBE 和 PUNSUBSCRIBE 分別是 SUBSCRIBE 和 PSUBSCRIBE 的反操做,若是明白了SUBSCRIBE 和 PSUBSCRIBE 的工做機制的話,應該不難理解這兩個反操做的原理,因此這裏就省略詳細的分析了,有興趣的能夠直接看代碼。

redis訂閱與發佈的使用場景

明確了Redis發佈訂閱的原理和基本流程後,咱們來看一下Redis的發佈訂閱到底具體能作什麼。

一、異步消息通知

好比渠道在調支付平臺的時候,咱們能夠用回調的方式給支付平臺一個咱們的回調接口來通知咱們支付狀態,還能夠利用Redis的發佈訂閱來實現。好比咱們發起支付的同時訂閱頻道`pay_notice_` + `wk` (假如咱們的渠道標識是wk,不能讓其餘渠道也訂閱這個頻道),當支付平臺處理完成後,支付平臺往該頻道發佈消息,告訴頻道的訂閱者該訂單的支付信息及狀態。收到消息後,根據消息內容更新訂單信息及後續操做。 

當不少人都調用支付平臺時,支付時都去訂閱同一個頻道會有問題。好比用戶A支付完訂閱頻道`pay_notice_wk`,在支付平臺未處理完時,用戶B支付完也訂閱了`pay_notice_wk`,當A收到通知後,接着B的支付通知也發佈了,這時渠道收不到第二次消息發佈。由於同一個頻道收到消息後,訂閱自動取消,也就是訂閱是一次性的。

因此咱們訂閱的訂單支付狀態的頻道就得惟一,一個訂單一個頻道,咱們能夠在頻道上加上訂單號`pay_notice_wk`+orderNo保證頻道惟一。這樣咱們能夠把頻道號在支付時當作參數一併傳過去,支付平臺處理完就能夠用此頻道發佈消息給咱們了。(實際大多接口用回調通知,由於用Redis發佈訂閱限制條件苛刻,系統間必須共用一套Redis)

二、任務通知

好比經過跑批系統通知應用系統作一些事(跑批系統沒法拿到用戶數據,且應用系統又不能作定時任務的狀況下)。

如天天凌晨3點提早加載一些用戶的用戶數據到Redis,應用系統不能作定時任務,能夠經過系統公共的Redis來由跑批系統發佈任務給應用系統,應用系統收到指令,去作相應的操做。

這裏須要注意的是在線上集羣部署的狀況下,全部服務實例都會收到通知,都要作一樣的操做嗎?徹底不必。能夠用Redis實現鎖機制,其中一臺實例拿到鎖後執行任務。另外若是任務比較耗時,能夠不用鎖,能夠考慮一下任務分片執行。固然這不在本文的討論範疇,這裏不在贅述。

 

三、參數刷新加載

 

衆所周知,咱們用Redis無非就是將系統中不怎麼變的、查詢又比較頻繁的數據緩存起來,例如咱們系統首頁的輪播圖啊,頁面的動態連接啊,一些系統參數啊,公共數據啊都加載到Redis,而後有個後臺管理系統去配置修改這些數據。

 

打個比方咱們首頁的輪播圖要再增長一個圖,那咱們就在後管系統加上,加上就完事了嗎?固然沒有,由於Redis裏仍是老數據。那你會說不是有過時時間嗎?是的,但有的過時時間設置的較長如24小時而且咱們想當即生效怎麼辦?這時候咱們就能夠利用Redis的發佈訂閱機制來實現數據的實時刷新。當咱們修改完數據後,點擊刷新按鈕,經過發佈訂閱機制,訂閱者接收到消息後調用從新加載的方法便可。

 

ZHUAN:https://my.oschina.net/u/779531/blog/904622

相關文章
相關標籤/搜索