最近花了我幾個月的業餘時間,對EQueue作了一個重大的改造,消息持久化採用本地寫文件的方式。到如今爲止,總算完成了,因此第一時間寫文章分享給你們這段時間我所積累的一些成果。html
昨天,我寫過一篇關於EQueue 2.0性能測試結果的文章,有興趣的能夠看看。git
文章地址:http://www.cnblogs.com/netfocus/p/4926305.html程序員
以前EQueue的消息持久化是採用SQL Server的。一開始我以爲沒什麼問題,採用的是異步定時批量持久化,使用SqlBulkCopy的方法,這個方法測試下來,批量插入消息的性能還不錯,就決定使用了。一開始我並無在使用到EQueue後作集成的性能測試。在功能上確實沒什麼問題了。並且使用DB持久化也有不少好處,好比消息查詢很簡單,DB天生支持各類方式的查詢。刪除消息也很是簡單,一條DELETE語句便可。因此功能實現比較順利。但後來當我對EQueue作性能測試時,發現一些問題。當數據庫服務器和Broker自己部署在不一樣的服務器上時,持久化消息也會走網卡,消耗帶寬,影響消息的發送和消費的TPS。而若是數據庫服務器部署在Broker同一臺服務器上,則由於SQLServer自己也會消耗CPU以及內存,也會影響Broker的消息發送和消費的TPS。另外SqlBulkCopy的速度,再自己機器正在接收大量的發送消息和拉取消息的請求時,會不太穩定。通過一些測試,發現整個EQueue Broker的性能不太理想。而後又想一想,Broker服務器有有一個硬件一直沒有好好利用起來,那就是硬盤。假設咱們的消息是持久化到本地硬盤的,順序寫文件,就應該能解決SQL Server的問題了。因此,開始調研如何實現文件持久化消息的方案了。github
以前消息存儲在SQL Server,若是消費者每次讀取消息時,老是從數據庫去讀取,那對數據庫就是不斷的寫入和讀取,性能不太理想。因此當初的思路是,儘可能把最近可能要被消費的消息緩存在本地內存中。當初的作法是設計了一個很大的ConcurrentDictionary<long, Message>,這個字典就是存放了全部可能會被消費的消息。若是要消費的消息當前不在這個字典裏,就批量從DB拉取一批出來消費。這個設計能夠儘量的避免讀取DB的狀況。可是帶來了另外一個問題。就是咱們對這個字典在高併發不斷的寫入和讀取。且這個字典裏緩存的消息又不少,到到達幾百上千萬時,GC的壓力過大,致使不少線程都會被阻塞。嚴重影響Broker的TPS。算法
因此,基於上面的兩個主要緣由,我想到了兩個思路來解決:1)採用寫文件的方式來持久化消息;2)使用非託管內存來緩存將要被消費的消息;下面咱們來看看這兩個設計的一些關鍵問題的設計思路。數據庫
以前一直沒法駕馭寫文件的設計。由於精細化的將數據寫入文件,並能要精確的讀取想要的數據,實在沒什麼經驗。以前雖然也知道阿里的RocketMQ的消息持久化也是採用順序寫文件的方式的,可是看了代碼,發現設計很複雜,一會兒也比較難懂。嘗試看了屢次也沒法徹底理解。因此一直沒法掌握這種方式。有一天不經意間想到以前看過的EventStore這個開源項目中,也有寫文件的設計。這個項目是CQRS架構之父greg young所主導的開源項目,是一個專門爲ES(Event Sourcing)設計模式中提供保存事件流支持的事件流存儲系統。因而下定決心專研其源碼,看C#代碼確定仍是比Java容易,呵呵。通過一段時間的摸索以後,基本學到了它是如何寫文件以及如何讀文件的。瞭解了不少設計思路。而後,在看懂了EventStore的文件存儲設計以後,再去看RocketMQ的文件持久化的設計,發現驚人的類似。原來看不懂的代碼如今也能看懂了,由於思路差很少的。因此,這給我開始動手提供了很大的信心。通過本身的一些準備(文件讀寫的性能驗證)和設計思路整理後,終於開始動手了。設計模式
其實說出來也很簡單。以前一直覺得寫文件就是一個消息一行唄。這樣當咱們要找哪一個消息時,只須要知道行號便可。確實,理論上這樣也挺好。但上面這兩個開源項目都不是這樣作的,而是都是採用更精細化的直接寫二進制的方式。搞清楚寫入的格式以後,還要考慮一個文件寫不下的時候怎麼辦?由於一個文件老是有大小的,好比1G,那超過1G後,必然要建立新的文件,而後把消息寫入新的文件。因此,咱們就又有了Chunk的概念。一個Chunk就是一個文件,假設咱們如今實現了一個FileMessageStore,表示對文件持久化的封裝,那這個FileMessageStore確定維護了一堆的Chunk。而後咱們也很容易想到一點,就是Chunk有3種狀態:1)New,表示剛建立的Chunk,這種Chunk咱們能夠寫入新消息進去;2)Completed,已寫入完成的Chunk,這種Chunk是隻讀的;3)OnGoing的Chunk,就是當FileMessageStore初始化時,要從磁盤的某個chunk的目錄下加載全部的Chunk文件,那不難理解,最後一個文件以前的Chunk文件應該都是Completed的;最後一個Chunk文件可能寫入了一半,就是以前沒徹底用完的。因此,本質上New和Ongoing的Chunk實際上是同樣的,只是初始化的方式不一樣。數組
至此,咱們知道了寫文件的兩個關鍵思路:1)按二進制寫;2)拆分爲Chunk文件,且每一個Chunk文件有狀態;按二進制寫主要的思路是,假如咱們當前要寫入的消息的二進制數組大小爲100個字節,也就是說消息的長度爲100,那咱們能夠先把消息的長度寫入文件,再接着寫入消息自己。這樣咱們讀取消息時,只要知道了寫入消息長度時的那個Position,就能先讀取到消息的長度,而後就能知道接下來要讀取多少字節爲消息內容。從而能正確讀取消息出來。緩存
另外再分享一點,EventStore中,寫入一個事件到文件中時,還會在寫入消息內容後再寫入這個消息的長度到文件裏。也就是說,寫入一個數據到文件時,會在頭尾都寫入該數據的長度。這樣作的好處是什麼呢?就是當咱們想從後往前讀數據時,也能方便的作到,由於每一個數據的先後都記錄了該數據的長度。這點應該不難理解吧?而EventStore是一個面向流的存儲系統,咱們對事件流確實可能從前日後讀,也多是從後往前讀。另外這個設計還有一個好處,就是起到了校驗數據合法性的目的。當咱們根據長度讀取數據後,再數據以後再讀取一個長度,若是這兩個長度一致,那數據應該就沒問題的。在RocketMQ中,是經過CRC校驗的方式來保證讀取的數據沒有問題。我我的仍是比較喜歡EventStore的作法。因此EQueue裏如今寫入數據就是這樣作的。安全
上面我介紹了一種寫入不定長數據到文件的設計思路,這種設計是爲了解決寫入消息到文件的狀況,由於消息的長度是不定的。在EQueue中,咱們還有一另外一種寫文件的場景。就是隊列信息的持久化。EQueue的架構是一個Topic下有多個Queue,每一個Queue裏有不少消息,消費者負載均衡是經過給消費者分配均勻數量的Queue的方式來達到的。這樣咱們只要確保寫入Queue的消息是均勻的,那每一個Consumer消費到的消息數就是均勻的。那一個Queue裏記錄的是什麼呢?就是一個消息和其在隊列的位置的對應關係。假設消息寫入在文件的物理位置爲10000,而後這個消息在Queue裏的索引是100,那這個隊列就會把這兩個位置對應起來。這樣當咱們要消費這個Queue中索引爲100的消息時,就能找到這個消息在文件中的物理位置爲10000,就能根據這個位置找到消息的內容了。若是是託管內存,咱們只須要弄一個Dictionary,key是消息在隊列中的Offset,value是消息在文件中的物理Offset便可。這樣咱們有了這個dict,就能輕鬆創建起對應關係了。但上面我說過,這種巨大的dict是要佔用內存的,會有GC的問題。因此更好的辦法是,把這個對應關係也寫入文件,那怎麼作呢?這時就又須要更精細化的設計了。想到了其實也很簡單,這個設計我是從RocketMQ中學到的。就是咱們設計一種固定長度的結構體,這個結構體裏就存放一個數據,就是消息在文件的物理位置(爲了後面好表達,我命名爲MessagePosition),一個Long值,一個Long的長度是8個字節。也就是說,這個文件中,每一個寫入的數據的長度都是8個字節。假設咱們一個文件要保存100W個MessagePosition。那這個文件的長度就是100W * 8這麼多字節,大概爲7.8MB。那麼這樣作有什麼好處呢?好處就是,假如咱們如今要消費這個Queue裏的第一個消息,那這個消息的MessagePosition在這個文件中的位置0,第二個消息在這個文件中的位置是8,第三個就是16,以此類推,第N 個消息就是(N-1) * 8。也就是說,咱們無須顯式的把消息在隊列中的位置信息也寫入到文件,而是經過這樣的固定算法,就能精確的算出Queue中某個消息的MessagePosition是寫入在文件的哪一個位置。而後拿到了MessagePosition以後,就能從Message的Chunk文件中讀取到這個消息了。
經過上面的分析,咱們知道了,Producer發送一個消息到Broker時,Broker會寫兩次磁盤。一次是現將消息自己寫入磁盤(Message Chunk裏),另外一次是將消息的寫入位置寫入到磁盤(Queue Chunk裏)。細心的朋友可能會問,假如我第一次寫入成功,但第二次寫入時失敗,好比正好機器斷電或者當前Broker服務器正好出啥問題 了,沒有寫入成功。那怎麼辦呢?這個沒有什麼大的影響。由於首先這種狀況會被認爲是消息發送失敗。因此Producer還會從新發送該消息,而後Broker收到消息後還會再作一次這兩個寫入操做。也就是說,第一次寫入的消息內容永遠也不會用到了,由於那個寫入位置永遠也不會在Queue Chunk裏有記錄。
下面的代碼展現了寫消息到文件的核心代碼:
//消息寫文件須要加鎖,確保順序寫文件 MessageStoreResult result = null; lock (_syncObj) { var queueOffset = queue.NextOffset; var messageRecord = _messageStore.StoreMessage(queueId, queueOffset, message); queue.AddMessage(messageRecord.LogPosition, message.Tag); queue.IncrementNextOffset(); result = new MessageStoreResult(messageRecord.MessageId, message.Code, message.Topic, queueId, queueOffset, message.Tag); }
StoreMessage方法內部實現:
public MessageLogRecord StoreMessage(int queueId, long queueOffset, Message message) { var record = new MessageLogRecord( message.Topic, message.Code, message.Body, queueId, queueOffset, message.CreatedTime, DateTime.Now, message.Tag); _chunkWriter.Write(record); return record; }
queue.AddMessage方法的內部實現:
public void AddMessage(long messagePosition, string messageTag) { _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.GetHashcode2())); }
ChunkWriter的內部實現:
public long Write(ILogRecord record) { lock (_lockObj) { if (_isClosed) { throw new ChunkWriteException(_currentChunk.ToString(), "Chunk writer is closed."); } //若是當前文件已經寫完,則須要新建文件 if (_currentChunk.IsCompleted) { _currentChunk = _chunkManager.AddNewChunk(); } //先嚐試寫文件 var result = _currentChunk.TryAppend(record); //若是當前文件已滿 if (!result.Success) { //結束當前文件 _currentChunk.Complete(); //新建新的文件 _currentChunk = _chunkManager.AddNewChunk(); //再嘗試寫入新的文件 result = _currentChunk.TryAppend(record); //若是仍是不成功,則報錯 if (!result.Success) { throw new ChunkWriteException(_currentChunk.ToString(), "Write record to chunk failed."); } } //若是須要同步刷盤,則當即同步刷盤 if (_chunkManager.Config.SyncFlush) { _currentChunk.Flush(); } //返回數據寫入位置 return result.Position; } }
固然,我上面爲了簡化問題的複雜度。因此沒有引入關於如何根據某個全局的MessagePosition找到其在哪一個Message Chunk的問題。這個其實也很好作,咱們首先固定好每一個Message Chunk文件的大小。好比大小爲256MB,而後咱們爲每一個Chunk文件設計一個ChunkHeader,每一個Chunk文件老是先把這個ChunkHeader寫入文件,這個Header裏記錄了這個文件的起始位置和結束位置,以及文件的大小。這樣咱們根據某個MessagePosition計算其在哪一個Chunk文件時,只須要把這個MessagePositon對Chunk的大小作取摸操做便可。根據數據的位置找其在哪一個Chunk的代碼看起來以下面這樣這樣:
public Chunk GetChunkFor(long dataPosition) { var chunkNum = (int)(dataPosition / _config.GetChunkDataSize()); return GetChunk(chunkNum); } public Chunk GetChunk(int chunkNum) { if (_chunks.ContainsKey(chunkNum)) { return _chunks[chunkNum]; } return null; }
代碼很簡單,就很少講了。拿到了Chunk對象後,咱們就能夠把dataPosition傳給Chunk,而後Chunk內部把這個全局的dataPosition轉換爲本地的一個位置,就能準確的定位到這個數據在當前Chunk文件的實際位置了。將全局位置轉換爲本地的位置的算法也很簡單直接:
public int GetLocalDataPosition(long globalDataPosition) { if (globalDataPosition < ChunkDataStartPosition || globalDataPosition > ChunkDataEndPosition) { throw new Exception(string.Format("globalDataPosition {0} is out of chunk data positions [{1}, {2}].", globalDataPosition, ChunkDataStartPosition, ChunkDataEndPosition)); } return (int)(globalDataPosition - ChunkDataStartPosition); }
只須要把這個全局的位置減去當前Chunk的數據開始位置,就能知道這個全局位置相對於當前Chunk的本地位置了。
好了,上面介紹了消息如何寫入的主要思路以及如何讀取數據的思路。
另一點還想提一下,就是關於刷盤的策略。通常咱們寫數據到文件後,是須要調用文件流的Flush方法的,確保數據最終刷入到了磁盤上。不然數據就仍是在緩衝區裏。固然,咱們須要注意到,即使調用了Flush方法,數據可能也還沒真正邏輯到磁盤,而只是在操做系統內部的緩衝區裏。這個咱們就沒法控制了,咱們能作到的是調用了Flush方法便可。那當咱們每次寫入一個數據到文件都要調用Flush方法的話,無疑性能是低下的,因此就有了所謂的異步刷盤的設計。就是咱們寫入消息後不當即調用Flush方法,而是採用一個獨立的線程,定時調用Flush方法來實現刷盤。目前EQueue支持同步刷盤和異步刷盤,開發者能夠本身配置決定採用哪種。異步刷盤的間隔默認是100ms。當咱們在追求高吞吐量時,應該考慮異步刷盤,但要求數據可靠性更高但對吞吐量能夠低一點時,則可使用同步刷盤。若是又要高吞吐又要數據高可靠,那就只有一個辦法了,呵呵。就是多增長一些Broker機器,經過集羣來彌補單臺Broker寫入數據的瓶頸。
假設咱們如今要從一個文件讀取數據,且是多線程併發的讀取,要怎麼設計?一個辦法是,每次讀取時,建立文件流,而後建立StreamReader,而後讀取文件,讀取完成後釋放StreamReader並關閉文件流。但每次要讀取文件的一個數據都要這樣作的話性能不是太好,由於咱們反覆的建立這樣的對象。因此,這裏咱們可使用對象池的概念。就是Chunk內部,預先建立好一些Reader,當須要讀文件時,獲取一個可用的Reader,讀取完成後,再把Reader歸還到對象池裏。基於這個思路,我設計了一個簡單的對象池:
private readonly ConcurrentQueue<ReaderWorkItem> _readerWorkItemQueue = new ConcurrentQueue<ReaderWorkItem>(); private void InitializeReaderWorkItems() { for (var i = 0; i < _chunkConfig.ChunkReaderCount; i++) { _readerWorkItemQueue.Enqueue(CreateReaderWorkItem()); } _isReadersInitialized = true; } private ReaderWorkItem CreateReaderWorkItem() { var stream = default(Stream); if (_isMemoryChunk) { stream = new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength); } else { stream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None); } return new ReaderWorkItem(stream, new BinaryReader(stream)); } private ReaderWorkItem GetReaderWorkItem() { ReaderWorkItem readerWorkItem; while (!_readerWorkItemQueue.TryDequeue(out readerWorkItem)) { Thread.Sleep(1); } return readerWorkItem; } private void ReturnReaderWorkItem(ReaderWorkItem readerWorkItem) { _readerWorkItemQueue.Enqueue(readerWorkItem); }
當一個Chunk初始化時,咱們預先初始化好固定數量(可配置)的Reader對象,並把這些對象放入一個ConcurrentQueue裏(對象池的做用),而後要讀取數據時,從從ConcurrentQueue裏拿一個可用的Reader便可,若是當前併發過高拿不到怎麼辦,就等待直到拿到爲止,目前我是等待1ms後繼續嘗試拿,直到最後拿到爲止。而後ReturnReaderWorkItem就是數據讀取完以後歸還Reader到對象池。就是否是很簡單哦。這樣的設計,能夠避免不斷的建立文件流和Reader對象,能夠避免GC的反作用。
你們知道,當Broker重啓時,咱們是須要掃描磁盤上Chunk目錄下的全部Chunk文件的。那怎麼掃描呢?上面其實我也簡單提到過。首先,咱們能夠對每一個Chunk文件的文件名的命名定義一個規則,第一個Chunk文件的文件名好比爲:message-chunk-000000000,第二個爲:message-chunk-000000001,以此類推。那咱們掃描時,只要先把全部的文件名獲取到,而後對文件名升序排序。那最後一個文件以前的文件確定都是寫入徹底了的,即上面我說的Completed狀態的,而最後一個文件是尚未寫入完的,還能夠接着寫。因此咱們初始化時,只須要先初始化最後一個以前的全部Chunk文件,最後再初始化最後一個文件便可。這裏我所說的初始化不是要把整個Chunk文件的內容都加載到內存,而是隻是讀取這個文件的ChunkHeader的信息維護在內存便可。有了Header信息,咱們就能夠爲後續的數據讀取提供位置計算了。因此,整個加載過程是很快的,讀取100個Chunk文件的ChunkHeader也不過一兩秒的時間,徹底不影響Broker的啓動時間。對於初始化Completed的Chunk比較簡單,只須要讀取ChunkHeader信息便可。可是初始化最後一個文件比較麻煩,由於咱們還要知道這個文件當前寫入到哪裏了?從而咱們能夠從這個位置的下一個位置接着往下寫。那怎麼知道這個文件當前寫入到哪裏了呢?其實比較複雜。有不少技術,我看到RocketMQ和EventStore這兩個開源項目中都採用了Checkpoint的技術。就是當咱們每次寫入一個數據到文件後,都會更新一下Checkpoint,即表示當前寫入到這個文件的哪裏了。而後這個Checkpoint值咱們也是定時異步保存到某個獨立的小文件裏,這個文件裏只保存了這個Checkpoint。這樣的設計有一個問題,就是假如數據寫入了,但因爲Checkpoint的保存不是實時的,因此理論上會出現Checkpint值會小於實際文件寫入的位置的狀況。通常咱們忽略這種狀況便可,便可能會存在初始化時,下次寫入可能會覆蓋必定的以前已經寫入的數據,由於Checkpoint多是稍微老一點的。
而我在設計時,但願能再嚴謹一點,取消Checkpoint的設計,而是採用在初始化Ongoing狀態的Chunk文件時,從文件的頭開始不斷往下讀,當最後沒法往下讀時,咱們就知道這個文件咱們當前寫入到哪裏了。那怎麼知道沒法往下讀了呢?也就是說怎麼肯定後續的文件內容不是咱們寫入的?也很簡單。對於不固定數據長度的Chunk來講,因爲咱們每次寫入一個數據時都是同時在先後寫入這個數據的長度;因此咱們再初始化讀取這個文件時,能夠藉助這一點來校驗,但出現不符合這個規則的數據時,就認爲後續不是正常的數據了。對於固定長度的Chunk來講,咱們只要保證每次寫入的數據的數據是非0了。而對於EQueue的場景,固定數據的Chunk裏存儲的都是消息在Message Chunk中的全局位置,一個Long值;但這個Long值咱們正常是從0開始的,怎麼辦呢?很簡單,咱們寫入MessagePosition時,老是加1便可。即假如當前的MessagePosition爲0,那咱們實際寫入1,若是爲100,則實際寫入的值是101。這樣咱們就能確保這個固定長度的Chunk文件裏每一個數據都是非0的。而後咱們在初始化這樣的Chunk文件時,只要不斷讀取固定長度(8個字節)的數據,當出現讀取到的數據爲0時,就認爲已經到頭了,即後續的不是咱們寫入的數據了。而後咱們就能知道接下來要從哪裏開始讀取了哦。
上面我介紹瞭如何讀文件的思路。咱們也知道了,咱們是在消費者要消費消息時,從文件讀取消息的。但對從文件讀取消息老是沒有比從內存讀取消息來的快。咱們前面的設計都沒有把內存好好利用起來。因此咱們可否考慮把將來可能要消費的Chunk文件的內容直接緩存在內存呢?這樣咱們就能夠避免對文件的讀取了。確定能夠的。那怎麼作呢?前面我提升多,曾經咱們用託管內存中的ConcurrentDictionary<long, Message>這樣的字典來緩存消息。我也提到這會帶來垃圾回收而影響性能的問題。因此咱們不能直接這樣簡單的設計。通過個人一些嘗試,以及從EventStore中的源碼中學到的,咱們可使用非託管內存來緩存Chunk文件。咱們可使用Marshal.AllocHGlobal來申請一塊完整的非託管內存,而後再須要釋放時,經過Marshal.FreeHGlobal來釋放。而後,咱們能夠經過UnmanagedMemoryStream來訪問這個非託管內存。這個是核心思路。那麼怎樣把一個Chunk文件緩存到非託管內存呢?很簡單了,就是掃描這個文件的全部內容,把內容都寫入內存便可。代碼以下:
private void LoadFileChunkToMemory() { using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 8192, FileOptions.None)) { var cachedLength = (int)fileStream.Length; var cachedData = Marshal.AllocHGlobal(cachedLength); try { using (var unmanagedStream = new UnmanagedMemoryStream((byte*)cachedData, cachedLength, cachedLength, FileAccess.ReadWrite)) { fileStream.Seek(0, SeekOrigin.Begin); var buffer = new byte[65536]; int toRead = cachedLength; while (toRead > 0) { int read = fileStream.Read(buffer, 0, Math.Min(toRead, buffer.Length)); if (read == 0) { break; } toRead -= read; unmanagedStream.Write(buffer, 0, read); } } } catch { Marshal.FreeHGlobal(cachedData); throw; } _cachedData = cachedData; _cachedLength = cachedLength; } }
代碼很簡單,不用多解釋了。須要注意的是,上面這個方法針對的是Completed狀態的Chunk,即已經寫入完成的Chunk的。已經寫入徹底的Chunk是隻讀的,不會再發生更改,因此咱們能夠隨便緩存在內存中。
那對於新建立出來的Chunk文件呢?正常狀況下,消費者來得及消費時,咱們老是在不斷的寫入最新的Chunk文件,也在不斷的從這個最新的Chunk文件讀取消息。那咱們怎麼確保消費最新的消息時,也不須要從文件讀取呢?也很簡單,就是在新建一個Chunk文件時,若是內存足夠,也同時建立一個同樣大小的基於非託管內存的Chunk。而後咱們再寫入消息到文件Chunk成功後,再同時寫入這個消息到非託管內存的Chunk。這樣,咱們在消費消息,讀取消息時老是首先判斷當前Chunk是否關聯了一個非託管內存的Chunk,若是有,就優先從內存讀取便可。若是沒有才從文件Chunk讀取。
可是從文件讀取時,可能會遇到一個問題。就是咱們剛寫入到文件的數據可能沒法當即讀取到。由於寫入的數據沒有當即刷盤,因此沒法經過Reader讀取到。因此,咱們不能僅經過判斷當前寫入的位置來判斷當前是否還有數據能夠被讀取,而是考慮當前的最後一次刷盤的位置。理論上只能讀取刷盤以前的數據。但即使這樣設計了,在若是當前硬盤不是SSD的狀況下,好像也會出現讀不到數據的問題。偶爾會報錯,有朋友在測試時已經遇到了這樣的問題。那怎麼辦呢?我想了一個辦法。由於這種狀況歸根接地仍是由於咱們邏輯上認爲已經寫入到文件的數據因爲未及時刷盤或者操做系統自己的內部緩存的問題,致使數據未能及時寫入磁盤。出現這種狀況必定是最近的一些數據。因此咱們若是可以把好比最近寫入的10000(可配置)個數據都緩存在本地託管內存中,而後讀取時先看本地緩存的託管內存中有沒有這個位置的數據,若是有,就不須要讀文件了。這樣就能很好的解決這個問題了。那怎麼確保咱們只緩存了最新的10000個數據且不會超出10000個呢?答案是環形隊列。這個名字聽起來很高大上,其實就是一個數組,數組的長度爲10000,而後咱們在寫入數據時,咱們確定知道這個數據在文件中的位置的,咱們能夠把這個位置(一個long值)對10000取摸,就能知道該把這個數據緩存在這個數組的哪一個位置了。經過這個設計確保緩存的數據不會超過10000個,且確保必定只緩存最新的數據,若是新的數據保存到數組的某個下標時,該下標已經存在之前已經保存過的數據了,就自動覆蓋掉便可。因爲這個數組的長度不是很長,因此每什麼GC的問題。
可是光這樣還不夠,咱們這個數組中的每一個元素至少要記錄這個元素對應的數據在文件中的位置。這個是爲了咱們在從數組中獲取到數據後,進一步校驗這個數據是不是我想要的那個位置的數據。這點你們應該能夠理解的吧。下面這段代碼展現瞭如何從環形數組中讀取想要的數據:
if (_cacheItems != null) { var index = dataPosition % _chunkConfig.ChunkLocalCacheSize; var cacheItem = _cacheItems[index]; if (cacheItem != null && cacheItem.RecordPosition == dataPosition) { var record = readRecordFunc(cacheItem.RecordBuffer); if (record == null) { throw new ChunkReadException( string.Format("Cannot read a record from data position {0}. Something is seriously wrong in chunk {1}.", dataPosition, this)); } if (_chunkConfig.EnableChunkReadStatistic) { _chunkStatisticService.AddCachedReadCount(ChunkHeader.ChunkNumber); } return record; } }
_cacheItems是當前Chunk內的一個環形數組,而後假如當前咱們要讀取的數據的位置是dataPosition,那咱們只須要先對環形數據的長度取摸,獲得一個下標,即上面代碼中的index。而後就能從數組中拿到對應的數據了,而後若是這個數據存在,就進一步判斷這個數據dataPosition是否和要求的dataPosition,若是一致,咱們就能肯定這個數據確實是咱們想要的數據了。就能夠返回了。
因此,經過上面的兩種緩存(非託管內存+託管內存環形數組)的設計,咱們能夠確保幾乎不用再從文件讀取消息了。那何時仍是會從文件讀取呢?就是在1)內存不夠用了;2)當前要讀取的數據不是最近的10000個;這兩個前提下,纔會從文件讀取。通常咱們線上服務器,確定會保證內存是可用的。EQueue如今有兩個內存使用的水位。一個是當物理內存使用到多少百分比(默認值爲40%)時,開始清理已經再也不活躍的Chunk文件的非託管內存Chunk;那什麼是不活躍呢?就是在最近5s內沒有發生過讀寫的Chunk。這個設計我以爲是很是有效的,由於假如一個Chunk有5s沒有發生過讀寫,那通常確定是沒有消費者在消費它了。另外一個水位是指,最多EQueue Broker最多使用物理內存的多少百分比(默認值爲75%),這個應該好理解。這個水位是爲了保證EQueue不會把全部物理內存都吃光,是爲了確保服務器不會由於內存耗盡而宕機或致使服務不可用。
那何時會出現大量使用服務器內存的狀況呢?咱們能夠推導出來的。正常狀況下,消息寫入第一個Chunk,咱們就在讀取第一個Chunk;寫入第二個Chunk咱們也會跟着讀取第二個Chunk;假設當前寫入到了第10個Chunk,那理論上前面的9個Chunk以前緩存的非託管內存均可以釋放了。由於確定超過5s沒有發生讀寫了。可是假如如今消費者有不少,且每一個消費者的消費進度都不一樣,有些很快,有些很慢,當全部的消費者的消費進度正好覆蓋到全部的Chunk文件時,就意味着每一個Chunk文件都在發生讀取。也就是說,每一個Chunk都是活躍的。那此時就沒法釋聽任何一個Chunk的非託管內存了。這樣就會致使佔用大量非託管內存了。但因爲75%的水位的設計,Broker內存的使用是不會超過物理內存75%的。在建立新的Chunk或者嘗試緩存一個Completed的Chunk時,老是會判斷當前使用的物理內存是否已經超過75%,若是已經超過,就不會分配對應的非託管內存了。
刪除消息的設計比較簡單。主要的思路是,當咱們的消息已經被全部的消費者都消費過了,且知足咱們的刪除策略了,就能夠刪除了。RocketMQ刪除消息的策略比較粗暴,沒有考慮消息是否經被消費,而是直接到了必定的時間就刪除了,好比最多隻保留2天。這個是RocketMQ的設計。EQueue中,會確保消息必定是被全部的消費者都消費了纔會考慮刪除。而後目前我設計的刪除策略有兩種:
實際上,應該可能還有一些需求但願能把兩個策略合起來考慮的。這個目前我沒有作,我以爲這兩種應該夠了。若是你們想作,能夠本身擴展的。
另外,上面我說過EQueue中目前有兩種Chunk文件,一種是存儲消息自己的,我叫作Message Chunk;一種是存儲隊列信息的,我叫作Queue Chunk;而Queue Chunk的數據是依賴於Message Chunk的。上面我說的兩種刪除策略是針對Message Chunk而言的。而Queue Chunk,因爲這個依賴性,我以爲比較合理的方式是,只須要判斷當前Queue Chunk中的全部的消息對應的Message Chunk是否已經都刪除了,若是是,難說明這個Queue Chunk也已經沒意義了,就能夠刪除了。但只要這個Queue Chunk中至少還有一個消息的Chunk文件沒刪除,那這個Queue Chunk就不會刪除。
上面這個只是思路哦,真實的代碼確定比這個複雜,呵呵。有興趣的朋友仍是要看代碼的。
以前用SQL Server的方式,因爲DB很容易查消息,因此查詢消息不是大問題。可是如今咱們的消息是放在文件裏的,那要怎麼查詢呢?目前支持根據消息ID來查詢。當Producer發送一個消息到Broker,Broker返回結果裏會包含消息的ID。Producer的正確作法應該是要用日誌或其餘方式記錄這個ID,並最好和本身的當前業務消息的某個業務ID一塊兒記錄,好比CommandId或者EventId,這樣咱們就能根據咱們的業務ID找到消息ID,而後根據消息ID找到消息內容了。那消息ID如今是怎麼設計的呢?也是受到RocketMQ的啓發,消息ID由兩部分組成:1)Broker的IP;2)消息在Broker的文件中的全局位置;這樣,當咱們要根據某個消息ID查詢時,就能夠先定位到這個消息在哪一個Broker上,也同時知道了消息在文件的哪一個位置了,這樣就能最終讀取這個消息的內容了。
爲何要這樣設計呢?若是咱們的Broker沒有集羣,那其實不須要包含Broker的IP;這個設計是爲了將來EQueue Broker會支持集羣的,那個時候,咱們就必需要知道某個消息ID對應的Broker是哪一個了。
EQueue中,每一個Queue,都會有一個對應的Consumer。消費進度就是這個Queue當前被消費到哪裏了,一個Offset值。好比Offset爲100,就表示當前這個Queue已經消費到第99(由於是從0開始的)個位置的消息了。由於一個Broker上有不少的Queue,好比有100個。而咱們如今是使用文件的方式來存儲信息了。因此天然消費進度也是用文件了。但因爲消費進度的信息不多,也不是遞增的形式。因此咱們能夠簡單設計,目前EQueue採用一個文件的方式來保存全部Queue的消費進度,文件內容爲JSON,JSON裏記錄了每一個Queue的消費進度。文件內容看起來像下面這樣:
{"SampleGroup":{"topic1-3":89055,"topic1-2":89599,"topic1-1":89471,"topic1-0":89695}}
上面的JSON標識一個名爲SampleGroup的ConsumerGroup,他消費了一個名爲topic1的topic,而後這個topic下的每一個Queue的消費進度記錄了下來。若是有另外一個ConsumerGroup,也消費了這個topic,那消費進度是隔離的。若是還不清楚ConsumerGroup的同窗,要去看一下我以前寫的EQueue的文章了。
到目前爲止,還有沒有其餘可優化的大的地方呢?有。以前我作EQueue時,老是把消息從數據庫讀取出來,而後構造出消息對象,再把消息對象序列化爲二進制,再返回給Consumer。這裏涉及到從DB拿出來,再序列化爲二進制。學習了RocketMQ的代碼後,咱們能夠作的更聰明一點。由於其實基於文件存儲時,咱們從文件裏拿出來的已是二進制了。因此能夠直接把二進制返回給消費者便可。不須要先轉換爲對象再作序列化了。經過這個設計的改進,咱們如今的消費者消費消息,能夠說無任何瓶頸了,很是快。
在測試寫文件的這個版本時,咱們很但願知道每一個Chunk的讀寫狀況的統計,從而肯定設計是正確的。因此,我給EQueue的Chunk增長了實時統計Chunk讀寫的統計服務。目前咱們在運行EQueue自帶的例子時,Broker會每一個一秒打印出全部Chunk的讀寫狀況,這個特性極大的方便咱們判斷消息的發送和消費是否正常,消費是否有延遲等。
此次我給EQueue的Web後臺管理控制檯也完善了一下隊列的增長和減小的設計。增長隊列(即隊列的擴容)比較簡單,直接新增便可。可是當咱們要刪除一個隊列時,怎樣安全的刪除呢?主要是要確保刪除這個隊列時,已經沒有Producer或Consumer在使用這個隊列了。要怎麼作到呢?個人思路是,爲每一個Queue對象設計兩個屬性,表示對Producer是否可見,對Consumer是否可見。當咱們要刪除某個Queue時,能夠:1)先讓其對Producer不可見,這樣Producer後續就不會再發送新的消息到這個隊列了;而後等待,直到這個隊列裏的消息都被全部的消費者消費掉了;而後再設置爲對Consumer不可見。而後再過幾秒,確保每一個消費者都不會再向這個隊列發出拉取消息的請求了。這樣咱們就能安全的刪除這個隊列了。刪除隊列的邏輯大概如以下:
public void DeleteQueue(string topic, int queueId) { lock (this) { var key = QueueKeyUtil.CreateQueueKey(topic, queueId); Queue queue; if (!_queueDict.TryGetValue(key, out queue)) { return; } //檢查隊列對Producer或Consumer是否可見,若是可見是不容許刪除的 if (queue.Setting.ProducerVisible || queue.Setting.ConsumerVisible) { throw new Exception("Queue is visible to producer or consumer, cannot be delete."); } //檢查是否有未消費完的消息 var minConsumedOffset = _consumeOffsetStore.GetMinConsumedOffset(topic, queueId); var queueCurrentOffset = queue.NextOffset - 1; if (minConsumedOffset < queueCurrentOffset) { throw new Exception(string.Format("Queue is not allowed to delete, there are not consumed messages: {0}", queueCurrentOffset - minConsumedOffset)); } //刪除隊列的消費進度信息 _consumeOffsetStore.DeleteConsumeOffset(queue.Key); //刪除隊列自己,包括全部的文件 queue.Delete(); //最後將隊列從字典中移除 _queueDict.Remove(key); } }
代碼應該很簡單直接,很少解釋了。隊列的動態新增和刪除,能夠方便咱們線上應付在線活動時,隨時爲消費者提供更高的並行消費能力,以及活動結束後去掉多餘的隊列。是很是實用的功能。
這個功能,也是很是實用的。這個版本我加了上去。之前EQueue只有Topic的概念,沒有Tag的概念。Tag是對Topic的二級過濾。好比當某個Producer發送了3個消息,Topic都是topic,而後tag分別是01,02,03。而後Consumer訂閱了這個Topic,可是訂閱這個Topic時同時制定了Tag,好比指定爲02,那這個Consumer就只會收到一個消息。Tag爲01,03的消息是不會收到的。這個就是Tag的功能。我以爲Tag對咱們是很是有用的,它能夠極大的減小咱們定義Topic。原本咱們必需要定義一個新的Topic時,如今可能只須要定義一個Tag便可。關於Tag的實現,我就不展開了。
終於到最後一點了,終於堅持快寫完了,呵呵。EQueue Web後臺管理控制檯如今支持消息堆積的報警了。當EQueue Broker上當前全部未消費的消息數達到必定的閥值時,就會發送郵件進行報警。咱們能夠把咱們的郵件和咱們的手機短信進行綁定,好比移動的139郵箱我記得就有這個功能。這樣咱們就能第一時間知道Broker上是否有大量消息堆積了,可讓咱們第一時間處理問題。
這篇文章感受是我有史以來寫過的最有乾貨的一篇了,呵呵。一鼓作氣,也是對我前面幾個月的全部積累知識經驗的一次性釋放吧。但願能給你們一些幫助。我寫文章比較喜歡寫思路,不太喜歡介紹如何用。我以爲一個程序員,最重要的是要學會如何思考去解決本身想解決的問題。而不是別人直接告訴你如何去解決。經過作EQueue這個分佈式消息隊列,也算是我本身的一個實踐過程。我很是鼓勵你們寫開源項目哦,當你專一於實現某個你感興趣的開源項目時,你就會有目標性的去學習相關的知識,你的學習就不會迷茫,不會爲了學技術而學技術了。我在作EQuque時,要考慮各類東西,好比通訊層的設計、消息持久化、整個架構設計,等等。我以爲是很是鍛鍊人的。
一我的時間短暫,若是能用有限的時間作出好的東西能夠造福後人,那咱們來到這個世上也算沒白來了,你說對嗎?因此,咱們千萬不要放棄咱們的理想,雖然堅持理想很難,但也要堅持。