RocketMQ 發送普通消息有 三 種實現方式:可靠同步發送 、 可靠異步發送 、 單向 (Oneway)發送。算法
RocketMQ 消息封裝類是 org.apache.rocketmq.common.message.Message。apache
Message 擴展屬性主要包含下面幾個 。緩存
org.apache.rocketmq.client.producer.DefaultMQProducer
複製代碼
/**
* 生產者所屬組,消息服務器在回查事務狀態時回隨機選擇該組中任何一個生產者發起事務回查請求
*/
private String producerGroup;
/**
* 默認 topicKey。TBW102
*/
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
/**
* 默認主題在每個Broker 隊列數量
*/
private volatile int defaultTopicQueueNums = 4;
/**
* 消息發送默認超時時間,默認3s
*/
private int sendMsgTimeout = 3000;
/**
* 消息體超過該值則啓用壓縮,默認4k
*/
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
* 同步方式發送消息重試次數,默認爲2,總共執行3次
*/
private int retryTimesWhenSendFailed = 2;
/**
* 異步方式發送消息重試次數,默認爲2
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* 消息重試選擇另一個Broker 時,是否不等待存儲結果就返回,默認爲false
*/
private boolean retryAnotherBrokerWhenNotStoreOK = false;
/**
* 容許發送的最大消息長度 默認爲 4M
*/
private int maxMessageSize = 1024 * 1024 * 4;
複製代碼
1.建立主題bash
/**
* key:目前未實際做用,能夠與 newTopic 相同 。
* newTopic: 主題名稱 。
* queueNum:隊列數量 。
* topicSysF!ag:主題系統標籤,默認爲 0。
**/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
複製代碼
2.查找該主題下全部消息隊列服務器
List<MessageQueue> fetchPublishMessageQueues(String topic);
複製代碼
3.同步發送消息,具體發送到主題中的那個消息隊列由負載算法決定。網絡
SendResult send(Message msg)
複製代碼
4.同步發送消息,若是發送超過timeout,則拋出超時異常異步
SendResult send(Message msg,long timeout)
複製代碼
5.異步發送消息,sendCallback 參數是消息發送成功後的回調方法函數
void send(Message msg,SendCallback sendCallback)
複製代碼
6.異步發送消息,若是發送超過timeout,則拋出超時異常fetch
void send(Message msg, SendCallback sendCallback, long timeout)
複製代碼
7.單向消息發送,不在意發送結果,消息發送出去後該方法馬上返回spa
void sendOneway(Message msg)
複製代碼
8.同步方式發送消息,發送到指定消息隊列
SendResult send(Message msg, MessageQueue mq)
複製代碼
9.異步方式發送消息,發送到指定消息隊列
void send(Message msg, MessageQueue mq, SendCallback sendCallback);
複製代碼
10.單向發送消息,發送到指定消息隊列
void sendOneway(Message msg,MessageQueue mq);
複製代碼
11.根據時間戳從隊列中查找其偏移量
long searchOffset(MessageQueue mq, long timestamp)
複製代碼
12.查找該消息隊列中最大的物理偏移量
long maxOffset(MessageQueue mq)
複製代碼
13.查找該消息隊列中最小的物理偏移量
long minOffset(MessageQueue mq)
複製代碼
14.根據消息偏移量查找消息
MessageExt viewMessage(String offsetMsgId)
複製代碼
15.根據條件查詢消息
* @param topic message topic -----消息主題
* @param key message key index word -----消息索引字段
* @param maxNum max message number ----本次最多取出消息條數
* @param begin from when -----開始時間
* @param end to when -----結束時間
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
複製代碼
16.根據主題與消息ID 查詢
MessageExt viewMessage(String topic,String msgId)
複製代碼
17.同步批量發送消息
SendResult send(Collection<Message> msgs);
複製代碼
檢查ProducerGroup 是否符合要求;並改變生產者的instanceName 爲進程ID
建立MQClientInstance實例MQClientlnstance 封裝了 RocketMQ 網絡處理 API,是消息生產者( Producer)、消息消費者 (Consumer)與 NameServer、 Broker打交道的網絡通道。
。整個JVM實例只存在一個MQClientManager實例,維護一個MQClientInstance緩存表factoryTable.也就是 同一個 clientld只 會建立一個 MQClientinstanc巳。
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>
複製代碼
向 MQClientlnstance註冊,將當前生產者加入到 MQClientlnstance管理中,方 便後續調用網絡請求、進行心跳檢測等。
啓動 MQClientlnstance,若是 MQC!ientlnstance 已經啓動 ,則本次啓 動不會真 正執行。
消息發送流程主要的步驟:驗證消息、查找路由 、 消息發送 (包含異常處理機制) 。默認消息發送以同 步方式發送,默認超 時時間 爲 3s。
消息發送以前,首先確保生 產 者處於運行狀態,而後驗證消息是否符合相應的規範, 具體的規範要求是主題名稱 、 消息體不能爲空 、 消息長度不能等於 0且默認不能超過容許 發送消息的最大長度 4M (maxMessageSize=l024 *1024 *4)。
消息發送以前,首先須要獲取主題的路由信息,只有獲取了這些信息咱們才知道消息 要發送到具體的 Broker節點。
1 )消息生產者啓動流程
重點理解 MQClientlnstance、消 息生產者之間的關係 。
2 )消息隊列負載機制
消息生產者在發送消息時,若是本地路由表中未緩存 topic 的路由信息,向 Name Server 發送獲取路由信息請求,更新本 地路由信息表,而且消息生 產者每隔 30s 從 Name Server 更新路由表 。
3 )消息發送異常機制 消息發送高可用主要經過兩個手段 : 重試與 Broker規避。 Brok巳r規避就是在一次消息 發送過程當中發現錯誤,在某一時間段內,消息生產者不會選擇該 Broker(消息服務器)上的 消息隊列,提升發送消息的成功 率 。
4 )批量消息發送
RocketMQ 支持將同一主題下的多條消息一次性發送到消息服務端。