生產者發送消息有三種方式java
個人理解是分爲三層:數組
一、Producer實例層:該層用於建立Topic、消息請求的建立、執行消息發送時的先後置處理、發送不一樣方式的消息,有幾個關鍵參數在DefaultMQProducer中。從類圖中看出,DefaultMQProducerImpl是具體的實現者,DefaultMQProducer實際是提供一些參數的。緩存
參數 | 默認值 | 解釋 |
defaultTopicQueueNums | 4 | 一個主題建立4個消息隊列 |
sendMsgTimeout | 3000ms | 發送消息超時時間 |
compressMsgBodyOverHowmuch | 4K | 消息體(Msg Body)長度超過4K,須要壓縮 |
retryTimesWhenSendFailed | 2 | 同步模式下消息發送失敗重試次數 |
retryTimesWhenSendAsyncFailed | 2 | 異步模式下消息發送失敗重試次數 |
maxMessageSize | 4M | 消息最大長度 |
二、實例管理&服務層:該層主要有MQClientInstance實現,生產者和消費者都會調用這個類,該類具體包含如下功能:安全
fetchNameServerAddr | 間隔2分鐘利用Http獲取NameServer的地址 |
updateTopicRouteInfoFromNameServer | 間隔30s請求NameServer,獲取最新的Topic路由關係 |
cleanOfflineBroker | 間隔30s清理不在線的Broker |
sendHeartbeatToAllBrokerWithLock | 間隔30s向全部Broker發送心跳 |
persistAllConsumerOffset | 間隔5s把每一個隊列消費到的位置保存到本地文件或者Broker |
adjustThreadPool | 間隔1分鐘調整線程池,只針對於PushConsumer,調整策略是若是一個消費者未消費消息總和超過100000,增長線程CoreSize,小於80000,減少線程CoreSize,實際RocketMQ未實現該功能 |
三、網絡通訊層網絡
RocketMQ網絡通訊依賴netty,所以RemotingClient接口的實際實現是NettyRemotingClient,不管是消息的消費仍是生產,最終都會由它執行,具體負責消息發送拉取、接受Broker回傳結果並異步處理(執行回調)、消息處理器管理(不一樣的請求類型使用不一樣的處理器,思想很好,實現時做者仍是使用了同一個Processor)。負載均衡
【MQClientAPIImpl】 this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
在這一層,NettyRemotingClient不會感知到上層是發送消息仍是拉取消息,對它來講,看到的都是RemotingCommand(具體封裝了請求類型),只有同步、異步、單向執行,因此這三個方式和生產者的那三個(同步發送、異步發送、單向發送)仍是有所不一樣的。異步
介紹完Producer發送消息所經歷的層次,分析具體的發送流程。函數
一、查詢本地緩存是否存儲了TopicPublishInfo,不然從NameServer獲取。
二、根據選擇策略獲取待發送隊列。
三、獲取消息隊列對應的broker實際IP。
四、設置消息Unique ID,zip壓縮消息。
五、檢查信息合法性,調用NettyClient發送消息fetch
TopicPublishInfo包含隊列優先級、消息隊列列表、路由信息以及一個線程安全的index座標。this
public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; }
隊列選擇是經過MQFaultStrategy的selectOneMessageQueue方法完成。MQFaultStrategy重要的屬性包含
//延遲容錯對象,維護延遲Brokers的信息 LatencyFaultTolerance<String /*brokerName*/> latencyFaultTolerance = new LatencyFaultToleranceImpl(); //延遲容錯開關 boolean sendLatencyFaultEnable = false; //延遲級別數組 long[] latencyMax = { 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L }; //不可用時長數組 long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
MQFaultStrategy中最重要的屬性是latencyFaultTolerance,它維護了那些消息發送延遲較高的brokers的信息,同時延遲的時間長短對應了延遲級別latencyMax 和時長notAvailableDuration ,sendLatencyFaultEnable 控制了是否開啓發送消息延遲功能。LatencyFaultToleranceImpl負責判斷隊列是否可用、更新Broker的延遲時間。
public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } //若是隊列中沒找到,說明沒有延遲記錄 return true; } //計算Broker被禁用時間是否到了 public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { //計算不可用時間持續多久 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
最後再來看看selectOneMessageQueue究竟作了什麼?
一、獲取上一次使用以後的隊列,從這個隊列開始判斷該隊列所在的Broker是否可用
二、若是該Broker可用,則返回該隊列
三、若是發現都不符合要求,則至少須要選擇一個相對好的broker,並返回對應的隊列
消息的發送先寫到這裏。下一篇會提到如何消費消息。