rocketmq之producer解析

先來看下producer核心的類設計,以下圖:java

一、核心發佈消息的類DefaultMQProducer,繼承自MQProducer接口,此接口定義了一系列發送消息的方法,如普通消息,順序消息,延時消息等,最終進行網絡通訊會交給MQClientAPIImpl處理。spring

二、rocketmq從4.1.3版本開始又支持了事務消息,由TransactionMQProducer類提供(以後會有專門的文章進行詳細解讀事務消息)shell

producer之配置

咱們看到DefaultMQProducer繼承了一個客戶端的公共配置類ClientConfig(與consumer公用),其實就是一個普通的javaBean,既能夠代碼中設置屬性,也能夠集成spring來配置服務器

參數名 默認值 說明
namesrvAddr nameserver的地址列表,用分號隔開
clientIP 本機ip地址 客戶端ip地址,有時候沒法識別,須要手動配置
instanceName DEFAULT 客戶端實例名稱,客戶端建立的多個 Producer、Consumer 實際是共用一個內部實例(這個實例包含網絡鏈接、線程資源等)
clientCallbackExecutorThreads cpu核數 通訊層客戶端處理請求的線程數
pollNameServerInterval 30000 輪詢nameserver的時間間隔,單位ms
heartbeatBrokerInterval 30000 向broker發送心跳的時間間隔,單位ms
persistConsumerOffsetInterval 5000 持久化 Consumer 消費進度間隔時間,單位ms

producer獨有的配置:網絡

參數名 默認值 說明
producerGroup DEFAULT_PRODUCER Producer組名,相同分組的producer應該有相同的發送消息邏輯
createTopicKey AUTO_CREATE_TOPIC_KEY 自動建立topic時,以此默認topic爲模板建立指定topic
defaultTopicQueueNums 4 自動建立topic隊列數量
sendMsgTimeout 3000 發送消息的超時時間,單位ms
compressMsgBodyOverHowmuch 4098 消息體超過多大會進行壓縮,單位字節
retryTimesWhenSendFailed 2 同步發送消息,發送失敗重試次數
retryTimesWhenSendAsyncFailed 2 異步發送消息,發送失敗的重試次數
retryAnotherBrokerWhenNotStoreOK false 同步發送消息,消息存儲失敗是否重試其餘broker
maxMessageSize 4194304 客戶端限制消息的大小,默認4M
TransactionListener 事務消息時,必須設置的回查監聽器

producer之group概念

咱們在建立producer時必需要指定一個group,這裏有兩個做用:負載均衡

  • 生產者通常會是集羣部署的,group用來標識一類生產者,相同group的生產者通常要有相同的發送邏輯。
  • 在發送事務消息時,當事務消息異常,broker端來回查事務狀態時,須要知道是由哪類生產者發送的事務消息,生產端會根據group名稱來查找對應的producer來執行相應的回查邏輯。

producer的啓動流程

簡單說明下整個啓動流程:dom

一、首先在DefaultMQProducerImpl中會作一些參數校驗,如group是否合法;而後會建立MQClientInstance實例,此實例包含網絡鏈接、線程資源等,相同的clientId會共享此實例,因此經過MQClientManager來管理。異步

二、核心的啓動流程在MQClientInstance類中,若是nameserver地址沒有配置的話,會先經過靜態的http服務器地址去抓取nameserver的地址;再則啓動netty客戶端。工具

三、啓動一些定時任務,跟producer有關的以下幾個:this

  • 若是producer沒有配置nameserver地址,啓動定時抓取nameserver的地址的定時任務,任務延時10s開始,每隔2分支執行一次。
  • 輪詢nameserver定時任務,主要是定時更新topic的路由信息,任務延時10ms開始,每隔30s執行一次。
  • 清除下線的broker和向broker發送心跳,任務延時1s執行,每隔30s執行一次

Producer如何尋址

RocketMQ 有多種配置方式能夠令客戶端找到 NameServer, 而後經過 NameServer 再找到 Broker,分別以下,
優先級由高到低,高優優先級會覆蓋低優先級

一、代碼中指定 Name Server 地址

producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");

二、啓動參數指定

-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876

三、環境變量指定 Name Server 地址

export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876

四、HTTP 靜態服務器尋址(默認)

若是以上三種都沒有設置name server的地址,客戶端啓動後先會訪問一個靜態http服務器獲取name server的地址,而後會啓動一個定時任務訪問這個靜態 HTTP 服務器,地址以下:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

這是默認的地址,固然你也能夠更改,作以下設置:

代碼:

System.setProperty("rocketmq.namesrv.domain","localhost");
System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")

或者啓動參數指定:

-Drocketmq.namesrv.domain=localhost
-Drocketmq.namesrv.domain.subgroup=nameServer

以上設置後http服務器地址就變成:

http://localhsot:8080/rocketmq/nameServer

這個 URL 的返回內容格式以下:

192.168.0.1:9876;192.168.0.2:9876

客戶端每隔 2 分鐘訪問一次這個 HTTP 服務器,並更新本地的 Name Server 地址。

推薦使用 HTTP 靜態服務器尋址方式,好處是客戶端部署簡單,且 Name Server 集羣能夠熱升級。

發送消息時如何獲取路由信息

一、broker在啓動的時候經過參數autoCreateTopicEnable設置是否自動建立topic,默認爲true,此時會建立一個名爲TBW102(4.3版本已經更名爲AUTO_CREATE_TOPIC_KEY)的topic(參見類TopicConfigManager),broker在向namesrv註冊時會把默認的topic註冊上去。若是設置false,則不會註冊。

二、producer在發送消息時會在本地獲取路由信息,第一次發送的話本地確定沒有,就會去namesrv獲取,若是此時namesrv也沒有,則會獲取TBW102的topic信息(參見DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此爲模板建立topic,而後選擇topic下的一臺broker發,broker建立後,會經過心跳註冊到namesrv上。

三、若是autoCreateTopicEnable設置false的話,producer發送消息會報找不到路由的異常,此時必須手動建立topic。

  • 建議autoCreateTopicEnable設置false,基於以上第二步,自動建立topic後,之後全部該TOPIC的消息,都將發送到剛纔選擇的這臺broke上,達不到負載均衡的目的。因此基於目前RocketMQ的設計,建議關閉自動建立TOPIC的功能,而後根據消息量的大小,手動建立TOPIC。
  • 能夠經過管理工具mqadmin來手動建立topic

    sh mqadmin updateTopic -c [集羣名稱] -n [nameserver地址] -t [topic名稱] -w [寫隊列數] -r [讀隊列數]
  • 手動建立了Topic後,producer就能夠輪詢的發送到不一樣的broker了。

topic的隊列數

這裏講一下自動建立的topic的隊列數如何設置,首先broker建立的模板topic=AUTO_CREATE_TOPIC_KEY的隊列是8,參見類TopicConfigManager:

public TopicConfigManager(BrokerController brokerController) { 
    //省略無關代碼
    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
        String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                                     .getDefaultTopicQueueNums());
        topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                                      .getDefaultTopicQueueNums());
        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
        topicConfig.setPerm(perm);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
     //省略無關代碼
}

BrokerConfig:

private int defaultTopicQueueNums = 8;

DefaultMQProducer端默認知道要建立的topic的隊列數是4

private volatile int defaultTopicQueueNums = 4;

MQClientInstance類的方法updateTopicRouteInfoFromNameServer中有這樣一段邏輯:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
    //省略無關代碼
    for (QueueData data : topicRouteData.getQueueDatas()) {
        int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
        data.setReadQueueNums(queueNums);
        data.setWriteQueueNums(queueNums);
    }
    //省略無關代碼
 }

建立隊列是取二者最小的一個,也就是4,因此要設置topic的隊列數量,很明顯了設置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就能夠了。這是自動建立Topic時隊列數的設置方法,上面也提到生成環境通常不會開啓自動建立Topic的功能,能夠經過上面的手動建立Topic的指令來設置讀寫隊列數。你可能注意到了Topic下有讀寫隊兩個隊列數,分別表明上面意思呢?讀寫隊列實際上是個邏輯概念,一個broker下topic的總隊列數是以寫隊列爲準,而讀隊列意思是容許多少隊列能夠被消費者消費,也就是說讀多寫少的狀況下,沒有問題,隊列均可以被消費掉,若是寫多讀少的話,那麼就會存在隊列不會被消費的狀況。

消息發送

前面咱們講到了如何獲取topic的路由信息,如何建立topic的隊列數,一個topic下有多個隊列,又能夠分佈在不一樣的broker上面,因此topic的總隊列數應該是全部broker上的topic下隊列數的總和。

備註:若是手動在每一個broker上分別建立topic的話,相同topic在不一樣broker上的隊列數能夠不同。

那麼問題來了,在發送消息時根據怎麼樣的策略來選擇一個隊列發送呢?rocketmq提供了一個MQFaultStrategy策略類來負責選擇隊列,這裏會有一個參數sendLatencyFaultEnable是否開啓延遲故障,

  • 該值默認爲false,在不開啓的狀況下,相同線程發送消息是輪詢topic下的全部隊列,不一樣線程發送是隨機的,核心代碼以下:

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            //省略沒必要要的代碼......
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    //以上代碼邏輯參見類MQFaultStrategy.selectOneMessageQueue
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //省略沒必要要的代碼......
        }
    }
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
    //以上代碼邏輯參見類TopicPublishInfo
    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();//ThreadLocal中獲取
        if (null == index) {//爲空,隨機生成一個
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }
        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;
        this.threadLocalIndex.set(index);
        return index;
    }
    //以上代碼參見類ThreadLocalIndex

    每次獲取index的時候都是從本地線程變量ThreadLocal中獲取,沒有的狀況下就是隨機生成一個,加1取絕對值後返回,再對隊列列表的長度取模,因此在同一線程中,會輪訓的從隊列列表獲取隊列。而若是是不一樣線程的話,index是隨機生成的,因此就是隨機從隊列列表中獲取。以下圖所示:

    能夠看到選擇隊列方法的入參有一個lastBrokerName的入參,此參數的目的是在發送消息失敗的狀況下,producer會重試再次發送,而再次發送選擇的隊列須要另選一個broker,lastBrokerName就是要過濾掉失敗的broker,選擇下一個broker的隊列進行發送消息。

  • 開啓延遲故障,每當發送完一次消息,無論成功仍是失敗,都會把這次存儲消息的broker給保存下來,記錄故障狀況下此broker須要延長多長時間才能再次發送,目前看到在代碼裏面寫死了,故障下30s以內是不能再向此broker發送消息了。

消息重試

producer的send方法自己支持內部重試,重試邏輯以下:

一、最大重試次數默認2次,能夠經過參數retryTimesWhenSendFailed設置

二、發送失敗,則輪詢到下一個broker,若是此時只有一個broker在線呢?那就會輪訓這個broker下的其餘隊列。

三、這個方法的總耗時時間不超過 sendMsgTimeout 設置的值,默認爲3s。

若是發送消息,broker返回結果超時,這種超時不會進行重試了;若是是方法自己耗時超過sendMsgTimeout ,還將來得及調用發送消息,此時的超時也不會重試。

以上策略其實也很難保證同步發送消息必定成功,若是應用要保證消息不丟失,最好先把消息存儲到db,後臺啓線程定時重試,確保消息必定存儲到broker。

相關文章
相關標籤/搜索