每一個時代,都不會虧待會學習的人。java
你們好,我是 yes。web
繼上一篇 頭條終面:寫個消息中間件 ,我提到實現消息中間件的一些關鍵點,今天就和你們一塊兒深刻生產級別消息中間件 - RocketMQ 的內核實現,來看看真正落地能支撐萬億級消息容量、低延遲的消息隊列究竟是如何設計的。面試
這篇文章我會先介紹總體的架構設計,而後再深刻各核心模塊的詳細設計、核心流程的剖析。緩存
還會說起使用的一些注意點和最佳實踐。服務器
對於消息隊列的用處和一些概念不太清楚的同窗強烈建議先看消息隊列面試連環問,這篇文章介紹了消息隊列的使用場景、基本概念和常見面試題。微信
話很少說,上車。網絡
RocketMQ 總體架構設計
總體的架構設計主要分爲四大部分,分別是:Producer、Consumer、Broker、NameServer。多線程
![](http://static.javashuo.com/static/loading.gif)
爲了更貼合實際,我畫的都是集羣部署,像 Broker 我還畫了主從。架構
-
Producer:就是消息生產者,能夠集羣部署。它會先和 NameServer 集羣中的隨機一臺創建長鏈接,得知當前要發送的 Topic 存在哪臺 Broker Master上,而後再與其創建長鏈接,支持多種負載平衡模式發送消息。併發
-
Consumer:消息消費者,也能夠集羣部署。它也會先和 NameServer 集羣中的隨機一臺創建長鏈接,得知當前要消息的 Topic 存在哪臺 Broker Master、Slave上,而後它們創建長鏈接,支持集羣消費和廣播消費消息。
-
Broker:主要負責消息的存儲、查詢消費,支持主從部署,一個 Master 能夠對應多個 Slave,Master 支持讀寫,Slave 只支持讀。Broker 會向集羣中的每一臺 NameServer 註冊本身的路由信息。
-
NameServer:是一個很簡單的 Topic 路由註冊中心,支持 Broker 的動態註冊和發現,保存 Topic 和 Borker 之間的關係。一般也是集羣部署,可是各 NameServer 之間不會互相通訊, 各 NameServer 都有完整的路由信息,即無狀態。
我再用一段話來歸納它們之間的交互:
![](http://static.javashuo.com/static/loading.gif)
先啓動 NameServer 集羣,各 NameServer 之間無任何數據交互,Broker 啓動以後會向全部 NameServer 按期(每 30s)發送心跳包,包括:IP、Port、TopicInfo,NameServer 會按期掃描 Broker 存活列表,若是超過 120s 沒有心跳則移除此 Broker 相關信息,表明下線。
這樣每一個 NameServer 就知道集羣全部 Broker 的相關信息,此時 Producer 上線從 NameServer 就能夠得知它要發送的某 Topic 消息在哪一個 Broker 上,和對應的 Broker (Master 角色的)創建長鏈接,發送消息。
Consumer 上線也能夠從 NameServer 得知它所要接收的 Topic 是哪一個 Broker ,和對應的 Master、Slave 創建鏈接,接收消息。
簡單的工做流程如上所述,相信你們對總體數據流轉已經有點印象了,咱們再來看看每一個部分的詳細狀況。
NameServer
它的特色就是輕量級,無狀態。角色相似於 Zookeeper 的狀況,從上面描述知道其主要的兩個功能就是:Broker 管理、路由信息管理。
整體而言比較簡單,我再貼一些字段,讓你們有更直觀的印象知道它存儲了些什麼。
![](http://static.javashuo.com/static/loading.gif)
Producer
Producer 無非就是消息生產者,那首先它得知道消息要發往哪一個 Broker ,因而每 30s 會從某臺 NameServer 獲取 Topic 和 Broker 的映射關係存在本地內存中,若是發現新的 Broker 就會和其創建長鏈接,每 30s 會發送心跳至 Broker 維護鏈接。
而且會輪詢當前能夠發送的 Broker 來發送消息,達到負載均衡的目的,在同步發送狀況下若是發送失敗會默認重投兩次(retryTimesWhenSendFailed = 2),而且不會選擇上次失敗的 broker,會向其餘 broker 投遞。
在異步發送失敗的狀況下也會重試,默認也是兩次 (retryTimesWhenSendAsyncFailed = 2),可是僅在同一個 Broker 上重試。
Producer 啓動流程
而後咱們再來看看 Producer 的啓動流程看看都幹了些啥。
![](http://static.javashuo.com/static/loading.gif)
大體啓動流程圖中已經代表的很清晰的,可是有些細節可能還不清楚,好比重平衡啊,TBW102 啥玩意啊,有哪些定時任務啊,別急都會提到的。
有人可能會問這生產者爲何要啓拉取服務、重平衡?
由於 Producer 和 Consumer 都須要用 MQClientInstance,而同一個 clientId 是共用一個 MQClientInstance 的, clientId 是經過本機 IP 和 instanceName(默認值 default)拼起來的,因此多個 Producer 、Consumer 實際用的是一個MQClientInstance。
至於有哪些定時任務,請看下圖:
![](http://static.javashuo.com/static/loading.gif)
Producer 發消息流程
咱們再來看看發消息的流程,大體也不是很複雜,無非就是找到要發送消息的 Topic 在哪一個 Broker 上,而後發送消息。
![](http://static.javashuo.com/static/loading.gif)
如今就知道 TBW102 是啥用的,就是接受自動建立主題的 Broker 啓動會把這個默認主題登記到 NameServer,這樣當 Producer 發送新 Topic 的消息時候就得知哪一個 Broker 能夠自動建立主題,而後發往那個 Broker。
而 Broker 接受到這個消息的時候發現沒找到對應的主題,可是它接受建立新主題,這樣就會建立對應的 Topic 路由信息。
自動建立主題的弊端
自動建立主題那麼有可能該主題的消息都只會發往一臺 Broker,起不到負載均衡的做用。
由於建立新 Topic 的請求到達 Broker 以後,Broker 建立對應的路由信息,可是心跳是每 30s 發送一次,因此說 NameServer 最長鬚要 30s 才能得知這個新 Topic 的路由信息。
假設此時發送方還在連續快速的發送消息,那 NameServer 上其實尚未關於這個 Topic 的路由信息,因此有機會讓別的容許自動建立的 Broker 也建立對應的 Topic 路由信息,這樣集羣裏的 Broker 就能接受這個 Topic 的信息,達到負載均衡的目的,但也有個別 Broker 可能,沒收到。
若是發送方這一次發了以後 30s 內一個都不發,以前的那個 Broker 隨着心跳把這個路由信息更新到 NameServer 了,那麼以後發送該 Topic 消息的 Producer 從 NameServer 只能得知該 Topic 消息只能發往以前的那臺 Broker ,這就不均衡了,若是這個新主題消息不少,那臺 Broker 負載就很高了。
因此不建議線上開啓容許自動建立主題,即 autoCreateTopicEnable 參數。
發送消息故障延遲機制
有一個參數是 sendLatencyFaultEnable,默認不開啓。這個參數的做用是對於以前發送超時的 Broker 進行一段時間的退避。
發送消息會記錄此時發送消息的時間,若是超過必定時間,那麼此 Broker 就在一段時間內不容許發送。
![](http://static.javashuo.com/static/loading.gif)
好比發送時間超過 15000ms 則在 600000 ms 內沒法向該 Broker 發送消息。
這個機制其實很關鍵,發送超時大機率代表此 Broker 負載高,因此先避讓一下子,讓它緩一緩,這也是實現消息發送高可用的關鍵。
小結一下
Producer 每 30s 會向 NameSrv 拉取路由信息更新本地路由表,有新的 Broker 就和其創建長鏈接,每隔 30s 發送心跳給 Broker 。
不要在生產環境開啓 autoCreateTopicEnable。
Producer 會經過重試和延遲機制提高消息發送的高可用。
Broker
Broker 就比較複雜一些了,可是很是重要。大體分爲如下五大模塊,咱們來看一下官網的圖。
![](http://static.javashuo.com/static/loading.gif)
-
Remoting 遠程模塊,處理客戶請求。 -
Client Manager 管理客戶端,維護訂閱的主題。 -
Store Service 提供消息存儲查詢服務。 -
HA Serivce,主從同步高可用。 -
Index Serivce,經過指定key 創建索引,便於查詢。
有幾個模塊沒啥可說的就不分析了,先看看存儲的。
Broker 的存儲
RocketMQ 存儲用的是本地文件存儲系統,效率高也可靠。
主要涉及到三種類型的文件,分別是 CommitLog、ConsumeQueue、IndexFile。
CommitLog
RocketMQ 的全部主題的消息都存在 CommitLog 中,單個 CommitLog 默認 1G,而且文件名以起始偏移量命名,固定 20 位,不足則前面補 0,好比 00000000000000000000 表明了第一個文件,第二個文件名就是 00000000001073741824,代表起始偏移量爲 1073741824,以這樣的方式命名用偏移量就能找到對應的文件。
全部消息都是順序寫入的,超過文件大小則開啓下一個文件。
ConsumeQueue
ConsumeQueue 消息消費隊列,能夠認爲是 CommitLog 中消息的索引,由於 CommitLog 是糅合了全部主題的消息,因此經過索引才能更加高效的查找消息。
ConsumeQueue 存儲的條目是固定大小,只會存儲 8 字節的 commitlog 物理偏移量,4 字節的消息長度和 8 字節 Tag 的哈希值,固定 20 字節。
在實際存儲中,ConsumeQueue 對應的是一個Topic 下的某個 Queue,每一個文件約 5.72M,由 30w 條數據組成。
消費者是先從 ConsumeQueue 來獲得消息真實的物理地址,而後再去 CommitLog 獲取消息。
IndexFile
IndexFile 就是索引文件,是額外提供查找消息的手段,不影響主流程。
經過 Key 或者時間區間來查詢對應的消息,文件名以建立時間戳命名,固定的單個 IndexFile 文件大小約爲400M,一個 IndexFile 存儲 2000W個索引。
咱們再來看看以上三種文件的內容是如何生成的:
![](http://static.javashuo.com/static/loading.gif)
消息到了先存儲到 Commitlog,而後會有一個 ReputMessageService 線程接近實時地將消息轉發給消息消費隊列文件與索引文件,也就是說是異步生成的。
消息刷盤機制
RocketMQ 提供消息同步刷盤和異步刷盤兩個選擇,關於刷盤咱們都知道效率比較低,單純存入內存中的話效率是最高的,可是可靠性不高,影響消息可靠性的狀況大體有如下幾種:
-
Broker 被暴力關閉,好比 kill -9 -
Broker 掛了 -
操做系統掛了 -
機器斷電 -
機器壞了,開不了機 -
磁盤壞了
若是都是 1-4 的狀況,同步刷盤確定沒問題,異步的話就有可能丟失部分消息,5 和 6就得依靠副本機制了,若是同步雙寫確定是穩的,可是性能太差,若是異步則有可能丟失部分消息。
因此須要看場景來使用同步、異步刷盤和副本雙寫機制。
頁緩存與內存映射
Commitlog 是混合存儲的,因此全部消息的寫入就是順序寫入,對文件的順序寫入和內存的寫入速度基本上沒什麼差異。
而且 RocketMQ 的文件都利用了內存映射即 Mmap,將程序虛擬頁面直接映射到頁緩存上,無需有內核態再往用戶態的拷貝,來看一下我以前文章畫的圖。
![](http://static.javashuo.com/static/loading.gif)
頁緩存其實就是操做系統對文件的緩存,用來加速文件的讀寫,也就是說對文件的寫入先寫到頁緩存中,操做系統會不按期刷盤(時間不可控),對文件的讀會先加載到頁緩存中,而且根據局部性原理還會預讀臨近塊的內容。
其實也是由於使用內存映射機制,因此 RocketMQ 的文件存儲都使用定長結構來存儲,方便一次將整個文件映射至內存中。
文件預分配和文件預熱
而內存映射也只是作了映射,只有當真正讀取頁面的時候產生缺頁中斷,纔會將數據真正加載到內存中,因此 RocketMQ 作了一些優化,防止運行時的性能抖動。
文件預分配
CommitLog 的大小默認是1G,當超過大小限制的時候須要準備新的文件,而 RocketMQ 就起了一個後臺線程 AllocateMappedFileService,不斷的處理 AllocateRequest,AllocateRequest 其實就是預分配的請求,會提早準備好下一個文件的分配,防止在消息寫入的過程當中分配文件,產生抖動。
文件預熱
有一個 warmMappedFile 方法,它會把當前映射的文件,每一頁遍歷多去,寫入一個0字節,而後再調用mlock 和 madvise(MADV_WILLNEED)。
mlock:能夠將進程使用的部分或者所有的地址空間鎖定在物理內存中,防止其被交換到 swap 空間。
madvise:給操做系統建議,說這文件在不久的未來要訪問的,所以,提早讀幾頁多是個好主意。
小結一下
CommitLog 採用混合型存儲,也就是全部 Topic 都存在一塊兒,順序追加寫入,文件名用起始偏移量命名。
消息先寫入 CommitLog 再經過後臺線程分發到 ConsumerQueue 和 IndexFile 中。
消費者先讀取 ConsumerQueue 獲得真正消息的物理地址,而後訪問 CommitLog 獲得真正的消息。
利用了 mmap 機制減小一次拷貝,利用文件預分配和文件預熱提升性能。
提供同步和異步刷盤,根據場景選擇合適的機制。
Broker 的 HA
從 Broker 會和主 Broker 創建長鏈接,而後獲取主 Broker commitlog 最大偏移量,開始向主 Broker 拉取消息,主 Broker 會返回必定數量的消息,循環進行,達到主從數據同步。
消費者消費消息會先請求主 Broker ,若是主 Broker 以爲如今壓力有點大,則會返回從 Broker 拉取消息的建議,而後消費者就去從服務器拉取消息。
Consumer
消費有兩種模式,分別是廣播模式和集羣模式。
廣播模式:一個分組下的每一個消費者都會消費完整的Topic 消息。
集羣模式:一個分組下的消費者瓜分消費Topic 消息。
通常咱們用的都是集羣模式。
而消費者消費消息又分爲推和拉模式,詳細看我這篇文章消息隊列推拉模式,分別從源碼級別分析了 RokcetMQ 和 Kafka 的消息推拉,以及推拉模式的優缺點。
Consumer 端的負載均衡機制
Consumer 會按期的獲取 Topic 下的隊列數,而後再去查找訂閱了該 Topic 的同一消費組的全部消費者信息,默認的分配策略是相似分頁排序分配。
將隊列排好序,而後消費者排好序,好比隊列有 9 個,消費者有 3 個,那消費者-1 消費隊列 0、一、2 的消息,消費者-2 消費隊列 三、四、5,以此類推。
因此若是負載太大,那麼就加隊列,加消費者,經過負載均衡機制就能夠感知到重平衡,均勻負載。
Consumer 消息消費的重試
不免會遇到消息消費失敗的狀況,因此須要提供消費失敗的重試,而通常的消費失敗要麼就是消息結構有誤,要麼就是一些暫時沒法處理的狀態,因此當即重試不太合適。
RocketMQ 會給每一個消費組都設置一個重試隊列,Topic 是 %RETRY%+consumerGroup
,而且設定了不少重試級別來延遲重試的時間。
爲了利用 RocketMQ 的延時隊列功能,重試的消息會先保存在 Topic 名稱爲「SCHEDULE_TOPIC_XXXX」的延遲隊列,在消息的擴展字段裏面會存儲原來所屬的 Topic 信息。
delay 一段時間後再恢復到重試隊列中,而後 Consumer 就會消費這個重試隊列主題,獲得以前的消息。
若是超過必定的重試次數都消費失敗,則會移入到死信隊列,即 Topic %DLQ%" + ConsumerGroup
中,存儲死信隊列即認爲消費成功,由於實在沒轍了,暫時放過。
而後咱們能夠經過人工來處理死信隊列的這些消息。
消息的全局順序和局部順序
全局順序就是消除一切併發,一個 Topic 一個隊列,Producer 和 Consuemr 的併發都爲一。
局部順序其實就是指某個隊列順序,多隊列之間仍是能並行的。
能夠經過 MessageQueueSelector 指定 Producer 某個業務只發這一個隊列,而後 Comsuer 經過MessageListenerOrderly 接受消息,其實就是加鎖消費。
在 Broker 會有一個 mqLockTable ,順序消息在建立拉取消息任務的時候須要在 Broker 鎖定該消息隊列,以後加鎖成功的才能消費。
而嚴格的順序消息其實很難,假設如今都好好的,若是有個 Broker 宕機了,而後發生了重平衡,隊列對應的消費者實例就變了,就會有可能會出現亂序的狀況,若是要保持嚴格順序,那此時就只能讓整個集羣不可用了。
一些注意點
一、訂閱消息是以 ConsumerGroup 爲單位存儲的,因此ConsumerGroup 中的每一個 Consumer 須要有相同的訂閱。
由於訂閱消息是隨着心跳上傳的,若是一個 ConsumerGroup 中 Consumer 訂閱信息不同,那麼就會出現互相覆蓋的狀況。
好比消費者 A 訂閱 Topic a,消費者 B 訂閱 Topic b,此時消費者 A 去 Broker 拿消息,而後 B 的心跳包發出了,Broker 更新了,而後接到 A 的請求,一臉懵逼,沒這訂閱關係啊。
二、RocketMQ 主從讀寫分離
從只能讀,不能寫,而且只有當前客戶端讀的 offset 和 當前 Broker 已接受的最大 offset 超過限制的物理內存大小時候纔會去從讀,因此正常狀況下從分擔不了流量
三、單單加機器提高不了消費速度,隊列的數量也須要跟上。
四、以前提到的,不要容許自動建立主題
RocketMQ 的最佳實踐
這些最佳實踐部分參考自官網。
Tags的使用
建議一個應用一個 Topic,利用 tages 來標記不一樣業務,由於 tages 設置比較靈活,且一個應用一個 Topic 很清晰,能直觀的辨別。
Keys的使用
若是有消息業務上的惟一標識,請填寫到 keys 字段中,方便往後的定位查找。
提升 Consumer 的消費能力
一、提升消費並行度:增長隊列數和消費者數量,提升單個消費者的並行消費線程,參數 consumeThreadMax。
二、批處理消費,設置 consumeMessageBatchMaxSize 參數,這樣一次能拿到多條消息,而後好比一個 update語句以前要執行十次,如今一次就執行完。
三、跳過非核心的消息,當負載很重的時候,爲了保住那些核心的消息,設置那些非核心的消息,例如此時消息堆積 1W 條了以後,就直接返回消費成功,跳過非核心消息。
NameServer 的尋址
請使用 HTTP 靜態服務器尋址(默認),這樣 NameServer 就能動態發現。
JVM選項
如下抄自官網:
若是不關心 RocketMQ Broker的啓動時間,經過「預觸摸」 Java 堆以確保在 JVM 初始化期間每一個頁面都將被分配。
那些不關心啓動時間的人能夠啓用它:-XX:+AlwaysPreTouch 禁用偏置鎖定可能會減小JVM暫停, -XX:-UseBiasedLocking 至於垃圾回收,建議使用帶JDK 1.8的G1收集器。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
另外不要把-XX:MaxGCPauseMillis的值設置過小,不然JVM將使用一個小的年輕代來實現這個目標,這將致使很是頻繁的minor GC,因此建議使用rolling GC日誌文件:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
Linux內核參數
如下抄自官網:
-
vm.extra_free_kbytes,告訴VM在後臺回收(kswapd)啓動的閾值與直接回收(經過分配進程)的閾值之間保留額外的可用內存。RocketMQ使用此參數來避免內存分配中的長延遲。(與具體內核版本相關) -
vm.min_free_kbytes,若是將其設置爲低於1024KB,將會巧妙的將系統破壞,而且系統在高負載下容易出現死鎖。 -
vm.max_map_count,限制一個進程可能具備的最大內存映射區域數。RocketMQ將使用mmap加載CommitLog和ConsumeQueue,所以建議將爲此參數設置較大的值。(agressiveness --> aggressiveness) -
vm.swappiness,定義內核交換內存頁面的積極程度。較高的值會增長攻擊性,較低的值會減小交換量。建議將值設置爲10來避免交換延遲。 -
File descriptor limits,RocketMQ須要爲文件(CommitLog和ConsumeQueue)和網絡鏈接打開文件描述符。咱們建議設置文件描述符的值爲655350。 -
Disk scheduler,RocketMQ建議使用I/O截止時間調度器,它試圖爲請求提供有保證的延遲。
最後
其實還有不少沒講,好比流量控制、消息的過濾、定時消息的實現,包括底層通訊 1+N+M1+M2 的 Reactor 多線程設計等等。
主要是內容太多了,並且也不太影響主流程,因此仍是剝離出來以後寫吧,大體的一些實現仍是講了的。
包括元信息的交互、消息的發送、存儲、消費等等。
關於事務消息的那一塊我以前文章也分析過了,因此這個就再也不貼了。
能夠看到要實現一個生產級別的消息隊列仍是有不少不少東西須要考慮的,不過大體的架構和涉及到的模塊差很少就這些了。
至於具體的細節深刻,仍是得靠你們自行研究了,我就起個拋磚引玉的做用。
最後我的能力有限,若是哪裏有紕漏請抓緊聯繫鞭撻我!還有我搞了個羣若是想進羣就備註下進羣,我拉你。
![](http://static.javashuo.com/static/loading.gif)
我是 yes,從一點點到億點點,咱們下篇見。
本文分享自微信公衆號 - yes的練級攻略(yes_java)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。