微信公衆號「後端進階」,專一後端技術分享:Java、Golang、WEB框架、分佈式中間件、服務治理等等。
老司機傾囊相授,帶你一路進階,來不及解釋了快上車!
我還記得第一次使用rocketmq的時候,須要去控制檯預先建立topic,我當時就想爲何要這麼設計,因而我決定擼一波源碼,帶你們從根源上吃透rocketmq topic的建立機制。java
topic在rocketmq的設計思想裏,是做爲同一個業務邏輯消息的組織形式,它僅僅是一個邏輯上的概念,而在一個topic下又包含若干個邏輯隊列,即消息隊列,消息內容實際是存放在隊列中,而隊列又存儲在broker中,下面我用一張圖來講明topic的存儲模型:git
其實rocketmq中存在兩種不一樣的topic建立方式,一種是我剛剛說的預先建立,另外一種是自動建立,下面我開車帶你們從源碼的角度來詳細地解讀這兩種建立機制。github
默認狀況下,topic不用手動建立,當producer進行消息發送時,會從nameserver拉取topic的路由信息,若是topic的路由信息不存在,那麼會默認拉取broker啓動時默認建立好名爲「TBW102」的Topic:算法
org.apache.rocketmq.common.MixAll:apache
// Will be created at broker when isAutoCreateTopicEnable public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
自動建立的開關配置在BrokerConfig中,經過autoCreateTopicEnable字段進行控制,後端
org.apache.rocketmq.common.BrokerConfig:緩存
@ImportantField private boolean autoCreateTopicEnable = true;
在broker啓動時,會調用TopicConfigManager的構造方法,autoCreateTopicEnable打開後,會將「TBW102」保存到topicConfigTable中:bash
org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager:微信
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); }
broker會經過發送心跳包將topicConfigTable的topic信息發送給nameserver,nameserver將topic信息註冊到RouteInfoManager中。負載均衡
繼續看消息發送時是如何從nameserver獲取topic的路由信息:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 生產者第一次發送消息,topic在nameserver中並不存在 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 第二次請求會將isDefault=true,開啓默認「TBW102」從namerserver獲取路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
如上方法,topic首次發送消息,此時並不能從namserver獲取topic的路由信息,那麼接下來會進行第二次請求namserver,這時會將isDefault=true,開啓默認「TBW102」從namerserver獲取路由信息,此時的「TBW102」topic已經被broker默認註冊到nameserver了:
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
if (isDefault && defaultMQProducer != null) { // 使用默認的「TBW102」topic獲取路由信息 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } }
若是isDefault=true而且defaultMQProducer不爲空,從nameserver中獲取默認路由信息,此時會獲取全部已開啓自動建立開關的broker的默認「TBW102」topic路由信息,並保存默認的topic消息隊列數量。
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); }
從本地緩存中取出topic的路由信息,因爲topic是第一次發送消息,這時本地並無該topic的路由信息,因此對比該topic路由信息對比「TBW102」時changed爲true,即有變化,進入如下邏輯:
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
// Update sub info { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } }
將「TBW102」topic路由信息構建TopicPublishInfo,並將用topic爲key,TopicPublishInfo爲value更新本地緩存,到這裏就明白了,原來broker們千辛萬苦建立「TBW102」topic並將其路由信息註冊到nameserver,被新來的topic獲取後當即用「TBW102」topic的路由信息構建出一個TopicPublishInfo而且據爲己有,因爲TopicPublishInfo的路由信息時默認「TBW102」topic,所以真正要發送消息的topic也會被負載發送到「TBW102」topic所在的broker中,這裏咱們能夠將其稱之爲偷樑換柱的作法。
當broker接收到消息後,會在msgCheck方法中調用createTopicInSendMessageMethod方法,將topic的信息塞進topicConfigTable緩存中,而且broker會定時發送心跳將topicConfigTable發送給nameserver進行註冊。
自動建立與消息發送時獲取topic信息的時序圖:
其實這個叫預先建立彷佛更加適合,即預先在broker中建立好topic的相關信息並註冊到nameserver中,而後client端發送消息時直接從nameserver中獲取topic的路由信息,可是手動建立從動做上來將更加形象通俗易懂,直接告訴你,你的topic信息須要在控制檯上本身手動建立。
預先建立須要經過mqadmin提供的topic相關命令進行建立,執行:
./mqadmin updateTopic
官方給出的各項參數以下:
usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t <arg> [-u <arg>] [-w <arg>] -b,--brokerAddr <arg> create topic to which broker -c,--clusterName <arg> create topic to which cluster -h,--help Print help -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 -o,--order <arg> set topic's order(true|false -p,--perm <arg> set topic's permission(2|4|6), intro[2:W 4:R; 6:RW] -r,--readQueueNums <arg> set read queue nums -s,--hasUnitSub <arg> has unit sub (true|false -t,--topic <arg> topic name -u,--unit <arg> is unit topic (true|false -w,--writeQueueNums <arg> set write queue nums
咱們直接定位到其實現類執行命令的方法:
經過broker模式建立:
org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:
// -b,--brokerAddr <arg> create topic to which broker if (commandLine.hasOption('b')) { String addr = commandLine.getOptionValue('b').trim(); defaultMQAdminExt.start(); defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); return; }
從commandLine命令行工具獲取運行時-b參數重的broker的地址,defaultMQAdminExt是默認的rocketmq控制檯執行的API,此時調用start方法,該方法建立了一個mqClientInstance,它封裝了netty通訊的細節,接着就是最重要的一步,調用createAndUpdateTopicConfig將topic配置信息發送到指定的broker上,完成topic的建立。
經過集羣模式建立:
org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:
// -c,--clusterName <arg> create topic to which cluster else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim(); defaultMQAdminExt.start(); Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); System.out.printf("create topic to %s success.%n", addr); } return; }
經過集羣模式建立與經過broker模式建立的邏輯大體相同,多了根據集羣從nameserver獲取集羣下全部broker的master地址這個步驟,而後在循環發送topic信息到集羣中的每一個broker中,這個邏輯跟指定單個broker是一致的。
這也說明了當用集羣模式去建立topic時,集羣裏面每一個broker的queue的數量相同,當用單個broker模式去建立topic時,每一個broker的queue數量能夠不一致。
預先建立時序圖:
建議線下開啓,線上關閉,不是我說的,是官方給出的建議:
rocketmq爲何要這麼設計呢?通過一波源碼深度解析後,我獲得了我想要的答案:
根據上面的源碼分析,咱們得出,rocketmq在發送消息時,會先去獲取topic的路由信息,若是topic是第一次發送消息,因爲nameserver沒有topic的路由信息,因此會再次以「TBW102」這個默認topic獲取路由信息,假設broker都開啓了自動建立開關,那麼此時會獲取全部broker的路由信息,消息的發送會根據負載算法選擇其中一臺Broker發送消息,消息到達broker後,發現本地沒有該topic,會在建立該topic的信息塞進本地緩存中,同時會將topic路由信息註冊到nameserver中,那麼這樣就會形成一個後果:之後全部該topic的消息,都將發送到這臺broker上,若是該topic消息量很是大,會形成某個broker上負載過大,這樣消息的存儲就達不到負載均衡的目的了。
掃面下方二維碼,關注「Java科表明」,開車帶你臨摹各類源碼,來不及解釋了快上車!