RocketMQ(4.6.1) 系列教程 -- Producer 篇

RocketMQ 消息發送

三種發送方式

  • 同步

生產者發送消息,須要等到消息服務器返回結果java

  • 異步

生產者發送消息, 不須要等到消息服務器返回結果,生產者線程不阻塞,只需註冊監聽回調函數便可。apache

  • 單向

生產者只管發,無論成功與否。緩存

消息結構

org.apache.rocketmq.common.message.Message服務器

private String topic;
private int flag;
// 屬性
private Map<String, String> properties;
// 消息體
private byte[] body;
// 事務 ID
private String transactionId;

properties 存放的擴展屬性主要有:網絡

  • tags

用於過濾消息異步

  • keys

Message 索引鍵,多個用空格隔開,RocketMQ 可根據這些 key 快速檢索消息函數

  • waitStoreMsgOk

消息發送時,是否等到消息存儲完,再返回this

  • delayTimeLevel

消息延遲級別,用於定時消息或消息重試線程

生產者

DefaultMQProducer 核心屬性

// 生成者組,消息服務器在回查事務狀態時,會隨機選擇該組中的任何一個生成者發起事務回查請求
private String producerGroup;

// 默認的 topic key
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; // TBW102

// 默認的主題隊列數量
private volatile int defaultTopicQueueNums = 4;

// 默認發送超時時間
private int sendMsgTimeout = 3000;

// 消息體超過 4kb 時,壓縮消息
private int compressMsgBodyOverHowmuch = 1024 * 4;

// 同步發送失敗,最大嘗試次數
private int retryTimesWhenSendFailed = 2;

// 異步發送失敗,最大嘗試次數
private int retryTimesWhenSendAsyncFailed = 2;

// 消息重試時選擇另一個 broker 時,是否不等待存儲結果就返回。默認 false
private boolean retryAnotherBrokerWhenNotStoreOK = false;

// 容許發送的最大消息長度
private int maxMessageSize = 1024 * 1024 * 4; // 4M

生產者啓動

流程代碼位於:DefaultMQProducerImpl#startcode

  1. 檢查 productGroup,默認爲 CLIENT_INNER_PRODUCER。
  2. 將 instanceName 修改成 PID。即 clientId 格式爲 ip@pid
  3. 建立 MQClientInstance 實例,給交給 MQClientManager 管理
  4. 建立默認的 Topic(TBW102),並放到 topicPublishInfoTable 中。

消息發送流程

流程代碼位於:DefaultMQProducerImpl#sendDefaultImpl

驗證消息
查找路由
  1. Producer 優先從緩存中查找,存在而且正常直接返回
  2. 若是沒有,則向 namesrv 發起請求, namesrv 返回 TopicRouteData。
  3. 判斷返回的 TopicRouteData 與舊的是否被改變過;若是是,則TopicPublishInfo 更新 queueDatas,brokerDatas 信息。

重要的類:TopicPublishInfo

// 是不是順序消息
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;

// 主題的消息隊列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 用於選擇消息隊列。每次選擇一個隊列,會自增1,若是等於 Integer.MAX_VALUE,重置爲0
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;

// TopicRouteData
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

代碼位於:DefaultMQProducerImpl#tryToFindTopicPublishInfo(final String topic)

選擇消息隊列
不啓用故障延遲機制

sendLatencyFaultEnable = false 默認不啓用 broker 故障延遲機制。

RocketMQ 會避開上一次發送失敗的 broker

代碼入口:TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  if (lastBrokerName == null) {
    return selectOneMessageQueue();
  } else {
    int index = this.sendWhichQueue.getAndIncrement();
    for (int i = 0; i < this.messageQueueList.size(); i++) {
      int pos = Math.abs(index++) % this.messageQueueList.size();
      if (pos < 0)
        pos = 0;
      MessageQueue mq = this.messageQueueList.get(pos);
      // 避開上一次發送失敗的 broker
      if (!mq.getBrokerName().equals(lastBrokerName)) {
        return mq;
      }
    }
    return selectOneMessageQueue();
  }
}
啓用故障延遲機制

代碼入口:MQFaultStrategy#selectOneMessageQueue

消息發送

代碼入口:DefaultMQProducerImpl#sendKernelImpl

  1. 找到所要發送的 broker 地址
  2. 爲消息分配全局惟一ID,消息體超過4kb,則採用ZIP壓縮。
  3. 若是註冊了<font color="red">消息發送鉤子函數</font>,則執行消息發送以前的加強邏輯。經過 <font color="red">DefaultMQProducerImpl#registerSendMessageHook() </font>註冊鉤子處理類,且能夠註冊多個
  4. 構造消息發送請求包
  5. 根據消息發送方式,進行發送(同步、異步、單向)
  6. 若是註冊了消息發送鉤子函數,執行after 邏輯

若是發送失敗,則繼續重複 選擇消息隊列,而後發送

同步發送
  1. 檢查消息發送是否合理
  2. 消息重試次數超過最大重試次數,消息將進入 DLD 延遲隊列。主題爲:%DLD%
  3. 消息存儲
異步方式

重試的調用入口是在收到服務端響應包時進行的。所以網絡異常、網絡超時,將不會觸發重試。

單向發送

異步發送,不須要回調,沒有重試機制

上篇疑問解答

Q: 路由發現與刪除機制存在一個問題,Namesrv 須要等 Broker 失效 至少 120s 才能將 Broker 從路由表中刪除。那麼若是,Producer 獲取到的 broker 是已經宕機的信息,那麼是否會形成消息發送失敗

單機環境確定沒救了,集羣環境下沒有影響的。

rocketMQ 每次發送消息時,會選擇消息隊列。第一次發送失敗後,會進行失敗的嘗試。此時會將上一次發送失敗的 broker 排除。從新選擇一個 broker。

Q: namesrv 之間數據不共享,那麼會形成消息的發送失敗嗎?

若是是當前保持鏈接的 namesrv 保存的 broker 信息,都掛掉了,那麼必定是會失敗的。(若是是這個緣由,那麼我以爲,全部的 broker 應該掛掉了)

若是是此 namesrv 沒有 topic 對應的 broker ,那麼 RocketMQ 會選擇其餘的 namesrv 保持鏈接,因此,不會發送失敗。

相關代碼:NettyRemotingClient#getAndCreateNameserverChannel()

若是隻是 namesrv 中的某個 broker 掛掉了,可是由於 Producer 有消息重試機制,會選擇其餘的 broker。所以,消息不會發送失敗。除非全部的 broker 都失敗了。

結論:除非全部 broker 掛掉,否則我以爲不會形成消息發送失敗。

Q: 集羣中的 namesrv 掛掉了一個,是否會形成鏈接它的 broker 發送消息失敗?

答案是不會。

緣由是,每次再向 namesrv 發送消息時,須要判斷與 namesrv 的 channel 是否有效。若是無效,則會嘗試從剩下的 namesrv 查找一個有效的,並與之保持鏈接。

相關代碼位於: NettyRemotingClient#getAndCreateNameserverChannel()

相關文章
相關標籤/搜索