[RocketMQ]消息中間件—RocketMQ消息消費(三)(消息消費重試)

摘要:若是Consumer端消費消息失敗,那麼RocketMQ是如何對失敗的異常狀況進行處理?
前面兩篇RocketMQ消息消費(一)/(二)篇,主要從Push/Pull兩種消費模式的簡要流程、長輪詢機制和Consumer端負載均衡這幾點內容出發,介紹了RocketMQ消息消費的正常流程和細節內容,本篇內容將主要介紹Consumer端消費失敗的異常流程。
這裏先回顧往期RocketMQ技術分享的篇幅:
(1)消息中間件—RocketMQ的RPC通訊(一)
(2)消息中間件—RocketMQ的RPC通訊(二)
(3)消息中間件—RocketMQ消息發送
(4)消息中間件—RocketMQ消息消費(一)
(5)消息中間件—RocketMQ消息消費(二)(push模式實現)網絡

1、其餘MQ中間件消費端可靠性的保障

在業務開發中,你們必定都遇到過業務工程由於各種異常(多是業務工程自己的異常、JVM內存異常或者系統所在的虛擬機宕機等),而致使MQ中間件發送過來的業務消息消費失敗而沒法再次消費該消息的狀況。目前,不少MQ消息中間件都有相應的機制和方法來保證Consumer端消費消息的可靠性。下面先來看看RabbitMQ和Kafka這兩款MQ消息中間件是如何來保證消費者端消息處理的可靠性的呢?負載均衡

1.1 簡談RabbitMQ的手動消息確認ACK機制

RabbitMQ提供了消息確認機制。消費者在訂閱隊列時,能夠在代碼中手動設置autoAck參數爲false,這時RabbitMQ會等待消費者顯式地回覆確認信號(即爲顯式地調用channel.basicAck(envelope.getDeliveryTag(), false)方法)後才從集羣中的內存(或磁盤)節點上移除消息,從而保證了這條消息不會由於消費失敗而致使丟失。異步

1.2 簡析Kafka消息消費的手動提交

在Kafka中,也能夠採用上面那種的消費後的確認機制,經過在Consumer端設置「enable.auto.commit」屬性爲false後,待業務工程正常處理完消費後,在代碼中手動調用KafkaConsumer實例的commitSync()方法提交(ps:這裏指的是同步阻塞commit消費的偏移量,等待Broker端的返回響應,須要注意Broker端在對commit請求作出響應以前,消費端會處於阻塞狀態,從而限制消息的處理性能和總體吞吐量),以確保消息可以正常被消費。若是在消費過程當中,消費端忽然Crash,這時候消費偏移量沒有commit,等正常恢復後依然還會處理剛剛未commit的消息。性能

2、RocketMQ消費失敗後的消費重試機制

對比了另外兩款MQ中間件後,接下來進入正題,主要來講說RocketMQ在消費失敗後的是如何來保證消息消費的可靠性?ui

2.1 重試隊列與死信隊列的概念

在介紹RocketMQ的消費重試機制以前,須要先來講下「重試隊列」和「死信隊列」兩個概念。
(1)重試隊列:若是Consumer端由於各類類型異常致使本次消費失敗,爲防止該消息丟失而須要將其從新回發給Broker端保存,保存這種由於異常沒法正常消費而回發給MQ的消息隊列稱之爲重試隊列。RocketMQ會爲每一個消費組都設置一個Topic名稱爲「%RETRY%+consumerGroup」的重試隊列(這裏須要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每一個Topic設置的),用於暫時保存由於各類異常而致使Consumer端沒法消費的消息。考慮到異常恢復起來須要一些時間,會爲重試隊列設置多個重試級別,每一個重試級別都有與之對應的從新投遞延時,重試次數越多投遞延時就越大。RocketMQ對於重試消息的處理是先保存至Topic名稱爲「SCHEDULE_TOPIC_XXXX」的延遲隊列中,後臺定時任務按照對應的時間進行Delay後從新保存至「%RETRY%+consumerGroup」的重試隊列中(具體細節後面會詳細闡述)。
(2)死信隊列:因爲有些緣由致使Consumer端長時間的沒法正常消費從Broker端Pull過來的業務消息,爲了確保消息不會被無端的丟棄,那麼超過配置的「最大重試消費次數」後就會移入到這個死信隊列中。在RocketMQ中,SubscriptionGroupConfig配置常量默認地設置了兩個參數,一個是retryQueueNums爲1(重試隊列數量爲1個),另一個是retryMaxTimes爲16(最大重試消費的次數爲16次)。Broker端經過校驗判斷,若是超過了最大重試消費次數則會將消息移至這裏所說的死信隊列。這裏,RocketMQ會爲每一個消費組都設置一個Topic命名爲「%DLQ%+consumerGroup"的死信隊列。通常在實際應用中,移入至死信隊列的消息,須要人工干預處理;this

2.1 Consumer端回發消息至Broker端

在業務工程中的Consumer端(Push消費模式下),若是消息可以正常消費須要在註冊的消息監聽回調方法中返回CONSUME_SUCCESS的消費狀態,不然由於各種異常消費失敗則返回RECONSUME_LATER的消費狀態。消費狀態的枚舉類型以下所示:spa

public enum ConsumeConcurrentlyStatus {
    //業務方消費成功
    CONSUME_SUCCESS,
    //業務方消費失敗,以後進行從新嘗試消費
    RECONSUME_LATER;
}

若是業務工程對消息消費失敗了,那麼則會拋出異常而且返回這裏的RECONSUME_LATER狀態。這裏,在消費消息的服務線程—consumeMessageService中,將封裝好的消息消費任務ConsumeRequest提交至線程池—consumeExecutor異步執行。從消息消費任務ConsumeRequest的run()方法中會執行業務工程中註冊的消息監聽回調方法,並在processConsumeResult方法中根據業務工程返回的狀態(CONSUME_SUCCESS或者RECONSUME_LATER)進行判斷和作對應的處理(下面講的都是在消費通訊模式爲集羣模型下的,廣播模型下的比較簡單就再也不分析了)。
(1)業務方正常消費(CONSUME_SUCCESS):正常狀況下,設置ackIndex的值爲consumeRequest.getMsgs().size() - 1,所以後面的遍歷consumeRequest.getMsgs()消息集合條件不成立,不會調用回發消費失敗消息至Broker端的方法—sendMessageBack(msg, context)。最後,更新消費的偏移量;
(2)業務方消費失敗(RECONSUME_LATER):異常狀況下,設置ackIndex的值爲-1,這時就會進入到遍歷consumeRequest.getMsgs()消息集合的for循環中,執行回發消息的方法—sendMessageBack(msg, context)。這裏,首先會根據brokerName獲得Broker端的地址信息,而後經過網絡通訊的Remoting模塊發送RPC請求到指定的Broker上,若是上述過程失敗,則建立一條新的消息從新發送給Broker,此時新消息的Topic爲「%RETRY%+ConsumeGroupName」—重試隊列的主題。其中,在MQClientAPIImpl實例的consumerSendMessageBack()方法中封裝了ConsumerSendMsgBackRequestHeader的請求體,隨後完成回發消費失敗消息的RPC通訊請求(業務請求碼爲:CONSUMER_SEND_MSG_BACK)。假若上面的回發消息流程失敗,則會延遲5S後從新在Consumer端進行從新消費。與正常消費的狀況同樣,在最後更新消費的偏移量;線程

2.3 Broker端對於回發消息處理的主要流程

Broker端收到這條Consumer端回發過來的消息後,經過業務請求碼(CONSUMER_SEND_MSG_BACK)匹配業務處理器—SendMessageProcessor來處理。在完成一系列的前置校驗(這裏主要是「消費分組是否存在」、「檢查Broker是否有寫入權限」、「檢查重試隊列數是否大於0」等)後,嘗試獲取重試隊列的TopicConfig對象(若是是第一次沒法獲取到,則調用createTopicInSendMessageBackMethod()方法進行建立)。根據回發過來的消息偏移量嘗試從commitlog日誌文件中查詢消息內容,若不存在則返回異常錯誤。
而後,設置重試隊列的Topic—「%RETRY%+consumerGroup」至MessageExt的擴展屬性「RETRY_TOPIC」中,並對根據延遲級別delayLevel和最大重試消費次數maxReconsumeTimes進行判斷,若是超過最大重試消費次數(默認16次),則會建立死信隊列的TopicConfig對象(用於後面將回發過來的消息移入死信隊列)。在構建完成須要落盤的MessageExtBrokerInner對象後,調用「commitLog.putMessage(msg)」方法作消息持久化。這裏,須要注意的是,在putMessage(msg)的方法裏會使用「SCHEDULE_TOPIC_XXXX」和對應的延遲級別隊列Id分別替換MessageExtBrokerInner對象的Topic和QueueId屬性值,並將原來設置的重試隊列主題(「%RETRY%+consumerGroup」)的Topic和QueueId屬性值作一個備份分別存入擴展屬性properties的「REAL_TOPIC」和「REAL_QID」屬性中。看到這裏也就大體明白了,回發給Broker端的消費失敗的消息並不是直接保存至重試隊列中,而是會先存至Topic爲「SCHEDULE_TOPIC_XXXX」的定時延遲隊列中。debug

疑問:上面說了RocketMQ的重試隊列的Topic是「%RETRY%+consumerGroup」,爲啥這裏要保存至Topic是「SCHEDULE_TOPIC_XXXX」的這個延遲隊列中呢?日誌

在源碼中搜索下關鍵字—「SCHEDULE_TOPIC_XXXX」,會發現Broker端還存在着一個後臺服務線程—ScheduleMessageService(經過消息存儲服務—DefaultMessageStore啓動),經過查看源碼能夠知道其中有一個DeliverDelayedMessageTimerTask定時任務線程會根據Topic(「SCHEDULE_TOPIC_XXXX」)與QueueId,先查到邏輯消費隊列ConsumeQueue,而後根據偏移量,找到ConsumeQueue中的內存映射對象,從commitlog日誌中找到消息對象MessageExt,並作一個消息體的轉換(messageTimeup()方法,由定時延遲隊列消息轉化爲重試隊列的消息),再次作持久化落盤,這時候纔會真正的保存至重試隊列中。看到這裏就能夠解釋上面的疑問了,定時延遲隊列只是爲了用於暫存的,而後延遲一段時間再將消息移入至重試隊列中。RocketMQ設定不一樣的延時級別delayLevel,而且與定時延遲隊列相對應,具體源碼以下:

//省略
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    /**
     * 定時延時消息主題的隊列與延遲等級對應關係
     * @param delayLevel
     * @return
     */
    public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
    }

2.4 Consumer端消費重試機制

每一個Consumer實例在啓動的時候就默認訂閱了該消費組的重試隊列主題,DefaultMQPushConsumerImpl的copySubscription()方法中的相關代碼以下:

private void copySubscription() throws MQClientException {
            //省略其餘代碼...
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING://若是消息消費模式爲集羣模式,還須要爲該消費組對應一個重試主題
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
            //省略其餘代碼...
      }

所以,這裏也就清楚了,Consumer端會一直訂閱該重試隊列主題的消息,向Broker端發送以下的拉取消息的PullRequest請求,以嘗試從新再次消費重試隊列中積壓的消息。

PullRequest [consumerGroup=CID_JODIE_1, messageQueue=MessageQueue [topic=%RETRY%CID_JODIE_1, brokerName=HQSKCJJIDRRD6KC, queueId=0], nextOffset=51]

最後,給出一張RocketMQ消息重試機制的框圖(ps:這裏只是描述了消息消費失敗後重試拉取的部分重要過程):

 

RocketMQ消息重試機制.jpg

3、總結

RocketMQ的消息消費(三)(消息消費重試)篇幅就先分析到這裏了。關於RocketMQ消息消費的內容比較多也比較複雜,須要讀者結合源碼並屢次debug(能夠經過分別在Consumer端和Broker端的部分重要方法中打印重要對象中的各個屬性值的方式,來仔細研究下其中的過程),才能夠對其有一個較爲深入的理解。限於筆者的才疏學淺,對本文內容可能還有理解不到位的地方,若有闡述不合理之處還望留言一塊兒探討。

做者:癲狂俠 連接:https://www.jianshu.com/p/5843cdcd02aa 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

相關文章
相關標籤/搜索