Redis訂閱發佈

Redis 經過 PUBLISH 、 SUBSCRIBE 和 PSUBSCRIBE 等命令實現發佈和訂閱功能。html

  這些命令被普遍用於構建即時通訊應用,好比網絡聊天室(chatroom)和實時廣播、實時提醒等。git

  本文經過分析 Redis 源碼裏的 pubsub.c 文件,瞭解發佈和訂閱機制的底層實現,籍此加深對 Redis 的理解。github

  訂閱、發佈和退訂redis

  在開始研究源碼以前,不妨先來回顧一下幾個相關命令的使用方式。服務器

  PUBLISH 命令用於向給定的頻道發送信息,返回值爲接收到信息的訂閱者數量:網絡

  redis> PUBLISH treehole "top secret here ..." 
  (integer) 0 
  redis> PUBLISH chatroom "hi?" 
  (integer) 1

  SUBSCRIBE 命令訂閱給定的一個或多個頻道:函數

  redis> SUBSCRIBE chatroom 
  Reading messages... (press Ctrl-C to quit) 
  1) "subscribe" # 訂閱反饋 
  2) "chatroom" # 訂閱的頻道 
  3) (integer) 1 # 目前客戶端已訂閱頻道/模式的數量 
  1) "message" # 信息 
  2) "chatroom" # 發送信息的頻道 
  3) "hi?" # 信息內容

  SUBSCRIBE 的返回值當中, 1) 爲 subscribe 的是訂閱的反饋信息,而 1) 爲 message 的則是訂閱的頻道所發送的信息。ui

  SUBSCRIBE 還能夠訂閱多個頻道,這樣一來它接收到的信息就可能來自多個頻道:spa

  redis> SUBSCRIBE chatroom talk-to-jack 
  Reading messages... (press Ctrl-C to quit) 
  1) "subscribe" # 訂閱 chatroom 的反饋 
  2) "chatroom" 
  3) (integer) 1 
  1) "subscribe" # 訂閱 talk-to-jack 的反饋 
  2) "talk-to-jack" 
  3) (integer) 2 
  1) "message" # 來自 chatroom 的消息 
  2) "chatroom" 
  3) "yahoo" 
  1) "message" # 來自 talk-to-peter 的消息 
  2) "talk-to-jack" 
  3) "Goodmorning, peter."

  PSUBSCRIBE 提供了一種訂閱符合給定模式的全部頻道的方法,好比說,使用 it.* 爲輸入,就能夠訂閱全部以 it. 開頭的頻道,好比 it.news 、 it.blog 、 it.tweets ,諸如此類:.net

  redis> PSUBSCRIBE it.* 
  Reading messages... (press Ctrl-C to quit) 
  1) "psubscribe" 
  2) "it.*" 
  3) (integer) 1 
  1) "pmessage" 
  2) "it.*" # 匹配的模式 
  3) "it.news" # 消息的來源頻道 
  4) "Redis 2.6rc5 release" # 消息內容 
  1) "pmessage" 
  2) "it.*" 
  3) "it.blog" 
  4) "Why NoSQL matters" 
  1) "pmessage" 
  2) "it.*" 
  3) "it.tweet" 
  4) "@redis : when will the 2.6 stable release?"

  固然, PSUBSCRIBE 也能夠接受多個參數,從而匹配多種模式。

  最後, UNSUBSCRIBE 命令和 PUNSUBSCRIBE 負責退訂給定的頻道或模式。

  發佈和訂閱機制

  當一個客戶端經過 PUBLISH 命令向訂閱者發送信息的時候,咱們稱這個客戶端爲發佈者(publisher)。

  而當一個客戶端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的時候,咱們稱這個客戶端爲訂閱者(subscriber)。

  爲了解耦發佈者(publisher)和訂閱者(subscriber)之間的關係,Redis 使用了 channel (頻道)做爲二者的中介 —— 發佈者將信息直接發佈給 channel ,而 channel 負責將信息發送給適當的訂閱者,發佈者和訂閱者之間沒有相互關係,也不知道對方的存在:

  

點擊放大

  知道了發佈和訂閱的機制以後,接下來就能夠開始研究具體的實現了,咱們從 Redis 的訂閱命令開始提及。

  SUBSCRIBE 命令的實現

  前面說到,Redis 將全部接受和發送信息的任務交給 channel 來進行,而全部 channel 的信息就儲存在 redisServer 這個結構中:

  struct redisServer { 
  // 省略 ... 
  dict *pubsub_channels; // Map channels to list of subscribed clients 
  // 省略 ... 
  };

  pubsub_channels 是一個字典,字典的鍵就是一個個 channel ,而字典的值則是一個鏈表,鏈表中保存了全部訂閱這個 channel 的客戶端。

  舉個例子,若是在一個 redisServer 實例中,有一個叫作 news 的頻道,這個頻道同時被client_123 和 client_456 兩個客戶端訂閱,那麼這個 redisServer 結構看起來應該是這樣子:

  

點擊放大

  能夠看出,實現 SUBSCRIBE 命令的關鍵,就是將客戶端添加到給定 channel 的訂閱鏈表中。

  函數 pubsubSubscribeChannel 是 SUBSCRIBE 命令的底層實現,它完成了將客戶端添加到訂閱鏈表中的工做:

  // 訂閱指定頻道 
  // 訂閱成功返回 1 ,若是已經訂閱過,返回 0 
  int pubsubSubscribeChannel(redisClient *c, robj *channel) { 
  struct dictEntry *de; 
  list *clients = NULL; 
  int retval = 0; 
  /* Add the channel to the client -> channels hash table */ 
  // dictAdd 在添加新元素成功時返回 DICT_OK 
  // 所以這個判斷句表示,若是新訂閱 channel 成功,那麼 。。。 
  if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { 
  retval = 1; 
  incrRefCount(channel); 
  /* Add the client to the channel -> list of clients hash table */ 
  // 將 client 添加到訂閱給定 channel 的鏈表中 
  // 這個鏈表是一個哈希表的值,哈希表的鍵是給定 channel 
  // 這個哈希表保存在 server.pubsub_channels 裏 
  de = dictFind(server.pubsub_channels,channel); 
  if (de == NULL) { 
  // 若是 de 等於 NULL 
  // 表示這個客戶端是首個訂閱這個 channel 的客戶端 
  // 那麼建立一個新的列表, 並將它加入到哈希表中 
  clients = listCreate(); 
  dictAdd(server.pubsub_channels,channel,clients); 
  incrRefCount(channel); 
  } else { 
  // 若是 de 不爲空,就取出這個 clients 鏈表 
  clients = dictGetVal(de); 
  } 
  // 將客戶端加入到鏈表中 
  listAddNodeTail(clients,c); 
  } 
  /* Notify the client */ 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.subscribebulk); 
  // 返回訂閱的頻道 
  addReplyBulk(c,channel); 
  // 返回客戶端當前已訂閱的頻道和模式數量的總和 
  addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
  return retval; 
  }

  PSUBSCRIBE 命令的實現

  除了直接訂閱給定 channel 外,還可使用 PSUBSCRIBE 訂閱一個模式(pattern),訂閱一個模式等同於訂閱全部匹配這個模式的 channel 。

  和 redisServer.pubsub_channels 屬性相似, redisServer.pubsub_patterns 屬性用於保存全部被訂閱的模式,和 pubsub_channels 不一樣的是, pubsub_patterns 是一個鏈表(而不是字典):

  struct redisServer { 
  // 省略 ... 
  list *pubsub_patterns; // A list of pubsub_patterns 
  // 省略 ... 
  };

  pubsub_patterns 的每個節點都是一個 pubsubPattern 結構的實例,它保存了被訂閱的模式,以及訂閱這個模式的客戶客戶端:

  typedef struct pubsubPattern { 
  redisClient *client; 
  robj *pattern; 
  } pubsubPattern;

  舉個例子,假設在一個 redisServer 實例中,有一個叫作 news.* 的模式同時被客戶端client_789 和 client_999 訂閱,那麼這個 redisServer 結構看起來應該是這樣子:

  

點擊放大

  如今能夠知道,實現 PSUBSCRIBE 命令的關鍵,就是將客戶端和訂閱的模式添加到redisServer.pubsub_patterns 當中。

  pubsubSubscribePattern 是 PSUBSCRIBE 的底層實現,它將客戶端和所訂閱的模式添加到redisServer.pubsub_patterns 當中:

  // 訂閱指定模式 
  // 訂閱成功返回 1 ,若是已經訂閱過,返回 0 
  int pubsubSubscribePattern(redisClient *c, robj *pattern) { 
  int retval = 0; 
  // 向 c->pubsub_patterns 中查找指定 pattern 
  // 若是返回值爲 NULL ,說明這個 pattern 還沒被這個客戶端訂閱過 
  if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { 
  retval = 1; 
  // 添加 pattern 到客戶端 pubsub_patterns 
  listAddNodeTail(c->pubsub_patterns,pattern); 
  incrRefCount(pattern); 
  // 將 pattern 添加到服務器 
  pubsubPattern *pat; 
  pat = zmalloc(sizeof(*pat)); 
  pat->pattern = getDecodedObject(pattern); 
  pat->client = c; 
  listAddNodeTail(server.pubsub_patterns,pat); 
  } 
  /* Notify the client */ 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.psubscribebulk); 
  // 返回被訂閱的模式 
  addReplyBulk(c,pattern); 
  // 返回客戶端當前已訂閱的頻道和模式數量的總和 
  addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
  return retval; 
  }

  PUBLISH 命令的實現

  使用 PUBLISH 命令向訂閱者發送消息,須要執行如下兩個步驟:

  1) 使用給定的頻道做爲鍵,在 redisServer.pubsub_channels 字典中查找記錄了訂閱這個頻道的全部客戶端的鏈表,遍歷這個鏈表,將消息發佈給全部訂閱者。

  2) 遍歷 redisServer.pubsub_patterns 鏈表,將鏈表中的模式和給定的頻道進行匹配,若是匹配成功,那麼將消息發佈到相應模式的客戶端當中。

  舉個例子,假設有兩個客戶端分別訂閱 it.news 頻道和 it.* 模式,當執行命令PUBLISH it.news "hello moto" 的時候, it.news 頻道的訂閱者會在步驟 1 收到信息,而當PUBLISH 進行到步驟 2 的時候, it.* 模式的訂閱者也會收到信息。

  PUBLISH 命令的實際實現由 pubsubPublishMessage 函數完成,它的完整定義以下:

  // 發送消息 
  int pubsubPublishMessage(robj *channel, robj *message) { 
  int receivers = 0; 
  struct dictEntry *de; 
  listNode *ln; 
  listIter li; 
  /* Send to clients listening for that channel */ 
  // 向全部頻道的訂閱者發送消息 
  de = dictFind(server.pubsub_channels,channel); 
  if (de) { 
  list *list = dictGetVal(de); // 取出全部訂閱者 
  listNode *ln; 
  listIter li; 
  // 遍歷全部訂閱者, 向它們發送消息 
  listRewind(list,&li); 
  while ((ln = listNext(&li)) != NULL) { 
  redisClient *c = ln->value; 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.messagebulk); 
  addReplyBulk(c,channel); // 打印頻道名 
  addReplyBulk(c,message); // 打印消息 
  receivers++; // 更新接收者數量 
  } 
  } 
  /* Send to clients listening to matching channels */ 
  // 向全部被匹配模式的訂閱者發送消息 
  if (listLength(server.pubsub_patterns)) { 
  listRewind(server.pubsub_patterns,&li); // 取出全部模式 
  channel = getDecodedObject(channel); 
  while ((ln = listNext(&li)) != NULL) { 
  pubsubPattern *pat = ln->value; // 取出模式 
  // 若是模式和 channel 匹配的話 
  // 向這個模式的訂閱者發送消息 
  if (stringmatchlen((char*)pat->pattern->ptr, 
  sdslen(pat->pattern->ptr), 
  (char*)channel->ptr, 
  sdslen(channel->ptr),0)) { 
  addReply(pat->client,shared.mbulkhdr[4]); 
  addReply(pat->client,shared.pmessagebulk); 
  addReplyBulk(pat->client,pat->pattern); // 打印被匹配的模式 
  addReplyBulk(pat->client,channel); // 打印頻道名 
  addReplyBulk(pat->client,message); // 打印消息 
  receivers++; // 更新接收者數量 
  } 
  } 
  decrRefCount(channel); // 釋放用過的 channel 
  } 
  return receivers; // 返回接收者數量 
  }

  UNSUBSCRIBE 和 PUNSUBSCRIBE 的實現

  UNSUBSCRIBE 和 PUNSUBSCRIBE 分別是 SUBSCRIBE 和 PSUBSCRIBE 的反操做,若是明白了SUBSCRIBE 和 PSUBSCRIBE 的工做機制的話,應該不難理解這兩個反操做的原理,因此這裏就省略詳細的分析了,有興趣的能夠直接看代碼。

  小節

  Redis 的 pubsub 機制的分析就到此結束了,跟往常同樣,帶有註釋的完整 pubsub.c 文件能夠到個人 GITHUB 上找: https://github.com/huangz1990/reading_redis_source

相關文章
相關標籤/搜索