本文想介紹一下前段時間在寫enode時,順便實現的一個分佈式消息隊列equeue。這個消息隊列的思想不是我想出來的,而是經過學習阿里的rocketmq後,本身用c#實現了一個輕量級的簡單版本。一方面能夠經過寫這個隊列讓本身更深刻的掌握消息隊列的一些常見問題;另外一方面也能夠用來和enode集成,爲enode中的command和domain event的消息傳遞提供支持。目前在.net平臺,比較好用的消息隊列,最多見的是微軟的MSMQ了吧,還有像rabbitmq也有.net的client端。這些消息隊列都很強大和成熟。但當我學習了kafka以及阿里的rocketmq(早期版本叫metaq,自metaq 3.0後更名爲rocketmq)後,以爲rocketmq的設計思想深深吸引了我,由於我不只能理解其思想,還有其完整的源代碼能夠學習。可是rocketmq是java寫的,且目前尚未.net的client端,因此不能直接使用(有興趣的朋友能夠爲其寫一個.net的client端),因此在學習了rocketmq的設計文檔以及大部分代碼後,決定本身用c#寫一個出來。 html
項目開源地址:https://github.com/tangxuehua/equeue,項目中包含了隊列的所有源代碼以及如何使用的示例。也能夠在enode項目中看到如何使用。 java
一個topic就是一個主題。一個系統中,咱們能夠對消息劃分爲一些topic,這樣咱們就能經過topic,將消息發送到不一樣的queue。 node
一個topic下,咱們能夠設置多個queue,每一個queue就是咱們平時所說的消息隊列;由於queue是徹底從屬於某個特定的topic的,因此當咱們要發送消息時,老是要指定該消息所屬的topic是什麼。而後equeue就能知道該topic下有幾個queue了。可是到底發送到哪一個queue呢?好比一個topic下有4個queue,那對於這個topic下的消息,發送時,到底該發送到哪一個queue呢?那一定有個消息被路由的過程。目前equeue的作法是在發送一個消息時,須要用戶指定這個消息對應的topic以及一個用來路由的一個object類型的參數。equeue會根據topic獲得全部的queue,而後根據該object參數經過hash code而後取模queue的個數最後獲得要發送的queue的編號,從而知道該發送到哪一個queue。這個路由消息的過程是在發送消息的這一方作的,也就是下面要說的producer。之因此不在消息服務器上作是由於這樣可讓用戶本身決定該如何路由消息,具備更大的靈活性。 git
就是消息隊列的生產者。咱們知道,消息隊列的本質就是實現了publish-subscribe的模式,即生產者-消費者模式。生產者生產消息,消費者消費消息。因此這裏的Producer就是用來生產和發送消息的。 github
就是消息隊列的消費者,一個消息能夠有多個消費者。 算法
消費者分組,這可能對你們來講是一個新概念。之因此要搞出一個消費者分組,是爲了實現下面要說的集羣消費。一個消費者分組中包含了一些消費者,若是這些消費者是要集羣消費,那這些消費者會平均消費該分組中的消息。 sql
equeue中的broker負責消息的中轉,即接收producer發送過來的消息,而後持久化消息到磁盤,而後接收consumer發送過來的拉取消息的請求,而後根據請求拉取相應的消息給consumer。因此,broker能夠理解爲消息隊列服務器,提供消息的接收、存儲、拉取服務。可見,broker對於equeue來講是核心,它絕對不能掛,一旦掛了,那producer,consumer就沒法實現publish-subscribe了。 c#
集羣消費是指,一個consumer group下的consumer,平均消費topic下的queue。具體如何平都可以看一下下面的架構圖,這裏先用文字簡單描述一下。假如一個topic下有4個queue,而後當前有一個consumer group,該分組下有4個consumer,那每一個consumer就被分配到該topic下的一個queue,這樣就達到了平均消費topic下的queue的目的。若是consumer group下只有兩個consumer,那每一個consumer就消費2個queue。若是有3個consumer,則第一個消費2個queue,後面兩個每一個消費一個queue,從而達到儘可能平均消費。因此,能夠看出,咱們應該儘可能讓consumer group下的consumer的數目和topic的queue的數目一致或成倍數關係。這樣每一個consumer消費的queue的數量老是同樣的,這樣每一個consumer服務器的壓力纔會差很少。當前前提是這個topic下的每一個queue裏的消息的數量老是差很少多的。這點咱們能夠對消息根據某個用戶本身定義的key來進行hash路由來保證。 緩存
廣播消費是指一個consumer只要訂閱了某個topic的消息,那它就會收到該topic下的全部queue裏的消息,而無論這個consumer的group是什麼。因此對於廣播消費來講,consumer group沒什麼實際意義。consumer能夠在實例化時,咱們能夠指定是集羣消費仍是廣播消費。 安全
消費進度是指,當一個consumer group裏的consumer在消費某個queue裏的消息時,equeue是經過記錄消費位置(offset)來知道當前消費到哪裏了。以便該consumer重啓後繼續從該位置開始消費。好比一個topic有4個queue,一個consumer group有4個consumer,則每一個consumer分配到一個queue,而後每一個consumer分別消費本身的queue裏的消息。equeue會分別記錄每一個consumer對其queue的消費進度,從而保證每一個consumer重啓後知道下次從哪裏開始繼續消費。實際上,也許下次重啓後不是由該consumer消費該queue了,而是由group裏的其餘consumer消費了,這樣也不要緊,由於咱們已經記錄了這個queue的消費位置了。因此能夠看出,消費位置和consumer其實無關,消費位置徹底是queue的一個屬性,用來記錄當前被消費到哪裏了。另一點很重要的是,一個topic能夠被多個consumer group裏的consumer訂閱。不一樣consumer group裏的consumer即使是消費同一個topic下的同一個queue,那消費進度也是分開存儲的。也就是說,不一樣的consumer group內的consumer的消費徹底隔離,彼此不受影響。還有一點就是,對於集羣消費和廣播消費,消費進度持久化的地方是不一樣的,集羣消費的消費進度是放在broker,也就是消息隊列服務器上的,而廣播消費的消費進度是存儲在consumer本地磁盤上的。之因此這樣設計是由於,對於集羣消費,因爲一個queue的消費者可能會更換,由於consumer group下的consumer數量可能會增長或減小,而後就會從新計算每一個consumer該消費的queue是哪些,這個能理解的把?因此,當出現一個queue的consumer變更的時候,新的consumer如何知道該從哪裏開始消費這個queue呢?若是這個queue的消費進度是存儲在前一個consumer服務器上的,那就很難拿到這個消費進度了,由於有可能那個服務器已經掛了,或者下架了,都有可能。而由於broker對於全部的consumer老是在服務的,因此,在集羣消費的狀況下,被訂閱的topic的queue的消費位置是存儲在broker上的,存儲的時候按照不一樣的consumer group作隔離,以確保不一樣的consumer group下的consumer的消費進度互補影響。而後,對於廣播消費,因爲不會出現一個queue的consumer會變更的狀況,因此咱們不必讓broker來保存消費位置,因此是保存在consumer本身的服務器上。
經過上圖,咱們能直觀的理解equeue。這個圖是從rocketmq的設計文檔中拿來的,呵呵。因爲equeue的設計思想徹底和rocketmq一致,因此我就拿過來用了。每一個producer能夠向某個topic發消息,發送的時候根據某種路由策略(producer可自定義)將消息發送到某個特定的queue。而後consumer能夠消費特定topic下的queue裏的消息。上圖中,TOPIC_A有兩個消費者,這兩個消費者是在一個group裏,因此應該平均消費TOPIC_A下的queue但因爲有三個queue,因此第一個consumer分到了2個queue,第二個consumer分到了1個。對於TOPIC_B,因爲只有一個消費者,那TOPIC_B下的全部queue都由它消費。全部的topic信息、queue信息、還有消息自己,都存儲在broker服務器上。這點上圖中沒有體現出來。上圖主要關注producer,consumer,topic,queue這四個東西之間的關係,並不關注物理服務器的部署架構。
因爲是用c#實現,且由於通常是在局域網內部署,爲了實現高性能通訊,咱們能夠利用異步socket來通訊。.net自己提供了很好的異步socket通訊的支持;咱們也能夠用zeromq來實現高性能的socket通訊。原本想直接使用zeromq來實現通訊模塊就行了,但後來本身學習了一下.net自帶的socket通訊相關知識,發現也不難,因此就本身實現了一個,呵呵。本身實現的好處是我能夠本身定義消息的協議,目前這部分實現代碼在ecommon基礎類庫中,是一個獨立的可服用的與業務場景無關的基礎類庫。有興趣的能夠去下載下來看看代碼。通過了本身的一些性能測試,發現通訊模塊的性能仍是不錯的。一臺broker,四臺producer同時向這個broker發送消息,每秒能發送的消息4W沒有問題,更多的producer還沒測試。
消息持久化方面主要考慮的是性能問題,還有就是消息如何快速的讀取。
1. 首先,一臺broker上的消息不須要一直保存在該broker服務器上,由於這些消息總會被消費掉。根據阿里rocketmq的設計,默認會1天刪除一次已經被消費過的消息。因此,咱們能夠理解,broker上的消息應該不會無限制增加,由於會被按期刪除。因此沒必要考慮一臺broker上消息放不下的問題。
2. 如何快速的持久化消息?通常來講,我以爲有兩種方式:1)順序寫磁盤文件;2)用現成的key,value的nosql產品來存儲;rocketmq目前用的是本身寫文件的方式,這種方式的難點是寫文件比較複雜,由於全部消息都是順序append到文件末尾,雖然性能很是高,但複雜度也很高;好比全部消息不能全寫在一個文件裏,一個文件到達必定大小後須要拆分,一旦拆分就會產生不少問題,呵呵。拆分後如何讀取也是比較複雜的問題。還有因爲是順序寫入文件的,那咱們還須要把每個消息在文件中的起始位置和長度須要記錄下來,這樣consumer在消費消息時,才能根據offset從文件中拿到該消息。總之須要考慮的問題不少。若是是用nosql來持久化消息,那能夠省去咱們寫文件時遇到的各類問題,咱們只須要關心如何把消息的key和該消息在queue中的offset對應起來便可。另一點疑問是,queue裏的信息要持久化嗎?先要想清楚queue裏放的是什麼東西。當broker接收到一個消息後,首先確定是要先持久化,完成後須要把消息放入queue裏。但因爲內存頗有限,咱們不可能把這個消息直接放入queue裏,咱們其實要放的只須要時該消息在nosql裏的key便可,或者若是是用文件來持久化,那放的是該消息在文件中的偏移量offset,即存儲在文件的那個位置(好比是哪一個行號)。因此,實際上,queue只是一個消息的索引。那有必要持久化queue嗎?能夠持久化,這樣畢竟在broker重啓的時候,恢復queue的時間能夠縮短。那須要和持久化消息同步持久化嗎?顯然不須要,咱們能夠異步定時持久化每一個queue,而後恢復queue的時候,能夠先從持久化的部分恢復,而後再把剩下的部分經過持久化的消息來補充以達到queue由於異步持久化而慢的部分能夠追平。因此,通過上面的分析,消息自己都是放在nosql中,queue所有在內存中。
那消息如何持久化呢?我以爲最好的辦法是讓每一個消息有一個全局的順序號,一旦消息被寫入nosql後,該消息的全局順序號就肯定了,而後咱們在更新對應的queue的信息時,把該消息的全局順序號傳給queue,這樣queue就能把queue本身對該消息的本地順序號和該消息的全局順序號創建映射關係。相關代碼以下:
public MessageStoreResult StoreMessage(Message message, int queueId) { var queues = GetQueues(message.Topic); var queueCount = queues.Count; if (queueId >= queueCount || queueId < 0) { throw new InvalidQueueIdException(message.Topic, queueCount, queueId); } var queue = queues[queueId]; var queueOffset = queue.IncrementCurrentOffset(); var storeResult = _messageStore.StoreMessage(message, queue.QueueId, queueOffset); queue.SetMessageOffset(queueOffset, storeResult.MessageOffset); return storeResult; }
沒什麼比代碼更能說明問題了,呵呵。上的代碼的思路是,接收一個消息對象和一個queueId,queueId表示當前消息要放到第幾個queue裏。而後內部邏輯是,先獲取該消息的topic的全部queue,因爲queue和topic都在內存,因此這裏沒性能問題。而後檢查一下當前傳遞進來的queueId是否合法。若是合法,那就定位到該queue,而後經過IncrementCurrentOffset方法,將queue的內部序號加1並返回,而後持久化消息,持久化的時候把queueId以及queueOffset一塊兒持久化,完成後返回一個消息的全局序列號。因爲messageStore內部會把消息內容、queueId、queueOffset,以及消息的全局順序號一塊兒做爲一個總體保存到nosql中,key就是消息的全局序列號,value就是前面說的總體(被序列化爲二進制)。而後,在調用queue的SetMessageOffset方法,把queueOffset和message的全局offset創建映射關係便可。最後返回一個結果。messageStore.StoreMessage的內存實現大體以下:
public MessageStoreResult StoreMessage(Message message, int queueId, long queueOffset) { var offset = GetNextOffset(); _queueCurrentOffsetDict[offset] = new QueueMessage(message.Topic, message.Body, offset, queueId, queueOffset, DateTime.Now); return new MessageStoreResult(offset, queueId, queueOffset); }
GetNextOffset就是獲取下一個全局的消息序列號,QueueMessage就是上面所說的「總體」,由於是內存實現,因此就用了一個ConcurrentDictionary來保存一下queueMessage對象。若是是用nosql來實現messageStore,則這裏須要寫入nosql,key就是消息的全局序列號,value就是queueMessage的二進制序列化數據。經過上面的分析咱們能夠知道咱們會將消息的全局序列號+queueId+queueOffset一塊兒總體做爲一條記錄持久化起來。這樣作有兩個很是好的特性:1)實現了消息持久化和消息在queue中的位置的持久化的原子事務;2)咱們老是能夠根據這些持久化的queueMessage還原出全部的queue的信息,由於queueMessage裏包含了消息和消息在queue的中的位置信息;
基於這樣的消息存儲,當某個consumer要消費某個位置的消息時,咱們能夠經過先經過queueId找到queue,而後經過消息在queueOffset(由consumer傳遞過來的)獲取消息的全局offset,而後根據該全局的offset做爲key從nosql拿到消息。實際上如今的equeue是批量拉取消息的,也就是一次socket請求不是拉一個消息,而是拉一批,默認是32個消息。這樣consumer能夠用更少的網絡請求拿到更多的消息,能夠加快消息消費的速度。
producer在發送消息時,如何知道當前topic下有多少個queue呢?每次發送消息時都要去broker上查一下嗎?顯然不行,這樣發送消息的性能就上不去了。那怎麼辦呢?就是異步,呵呵。producer能夠定時向broker發送請求,獲取topic下的queue數量,而後保存起來。這樣每次producer在發送消息時,就只要從本地緩存裏拿便可。由於broker上topic的queue的數量通常不會變化,因此這樣的緩存頗有意義。那還有一個問題,當前producer第一次對某個topic發送消息時,queue哪裏來呢?由於定時線程不知道要向broker拿哪一個topic下的queue數量,由於此時producer端尚未一個topic呢,由於一個消息都還沒發送過。那就是須要判斷一下,若是當前topic沒有queue的count信息,則直接從broker上獲取queue的count信息。而後再緩存起來,在發送當前消息。而後第二次發送時,由於緩存裏已經有了該消息,因此就沒必要再從broker拿了,且後續定時線程也會自動去更新該topic下的queue的count了。好,producer有了topic的queue的count,那用戶在發送消息時,框架就能把這個topic的queueCount傳遞給用戶,而後用戶就能根據本身的須要將消息路由到第幾個queue了。
consumer負載均衡的意思是指,在消費者集羣消費的狀況下,如何讓同一個consumer group裏的消費者平均消費同一個topic下的queue。因此這個負載均衡本質上是一個將queue平均分配給consumer的過程。那麼怎麼實現呢?經過上面負載均衡的定義,咱們只要,要作負載均衡,必需要肯定consumer group和topic;而後拿到consumer group下的全部consumer,以及topic下的全部queue;而後對於當前的consumer,就能計算出來當前consumer應該被分配到哪些queue了。咱們能夠經過以下的函數來獲得當前的consumer應該被分配到哪幾個queue。
public class AverageAllocateMessageQueueStrategy : IAllocateMessageQueueStrategy { public IEnumerable<MessageQueue> Allocate(string currentConsumerId, IList<MessageQueue> totalMessageQueues, IList<string> totalConsumerIds) { var result = new List<MessageQueue>(); if (!totalConsumerIds.Contains(currentConsumerId)) { return result; } var index = totalConsumerIds.IndexOf(currentConsumerId); var totalMessageQueueCount = totalMessageQueues.Count; var totalConsumerCount = totalConsumerIds.Count; var mod = totalMessageQueues.Count() % totalConsumerCount; var size = mod > 0 && index < mod ? totalMessageQueueCount / totalConsumerCount + 1 : totalMessageQueueCount / totalConsumerCount; var averageSize = totalMessageQueueCount <= totalConsumerCount ? 1 : size; var startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; var range = Math.Min(averageSize, totalMessageQueueCount - startIndex); for (var i = 0; i < range; i++) { result.Add(totalMessageQueues[(startIndex + i) % totalMessageQueueCount]); } return result; } }
函數裏的實現就很少分析了。這個函數的目的就是根據給定的輸入,返回當前consumer該分配到的queue。分配的原則就是平均分配。好了,有了這個函數,咱們就能很方便的實現負載均衡了。咱們能夠對每個正在運行的consumer內部開一個定時job,該job每隔一段時間進行一次負載均衡,也就是執行一次上面的函數,獲得當前consumer該綁定的最新queue。由於每一個consumer都有一個groupName屬性,用於表示當前consumer屬於哪一個group。因此,咱們就能夠在負載均衡時到broker獲取當前group下的全部consumer;另外一方面,由於每一個consumer都知道它本身訂閱了哪些topic,因此有了topic信息,就能獲取topic下的全部queue的信息了,有了這兩樣信息,每一個consumer就能本身作負載均衡了。先看一下下面的代碼:
_scheduleService.ScheduleTask(Rebalance, Setting.RebalanceInterval, Setting.RebalanceInterval); _scheduleService.ScheduleTask(UpdateAllTopicQueues, Setting.UpdateTopicQueueCountInterval, Setting.UpdateTopicQueueCountInterval); _scheduleService.ScheduleTask(SendHeartbeat, Setting.HeartbeatBrokerInterval, Setting.HeartbeatBrokerInterval);
每一個consumer內部都會啓動三個定時的task,第一個task表示要定時作一次負載均衡;第二個task表示要定時更新當前consumer訂閱的全部topic的queueCount信息,並把最新的queueCount信息都保存在本地;第三個task表示當前consumer會向broker定時發送心跳,這樣broker就能經過心跳知道某個consumer是否還活着,broker上維護了全部的consumer信息。一旦有新增或者發現沒有及時發送心跳過來的consumer,就會認爲有新增或者死掉的consumer。由於broker上維護了全部的consumer信息,因此他就能提供查詢服務,好比根據某個consumer group查詢該group下的consumer。
經過這三個定時任務,就能完成消費者的負載均衡了。先看一下Rebalance方法:
private void Rebalance() { foreach (var subscriptionTopic in _subscriptionTopics) { try { RebalanceClustering(subscriptionTopic); } catch (Exception ex) { _logger.Error(string.Format("[{0}]: rebalanceClustering for topic [{1}] has exception", Id, subscriptionTopic), ex); } } }
代碼很簡單,就是對每一個訂閱的topic作負載均衡處理。再看一下RebalanceClustering方法:
上面的代碼很少分析了,就是先根據consumer group和topic獲取全部的consumer,而後對consumer作排序處理。之因此要作排序處理是爲了確保負載均衡時對已有的分配狀況儘可能不發生改變。接下來就是從本地獲取topic下的全部queue,一樣根據queueId作一下排序。而後就是調用上面的分配算法計算出當前consumer應該分配到哪些queue。最後調用UpdatePullRequestDict方法,用來對新增或刪除的queue作處理。對於新增的queue,要建立一個獨立的worker線程,開始從broker拉取消息;對於刪除的queue,要中止其對應的work,中止拉取消息。
經過上面的介紹和分析,咱們你們知道了equeue是如何實現消費者的負載均衡的。咱們能夠看出,由於每一個topic下的queue的更新是異步的定時的,且負載均衡自己也是定時的,且broker上維護的consumer的信息也不是事實的,由於每一個consumer發送心跳到broker不是實時發送的,而是好比每隔5s發送一次。全部這些由於都是異步的設計,因此可能會致使在負載均衡的過程當中,同一個queue可能會被兩個消費者同時消費。這個就是所謂的,咱們只能作到一個消息至少被消費一次,但equeue層面作不到一個消息只會被消費一次。實際上像rocketmq這種也是這樣的思路,放棄一個消息只會被消費一次的實現(由於代價太大,且過於複雜,實際上對於分佈式的環境,不太可能作到一個消息只會被消費一次),而是採用確保一個消息至少會被消費一次(即at least once).因此使用equeue,應用方要本身作好對每一個消息的冪等處理。
消息的實時推送,通常有兩種作法:推模式(push)和拉模式(pull)。push的方式是指broker主動對全部訂閱了該topic的消費者推送消息;pull的方式是指消費者主動到broker上拉取消息;對於推模式,最大的好處就是實時,由於一有新的消息,就會當即推送給消費者。可是有一個缺點就是若是消費者來不及消費,它也會給消費者推消息,這樣就會致使消費者端的消息會堵塞。而經過拉的方式,有兩種實現:1)輪訓的方式拉,好比每隔5s輪訓一下是否有新消息,這種方式的缺點是消息不實時,可是消費進度徹底由消費者本身把控了;2)開長鏈接的方式來拉,就是不輪訓,消費者和broker之間一直保持的鏈接通道,而後broker一有新消息,就會利用這個通道把消息發送給消費者。
equeue中目前採用的是經過長鏈接拉取消息的方式。長鏈接經過socket長鏈接實現。可是雖然叫長鏈接,也不是一直不斷開,而是也會設計一個超時的限制,好比一個長鏈接最大不超過15s,超過15s,則broker發送回覆給consumer,告訴consumer當前沒有新消息;而後consumer接受到這個回覆後,就知道要繼續發起下一個長鏈接來拉取。而後假如在這15s中以內,broker上有新消息了,則broker就能當即主動利用這個長鏈接通知相應的消費者,把消息傳給消費者。因此,能夠看出,broker上在處理消費者的拉取消息的請求時,若是當前沒有新消息,則會hold住這個socket鏈接,最多hold 15s,超過15s,則發送返回信息,告訴消費者當前無消息,而後消費者再次發送pull message request過來。經過這樣的基於長鏈接的拉取模式,咱們能夠實現兩個好處:1)消息實時推送;2)由消費者控制消息消費進度;
另外,equeue裏還實現了消費者自身的自動限流功能。就是假如當前broker上消息不少,即生產者生產消息的速度大於消費者消費消息的速度,那broker上就會有消息被堆積。那此時消費者在拉取消息時,老是會有新消息拉取到,可是消費者又來不及處理這麼多消息。因此equeue框架內置了一個限流(流控,流量控制)的設計,就是能夠容許用於配製一個消費者端堆積的消息的上限,好比3000,超過這個數目(可配置),則equeue會讓消費者以慢一點的頻率拉取消息。好比延遲個多少毫秒(延遲時間可配置)再拉取。這樣就簡單的實現了流控的目的。
做爲一個消息隊列,消費者老是可能會在消費消息時拋出異常,在equeue中這種狀況就是消息消費失敗的狀況。經過上面的消費進度的介紹,你們知道了每一個queue對某個特定的consumer group,都有一個惟一的消費進度。實際上,消息被拉取到consumer本地後,可能會被以兩種方式消費,一種是並行消費,一種是線性消費。
並行消費的意思是,假如當前一次性拉取過來32個消息,那equeue會經過啓動task(即開多線程)的方式並行消費每一個消息;
線性消費的意思是,消息是在一個獨立的單線程中順序消費,消費順序和拉取過來的順序相同。
對於線性消費,假如前一個消息消費的時候失敗了,也就是拋異常了,那該怎麼辦呢?可能想到的辦法是重試個3次,可是要是重試後仍是失敗呢?總不能由於這個消息而致使後面的消息沒法把消費吧?呵呵!對於這種狀況,先說一下rocketmq裏的處理方式吧:它的作法是,當遇到消費失敗的狀況,沒有立馬重試,而是直接把這個消息發送到broker上的某個重試隊列,發送成功後,就能夠往下消費下一個消息了。由於一旦發送到重試隊列,那意味着這個消息就最後老是會被消費了,由於該消息不會丟了。可是要是發送到broker的重試隊列也不成功呢?這個?!其實這種狀況不大應該出現,若是出現,那基本就是broker掛了,呵呵。
rocketmq中,對於這種狀況,那會把這個失敗的消息放入本地內存隊列,慢慢消費它。而後繼續日後消費後面的消息。如今你必定很關心queue的offset是如何更新的?這裏涉及到一個滑動門的概念。當一批消息從broker拉取到消費者本地後,並非立刻消費的,而是先放入一個本地的SortedDictionary,key就是消息在queue裏的位置,value就是消息自己。由於是一個排序的dictionary,因此key最小的消息意味着是最前面的消息,最大的消息就是最後面的消息。而後無論是並行消費仍是線性消費,只要某個消息被消費了,那就從這個SortedDictionary裏移除掉。每次被移除一個消息時,老是會返回當前這個SortedDictionary裏的最小的key,而後咱們就能判斷這個key是否和上次比是否前移了,若是是,則更新queue的這個最新的offset。由於每次移除一個消息的時候,老是返回當前SortedDictionary裏的最小的key,因此,假如當前offset是3,而後offset爲4的這個消息一直消費失敗,因此不會被移除,可是offset爲5,6,7,8的這些消息雖然都消費成功了,可是隻要offset爲4的這個消息沒有被移除,那最小的key就不會往前移動。這個就是所謂的滑動門的概念了。就比如是在鐵軌上一輛在跑的動車,offset的往前移動就比如是動車在不斷往前移動。由於咱們但願offset老是會不斷往前移動,因此不但願前面的某個消費失敗的消息讓這個滑動門中止移動(即咱們老是但願這個最小的key能不斷變大),因此咱們會千方百計讓消費失敗的消息能不阻礙滑動門的往前移動。因此才把消費失敗的消息放入重試隊列。
另一點須要注意一下:並非每次成功消費完一個消息,就會立馬告訴broker更新offset,由於這樣那性能確定很低,broker也會忙死,更好的辦法是先只是在本地內存更新queue的offset,而後定時好比5s一次,將最新的offset更新到broker。因此,由於這個異步的存在,一樣也會致使某個消息被重複消費的可能性,由於broker上的offset確定比實際的消費進度要慢,有5s的時間差。因此,再次強調,應用方必需要處理好對消息的冪等處理!好比enode框架中,對每一個command消息,框架內部都作了command的冪等處理。因此使用enode框架的應用,自身無需對command作冪等處理方面的考慮。
上面提到了並行消費和線性消費,其實對於offset的更新來講是同樣的,由於並行消費無非是多線程同時從SortedDictionary中移除消費成功的消息,而單線程只是單個線程去移除SortedDictionary中的消息。因此咱們要經過鎖的機制,保證對SortedDictionary的操做是線程安全的。目前用了ReaderWriterLockSlim來實現對方法調用的線層安全。有興趣的朋友能夠去看一下代碼。
最後,也是重點,呵呵。equeue目前尚未實現將失敗的消息發回到broker的重試隊列。這個功能之後會考慮加進去。
這個問題比較複雜,目前equeue不支持broker的master-salve或master-master,而是單點的。我以爲一個成熟的消息隊列,爲了確保在一個broker掛了的時候,要儘可能能確保有其餘broker能夠接替它,這樣才能讓消息隊列服務器的可靠性。可是這個問題實在太複雜。rocketmq目前實現的也只是master-slave的方式。也就是隻要主的master掛了,那producer就沒法向broker發送消息了,由於slave的broker是隻讀的,不能直接接受新消息,slave的broker只能容許被consumer拉取消息。
這個問題,要討論清楚,須要不少分佈式方面的知識。因爲篇幅的緣由,這裏就不作討論了,實際上我本身也搞不清楚到底該如何設計。但願大牛們多多指點,如何實現broker的高可用哈!