RocketMQ Producer發送消息流程

 這節介紹Producer發送消息的流程。算法

 接上一節開頭的Demo,發送消息的寫法以下:數組

public class SyncProducer {

    public static void main (String[] args) throws Exception {
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer ("GroupTest");
        // 設置NameServer的地址
        producer.setNamesrvAddr ("localhost:9876");
        // 啓動Producer實例
        producer.start ();
        for (int i = 0; i < 100; i++) {
            // 建立消息,並指定Topic,Tag和消息體
            Message msg = new Message ("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 發送消息到一個Broker
            SendResult sendResult = producer.send (msg);
            // 經過sendResult返回消息是否成功送達
            System.out.printf ("%s%n", sendResult);
        }
        // 若是再也不發送消息,關閉Producer實例。
        producer.shutdown ();
    }
}

發送消息的方法爲:緩存

SendResult sendResult = producer.send (msg);

其send方法內容以下:微信

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg);
}

主要調用了DefaulMQProducerImpl,委託給了DefaultMQProducerImpl的send方法。而DefaultMQProducerImpl又調用了自身的sendDefaultImpl,該方法完成了發送的主要動做。sendDefaultImpl的定義以下:異步

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

參數包括:this

  • Message:消息內容
  • CommunicationMode:通信模式,包括同步、異步、單步
  • SendCallback:異步模式下的回調接口,包括成功和異常通知
  • timeout:超時時間

以下爲SendDefaultImpl的主要執行過程:線程

file

(1) 確保客戶端已經初始化成功netty

 主要確保DefaultMQProducerImpl的狀態爲RUNNINGcode

(2) 查詢topic的發佈信息server

 從內部維護的ConcurrentMap<String/* topic */, TopicPublishInfo>中獲取topic對應的發佈信息,上一節介紹過,該信息會經過後臺線程定時更新,若是當前沒有topic對應的信息,則會當即調用updateTopicRouteInfoFromNameServer方法實時同步。

 TopicPlushInfo用於表示Topic的路由信息,第一節介紹RocketMQ時說過,Topic的數據分佈在多個Broker上,同時在一個broker上還會分爲若干個Queue以增長並行度。

file

上圖的關係圖中,TopicPlushInfo持有一個MessageQueue列表和一個TopicRouteData。MessageQueue表示了各個Queue的映射信息,即上面提到的各個Broker上的Queue。而TopicRouteData則用於描述Broker的位置信息和Queue的配置信息。

(3) 判斷是否有路由信息

 若是上一步沒有查詢到topic對應的發佈信息,則拋出異常結束,不然轉到(4)

(4) 計算重試次數

 根據通信模式計算重試次數

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

即若是是同步模式,則在失敗時會再重試配置的次數(默認爲2次),其餘狀況不進行重試。

(5) 判斷當前是否須要重試

 即判斷當前執行次數是否已經超太重試次數,若是已經超過,則說明重試次數用完,無法繼續嘗試,判斷當前是否有結果,若是有結果則返回,不然拋出異常結束。若是重試次數沒用完,則轉到(6)

(6) 選取一個延遲較短的broker

 選取一個延遲較短的broker,該功能由MQFaultStrategy提供。這裏先介紹MQFaultStrategy,其提供了可選的故障延遲機制,對於請求響應較慢的broker,能夠在一段時間內將其狀態置爲不可用。以下圖:

file

能夠經過MQFaultStrategy的sendLatencyFaultEnable屬性控制是否打開故障延遲機制開關,默認爲false不打開。在打開該開關時,則每次選取topic下對應的queue時,會基於以前執行的耗時,在有存在符合條件的broker的前提下,優選選取一個延遲較短的broker,不然再考慮隨機選取。

 LatencyFaultTolerance用於維護有「故障」broker的「可用」狀態。對於每個被定義爲「故障」的broker,LatencyFaultTolerance內部都會有一個對應的FaultItem來表示,其主要內容以下:

class FaultItem implements Comparable<FaultItem> {
    private final String name;//brokername
    private volatile long currentLatency;//最近發生延遲的時間點
    private volatile long startTimestamp;//下一次開始可用的時間點

    public FaultItem(final String name) {
        this.name = name;
    }
    
    public boolean isAvailable() {//判斷當前時間是否已通過了設置的開始可用時間點
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
    
}

即當一個broker發送故障時,會記錄其最近發生延遲的時間點和下一次開始可用的時間點,而一個broker「可用」的意思是指:該broker不存在LatencyFaultTolerance維護的faultItemTable屬性中,或者當前時間已經大於該broker下一次開始可用的時間點。

 LatencyFaultTolerance經過updateFaultItem方法更新「故障」broker的可用狀態,該方法會直接更新faultItemTable中broker對應FaultItem的最近延遲時間和最近開始可用時間點。該方法須要給定指定broker的不可用時間。而判斷一個broker是否有故障以及不可用時間的方法,則在MQFaultStrategy的computNotAvaliableDuration方法中,以下:

private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }

    return 0;
}

其中延遲級別數組latencyMax和不可用時長數組notAvailableDuration的定義以下:

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

即若是當前請求的「延遲時間」超過latencyMax的某個級別,則認爲該broker已是「故障」狀態,會從notAvailableDuration中選擇該broke對應的不可用時間。從這兩組數組的定義也能夠看出來,當延遲時間小於50ms則認爲該broker狀態正常,不用進行故障延遲處理。而「延遲時間」則是指:調用remotingclient發送netty請求這一步的耗時,會在後面進行說明。

 如今回過頭來看當前步驟的動做,MQFaultStrategy的selectOneMessageQueue方法用於選取指定topic下的queue,其內容以下:

file

上面selectOneMessageQueue方法已經添加了註釋,在開啓了故障延遲機制時,該方法首先從topic下可選的queue列表中輪訓選擇一個broker「可用」的,而且brokerName=lastBrokerName(lastBrokerName表示上次使用過的broker,能夠爲空,表示第一次選擇)上的queue。若是沒有符合要求的broker,則會選擇一個「相對好」的broker上的queue,最後纔會從該topic下可選的列表中隨機選擇一個queue。若是沒有開啓故障延遲機制,則會直接從該topic下最近使用過的broker上的可選queue列表中隨機選擇一個queue。

(7)調用remotingclient發送netty請求

 使用MQClientAPIImpl發送消息(內部使用RemotingClient),發送模式包括單步、異步、同步。步驟爲:

  1. 從緩存中獲取broker的ip和端口,若是緩存中沒有該broker的信息則從nameserver同步一次到緩存,再從緩存中獲取信息
  • 包裝一個發送的上下文對象SendMessageContext
  • 若是不是批量消息,則設置惟一的ID,ID值爲:應用啓動到當前的時間差+當前消息計數
  • 嘗試壓縮消息體,目前,批量消息不壓縮;單條消息,超過配置的長度會進行壓縮,使用Deflater算法。由於上層會在該步驟失敗時進行重試,於是改不在最後會使用finally將壓縮後的消息體從新設置爲未壓縮前的內容
  • 若是存在CheckForbiddenHook,則執行
  • 若是存在sendHook,則執行發送前回調動做executeSendMessageHookBefore
  • 包裝請求頭SendMessageRequestHeader
  • 使用MQClientAPIImpl的sendMessage方法發送消息,獲得SendResult結果
  • 若是存在sendHook,則執行發送後回調動做executeSendMessageHookAfter,當拋出異常也會執行發送後回調動做executeSendMessageHookAfter
  • 發送完後將消息體設置回原值(還原Message被壓縮過的body值),爲重試作準備
  • 返回SendResult

(8) 更新broker的故障狀態

 (7)調用remotingclient執行完請求後,能夠獲得該步的耗時,再根據(6)指出的,若是打開了故障延遲開關,會根據該耗時來肯定該broker是否有故障,而後讓其「不可用」一段時間,具體可看(6)的內容。

(9) 判斷結果是否有有效

 判斷該次請求是否有效,有效包括:執行過程無異常,(7)正確返回結果,若是無效,則會執行(8)更新broker的故障狀態,而後跳到(5)進行重試

 以上就是Producer發送消息的主要流程,其中涉及到不少緩存數據不少都是上一節客戶端啓動過程當中講過的後臺任務同步的.下面附上該部分當時源碼閱讀過程作的筆記簡圖,該圖描述了Producer發送消息的大體過程:

file

更多原創內容請搜索微信公衆號:啊駝(doubaotaizi)

相關文章
相關標籤/搜索