生產者發送消息,須要等到消息服務器返回結果java
生產者發送消息, 不須要等到消息服務器返回結果,生產者線程不阻塞,只需註冊監聽回調函數便可。apache
生產者只管發,無論成功與否。緩存
org.apache.rocketmq.common.message.Message服務器
private String topic; private int flag; // 屬性 private Map<String, String> properties; // 消息體 private byte[] body; // 事務 ID private String transactionId;
properties 存放的擴展屬性主要有:網絡
用於過濾消息異步
Message 索引鍵,多個用空格隔開,RocketMQ 可根據這些 key 快速檢索消息函數
消息發送時,是否等到消息存儲完,再返回this
消息延遲級別,用於定時消息或消息重試線程
// 生成者組,消息服務器在回查事務狀態時,會隨機選擇該組中的任何一個生成者發起事務回查請求 private String producerGroup; // 默認的 topic key private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; // TBW102 // 默認的主題隊列數量 private volatile int defaultTopicQueueNums = 4; // 默認發送超時時間 private int sendMsgTimeout = 3000; // 消息體超過 4kb 時,壓縮消息 private int compressMsgBodyOverHowmuch = 1024 * 4; // 同步發送失敗,最大嘗試次數 private int retryTimesWhenSendFailed = 2; // 異步發送失敗,最大嘗試次數 private int retryTimesWhenSendAsyncFailed = 2; // 消息重試時選擇另一個 broker 時,是否不等待存儲結果就返回。默認 false private boolean retryAnotherBrokerWhenNotStoreOK = false; // 容許發送的最大消息長度 private int maxMessageSize = 1024 * 1024 * 4; // 4M
流程代碼位於:DefaultMQProducerImpl#startcode
流程代碼位於:DefaultMQProducerImpl#sendDefaultImpl
重要的類:TopicPublishInfo
// 是不是順序消息 private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; // 主題的消息隊列 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); // 用於選擇消息隊列。每次選擇一個隊列,會自增1,若是等於 Integer.MAX_VALUE,重置爲0 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; // TopicRouteData private String orderTopicConf; private List<QueueData> queueDatas; private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
代碼位於:DefaultMQProducerImpl#tryToFindTopicPublishInfo(final String topic)
sendLatencyFaultEnable = false 默認不啓用 broker 故障延遲機制。
RocketMQ 會避開上一次發送失敗的 broker
代碼入口:TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 避開上一次發送失敗的 broker if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
代碼入口:MQFaultStrategy#selectOneMessageQueue
代碼入口:DefaultMQProducerImpl#sendKernelImpl
若是發送失敗,則繼續重複 選擇消息隊列,而後發送
重試的調用入口是在收到服務端響應包時進行的。所以網絡異常、網絡超時,將不會觸發重試。
異步發送,不須要回調,沒有重試機制
單機環境確定沒救了,集羣環境下沒有影響的。
rocketMQ 每次發送消息時,會選擇消息隊列。第一次發送失敗後,會進行失敗的嘗試。此時會將上一次發送失敗的 broker 排除。從新選擇一個 broker。
若是是當前保持鏈接的 namesrv 保存的 broker 信息,都掛掉了,那麼必定是會失敗的。(若是是這個緣由,那麼我以爲,全部的 broker 應該掛掉了)
若是是此 namesrv 沒有 topic 對應的 broker ,那麼 RocketMQ 會選擇其餘的 namesrv 保持鏈接,因此,不會發送失敗。
相關代碼:NettyRemotingClient#getAndCreateNameserverChannel()
若是隻是 namesrv 中的某個 broker 掛掉了,可是由於 Producer 有消息重試機制,會選擇其餘的 broker。所以,消息不會發送失敗。除非全部的 broker 都失敗了。
結論:除非全部 broker 掛掉,否則我以爲不會形成消息發送失敗。
答案是不會。
緣由是,每次再向 namesrv 發送消息時,須要判斷與 namesrv 的 channel 是否有效。若是無效,則會嘗試從剩下的 namesrv 查找一個有效的,並與之保持鏈接。
相關代碼位於: NettyRemotingClient#getAndCreateNameserverChannel()