技術乾貨 | 源碼解析 Github 上 14.1k Star 的 RocketMQ

前言

Apache RocketMQ 做爲廣爲人知的開源消息中間件,誕生於阿里巴巴,於 2016 年捐贈給了 Apache。從 RocketMQ 4.0 到現在最新的 v4.7.1,不管是在阿里巴巴內部仍是外部社區,都贏得了普遍的關注和好評。 java

本文將站在發送方視角,經過閱讀 RocketMQ Producer 源碼,來分析在事務消息發送中 RocketMQ 是如何工做的。git

須要說明的是,本文所貼代碼,均來自 4.7.1 版本的 RocketMQ 源碼。本文中所討論的發送,僅指從 Producer 發送到 Broker 的過程,並不包含 Broker 將消息投遞到 Consumer 的過程。github

宏觀概覽

RocketMQ 事務消息發送流程: apache

3.png

結合源碼來看,RocketMQ 的事務消息 TransactionMQProducer 的 sendMessageInTransaction 方法,實際調用了 DefaultMQProducerImpl 的 sendMessageInTransaction 方法。咱們進入 sendMessageInTransaction 方法,整個事務消息的發送流程清晰可見。網絡

首先,作發送前檢查,並填入必要參數,包括設 prepare 事務消息。負載均衡

源碼清單-1dom

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    TransactionListener transactionListener = getCheckListener(); 
        if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

進入發送處理流程: 異步

源碼清單-2this

try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

根據 broker 返回的處理結果決策本地事務是否執行,半消息發送成功則開始本地事務執行: spa

源碼清單-3

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) { 
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) { 
                    log.debug("Used new transaction API");
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg); 
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:  // 當備broker狀態不可用時,半消息要回滾,不執行本地事務
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

本地事務執行結束,根據本地事務狀態進行二階段處理:

源碼清單-4

try {
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    // 組裝發送結果
    // ...
    return transactionSendResult;
}

接下來,咱們深刻每一個階段代碼分析。

深扒內幕

Ⅰ階段發送

重點分析 send 方法。進入 send 方法後,咱們發現,RocketMQ 的事務消息的一階段,使用了 SYNC 同步模式:

源碼清單-5

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

這一點很容易理解,畢竟事務消息是要根據一階段發送結果來決定要不要執行本地事務的,因此必定要阻塞等待 broker 的 ack。

咱們進入 DefaultMQProducerImpl.java 中去看 sendDefaultImpl 方法的實現,經過讀這個方法的代碼,來嘗試瞭解在事務消息的一階段發送過程當中 producer 的行爲。

值得注意的是,這個方法並不是爲事務消息定製,甚至不是爲 SYNC 同步模式定製的,所以讀懂了這段代碼,基本能夠對 RocketMQ 的消息發送機制有了一個較爲全面的認識。

這段代碼邏輯很是通暢,不忍切片。爲了節省篇幅,將代碼中較爲繁雜但信息量不大的部分以註釋代替,儘量保留流程的完整性。我的認爲較爲重要或是容易被忽略的部分,以註釋標出,後文還有部分細節的詳細解讀。

源碼清單-6

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    // 1、消息有效性校驗。見後文
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;

    // 獲取當前topic的發送路由信息,主要是要broker,若是沒找到則從namesrv獲取
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 2、發送重試機制。見後文
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            // 第一次發送是mq == null, 以後都是有broker信息的
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 3、rocketmq發送消息時如何選擇隊列?——broker異常規避機制 
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    // 發送核心代碼
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // rocketmq 選擇 broker 時的規避機制,開啓 sendLatencyFaultEnable == true 才生效
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

                    switch (communicationMode) {
                    // 4、RocketMQ的三種CommunicationMode。見後文
                        case ASYNC: // 異步模式
                            return null;
                        case ONEWAY: // 單向模式
                            return null;
                        case SYNC: // 同步模式
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    // ...
                    // 自動重試
                } catch (MQClientException e) {
                    // ...
                    // 自動重試
                } catch (MQBrokerException e) {
                   // ...
                    // 僅返回碼==NOT_IN_CURRENT_UNIT==205 時自動重試
                    // 其餘狀況不重試,拋異常
                } catch (InterruptedException e) {
                   // ...
                    // 不重試,拋異常
                }
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }

        // 組裝返回的info信息,最後以MQClientException拋出
        // ... ...

        // 超時場景拋RemotingTooMuchRequestException
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        // 填充MQClientException異常信息
        // ...
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

1、消息有效性校驗

源碼清單-7

Validators.checkMessage(msg, this.defaultMQProducer);

在此方法中校驗消息的有效性,包括對 topic 和消息體的校驗。topic 的命名必須符合規範,且避免使用內置的系統消息 TOPIC。消息體長度 > 0 && 消息體長度 <= 102410244 = 4M 。

源碼清單-8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // topic
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }

    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

2、發送重試機制

Producer 在消息發送不成功時,會自動重試,最多發送次數 = retryTimesWhenSendFailed + 1 = 3 次 。

值得注意的是,並不是全部異常狀況都會重試,從以上源碼中能夠提取到的信息告訴咱們,在如下三種狀況下,會自動重試:

  • 發生 RemotingException,MQClientException 兩種異常之一時
  • 發生 MQBrokerException 異常,且 ResponseCode 是 NOT_IN_CURRENT_UNIT = 205 時
  • SYNC 模式下,未發生異常且發送結果狀態非 SEND_OK

在每次發送消息以前,會先檢查是否在前面這兩步就已經耗時超長(超時時長默認 3000ms),如果,則再也不繼續發送而且直接返回超時,再也不重試。這裏說明了 2 個問題:

  • producer 內部自動重試對業務應用而言是無感知的,應用看到的發送耗時是包含全部重試的耗時在內的;
  • 一旦超時意味着本次消息發送已經以失敗了結,緣由是超時。這個信息最後會以 RemotingTooMuchRequestException 的形式拋出。

這裏須要指出的是,在 RocketMQ 官方文檔中指出,發送超時時長是 10s,即 10000ms,網上許多人對 rocketMQ 的超時時間解讀也認爲是 10s。然而代碼中卻明明白白寫着 3000ms,最終我 debug 以後確認,默認超時時間確實是 3000ms。

3、broker 的異常規避機制

源碼清單-9

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

這行代碼是發送前選擇 queue 的過程。

這裏涉及 RocketMQ 消息發送高可用的的一個核心機制,latencyFaultTolerance。這個機制是 Producer 負載均衡的一部分,經過 sendLatencyFaultEnable 的值來控制,默認是 false 關閉狀態,不啓動 broker 故障延遲機制,值爲 true 時啓用 broker 故障延遲機制,可由 Producer 主動打開。

選擇隊列時,開啓異常規避機制,則根據 broker 的工做狀態避免選擇當前狀態不佳的 broker 代理,不健康的 broker 會在一段時間內被規避,不開啓異常規避機制時,則按順序選取下一個隊列,但在重試場景下會盡可能選擇不一樣於上次發送 broker 的 queue。每次消息發送都會經過 updateFaultItem 方法來維護 broker 的狀態信息。

源碼清單-10

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        // 計算延遲多久,isolation表示是否須要隔離該broker,如果,則從30s往前找第一個比30s小的延遲值,再按下標判斷規避的週期,若30s,則是10min規避;
        // 不然,按上一次發送耗時來決定規避時長;
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

深刻到 selectOneMessageQueue 方法內部一探究竟:

源碼清單-11

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        // 開啓異常規避
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                // 按順序取下一個message queue做爲發送的queue
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 當前queue所在的broker可用,且與上一個queue的broker相同,
                // 或者第一次發送,則使用這個queue
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }
    // 不開啓異常規避,則隨機自增選擇Queue
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

4、RocketMQ 的三種 CommunicationMode

源碼清單-12

public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

以上三種模式指的都是消息從發送方到達 broker 的階段,不包含 broker 將消息投遞給訂閱方的過程。三種模式的發送方式的差別:

單向模式:ONEWAY。消息發送方只管發送,並不關心 broker 處理的結果如何。這種模式下,因爲處理流程少,發送耗時很是小,吞吐量大,但不能保證消息可靠不丟,經常使用於流量巨大但不重要的消息場景,例如心跳發送等。

異步模式:ASYNC。消息發送方發送消息到 broker 後,無需等待 broker 處理,拿到的是 null 的返回值,而由一個異步的線程來作消息處理,處理完成後以回調的形式告訴發送方發送結果。異步處理時若有異常,返回發送方失敗結果以前,會通過內部重試(默認 3 次,發送方不感知)。這種模式下,發送方等待時長較小,吞吐量較大,消息可靠,用於流量大但重要的消息場景。

同步模式:SYNC。消息發送方需等待 broker 處理完成並明確返回成功或失敗,在消息發送方拿到消息發送失敗的結果以前,也會經歷過內部重試(默認 3 次,發送方不感知)這種模式下,發送方會阻塞等待消息處理結果,等待時長較長,消息可靠,用於流量不大但重要的消息場景。須要強調的是,事務消息的一階段半事務消息的處理是同步模式。

在 sendKernelImpl 方法中也能夠看到具體的實現差別。ONEWAY 模式最爲簡單,不作任何處理。負責發送的 sendMessage 方法參數中,相比同步模式,異步模式多了回調方法、包含 topic 發送路由元信息的 topicPublishInfo、包含發送 broker 信息的 instance、包含發送隊列信息的 producer、重試次數。另外,異步模式下,會對有壓縮的消息先作 copy。

源碼清單-13

switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

官方文檔中有這樣一張圖,十分清晰的描述了異步通訊的詳細過程:

4.png

Ⅱ 階段發送

源碼清單-3 體現了本地事務的執行,localTransactionState 將本地事務執行結果與事務消息二階段的發送關聯起來。

值得注意的是,若是一階段的發送結果是 SLAVENOTAVAILABLE,即使 broker 不可用時,也會將 localTransactionState 置爲 Rollback,此時將不會執行本地事務。以後由 endTransaction 方法負責二階段提交,見源碼清單-4。具體到 endTransaction 的實現:

源碼清單-14

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 採用oneway的方式發送二階段消息
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

在二階段發送時,之因此用 oneway 的方式發送,我的理解這正是由於事務消息有一個特殊的可靠機制——回查。

消息回查

當 Broker 通過了一個特定的時間,發現依然沒有獲得事務消息的二階段是否要提交或者回滾的確切信息,Broker 不知道 Producer 發生了什麼狀況(可能 producer 掛了,也可能 producer 發了 commit 但網絡抖動丟了,也可能……因而主動發起回查。

事務消息的回查機制,更多的是在 broker 端的體現。RocketMQ 的 broker 以 Half 消息、Op 消息、真實消息三個不一樣的 topic 來將不一樣發送階段的事務消息進行了隔離,使得 Consumer 只能看到最終確認 commit 須要投遞出去的消息。其中詳細的實現邏輯在本文中暫很少贅述,後續可另開一篇專門來從 Broker 視角來解讀。

回到 Producer 的視角,當收到了 Broker 的回查請求,Producer 將根據消息檢查本地事務狀態,根據結果決定提交或回滾,這就要求 Producer 必須指定回查實現,以備不時之需。固然,正常狀況下,並不推薦主動發送 UNKNOW 狀態,這個狀態毫無疑問會給 broker 帶來額外回查開銷,只在出現不可預知的異常狀況時才啓動回查機制,是一種比較合理的選擇。

另外,4.7.1 版本的事務回查並不是無限回查,而是最多回查 15 次:

源碼清單-15

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

附錄

官方給出 Producer 的默認參數以下(其中超時時長的參數,在前文中也已經提到,debug 的結果是默認 3000ms,並不是 10000ms):

5.png


做者介紹(源淇).png

動態-logo.gif

底部banner.png

點擊「瞭解更多」,瞭解「mPaaS」更多資訊。

相關文章
相關標籤/搜索