RocketMQ 消息發送

1.漫談 RocketMQ 消息發送

RocketMQ 發送普通消息有 三 種實現方式:可靠同步發送 、 可靠異步發送 、 單向 (Oneway)發送。算法

  • 同步 : 發送者向 MQ 執行發送消息 API 時,同步等待, 直到消息服務器返回發送結果 。
  • 異步 : 發送者向 MQ 執行發送消息 API 時,指定消息發送成功後的回掉函數,而後調用消息發送 API 後,當即返回,消息發送者線程不阻 塞 ,直到運行結束,消息發送成功或失敗的回調任務在一個新的線程中執行 。
  • 單向:消息發送者向 MQ 執行發送消息 API 時,直接返回,不等待消息服務器的結果, 也不註冊回調函數,簡單地說,就是隻管發,不在意消息是否成功存儲在消息服務器上 。

2.RocketMQ 消息

RocketMQ 消息封裝類是 org.apache.rocketmq.common.message.Message。apache

Message 擴展屬性主要包含下面幾個 。緩存

  • tag:消息 TAG,用於消息過濾 。
  • keys: Message 索引鍵, 多個用空格隔開, RocketMQ 能夠根據這些 key 快速檢索到消息 。
  • waitStoreMsgOK:消息發送時是否等消息存儲完成後再返回 。
  • delayTimeLevel: 消息延遲級別,用於定時消息或消息重試 。 這些擴展屬性存儲在 Message 的 properties 中 。

3.生產者啓動流程

3.1 DefaultMQProducer 消息發送者

org.apache.rocketmq.client.producer.DefaultMQProducer
複製代碼

重要屬性

/**
     * 生產者所屬組,消息服務器在回查事務狀態時回隨機選擇該組中任何一個生產者發起事務回查請求
     */
    private String producerGroup;
    
    /**
     * 默認 topicKey。TBW102
     */
    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
    
    /**
     * 默認主題在每個Broker 隊列數量
     */
    private volatile int defaultTopicQueueNums = 4;

    /**
     * 消息發送默認超時時間,默認3s
     */
    private int sendMsgTimeout = 3000;

    /**
     * 消息體超過該值則啓用壓縮,默認4k
     */
    private int compressMsgBodyOverHowmuch = 1024 * 4;

    /**
     * 同步方式發送消息重試次數,默認爲2,總共執行3次
     */
    private int retryTimesWhenSendFailed = 2;

    /**
     * 異步方式發送消息重試次數,默認爲2
     */
    private int retryTimesWhenSendAsyncFailed = 2;

    /**
     * 消息重試選擇另一個Broker 時,是否不等待存儲結果就返回,默認爲false
     */
    private boolean retryAnotherBrokerWhenNotStoreOK = false;

    /**
     * 容許發送的最大消息長度 默認爲 4M
     */
    private int maxMessageSize = 1024 * 1024 * 4;
複製代碼

重要方法

1.建立主題bash

/**
* key:目前未實際做用,能夠與 newTopic 相同 。
* newTopic: 主題名稱 。
* queueNum:隊列數量 。
* topicSysF!ag:主題系統標籤,默認爲 0。
**/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
複製代碼

2.查找該主題下全部消息隊列服務器

List<MessageQueue> fetchPublishMessageQueues(String topic);
複製代碼

3.同步發送消息,具體發送到主題中的那個消息隊列由負載算法決定。網絡

SendResult send(Message msg)
複製代碼

4.同步發送消息,若是發送超過timeout,則拋出超時異常異步

SendResult send(Message msg,long timeout)
複製代碼

5.異步發送消息,sendCallback 參數是消息發送成功後的回調方法函數

void send(Message msg,SendCallback sendCallback)
複製代碼

6.異步發送消息,若是發送超過timeout,則拋出超時異常fetch

void send(Message msg, SendCallback sendCallback, long timeout)
複製代碼

7.單向消息發送,不在意發送結果,消息發送出去後該方法馬上返回spa

void sendOneway(Message msg) 
複製代碼

8.同步方式發送消息,發送到指定消息隊列

SendResult send(Message msg, MessageQueue mq)
複製代碼

9.異步方式發送消息,發送到指定消息隊列

void send(Message msg, MessageQueue mq, SendCallback sendCallback);
複製代碼

10.單向發送消息,發送到指定消息隊列

void sendOneway(Message msg,MessageQueue mq);
複製代碼

11.根據時間戳從隊列中查找其偏移量

long searchOffset(MessageQueue mq, long timestamp)
複製代碼

12.查找該消息隊列中最大的物理偏移量

long maxOffset(MessageQueue mq)
複製代碼

13.查找該消息隊列中最小的物理偏移量

long minOffset(MessageQueue mq) 
複製代碼

14.根據消息偏移量查找消息

MessageExt viewMessage(String offsetMsgId)
複製代碼

15.根據條件查詢消息

* @param topic message topic        -----消息主題
 * @param key message key index word -----消息索引字段
 * @param maxNum max message number  ----本次最多取出消息條數
 * @param begin from when            -----開始時間
 * @param end to when                -----結束時間
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
複製代碼

16.根據主題與消息ID 查詢

MessageExt viewMessage(String topic,String msgId)
複製代碼

17.同步批量發送消息

SendResult send(Collection<Message> msgs);
複製代碼

3.2 消息生產者啓動流程

step1.

檢查ProducerGroup 是否符合要求;並改變生產者的instanceName 爲進程ID

step2.

建立MQClientInstance實例MQClientlnstance 封裝了 RocketMQ 網絡處理 API,是消息生產者( Producer)、消息消費者 (Consumer)與 NameServer、 Broker打交道的網絡通道。。整個JVM實例只存在一個MQClientManager實例,維護一個MQClientInstance緩存表factoryTable.也就是 同一個 clientld只 會建立一個 MQClientinstanc巳。

ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
        new ConcurrentHashMap<String, MQClientInstance>
複製代碼

clientld爲客戶端 IP+instance+(unitname可選),若是 instance 爲默認值 DEFAULT 的話, RocketMQ 會自動將 instance 設置爲進程 ID,這樣避免了不一樣進程的相互影響.但同 一 個 NM 中 的不一樣消費 者和不一樣生產者在啓動時獲取到的 MQC!ientlnstane 實例都是同 一個

step3.

向 MQClientlnstance註冊,將當前生產者加入到 MQClientlnstance管理中,方 便後續調用網絡請求、進行心跳檢測等。

step4.

啓動 MQClientlnstance,若是 MQC!ientlnstance 已經啓動 ,則本次啓 動不會真 正執行。

4.消息發送基本流程

消息發送流程主要的步驟:驗證消息、查找路由 、 消息發送 (包含異常處理機制) 。默認消息發送以同 步方式發送,默認超 時時間 爲 3s。

4.1 消息長度驗證

消息發送以前,首先確保生 產 者處於運行狀態,而後驗證消息是否符合相應的規範, 具體的規範要求是主題名稱 、 消息體不能爲空 、 消息長度不能等於 0且默認不能超過容許 發送消息的最大長度 4M (maxMessageSize=l024 *1024 *4)。

4.1 查找主題路由信息

消息發送以前,首先須要獲取主題的路由信息,只有獲取了這些信息咱們才知道消息 要發送到具體的 Broker節點。

若是生產者中緩存了 topic 的路由信息,若是該路由信息中包含了消息隊列,則直接返回該路由信息,若是沒有緩存 或沒有包含消息隊列, 則向 NameServer查詢該 topic 的路由信息。 若是最終未找到路由信 息,則拋出異常 : 沒法找到主題相關路由信息異常 .

5.總結

1 )消息生產者啓動流程

重點理解 MQClientlnstance、消 息生產者之間的關係 。

2 )消息隊列負載機制

消息生產者在發送消息時,若是本地路由表中未緩存 topic 的路由信息,向 Name­ Server 發送獲取路由信息請求,更新本 地路由信息表,而且消息生 產者每隔 30s 從 Name­ Server 更新路由表 。

3 )消息發送異常機制 消息發送高可用主要經過兩個手段 : 重試與 Broker規避。 Brok巳r規避就是在一次消息 發送過程當中發現錯誤,在某一時間段內,消息生產者不會選擇該 Broker(消息服務器)上的 消息隊列,提升發送消息的成功 率 。

4 )批量消息發送

RocketMQ 支持將同一主題下的多條消息一次性發送到消息服務端。

相關文章
相關標籤/搜索