上上週在團隊內部作了一個關於 RocketMQ 的分享,本文記錄一下分享的大部份內容git
這是公衆號 Young_Blog 的第 41 篇文章github
爲何沒有選擇 Kafka
而是 RocketMQ
呢,沒有什麼特別的緣由,單純是我以前就看過一點 RocketMQ
的源碼,可是後來由於各類緣由沒能看完,所以想着趁此次機會系統地回顧一遍。算法
另外就是,以前個人工做集中在客戶端或者服務端,不多端到端地去設計、開發某項功能,所以在架構設計方面積累的經驗比較少。這點劣勢在年初接手開發新項目的時候尤其明顯,新項目涉及註冊發現、客戶端、服務端、監控、網絡編程等一系列模塊,由於以前沒有接觸過這麼複雜的項目,也不瞭解業務成熟的大型項目的設計哲學,所以在輪到本身作架構設計的時候提出的方案老是存在一些缺陷,後續致使了返工的現象。編程
那時起我就琢磨着找一個大型開源項目研究一下,學習一下它在架構設計上的考量,RocketMQ
不論從規模仍是複雜度上講都很合適,所以就選了它。json
最近騰訊開源了 TubeMQ,感興趣的能夠去看看 TubeMQ緩存
首先,明確下消息隊列的設計目標,一般有如下幾點:服務器
貌似要求越寫越多,那設想一下,若是從零開始設計一個消息隊列,知足以上五個需求,須要哪些組成部分。網絡
第一個很容易想到的就是生產者(Producer
)和消費者(Consumer
),這是消息隊列兩端最重要的組成部分。一般狀況下,生產者和消費者都不會是單臺機器,而是一個集羣,所以用生產組(Producer Group
)和消費組(Consumer Group
)來描述。架構
緊接着爲了容許消息暫存和消息積壓,咱們一般須要一個消息代理服務器(Broker
),生產者和消費者都僅鏈接 Broker
進行消息的發送和消費,兩者之間徹底解耦。負載均衡
消息發送一般須要一個管道進行投遞,並且不一樣類型的消息最好發往不一樣的管道,管道的另外一端鏈接消費者,經過增長管道數目和消費者數目,咱們就達到了橫向擴展消費能力的目的,這裏咱們將管道成爲 Queue
,將消息類型成爲 Topic
。
很好,目前咱們搭建了 生產者 -> Queue -> Broker -> Queue -> 消費者 的完整流程,一條消息已經能夠端到端地發送了。可是思考下,若是 Topic
和 Queue
的數目不少,某些生產者和消費者只關注其中一些,那麼咱們還須要爲這種訂閱/發佈關係提供一個註冊平臺,稱之爲 NameService
(命名服務)來統一管理消息訂閱的拓撲關係。
上面廢了這麼多口舌,目的就在於讓不瞭解消息隊列的同窗在進入下一節以前,不至於一臉懵逼。技術的架構設計都是一步一步來的,消息隊列也不是一開始就演變成 Kafka
或者 RocketMQ
這種架構的,它們都經歷過爲了支持某些業務需求而不得不作的架構演進,最後變成了如今目前業界比較成熟的模型,而咱們上述的假設其實已經走了不少捷徑。
這節咱們來看看 RocketMQ
的具體架構,結合這張結構圖咱們會介紹裏面每一個組成部分的功能和設計考量。
位於最頂端的是 RocketMQ
的命名服務,稱之爲 NameServer
,它是用來管理 Topic
的訂閱發佈關係、消息發送和消費拓撲的,得讓生產者知道 「我這個 Topic
的信息發往哪些 Broker
」,得讓消費者知道 「我得去哪些 Broker
上消費這個 Topic
的消息」。NameServer
能夠多機部署變成一個 NameServer
集羣保證高可用,但這些機器間彼此並不通訊,也就是說三者的元數據捨棄了強一致性。
這些元數據是怎麼來的呢,首先 Broker
啓動時會向所有的 NameServer
機器註冊心跳,心跳裏包含本身機器上 Topic
的拓撲信息,以後每隔 30s
更新一次,而後生產者和消費者啓動的時候任選一臺 NameServer
機器拉取所需的 Topic
的路由信息緩存在本地內存中,以後每隔 30s
定時從遠端拉取更新本地緩存。
NameServer
機器中定時掃描 Broker
的心跳,一旦失聯超出 2min
,即關閉這個 Broker
的鏈接,但不主動通知生產組和消費組,所以兩者最長鬚要 30s
才能感知到某個 Broker
故障。
架構圖兩端的就是生產組和消費組,都是多機的集羣,由若干個生產者和消費者實例組成,消費者消費消息時有兩種模式,一種是廣播模式一種是集羣消費模式,前者表示一條消息會被消費組下的全部消費者實例消費,後者表示一條消息只會被消費組下的一個實例消費到,考慮到集羣消費模式是目前使用主流,所以本文主要談論後者。
Topic
咱們以前講過了,表明某種消息類型,爲了達到消費性能可橫向擴展的需求,RocketMQ
引入了 MessageQueue
這個邏輯概念,將一個 Topic
劃分爲多個 MessageQueue
,默認是四個。而 MessageQueue
和消費者實例是一對一的關係,消費者實例和 MessageQueue
是一對多的關係。
例如架構圖中,Topic
下分爲四個 MessageQueue
,分佈在兩個 Broker
機器上,生產者組將消息平均發往四個 MessageQueue
,而因爲消費組中僅有兩個消費者實例,所以每一個消費者實例平均消費兩個 MessageQueue
。
一旦性能不足,能夠擴容消費組增長消費者實例至四個,那麼每一個消費者實例消費一個 MessageQueue
,從而達到消費能力的橫向擴展。
Broker
做爲消息代理服務器,最重要的職責是存儲消息和管理消費進度(集羣消費模式下專有)。單個 Topic
下的多個 MessageQueue
通常來講會分散在多個 Broker
上面達到容災的目的。
Topic
經過打散 MessageQueue
達到容災目的,那麼 Broker
機器維度又是怎麼容災的呢,RocketMQ
容許設置主備 Broker
,兩者間經過異步拉取複製的方式進行消息同步,一旦主 Broker
宕機,備機能夠提供消息消費,但不提供消息寫入,也就是說其實主備之間並無 Failover
功能,這保證了寫入主的消息不會丟失,可是會影響系統的可用性。
滴滴內部作過針對性地作過二次開發,簡單來講實現的方式是 NameServer
集羣經過 ZK
選舉出一個 Leader
,來完成 Failover
的決策。
爲了簡潔,本文圖例中沒有說明的狀況下,均不畫出 Slave Broker
仍是看這張結構圖,生產者發送消息,默認採用輪詢的方式達到負載均衡,每一個生產者實例內存中都知道 Topic
下 MessageQueue
的分佈拓撲信息,所以經過輪詢就能夠將消息平均發送到這些管道里。
咱們以前提到過,Broker
會向 NameServer
集羣全部機器發送心跳,NameServer
集羣裏的機器各自按期掃描失聯的 Broker
,關閉鏈接,但不會主動通知生產者組,須要等待生產者主動來拉取。所以存在元數據不一致的窗口,此窗口最長爲 30s
。
因爲上述緣由,消息生產者不可避免的會將消息發往已經故障的 Broker
機器,例如上圖,Producer-01
先將消息發往 Broker-A
上的 MessageQueue-01
,發現失敗了,因爲輪詢發送機制它繼續發往 MessageQueue-02
,因爲仍是位於 Broker-A
機器上,所以依舊失敗了,默認狀況下同步發送消息重試三次,所以極可能這條消息因爲沒有規避 Broker-A
致使發送失敗,實際上 Broker-B
仍是存活的,徹底能夠規避掉故障的 Broker-A
機器提早選擇 Broker-B
發送消息。
RocketMQ
中將生產者端剔除故障機器的機制稱之爲 Broker
的故障延遲機制,一旦發現發送到某個 Broker
機器失敗,則暫時將其剔除,優先選擇其餘 Broker
重試。
看完了消息發送部分,本節咱們進入消息的消費。消息的消費相較於消息的發送會複雜一些。咱們想一下,假設你某個生產者實例宕機了,那最多就是少了個消息的發送者,而絕大多數狀況下消息的生產者都是無狀態的,流量能夠任意打到某個生產者,若是其一宕機那麼我經過一些措施摘掉這臺機器的流量就能夠。可是消費者沒有這麼簡單,由於它們並非無狀態的,它們是固定在消費某一些 Topic
的 MessageQueue
,所以宕機任意一臺消費者都涉及到消費拓撲的從新變動,這帶來了更多的複雜度。
MessageQueue
存在的意義前面已經談過再也不復述,本節講一下若是將特定數量的 MessageQueue
分配給消費者組下的消費者實例,注意!這實際上是個技術活。
消費者組下的消費者實例,怎麼知道本身須要消費某個 Topic
下的哪些 MessageQueue
呢?例如架構圖中,只有兩個消費者實例,可是總共有四個 MessageQueue
,他們如何知道各自消費兩個,並且尚未衝突的。
爲了簡單,假設咱們的系統是新搭建的,兩臺 Consumer 都是第一次啓動,所以這裏不涉及 Rebalance 機制
分配方案是在消費者實例啓動的時候去執行的,消費者實例啓動的時候回從 NameServer
上獲取本身訂閱的 Topic
的拓撲信息,包括該 Topic 下總共有幾個 MessageQueue
,分佈在哪些 Broker
機器上等等。而後向其中全部 Broker
機器發送心跳。最後選取任意一臺 Broker
,從上面獲取消費組下總共有幾個實例。
如此一來,消費者實例就知道了 MessageQueue
信息(mqSet
)和消費組下的實例個數(consumerIdSet
)信息。在本地內存中經過簡單的分配算法,就能夠知道本身應該負責消費哪些 MessageQueue
了。
須要注意的是,每一個客戶端獲取到 mqSet
和 consumerIdSet
以後都須要首先進行排序!目的是爲了在執行分配算法時,每一個客戶端的視圖都是一致的。
RocketMQ
針對 MessageQueue
提供了多種可選的分配策略,例如平均分配、輪詢分配、固定分配等,在實際生產環境中可能還須要根據機房進行就近路由分配、粘滯分配(使得 MessageQueue
變更次數最小)等。
順序消費是應用場景對消息隊列中間件提出的需求,例如某個 ID = 100
的支付業務,在其生命週期內會發送三條消息:
由於訂單 ID
同爲 100
屬於一個訂單,所以要求消費組在消費這三條消息時保證先消費第一條,而後才能消費第二條,最後纔是第三條。若是此時還有 ID = 300
的訂單,那麼兩者之間能夠交叉,可是這三個過程必須保證升序。
保證消息局部順序消費的重點在於:
ID
的訂單消息發往同一個 MessageQueue
Broker
和客戶端的兩把鎖,保證對該 MessageQueue
內消息的順序消費發往同一個 MessageQueue
保證了該 MessageQueue
內消息是局部有序的,可是沒法保證全局有序,想要全局有序?那這個 Topic
只能配一個 MessageQueue
,而後所有消息都發到這一個 MessageQueue
中。通常來講,局部有序已經能夠知足絕大部分應用場景了。
生產端的保證達到了,下面就是消費端,依靠的是兩把鎖,分別位於 Broker
側和消費者實例客戶端側。Broker
側的鎖是 MessageQueue
粒度的,保證同一時間至多隻有一個消費者實例消費該 MessageQueue
。
你可能疑惑,原本不就是一對一的關係麼?緣由是在消費者組進行 Rebalance
的時候可能會形成某個時間窗口內單個 MessageQueue
被多個消費者實例同時消費,這裏經過加鎖限制了這種狀況。一旦啓動時加鎖失敗,意味着該 MessageQueue
還在被其餘消費者實例鎖定,所以不建立相應的消息拉取任務,等到鎖被釋放或者超時(默認 60s
)。加鎖成功後消費者實例還會每隔 20s 定時鎖定該 MessageQueue
一次。
消費者實例側因爲可能同時負責消費多個 MessageQueue
,所以採用了線程池消費消息,須要在客戶端提供加鎖的方式保證單個 MessageQueue
內的消息同一時間僅被一個線程消費。
在廣播消費模式下,消費進度僅存儲在消費者實例本地,而在集羣消費模式下,消費進度存儲在 Broker
上。經過 Topic + 消費者組名稱
做爲 key
,value
中分別記錄每一個 MessageQueue
對應該消費者組的消費偏移量,所以消費進度是消費者組之間互相隔離的。
早期版本
Kafka
將offset
保存在ZK
上,Path
爲 consumers/{consume-group}/offsets/{topic}/{partition},其實和RocketMQ
的保存方式是一致的
利用 offset
記錄消費進度本質上是一種批量 ACK
的方法,它的優勢在於 Broker
的消費進度管理粒度從單條消息變爲單個 MessageQueue
,簡化了 Broker
的複雜度。那麼下一個問題,消費者和 Broker
都是在什麼時候提交和持久化各自的 offset 的呢?
首先,消費者側會記錄本身的消費進度到內存中的 OffsetTable
,經過每五秒一次的定時任務提交到 Broker
側,Broker
接收到以後保存在內存中,並定時刷到磁盤上的 json
文件裏。
這裏須要注意的是,因爲一批消息的消費次序不肯定,可能下標大的消息先被消費結束,下標小的因爲延時還沒有被消費,此時消費者向 Broker
提交的 offset
應該是已被消費的最小下標,從而保證消息不被遺漏,但缺點在於可能重複消費消息。
消息隊列系統中,常常會出現 Broker
實例的增刪、Topic
的增減、Topic
下 MessageQueue
數目的增減、消費組實例數目的增減等狀況,它們都會觸發消費關係的從新分配,這個過程稱之爲 Rebalance
。
RocketMQ
的 Rebalance
機制有主動和被動之分,主動意爲消費者實例每隔 20s
會定時計算本身的消費拓撲並和內存中的對比,一旦發現部分 MessageQueue
再也不是本身負責消費,則中止對它的消息拉取任務;若是有新的 MessageQueue
變爲本身負責,則建立對它的消息拉取任務。
被動意爲,Broker
能夠主動通知某個消費組下的全部實例,要求它們當即開始一次 Rebalance
,經常使用於新的消費者實例加入、或者 Broker 檢測到有消費者實例心跳失聯等狀況,下面是一個消費者實例新加入的場景。
RocketMQ
的 Rebalance
因爲部分時刻的視圖可能存在不一致,所以單次 Rebalance
並不能徹底保證必定達到最終效果,可是因爲它是一種週期性的任務,因此最終系統裏的 MessageQueue
會被分配徹底。
RocketMQ
的 Rebalance
機制依靠客戶端各自單獨計算獲得,Kafka
新版本中則依靠 Consumer Leader
單點計算後再上傳至 Group Coordinator
,由它下發至每一個消費者實例進行更新。
這兩種方式各有優缺點,一般來講,單點計算能夠最大程度減少視圖不一致致使的頻繁 Rebalance
現象(但也不能杜絕),可是缺點在於邏輯複雜,消費者組和 Broker
中都須要選取單點,一個負責計算一個負責下發通知;客戶端計算實現上更簡單,彼此獨立,經過週期性任務最終也能完成從新分配的任務,可是因爲客戶端彼此獲取的視圖不作校驗,所以可能存在因爲視圖不一致致使的重複消費和頻繁 Rebalance
。
硬核內容不少,並且文件存儲我接觸的也很少更不敢瞎寫了,這塊後續會視個人學習狀況看看是否單獨再開一個坑。
若是有同窗很想了解這部份內容的話,我貼幾篇在資料蒐集過程當中看到的比較好的博文:
若是你把 RocketMQ
和 Kafka
對比起來看,其實消息隊列的設計哲學有不少類似之處,但在文件存儲粒度、分區容災、負載均衡等方面,兩者又有本身的設計考量,採用了不一樣的實現思路,結合 Kafka
的 ISR
同步、Rebalance
、Partition Failover
等機制一塊兒學習的話,這種感覺會更強烈一些,但願這篇文章對你們有所啓發。