Redis 經過 PUBLISH 、 SUBSCRIBE 和 PSUBSCRIBE 等命令實現發佈和訂閱功能。html
這些命令被普遍用於構建即時通訊應用,好比網絡聊天室(chatroom)和實時廣播、實時提醒等。git
本文經過分析 Redis 源碼裏的 pubsub.c 文件,瞭解發佈和訂閱機制的底層實現,籍此加深對 Redis 的理解。github
訂閱、發佈和退訂redis
在開始研究源碼以前,不妨先來回顧一下幾個相關命令的使用方式。服務器
PUBLISH 命令用於向給定的頻道發送信息,返回值爲接收到信息的訂閱者數量:網絡
redis> PUBLISH treehole "top secret here ..." (integer) 0 redis> PUBLISH chatroom "hi?" (integer) 1 |
SUBSCRIBE 命令訂閱給定的一個或多個頻道:函數
redis> SUBSCRIBE chatroom Reading messages... (press Ctrl-C to quit) 1) "subscribe" # 訂閱反饋 2) "chatroom" # 訂閱的頻道 3) (integer) 1 # 目前客戶端已訂閱頻道/模式的數量 1) "message" # 信息 2) "chatroom" # 發送信息的頻道 3) "hi?" # 信息內容 |
SUBSCRIBE 的返回值當中, 1) 爲 subscribe 的是訂閱的反饋信息,而 1) 爲 message 的則是訂閱的頻道所發送的信息。ui
SUBSCRIBE 還能夠訂閱多個頻道,這樣一來它接收到的信息就可能來自多個頻道:spa
redis> SUBSCRIBE chatroom talk-to-jack Reading messages... (press Ctrl-C to quit) 1) "subscribe" # 訂閱 chatroom 的反饋 2) "chatroom" 3) (integer) 1 1) "subscribe" # 訂閱 talk-to-jack 的反饋 2) "talk-to-jack" 3) (integer) 2 1) "message" # 來自 chatroom 的消息 2) "chatroom" 3) "yahoo" 1) "message" # 來自 talk-to-peter 的消息 2) "talk-to-jack" 3) "Goodmorning, peter." |
PSUBSCRIBE 提供了一種訂閱符合給定模式的全部頻道的方法,好比說,使用 it.* 爲輸入,就能夠訂閱全部以 it. 開頭的頻道,好比 it.news 、 it.blog 、 it.tweets ,諸如此類:.net
redis> PSUBSCRIBE it.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "it.*" 3) (integer) 1 1) "pmessage" 2) "it.*" # 匹配的模式 3) "it.news" # 消息的來源頻道 4) "Redis 2.6rc5 release" # 消息內容 1) "pmessage" 2) "it.*" 3) "it.blog" 4) "Why NoSQL matters" 1) "pmessage" 2) "it.*" 3) "it.tweet" 4) "@redis : when will the 2.6 stable release?" |
固然, PSUBSCRIBE 也能夠接受多個參數,從而匹配多種模式。
最後, UNSUBSCRIBE 命令和 PUNSUBSCRIBE 負責退訂給定的頻道或模式。
發佈和訂閱機制
當一個客戶端經過 PUBLISH 命令向訂閱者發送信息的時候,咱們稱這個客戶端爲發佈者(publisher)。
而當一個客戶端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的時候,咱們稱這個客戶端爲訂閱者(subscriber)。
爲了解耦發佈者(publisher)和訂閱者(subscriber)之間的關係,Redis 使用了 channel (頻道)做爲二者的中介 —— 發佈者將信息直接發佈給 channel ,而 channel 負責將信息發送給適當的訂閱者,發佈者和訂閱者之間沒有相互關係,也不知道對方的存在:
點擊放大
知道了發佈和訂閱的機制以後,接下來就能夠開始研究具體的實現了,咱們從 Redis 的訂閱命令開始提及。
SUBSCRIBE 命令的實現
前面說到,Redis 將全部接受和發送信息的任務交給 channel 來進行,而全部 channel 的信息就儲存在 redisServer 這個結構中:
struct redisServer { // 省略 ... dict *pubsub_channels; // Map channels to list of subscribed clients // 省略 ... }; |
pubsub_channels 是一個字典,字典的鍵就是一個個 channel ,而字典的值則是一個鏈表,鏈表中保存了全部訂閱這個 channel 的客戶端。
舉個例子,若是在一個 redisServer 實例中,有一個叫作 news 的頻道,這個頻道同時被client_123 和 client_456 兩個客戶端訂閱,那麼這個 redisServer 結構看起來應該是這樣子:
點擊放大
能夠看出,實現 SUBSCRIBE 命令的關鍵,就是將客戶端添加到給定 channel 的訂閱鏈表中。
函數 pubsubSubscribeChannel 是 SUBSCRIBE 命令的底層實現,它完成了將客戶端添加到訂閱鏈表中的工做:
// 訂閱指定頻道 // 訂閱成功返回 1 ,若是已經訂閱過,返回 0 int pubsubSubscribeChannel(redisClient *c, robj *channel) { struct dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ // dictAdd 在添加新元素成功時返回 DICT_OK // 所以這個判斷句表示,若是新訂閱 channel 成功,那麼 。。。 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ // 將 client 添加到訂閱給定 channel 的鏈表中 // 這個鏈表是一個哈希表的值,哈希表的鍵是給定 channel // 這個哈希表保存在 server.pubsub_channels 裏 de = dictFind(server.pubsub_channels,channel); if (de == NULL) { // 若是 de 等於 NULL // 表示這個客戶端是首個訂閱這個 channel 的客戶端 // 那麼建立一個新的列表, 並將它加入到哈希表中 clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { // 若是 de 不爲空,就取出這個 clients 鏈表 clients = dictGetVal(de); } // 將客戶端加入到鏈表中 listAddNodeTail(clients,c); } /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); // 返回訂閱的頻道 addReplyBulk(c,channel); // 返回客戶端當前已訂閱的頻道和模式數量的總和 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); return retval; } |
PSUBSCRIBE 命令的實現
除了直接訂閱給定 channel 外,還可使用 PSUBSCRIBE 訂閱一個模式(pattern),訂閱一個模式等同於訂閱全部匹配這個模式的 channel 。
和 redisServer.pubsub_channels 屬性相似, redisServer.pubsub_patterns 屬性用於保存全部被訂閱的模式,和 pubsub_channels 不一樣的是, pubsub_patterns 是一個鏈表(而不是字典):
struct redisServer { // 省略 ... list *pubsub_patterns; // A list of pubsub_patterns // 省略 ... }; |
pubsub_patterns 的每個節點都是一個 pubsubPattern 結構的實例,它保存了被訂閱的模式,以及訂閱這個模式的客戶客戶端:
typedef struct pubsubPattern { redisClient *client; robj *pattern; } pubsubPattern; |
舉個例子,假設在一個 redisServer 實例中,有一個叫作 news.* 的模式同時被客戶端client_789 和 client_999 訂閱,那麼這個 redisServer 結構看起來應該是這樣子:
點擊放大
如今能夠知道,實現 PSUBSCRIBE 命令的關鍵,就是將客戶端和訂閱的模式添加到redisServer.pubsub_patterns 當中。
pubsubSubscribePattern 是 PSUBSCRIBE 的底層實現,它將客戶端和所訂閱的模式添加到redisServer.pubsub_patterns 當中:
// 訂閱指定模式 // 訂閱成功返回 1 ,若是已經訂閱過,返回 0 int pubsubSubscribePattern(redisClient *c, robj *pattern) { int retval = 0; // 向 c->pubsub_patterns 中查找指定 pattern // 若是返回值爲 NULL ,說明這個 pattern 還沒被這個客戶端訂閱過 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; // 添加 pattern 到客戶端 pubsub_patterns listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); // 將 pattern 添加到服務器 pubsubPattern *pat; pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); } /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); // 返回被訂閱的模式 addReplyBulk(c,pattern); // 返回客戶端當前已訂閱的頻道和模式數量的總和 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); return retval; } |
PUBLISH 命令的實現
使用 PUBLISH 命令向訂閱者發送消息,須要執行如下兩個步驟:
1) 使用給定的頻道做爲鍵,在 redisServer.pubsub_channels 字典中查找記錄了訂閱這個頻道的全部客戶端的鏈表,遍歷這個鏈表,將消息發佈給全部訂閱者。
2) 遍歷 redisServer.pubsub_patterns 鏈表,將鏈表中的模式和給定的頻道進行匹配,若是匹配成功,那麼將消息發佈到相應模式的客戶端當中。
舉個例子,假設有兩個客戶端分別訂閱 it.news 頻道和 it.* 模式,當執行命令PUBLISH it.news "hello moto" 的時候, it.news 頻道的訂閱者會在步驟 1 收到信息,而當PUBLISH 進行到步驟 2 的時候, it.* 模式的訂閱者也會收到信息。
PUBLISH 命令的實際實現由 pubsubPublishMessage 函數完成,它的完整定義以下:
// 發送消息 int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; struct dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ // 向全部頻道的訂閱者發送消息 de = dictFind(server.pubsub_channels,channel); if (de) { list *list = dictGetVal(de); // 取出全部訂閱者 listNode *ln; listIter li; // 遍歷全部訂閱者, 向它們發送消息 listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { redisClient *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); // 打印頻道名 addReplyBulk(c,message); // 打印消息 receivers++; // 更新接收者數量 } } /* Send to clients listening to matching channels */ // 向全部被匹配模式的訂閱者發送消息 if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); // 取出全部模式 channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; // 取出模式 // 若是模式和 channel 匹配的話 // 向這個模式的訂閱者發送消息 if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); // 打印被匹配的模式 addReplyBulk(pat->client,channel); // 打印頻道名 addReplyBulk(pat->client,message); // 打印消息 receivers++; // 更新接收者數量 } } decrRefCount(channel); // 釋放用過的 channel } return receivers; // 返回接收者數量 } |
UNSUBSCRIBE 和 PUNSUBSCRIBE 的實現
UNSUBSCRIBE 和 PUNSUBSCRIBE 分別是 SUBSCRIBE 和 PSUBSCRIBE 的反操做,若是明白了SUBSCRIBE 和 PSUBSCRIBE 的工做機制的話,應該不難理解這兩個反操做的原理,因此這裏就省略詳細的分析了,有興趣的能夠直接看代碼。
小節
Redis 的 pubsub 機制的分析就到此結束了,跟往常同樣,帶有註釋的完整 pubsub.c 文件能夠到個人 GITHUB 上找: https://github.com/huangz1990/reading_redis_source