不少網友會問,爲何明明集羣中有多臺Broker服務器,autoCreateTopicEnable設置爲true,表示開啓Topic自動建立,但新建立的Topic的路由信息只包含在其中一臺Broker服務器上,這是爲何呢?json
指望值:爲了消息發送的高可用,但願新建立的Topic在集羣中的每臺Broker上建立對應的隊列,避免Broker的單節點故障。緩存
現象截圖以下:
正如上圖所示,自動建立的topicTest5的路由信息:服務器
咱們再來看一下RocketMQ默認topic的路由信息截圖以下:
從圖中能夠默認Topic的路由信息爲broker-a、broker-b上各8個隊列。3d
默認Topic的路由信息是如何建立的?server
回到本文的主題:autoCreateTopicEnable,開啓自動建立主題,試想一下,若是生產者向一個不存在的主題發送消息時,上面的任何一個步驟都沒法獲取一個不存在的主題的路由信息,那該如何處理這種狀況呢?中間件
在RocketMQ中,若是autoCreateTopicEnable設置爲true,消息發送者向NameServer查詢主題的路由消息返回空時,會嘗試用一個系統默認的主題名稱(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此時消息發送者獲得的路由信息爲:
但問題就來了,默認Topic在集羣的每一臺Broker上建立8個隊列,那問題來了,爲啥新建立的Topic只在一個Broker上建立4個隊列?對象
舒適提示:本文不會詳細跟蹤整個建立過程,只會點出源碼的關鍵入口點,如想詳細瞭解NameServer路由消息、消息發送高可用的實現原理,建議查閱筆者的書籍《RocketMQ技術內幕》第2、三章。blog
Step1:在Broker啓動流程中,會構建TopicConfigManager對象,其構造方法中首先會判斷是否開啓了容許自動建立主題,若是啓用了自動建立主題,則向topicConfigTable中添加默認主題的路由信息。
TopicConfigManager構造方法
隊列
備註:該topicConfigTable中全部的路由信息,會隨着Broker向Nameserver發送心跳包中,Nameserver收到這些信息後,更新對應Topic的路由信息表。圖片
BrokerConfig的defaultTopicQueueNum默認爲8。兩臺Broker服務器都會運行上面的過程,故最終Nameserver中關於默認主題的路由信息中,會包含兩個Broker分別各8個隊列信息。
Step2:生產者尋找路由信息
生產者首先向NameServer查詢路由信息,因爲是一個不存在的主題,故此時返回的路由信息爲空,RocketMQ會使用默認的主題再次尋找,因爲開啓了自動建立路由信息,NameServer會向生產者返回默認主題的路由信息。而後從返回的路由信息中選擇一個隊列(默認輪詢)。消息發送者從Nameserver獲取到默認的Topic的隊列信息後,隊列的個數會改變嗎?答案是會的,其代碼以下:
MQClientInstance#updateTopicRouteInfoFromNameServer
舒適提示:消息發送者在到默認路由信息時,其隊列數量,會選擇DefaultMQProducer#defaultTopicQueueNums與Nameserver返回的的隊列數取最小值,DefaultMQProducer#defaultTopicQueueNums默認值爲4,故自動建立的主題,其隊列數量默認爲4。
Step3:發送消息
DefaultMQProducerImpl#sendKernelImpl
在消息發送時的請求報文中,設置默認topic名稱,消息發送topic名稱,使用的隊列數量爲DefaultMQProducer#defaultTopicQueueNums,即默認爲4。
Step4:Broker端收到消息後的處理流程
服務端收到消息發送的處理器爲:SendMessageProcessor,在處理消息發送時,會調用super.msgCheck方法:
AbstractSendMessageProcessor#msgCheck
在Broker端,首先會使用TopicConfigManager根據topic查詢路由信息,若是Broker端不存在該主題的路由配置(路由信息),此時若是Broker中存在默認主題的路由配置信息,則根據消息發送請求中的隊列數量,在Broker建立新Topic的路由信息。這樣Broker服務端就會存在主題的路由信息。
在Broker端的topic配置管理器中存在的路由信息,一會向Nameserver發送心跳包,彙報到Nameserver,另外一方面會有一個定時任務,定時存儲在broker端,具體路徑爲${ROCKET_HOME}/store/config/topics.json中,這樣在Broker關閉後再重啓,並不會丟失路由信息。
廣大讀者朋友,跟蹤到這一步的時候,你們應該對啓用自動建立主題機制時,新主題是的路由信息是如何建立的,爲了方便理解,給出建立主題序列圖:
通過上面自動建立路由機制的建立流程,咱們能夠比較容易的分析得出以下結論:
由於開啓了自動建立路由信息,消息發送者根據Topic去NameServer沒法獲得路由信息,但接下來根據默認Topic從NameServer是能拿到路由信息(在每一個Broker中,存在8個隊列),由於兩個Broker在啓動時都會向NameServer彙報路由信息。此時消息發送者緩存的路由信息是2個Broker,每一個Broker默認4個隊列(緣由見3.2.1:Step2的分析)。消息發送者而後按照輪詢機制,發送第一條消息選擇(broker-a的messageQueue:0),向Broker發送消息,Broker服務器在處理消息時,首先會查看本身的路由配置管理器(TopicConfigManager)中的路由信息,此時不存在對應的路由信息,而後嘗試查詢是否存在默認Topic的路由信息,若是存在,說明啓用了autoCreateTopicEnable,則在TopicConfigManager中建立新Topic的路由信息,此時存在與Broker服務端的內存中,而後本次消息發送結束。此時,在NameServer中還不存在新建立的Topic的路由信息。
這裏有三個關鍵點:
緣由就分析到這裏了,如今咱們還能夠的大膽假設,開啓autoCreateTopicEnable機制,什麼狀況會在兩個Broker上都建立隊列,其實,咱們只須要連續快速的發送9條消息,就有可能在2個Broker上都建立隊列,驗證代碼以下:
~~~
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 9; i++) {
try {
Message msg = new Message("TopicTest10" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
~~~
驗證結果如圖所示:
本文就分析到這裏了,你們若是喜歡這篇文章,麻煩你們幫忙點點贊,同時你們也能夠給做者留言,告知在使用RocketMQ的過程當中遇到的疑難雜症,與做者互動。
做者簡介:《RocketMQ技術內幕》做者,維護公衆號:中間件興趣圈,主要關注目前主流的開源中間件,例如Netty、Mycat、Dubbo、ElasticSearch、ElasticJob、RocketMQ、Mybatis等。