開源地址:https://github.com/tangxuehua/enodehtml
上一篇文章,簡單介紹了enode框架內部的總體實現思路,用到了staged event-driven architecture的思想。經過前一篇文章,咱們知道了enode內部有兩種隊列:command queue、event queue;用戶發送的command會進入command queue排隊,domain model產生的domain event會進入event queue,而後等待被dispatch到全部的event handlers。本文介紹一下enode框架中這兩種消息隊列究竟是如何設計的。java
先貼一下enode框架的內部實現架構圖,這樣對你們理解後面的分析有幫助。node
enode的設計初衷是在單個進程內提供基於DDD+CQRS+EDA的應用開發。若是咱們的業務須要和其餘系統交互,那也能夠,就是經過在event handler中與其餘外部系統交互,好比廣播消息出去或者調用遠程接口,均可以。也許未來,enode也會內置支持遠程消息通訊的功能。可是不支持遠程通訊並不表示enode只能開發單機應用了。enode框架須要存儲的數據主要有三種:git
好,經過上面的分析,咱們知道enode框架運行時的全部數據,就存儲在mongodb和redis這兩個地方。而這兩種存儲都是部署在獨立的服務器上,與web服務器無關。因此運行enode框架的每臺web服務器上是無狀態的。因此,咱們就能方便的對web服務器進行集羣,咱們能夠隨時當用戶訪問量的增長時增長新的web服務器,以提升系統的響應能力;固然,當你發現隨着web服務器的增長,致使單臺mongodb服務器或單臺redis服務器處理不過來成爲瓶頸時,也能夠對mongodb和redis作集羣,或者對數據作sharding(固然這兩種作法不是很好作,須要對mongodb,redis很熟悉才行),這樣就能夠提升mongodb,redis的吞吐量了。github
好了,上面的分析主要是爲了說明enode框架的使用範圍,討論清楚這一點對咱們分析須要什麼樣的消息隊列有很大幫助。web
如今咱們知道,咱們徹底不須要分佈式的消息隊列了,好比不須要MSMQ、RabbitMQ,等重量級成熟的支持遠程消息傳遞的消息隊列了。咱們須要的消息隊列的特徵是:redis
內存隊列,特色是快。可是咱們不光是須要快,還要能支持併發的入隊和出對。那麼看起來ConcurrentQueue<T>彷佛能知足咱們的要求了,一方面性能還能夠,另外一方面內置支持了併發操做。可是有一點沒知足,那就是咱們但願當隊列裏沒有消息的時候,隊列的消費者不能讓CPU空轉,CPU空轉會直接致使CPU佔用100%,致使機器沒法工做。幸運的是,.net中也有一個支持這種功能的集合,那就是:BlockingCollection<T>,這種集合能提供在隊列內無元素的時候block當前線程的功能。咱們能夠用如下的方式來實例化一個隊列:算法
private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>());
併發入隊的時候,咱們只要寫下面的代碼便可:mongodb
_queue.Add(message);
併發出隊的時候,只要:緩存
_queue.Take();
咱們不難看出,ConcurrentQueue<T>是提供了隊列加併發訪問的支持,而BlockingCollection<T>是在此基礎上再增長blocking線程的功能。
是否是很是簡單,通過個人測試,BlockingCollection<T>的性能已經很是好,每秒10萬次入隊出對確定沒問題,因此沒必要擔憂成爲瓶頸。
關於Disruptor的調研:
瞭解過LMAX架構的朋友應該據說過Disruptor,LMAX架構能支持每秒處理600W訂單,並且是單線程。這個速度是否是很驚人?你們有興趣的能夠去了解下。LMAX架構是徹底in memory的架構,全部的業務邏輯基於純內存實現,粗粒度的架構圖以下:
LMAX架構之因此能這麼快,除了徹底基於in memory的架構外,還歸功於延遲率在納秒級別的disruptor隊列組件。下面是disruptor與java中的Array Blocking Queue的延遲率對比圖:
ns是納秒,咱們能夠從數據上看到,Disruptor的延遲時間比Array Blocking Queue快的不是一個數量級。因此,當初LMAX架構出來時,一時很是轟動。我曾經也對這個架構很好奇,但由於有些細節問題沒想清楚,就不敢貿然實踐。
經過上面的分析,咱們知道,Disruptor也是一種隊列,而且也徹底能夠替代BlockingCollection,可是由於咱們的BlockingCollection目前已經知足咱們的須要,且暫時不會成爲瓶頸,因此,我暫時沒有采用Disruptor來實現咱們的內存隊列。關於LMAX架構,你們還能夠看一下這篇我之前寫的文章。
咱們不光須要一個高性能且支持併發的內存隊列,還要支持隊列消息的持久化功能,這樣咱們才能保證消息不會丟失,從而才能談消息至少被處理一次。
那消息何時持久化?
當咱們發送一個消息給隊列,一旦發生成功,咱們確定認爲消息已經不會丟了。因此,很明顯,消息隊列內部確定是要在接收到入隊的消息時先持久化該消息,而後才能返回。
那麼如何高效的持久化呢?
第一個想法:
基於txt文本文件的順序寫。原理是:當消息入隊時,將消息序列化爲文本,而後append到一個txt1文件;當消息被處理完以後,再把該消息append到另外一個txt2文件;而後,若是當前機器沒重啓,那內存隊列裏當前存在的消息就是還未被處理的消息;若是機器重啓了,那如何知道哪些消息還沒被處理?很簡單,就是對比txt1,txt2這兩個文本文件,而後只要是txt1中存在,可是txt2中不存在的消息,就認爲是沒被處理過,那須要在enode框架啓動時讀取txt1中這些沒被處理的消息文本,反序列化爲消息對象,而後從新放入內存隊列,而後開始處理。這個思路其實挺好,關鍵的一點,這種作法性能很是高。由於咱們知道順序寫文本文件是很是快的,通過個人測試,每秒200W行普通消息的文本不在話下。這意味着咱們每秒能夠持久化200W個消息,固然實際上咱們確定達不到這個高的速度,由於消息的序列化性能達不到這個速度,因此瓶頸是在序列化上面。可是,經過這種持久化消息的思路,也會有不少細節問題比較難解決,好比txt文件愈來愈大,怎麼辦?txt文件很差管理和維護,萬一不當心被人刪除了呢?還有,如何比較這兩個txt文件?按行比較嗎?不行,由於消息入隊的順序和處理的順序不必定相同,好比command就是如此,當用戶發送一個command到隊列,可是處理的時候發現第一次因爲併發衝突,致使command執行沒成功,因此會重試command,若是重試成功了,而後持久化該command,可是咱們知道,此時持久化的時候,它的順序也許已經在後面的command的後面了。因此,咱們不能按行比較;那麼就要按消息的ID比較了?就算能作到,那這個比較過程也是很耗時的,假設txt1有100W個消息;txt2中有80W個消息,那若是按照ID來比較txt1中哪20W個消息還沒被處理,有什麼算法能高效比較出來嗎?因此,咱們發現,這個思路仍是有不少細節問題須要考慮。
第二個想法:
採用NoSQL來存儲消息,經過一些思考和比較後,以爲仍是MongoDB比較合適。一方面MongoDB實際上全部的存取操做優先使用內存,也就是說不會立刻持久化到磁盤。因此性能很快。另外一方面,mongodb支持可靠的持久化功能,能夠放心的用來持久化消息。性能方面,雖然沒有寫txt那麼快,但也基本能接受了。由於咱們畢竟不是整個網站的全部用戶請求的command都是放在一個隊列,若是咱們的網站用戶量很大,那確定會用web服務器集羣,且每一個集羣機器上都會有不止一個command queue,因此,單個command queue裏的消息咱們能夠控制爲不會太多,並且,單個command queue裏的消息都是放在不一樣的mongodb collection中存儲;固然持久化瓶頸永遠是IO,因此真的要快,那隻能一個獨立的mongodb server上設計一個collection,該collection存放一個command queue裏的消息;其餘的command queue的消息就也採用這樣的作法放在另外的mongodb server上;這樣就能作到IO的並行,從而根本上提升持久化速度。可是這樣作代價很大的,可能須要好多機器呢,整個系統有多少個queue,那就須要多少臺機器,呵呵。總而言之,持久化方面,咱們仍是有一些辦法能夠去嘗試,還有優化的餘地。
再回過頭來簡單說一下,採用mongodb來持久化消息的實現思路:入隊的時候持久化消息,出隊的時候刪除該消息;這樣當機器重啓時,要查看某個隊列有多少消息,只要經過一個簡單的查詢返回mongodb collection中當前存在的消息便可。這種作法設計簡單,穩定,性能方面目前應該還能夠接受。因此,目前enode就是採用這種方法來持久化全部enode用到的內存隊列的消息。
代碼示意,有興趣的能夠看看:
public abstract class QueueBase<T> : IQueue<T> where T : class, IMessage { #region Private Variables private IMessageStore _messageStore; private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>()); private ReaderWriterLockSlim _enqueueLocker = new ReaderWriterLockSlim(); private ReaderWriterLockSlim _dequeueLocker = new ReaderWriterLockSlim(); #endregion public string Name { get; private set; } protected ILogger Logger { get; private set; } public QueueBase(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentNullException("name"); } Name = name; _messageStore = ObjectContainer.Resolve<IMessageStore>(); Logger = ObjectContainer.Resolve<ILoggerFactory>().Create(GetType().Name); } public void Initialize() { _messageStore.Initialize(Name); var messages = _messageStore.GetMessages<T>(Name); foreach (var message in messages) { _queue.Add(message); } OnInitialized(messages); } protected virtual void OnInitialized(IEnumerable<T> initialQueueMessages) { } public void Enqueue(T message) { _enqueueLocker.AtomWrite(() => { _messageStore.AddMessage(Name, message); _queue.Add(message); }); } public T Dequeue() { return _queue.Take(); } public void Complete(T message) { _dequeueLocker.AtomWrite(() => { _messageStore.RemoveMessage(Name, message); }); } }
思路應該很容易想到,就是先把消息從內存隊列dequeue出來,而後交給消費者處理,而後由消費者告訴咱們當前消息是否被處理了,若是沒被處理好,那須要嘗試重試處理,若是重試幾回後仍是不行,那也不能把消息丟棄了,但也不能無休止的一直只處理這個消息,因此須要把該消息丟到另外一個專門用於處理須要重試的本地純內存隊列。若是消息被處理成功了,那就把該消息從持久化設備中刪除便可。看一下代碼比較清晰吧:
private void ProcessMessage(TMessageExecutor messageExecutor) { var message = _bindingQueue.Dequeue(); if (message != null) { ProcessMessageRecursively(messageExecutor, message, 0, 3); } } private void ProcessMessageRecursively(TMessageExecutor messageExecutor, TMessage message, int retriedCount, int maxRetryCount) { var result = ExecuteMessage(messageExecutor, message); //這裏表示在消費(即處理)消息 //若是處理成功了,就通知隊列從持久化設備刪除該消息,經過調用Complete方法實現 if (result == MessageExecuteResult.Executed) { _bindingQueue.Complete(message); } //若是處理失敗了,就重試幾回,目前是3次,若是仍是失敗,那就丟到一個重試隊列,進行永久的定時重試 else if (result == MessageExecuteResult.Failed) { if (retriedCount < maxRetryCount) { _logger.InfoFormat("Retring to handle message:{0} for {1} times.", message.ToString(), retriedCount + 1); ProcessMessageRecursively(messageExecutor, message, retriedCount + 1, maxRetryCount); } else { //這裏是丟到一個重試隊列,進行永久的定時重試,目前是每隔5秒重試一下,_retryQueue是一個簡單的內存隊列,也是一個BlockingCollection<T> _retryQueue.Add(message); } } }
代碼應該很清楚了,我就很少作解釋了。
本文主要介紹了enode框架中消息隊列的設計思路,由於enode中有command queue和event queue,兩種queue,因此邏輯是相似的;因此原本還想討論一下如何抽象和設計這些queue,已去掉重複代碼。但時間不早了,下次再詳細講吧。