【RocketMQ】消息的發送

Producer

發送方式

生產者發送消息有三種方式java

  • 同步(Sync):發送方線程發送後同步堵塞等待SendResult,若failed則重試下一個broker
  • 異步(Async):發送方線程發送後無需等待返回結果(返回的是null),當Broker返回結果後,生產者使用回調函數異步處理,超時可拋出Timeout異常
  • 單向(OneWay):發送方不等待broker響應也沒有回調函數觸發,速度快但可靠性弱

生產者類圖設計

個人理解是分爲三層:數組

一、Producer實例層:該層用於建立Topic、消息請求的建立、執行消息發送時的先後置處理、發送不一樣方式的消息,有幾個關鍵參數在DefaultMQProducer中。從類圖中看出,DefaultMQProducerImpl是具體的實現者,DefaultMQProducer實際是提供一些參數的。緩存

參數 默認值 解釋
defaultTopicQueueNums 4 一個主題建立4個消息隊列
sendMsgTimeout 3000ms 發送消息超時時間
compressMsgBodyOverHowmuch 4K 消息體(Msg Body)長度超過4K,須要壓縮
retryTimesWhenSendFailed 2 同步模式下消息發送失敗重試次數
retryTimesWhenSendAsyncFailed 2 異步模式下消息發送失敗重試次數
maxMessageSize 4M 消息最大長度

二、實例管理&服務層:該層主要有MQClientInstance實現,生產者和消費者都會調用這個類,該類具體包含如下功能:安全

  • 註冊&取消生產者(消費者)
  • 尋找&維護Broker路由
  • 發送&查詢消息
  • 基礎服務:包含向全部Broker發送心跳、定時任務執行(見表格)、拉取消息、負載均衡、啓動生產者。
fetchNameServerAddr 間隔2分鐘利用Http獲取NameServer的地址
updateTopicRouteInfoFromNameServer 間隔30s請求NameServer,獲取最新的Topic路由關係
cleanOfflineBroker 間隔30s清理不在線的Broker
sendHeartbeatToAllBrokerWithLock 間隔30s向全部Broker發送心跳
persistAllConsumerOffset 間隔5s把每一個隊列消費到的位置保存到本地文件或者Broker
adjustThreadPool 間隔1分鐘調整線程池,只針對於PushConsumer,調整策略是若是一個消費者未消費消息總和超過100000,增長線程CoreSize,小於80000,減少線程CoreSize,實際RocketMQ未實現該功能

三、網絡通訊層網絡

RocketMQ網絡通訊依賴netty,所以RemotingClient接口的實際實現是NettyRemotingClient,不管是消息的消費仍是生產,最終都會由它執行,具體負責消息發送拉取、接受Broker回傳結果並異步處理(執行回調)、消息處理器管理(不一樣的請求類型使用不一樣的處理器,思想很好,實現時做者仍是使用了同一個Processor)。負載均衡

【MQClientAPIImpl】
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);

this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);

在這一層,NettyRemotingClient不會感知到上層是發送消息仍是拉取消息,對它來講,看到的都是RemotingCommand(具體封裝了請求類型),只有同步、異步、單向執行,因此這三個方式和生產者的那三個(同步發送、異步發送、單向發送)仍是有所不一樣的。異步

發送流程

介紹完Producer發送消息所經歷的層次,分析具體的發送流程。函數

一、查詢本地緩存是否存儲了TopicPublishInfo,不然從NameServer獲取。
二、根據選擇策略獲取待發送隊列。
三、獲取消息隊列對應的broker實際IP。
四、設置消息Unique ID,zip壓縮消息。
五、檢查信息合法性,調用NettyClient發送消息fetch

TopicPublishInfo包含隊列優先級、消息隊列列表、路由信息以及一個線程安全的index座標。this

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

隊列選擇

隊列選擇是經過MQFaultStrategy的selectOneMessageQueue方法完成。MQFaultStrategy重要的屬性包含

//延遲容錯對象,維護延遲Brokers的信息
LatencyFaultTolerance<String /*brokerName*/> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//延遲容錯開關
boolean sendLatencyFaultEnable = false;
//延遲級別數組
long[] latencyMax = { 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L };
//不可用時長數組
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

MQFaultStrategy中最重要的屬性是latencyFaultTolerance,它維護了那些消息發送延遲較高的brokers的信息,同時延遲的時間長短對應了延遲級別latencyMax 和時長notAvailableDuration ,sendLatencyFaultEnable 控制了是否開啓發送消息延遲功能。LatencyFaultToleranceImpl負責判斷隊列是否可用、更新Broker的延遲時間。

public boolean isAvailable(final String name) {
     final FaultItem faultItem = this.faultItemTable.get(name);
     if (faultItem != null) {
         return faultItem.isAvailable();
     }
     //若是隊列中沒找到,說明沒有延遲記錄
     return true;
}

//計算Broker被禁用時間是否到了
public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

 

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            //計算不可用時間持續多久
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
}

private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
}
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
}

最後再來看看selectOneMessageQueue究竟作了什麼?

一、獲取上一次使用以後的隊列,從這個隊列開始判斷該隊列所在的Broker是否可用

二、若是該Broker可用,則返回該隊列

三、若是發現都不符合要求,則至少須要選擇一個相對好的broker,並返回對應的隊列

消息的發送先寫到這裏。下一篇會提到如何消費消息。

相關文章
相關標籤/搜索