EQueue - 一個純C#寫的分佈式消息隊列介紹2

一年前,當我第一次開發完EQueue後,寫過一篇文章介紹了其總體架構,作這個框架的背景,以及架構中的全部基本概念。經過那篇文章,你們能夠對EQueue有一個基本的瞭解。通過了1年多的完善,EQueue不管是功能上仍是成熟性上都完善了很多。因此,但願再寫一篇文章,介紹一下EQueue的總體架構和關鍵特性。html

EQueue架構

EQueue是一個分佈式的、輕量級、高性能、具備必定可靠性,純C#編寫的消息隊列,支持消費者集羣消費模式。算法

主要包括三個部分:producer, broker, consumer。producer就是消息發送者;broker就是消息隊列服務器,負責接收producer發送過來的消息,以及持久化消息;consumer就是消息消費者,consumer從broker採用拉模式到broker拉取消息進行消費,具體採用的是long polling(長輪訓)的方式。這種方式的最大好處是可讓broker很是簡單,不須要主動去推消息給consumer,而是隻要負責持久化消息便可,這樣就減輕了broker server的負擔。同時,consumer因爲是本身主動去拉取消息,因此消費速度能夠本身控制,不會出現broker給consumer消息推的太快致使consumer來不及消費而掛掉的狀況。在消息實時性方面,因爲是長輪訓的方式,因此消息消費的實時性也能夠保證,實時性和推模型基本至關。sql

EQueue是面向topic的架構,和傳統的MSMQ這種面向queue的方式不一樣。使用EQueue,咱們不須要關心queue。producer發送消息時,指定的是消息的topic,而不須要指定具體發送到哪一個queue。一樣,consumer發送消息也是同樣,訂閱的是topic,不須要關心本身想從哪一個queue接收消息。而後,producer客戶端框架內部,會根據當前的topic獲取全部可用的queue,而後經過某種queue select strategy選擇一個queue,而後把消息發送到該queue;一樣,consumer端,也會根據當前訂閱的topic,獲取其下面的全部的queue,以及當前全部訂閱這個topic的consumer,按照平均的方式計算出當前consumer應該分配到哪些queue。這個分配的過程就是消費者負載均衡。數據庫

Broker的主要職責是:服務器

發送消息時:負責接收producer的消息,而後持久化消息,而後創建消息索引信息(把消息的全局offset和其在queue中的offset簡歷映射關係),而後返回結果給producer;網絡

消費消息時:負責根據consumer的pull message request,查詢一批消息(默認是一次pull request拉取最多32個消息),而後返回給consumer;架構

各位看官若是對EQueue中的一些基本概念還不太清楚,能夠看一下我去年寫的介紹1,寫的很詳細。下面,我想介紹一下EQueue的一些有特點的地方。負載均衡

EQueue關鍵特性

高性能與可靠性設計

網絡通訊模型,採用.NET自帶的SocketAsyncEventArgs,內部基於Windows IOCP網絡模型。發送消息支持async, sync, oneway三種模式,不管是哪一種模式,內部都是異步模式。當同步發送消息時,就是框架幫咱們在異步發送消息後,同步等待消息發送結果,等到結果返回後,才返回給消息發送者;若是必定時間還不返回,則報超時異常。在異步發送消息時,採用從EventStore開源項目中學習到的優秀的socket消息發送設計,目前測試下來,性能高效、穩定。經過幾個案例運行很長時間,沒有出現通訊層方面的問題。框架

broker消息持久化的設計。採用WAL(Write-Ahead Log)技術,以及異步批量持久化到SQL Server的方式確保消息高效持久化且不會丟。消息到達broker後,先寫入本地日誌文件,這種設計在db, nosql等數據庫中很常見,都是爲了確保消息或請求不丟失。而後,再異步批量持久化消息到SQL Server,採用.NET自帶的SqlBulkCopy技術。這種方式,咱們能夠確保消息持久化的實時性和很高的吞吐量,由於一條消息只要寫入本地日誌文件,而後放入內存的一個dict便可。異步

當broker意外宕機,可能會有一些消息還沒持久化到SQL Server;因此,咱們在重啓broker時,咱們除了先從SQL Server恢復全部未消費的消息到內存外,同時記錄當前SQL Server中的最後一條消息的offset,而後咱們從本地日誌文件掃描offset+1開始的全部消息,所有恢復到SQL Server以及內存。

須要簡單提一下的是,咱們在把消息寫入到本地日誌文件時,不可能所有寫入到一個文件,因此要拆文件。目前是根據log4net來寫消息日誌,每100MB一個日誌文件。爲何是100MB?是由於,咱們的這個消息日誌文件的用途主要是用來在Broker重啓時,恢復SQL Server中最後還沒來得及持久化的那些消息的。正常狀況下,這些消息量應該不會不少。因此,咱們但願,當掃描本地日誌文件時,儘可能能快速的掃描文件。一般100MB的消息日誌文件,已經能夠存儲很多的消息量,而SQL Server中未持久化的消息一般不會超過這個量,除非當機前,出現長時間消息沒法持久化的狀況,這種狀況,應該會被咱們監控到並及時發現,並採起措施。固然,每一個消息日誌文件的大小,能夠支持配置。另一點,就是從日誌文件恢復的時候,仍是須要有一個算法的,由於未被持久化的消息,有可能不僅在最近的一個消息日誌文件裏,有可能在多個日誌文件裏,由於就像前面所說,會出現大量消息沒有持久化到SQL Server的狀況。

但總之,在保證高性能的前提下,消息不丟(可靠性)是徹底能夠保證的。

消費消息方面,採用批量拉取消息進行消費的方式。默認consumer一個pull message request會最多拉取32個消息(只要存在這麼多未消費消息的話);而後consumer會並行消費這些消費,除了並行消費外,也能夠配置爲單線程線性消費。broker在查詢消息時,通常狀況未消費消息老是在內存的,只有有一種狀況不在內存,這個下面詳細分析。因此,查詢消息應該說很是快。

不過上面提到的消息可靠性,只能儘可能保證單機不丟消息。因爲消息是放在DB,以及本地日誌。因此,若是DB服務器硬盤壞了,或者broker的硬盤壞了,那就會有丟消息的可能性。要解決這個問題,就須要作replication了。EQueue下一步會支持broker的集羣和故障轉移(failover)。目前,我開發了一個守護進程服務,會監控broker進程是否掛掉,若是掛掉,則自動重啓,必定程度上也會提升broker的可用性。

我以爲,作事情,越簡單越好,不要一開始就搞的太複雜。複雜的東西,每每難以維護和駕馭,雖然理論很美好,但老是會出現各類問題,呵呵。就像去中心化的架構雖然理論好像很美好,但實際使用中,發現仍是中心化的架構更好,更具有實戰性。

支持消費者負載均衡

消費者負載均衡是指某個topic的全部消費者,能夠平均消費這個topic下的全部queue。咱們使用消息隊列,我認爲這個特性很是重要。設想,某一天,咱們的網站搞了一個活動,而後producer產生的消息猛增。此時,若是咱們的consumer服務器若是仍是隻有原來的數量,那極可能會來不及處理這麼多的消息,致使broker上的消息大量堆積。最終會影響用戶請求的響應時間,由於不少消息沒法及時被處理。

因此,遇到這種狀況,咱們但願分佈式消息隊列能夠方便的容許咱們動態添加消費者機器,提升消費能力。EQueue支持這樣的動態擴展能力。假如某個topic,默認有4個queue,而後每一個queue對應一臺consumer機器進行消費。而後,咱們但願增長一倍的consumer時,只要在EQueue Web控制檯上,爲這個topic增長4個queue,而後咱們再新增4臺consumer機器便可。這樣EQueue客戶端會支持自動負載均衡,幾秒鐘後,8個consumer就能夠各自消費對應的queue了。而後,當活動事後,消息量又會回退到正常水平,那麼咱們就能夠再減小queue,並下線多餘的consumer機器。

另外,EQueue還充分考慮到了下線queue時的平滑性,能夠支持先凍結某個queue,這樣能夠確保不會有新的消息發送到該queue。而後咱們等到這個queue的消息都消費完後,就能夠下線consumer機器和刪除該queue了。這點,應該說,阿里的rocketmq也沒有作到,呵呵。

broker支持大量消息堆積

這個特性,我以前專門寫過一篇文章,詳細介紹設計思路,這裏也簡單介紹一下。MQ的一個很重要的做用就是削峯,就是在遇到一瞬間大量消息產生而消費者來不及一會兒消費時,消息隊列能夠起到一個緩衝的做用,從而能夠確保消息消費者服務器不會垮掉,這個就是削峯。若是使用RPC的方式,那最後全部的請求,都會壓倒DB,DB就會承受不了這麼多的請求而掛掉。

因此,咱們但願MQ支持消息堆積的能力,不能由於爲了快,而只能支持把消息放入服務器內存。由於服務器內存的大小是有限的,假設咱們的消息服務器內存大小是128G,每一個消息大小爲1KB,那差很少最多隻能堆積1.3億個消息。不過通常來講1.3億也夠了,呵呵。但這個畢竟要求大內存做爲前提的。但有時咱們可能沒有那麼大的服務器內存,但也須要堆積這麼多的消息的能力。那就須要咱們的MQ在設計上也提供支持。EQueue能夠容許咱們在啓動時配置broker服務器上容許在內存裏存放的消息數以及消息隊列裏消息的全局offset和queueOffset的映射關係(我稱之爲消息索引信息)的數量。咱們能夠根據咱們的服務器內存的大小進行配置。而後,broker上會有定時的掃描線程,定時掃描是否有多出來的消息和消息索引,若是有,則移除多出來的部分。經過這個設計,能夠確保服務器內存必定不會用完。可是否要移除也有一個前提,就是必需要求這個消息已經持久化到SQL Server了。不然就不能移除。這個應該一般能夠保證,由於通常不會出現1億個消息都還沒持久化到DB,若是出現這個狀況,說明DB必定出了什麼嚴重的問題,或者broker沒法與db創建鏈接了。這種狀況下,咱們應該早就已經發現了,EQueue Web監控控制檯上隨時能夠查看消息的最大全局offset,已經持久化的最大全局offset。

上面這個設計帶來的一個問題是,假如如今consumer要拉取的消息不在內存了怎麼辦?一種辦法是從DB把這個消息拉取到內存,但一條條拉,確定太慢了。因此,咱們能夠作一個優化,就是發現當前消息不在內存時,由於極可能下一條消息也不在內存,因此咱們能夠一次性從Sql Server DB拉取10000個消息(可配置),這樣後續的10000個消息就必定在內存了,咱們須要再訪問DB。這個設計實際上是在內存使用和拉取消息性能之間的一個權衡後的設計。Linux的pagecache的目的也是這個。

另一點,就是咱們broker重啓時,不能所有把全部消息都恢復到內存,而是要判斷是否已經到達內存能夠承受的最大消息數了。若是已經到達,那就不能再放入內存了;同理,消息索引信息的恢復也是同樣。不然,在消息堆積過多的時候,就會致使broker重啓時,內存爆掉了。

消息消費進度更新的設計

EQueue的消息消費進度的設計,和kafka, rocketmq是一個思路。就是定時保存每一個queue的消費進度(queue consumed offset),一個long值。這樣設計的好處是,咱們不用每次消費完一個消息後,就當即發送一個ack回覆消息到broker。若是是這樣,對broker的壓力是很大的。而若是隻是定時發送一個消費進度,那對broker的壓力很小。那這個消費進度怎麼來?就是採用滑動門技術。就是consumer端,在拉取到一批消息後,先放入本地內存的一個SortedDictionary裏。而後繼續去拉下一批消息。而後會啓動task去並行消費這些剛剛拉取到的消息。因此,這個本地的SortedDictionary就是存放了全部已經拉取到本地但尚未被消費掉的消息。而後當某個task thread消費掉一個消息後,會把它從SortedDictionary中移除。而後,我上面所說的滑動門技術,就是指,在每次移除一個消息後,獲取當前SortedDictionary裏key最小的那個消息的queue offset。隨着消息的不斷消費,這個queue offset也會不斷增大,從宏觀的角度看來,就像是一扇門在不停的往前移動。

但這個設計有個問題,就是假如這個Dict裏,有一個offset=100的消息一直沒被消費掉,那就算後面的消息都被消費了,最後這個滑動門仍是不會前進。由於這個dict裏的最後的那個queue offset老是100。這個應該好理解的吧。因此這種狀況下,當consumer重啓後,下次消費的位置仍是會從100開始,後面的也會從新消費一遍。因此,咱們的消費者內部,須要都支持冪等處理消息。

支持消息回溯

由於broker上的消息,不是消息消費掉了就當即刪除,而是定時刪除,好比每2天刪除一次(能夠配置)。因此,當咱們哪天但願從新消費1天前的消息的時候,EQueue也是徹底支持的。只要在consumer啓動前,修改消費進度到之前的某個特定的值便可。

Web管理控制檯

EQueue有一個完善的Web管理控制檯,咱們能夠經過該控制檯管理topic,管理queue,查看消息,查看消息消費進度,查看消息堆積狀況等信息。可是目前還不支持報警,之後會慢慢增長報警功能。

經過這個控制檯,咱們使用EQueue就會方便不少,能夠實時瞭解消息隊列服務器的健康情況。貼一個管理控制檯的UI界面,讓你們有個印象:

 

EQueue將來的計劃

  1. broker支持集羣,master-slave模式,使其具有更高的可用性和擴展性;
  2. Web管理控制檯支持報警;
  3. 出一份性能測試報告,目前我主要是沒有實際服務器,沒辦法實際測試;
  4. 考慮支持非DBC持久化的支持,好比本地key/value存儲支持,或者徹底的本地文件持久化消息(難度很大);
  5. 其餘小功能完善和代碼局部調整;

我相信:沒有作很差,只有沒耐心。

相關文章
相關標籤/搜索