RocketMQ實戰:生產環境中,autoCreateTopicEnable爲何不能設置爲true

微信公衆號「中間件興趣圈」主要關注RocketMQ、Dubbo、Netty、Elasticsearch、ElasticJob、Mycat、Mybatis等主流開源中間件。java

一、現象

不少網友會問,爲何明明集羣中有多臺Broker服務器,autoCreateTopicEnable設置爲true,表示開啓Topic自動建立,但新建立的Topic的路由信息只包含在其中一臺Broker服務器上,這是爲何呢?json

指望值:爲了消息發送的高可用,但願新建立的Topic在集羣中的每臺Broker上建立對應的隊列,避免Broker的單節點故障。緩存

現象截圖以下:服務器

Broker集羣信息 在這裏插入圖片描述

正如上圖所示,自動建立的topicTest5的路由信息:微信

  • topicTest5只在broker-a服務器上建立了隊列,並無在broker-b服務器建立隊列,不符合指望。
  • 默認讀寫隊列的個數爲4。

咱們再來看一下RocketMQ默認topic的路由信息截圖以下: 在這裏插入圖片描述code

從圖中能夠默認Topic的路由信息爲broker-a、broker-b上各8個隊列。server

二、思考

默認Topic的路由信息是如何建立的?中間件

  1. Topic的路由信息是存儲在哪裏?Nameserver?broker?
  2. RocketMQ Topic默認隊列個數是多少呢?

三、原理

3.1 RocketMQ基本路由規則

在這裏插入圖片描述

  1. Broker在啓動時向Nameserver註冊存儲在該服務器上的路由信息,並每隔30s向Nameserver發送心跳包,並更新路由信息。
  2. Nameserver每隔10s掃描路由表,若是檢測到Broker服務宕機,則移除對應的路由信息。
  3. 消息生產者每隔30s會從Nameserver從新拉取Topic的路由信息並更新本地路由表;在消息發送以前,若是本地路由表中不存在對應主題的路由消息時,會主動向Nameserver拉取該主題的消息。

回到本文的主題:autoCreateTopicEnable,開啓自動建立主題,試想一下,若是生產者向一個不存在的主題發送消息時,上面的任何一個步驟都沒法獲取一個不存在的主題的路由信息,那該如何處理這種狀況呢?對象

在RocketMQ中,若是autoCreateTopicEnable設置爲true,消息發送者向NameServer查詢主題的路由消息返回空時,會嘗試用一個系統默認的主題名稱(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此時消息發送者獲得的路由信息爲:blog

在這裏插入圖片描述

但問題就來了,默認Topic在集羣的每一臺Broker上建立8個隊列,那問題來了,爲啥新建立的Topic只在一個Broker上建立4個隊列?

3.2 探究autoCreateTopicEnable機制

3.2.1 默認Topic路由建立時機

舒適提示:本文不會詳細跟蹤整個建立過程,只會點出源碼的關鍵入口點,如想詳細瞭解NameServer路由消息、消息發送高可用的實現原理,建議查閱筆者的書籍《RocketMQ技術內幕》第2、三章。

Step1:在Broker啓動流程中,會構建TopicConfigManager對象,其構造方法中首先會判斷是否開啓了容許自動建立主題,若是啓用了自動建立主題,則向topicConfigTable中添加默認主題的路由信息。 TopicConfigManager構造方法 在這裏插入圖片描述

備註:該topicConfigTable中全部的路由信息,會隨着Broker向Nameserver發送心跳包中,Nameserver收到這些信息後,更新對應Topic的路由信息表。

BrokerConfig的defaultTopicQueueNum默認爲8。兩臺Broker服務器都會運行上面的過程,故最終Nameserver中關於默認主題的路由信息中,會包含兩個Broker分別各8個隊列信息。

Step2:生產者尋找路由信息 生產者首先向NameServer查詢路由信息,因爲是一個不存在的主題,故此時返回的路由信息爲空,RocketMQ會使用默認的主題再次尋找,因爲開啓了自動建立路由信息,NameServer會向生產者返回默認主題的路由信息。而後從返回的路由信息中選擇一個隊列(默認輪詢)。消息發送者從Nameserver獲取到默認的Topic的隊列信息後,隊列的個數會改變嗎?答案是會的,其代碼以下:

MQClientInstance#updateTopicRouteInfoFromNameServer 在這裏插入圖片描述

舒適提示:消息發送者在到默認路由信息時,其隊列數量,會選擇DefaultMQProducer#defaultTopicQueueNums與Nameserver返回的的隊列數取最小值,DefaultMQProducer#defaultTopicQueueNums默認值爲4,故自動建立的主題,其隊列數量默認爲4。

Step3:發送消息

DefaultMQProducerImpl#sendKernelImpl 在這裏插入圖片描述

在消息發送時的請求報文中,設置默認topic名稱,消息發送topic名稱,使用的隊列數量爲DefaultMQProducer#defaultTopicQueueNums,即默認爲4。

Step4:Broker端收到消息後的處理流程 服務端收到消息發送的處理器爲:SendMessageProcessor,在處理消息發送時,會調用super.msgCheck方法: AbstractSendMessageProcessor#msgCheck 在這裏插入圖片描述

在Broker端,首先會使用TopicConfigManager根據topic查詢路由信息,若是Broker端不存在該主題的路由配置(路由信息),此時若是Broker中存在默認主題的路由配置信息,則根據消息發送請求中的隊列數量,在Broker建立新Topic的路由信息。這樣Broker服務端就會存在主題的路由信息。

在Broker端的topic配置管理器中存在的路由信息,一會向Nameserver發送心跳包,彙報到Nameserver,另外一方面會有一個定時任務,定時存儲在broker端,具體路徑爲${ROCKET_HOME}/store/config/topics.json中,這樣在Broker關閉後再重啓,並不會丟失路由信息。

廣大讀者朋友,跟蹤到這一步的時候,你們應該對啓用自動建立主題機制時,新主題是的路由信息是如何建立的,爲了方便理解,給出建立主題序列圖:

在這裏插入圖片描述

3.2.2 現象分析

通過上面自動建立路由機制的建立流程,咱們能夠比較容易的分析得出以下結論: 由於開啓了自動建立路由信息,消息發送者根據Topic去NameServer沒法獲得路由信息,但接下來根據默認Topic從NameServer是能拿到路由信息(在每一個Broker中,存在8個隊列),由於兩個Broker在啓動時都會向NameServer彙報路由信息。此時消息發送者緩存的路由信息是2個Broker,每一個Broker默認4個隊列(緣由見3.2.1:Step2的分析)。消息發送者而後按照輪詢機制,發送第一條消息選擇(broker-a的messageQueue:0),向Broker發送消息,Broker服務器在處理消息時,首先會查看本身的路由配置管理器(TopicConfigManager)中的路由信息,此時不存在對應的路由信息,而後嘗試查詢是否存在默認Topic的路由信息,若是存在,說明啓用了autoCreateTopicEnable,則在TopicConfigManager中建立新Topic的路由信息,此時存在與Broker服務端的內存中,而後本次消息發送結束。此時,在NameServer中還不存在新建立的Topic的路由信息。

這裏有三個關鍵點:

  1. 啓用autoCreateTopicEnable建立主題時,在Broker端建立主題的時機爲,消息生產者往Broker端發送消息時纔會建立。
  2. 而後Broker端會在一個心跳包週期內,將新建立的路由信息發送到NameServer,於此同時,Broker端還會有一個定時任務,定時將內存中的路由信息,持久化到Broker端的磁盤上。
  3. 消息發送者會每隔30s向NameServer更新路由信息,若是消息發送端一段時間內未發送消息,就不會有消息發送集羣內的第二臺Broker,那麼NameServer中新建立的Topic的路由信息只會包含Broker-a,而後消息發送者會向NameServer拉取最新的路由信息,此時就會消息發送者本來緩存了2個broker的路由信息,將會變爲一個Broker的路由信息,則該Topic的消息永遠不會發送到另一個Broker,就出現了上述現象。

緣由就分析到這裏了,如今咱們還能夠的大膽假設,開啓autoCreateTopicEnable機制,什麼狀況會在兩個Broker上都建立隊列,其實,咱們只須要連續快速的發送9條消息,就有可能在2個Broker上都建立隊列,驗證代碼以下:

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 9; i++) {
        try {
            Message msg = new Message("TopicTest10" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

驗證結果如圖所示:

在這裏插入圖片描述


做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。

在這裏插入圖片描述

相關文章
相關標籤/搜索