RocketMQ一個新的消費組初次啓動時從何處開始消費呢?

一、拋出問題

一個新的消費組訂閱一個已存在的Topic主題時,消費組是從該Topic的哪條消息開始消費呢?java

首先翻閱DefaultMQPushConsumer的API時,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼簾,從字面意思來看是設置消費者從哪裏開始消費,正是解開該問題的」鑰匙「。ConsumeFromWhere枚舉類圖以下: 緩存

在這裏插入圖片描述

  • CONSUME_FROM_MAX_OFFSET 從消費隊列最大的偏移量開始消費。
  • CONSUME_FROM_FIRST_OFFSET 從消費隊列最小偏移量開始消費。
  • CONSUME_FROM_TIMESTAMP 從指定的時間戳開始消費,默認爲消費者啓動以前的30分鐘處開始消費。能夠經過DefaultMQPushConsumer#setConsumeTimestamp。

是否是點小激動,還不快試試。服務器

需求:新的消費組啓動時,從隊列最後開始消費,即只消費啓動後發送到消息服務器後的最新消息。ide

1.1 環境準備

本示例所用到的Topic路由信息以下: 測試

在這裏插入圖片描述

Broker的配置以下(broker.conf)ui

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
mapedFileSizeCommitLog=10240
mapedFileSizeConsumeQueue=2000
複製代碼

其中重點修改了以下兩個參數:this

  • mapedFileSizeCommitLog 單個commitlog文件的大小,這裏使用10M,方便測試用。
  • mapedFileSizeConsumeQueue 單個consumequeue隊列長度,這裏使用1000,表示一個consumequeue文件中包含1000個條目。

1.2 消息發送者代碼

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 300; i++) {
        try {
            Message msg = new Message("TopicTest" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}
複製代碼

經過上述,往TopicTest發送300條消息,發送完畢後,RocketMQ Broker存儲結構以下: spa

在這裏插入圖片描述

1.3 消費端驗證代碼

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_01");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
複製代碼

執行上述代碼後,按照指望,應該是不會消費任何消息,只有等生產者再發送消息後,纔會對消息進行消費,事實是這樣嗎?執行效果如圖所示: 3d

在這裏插入圖片描述
使人意外的是,居然從隊列的最小偏移量開始消費了,這就「尷尬」了。難不成是RocketMQ的Bug。帶着這個疑問,從源碼的角度嘗試來解讀該問題,並指導咱們實踐。

二、探究CONSUME_FROM_MAX_OFFSET實現原理

對於一個新的消費組,不管是集羣模式仍是廣播模式都不會存儲該消費組的消費進度,能夠理解爲-1,此時就須要根據DefaultMQPushConsumer#consumeFromWhere屬性來決定其從何處開始消費,首先咱們須要找到其對應的處理入口。咱們知道,消息消費者從Broker服務器拉取消息時,須要進行消費隊列的負載,即RebalanceImpl。code

舒適提示:本文不會詳細介紹RocketMQ消息隊列負載、消息拉取、消息消費邏輯,只會展現出通往該問題的簡短流程,如想詳細瞭解消息消費具體細節,建議購買筆者出版的《RocketMQ技術內幕》書籍。

RebalancePushImpl#computePullFromWhere

public long computePullFromWhere(MessageQueue mq) {
        long result = -1;                                                                                                                                                                                                                  // @1
        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();    
        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
            case CONSUME_FROM_MIN_OFFSET:
            case CONSUME_FROM_MAX_OFFSET:
            case CONSUME_FROM_LAST_OFFSET: {                                                                                                                                                                // @2
               // 省略部分代碼
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {                                                                                                                                                              // @3
                // 省略部分代碼
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {                                                                                                                                                                  //@4
                // 省略部分代碼
                break;
            }
            default:
                break;
        }
        return result;                                                                                                                                                                                                                  // @5
    }
複製代碼

代碼@1:先解釋幾個局部變量。

  • result 最終的返回結果,默認爲-1。
  • consumeFromWhere 消息消費者開始消費的策略,即CONSUME_FROM_LAST_OFFSET等。
  • offsetStore offset存儲器,消費組消息偏移量存儲實現器。

代碼@2:CONSUME_FROM_LAST_OFFSET(從隊列的最大偏移量開始消費)的處理邏輯,下文會詳細介紹。

代碼@3:CONSUME_FROM_FIRST_OFFSET(從隊列最小偏移量開始消費)的處理邏輯,下文會詳細介紹。

代碼@4:CONSUME_FROM_TIMESTAMP(從指定時間戳開始消費)的處理邏輯,下文會詳細介紹。

代碼@5:返回最後計算的偏移量,從該偏移量出開始消費。

2.1 CONSUME_FROM_LAST_OFFSET計算邏輯

case CONSUME_FROM_LAST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {                                                                                                             // @2
        result = lastOffset;
    }
    // First start,no offset
    else if (-1 == lastOffset) {                                                                                                  // @3
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {               
            result = 0L;
        } else {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);                     
            } catch (MQClientException e) {                                                                              // @4
                result = -1;
            }
        }
    } else {
        result = -1;    
    }
	break;
}
複製代碼

代碼@1:使用offsetStore從消息消費進度文件中讀取消費消費進度,本文將以集羣模式爲例展開。稍後詳細分析。

代碼@2:若是返回的偏移量大於等於0,則直接使用該offset,這個也能理解,大於等於0,表示查詢到有效的消息消費進度,從該有效進度開始消費,但咱們要特別留意lastOffset爲0是什麼場景,由於返回0,並不會執行CONSUME_FROM_LAST_OFFSET(語義)。

代碼@3:若是lastOffset爲-1,表示當前並未存儲其有效偏移量,能夠理解爲第一次消費,若是是消費組重試主題,從重試隊列偏移量爲0開始消費;若是是普通主題,則從隊列當前的最大的有效偏移量開始消費,即CONSUME_FROM_LAST_OFFSET語義的實現。

代碼@4:若是從遠程服務拉取最大偏移量拉取異常或其餘狀況,則使用-1做爲第一次拉取偏移量。

分析,上述執行的現象,雖然設置的是CONSUME_FROM_LAST_OFFSET,但現象是從隊列的第一條消息開始消費,根據上述源碼的分析,只有從消費組消費進度存儲文件中取到的消息偏移量爲0時,纔會從第一條消息開始消費,故接下來重點分析消息消費進度存儲器(OffsetStore)在什麼狀況下會返回0。

接下來咱們將以集羣模式來查看一下消息消費進度的查詢邏輯,集羣模式的消息進度存儲管理器實現爲: RemoteBrokerOffsetStore,最終Broker端的命令處理類爲:ConsumerManageProcessor。

ConsumerManageProcessor#queryConsumerOffset
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
    final QueryConsumerOffsetResponseHeader responseHeader =
        (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
    final QueryConsumerOffsetRequestHeader requestHeader =
        (QueryConsumerOffsetRequestHeader) request
            .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

    long offset =
        this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());    // @1

    if (offset >= 0) {                                                                                                                                          // @2
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
    } else {                                                                                                                                                       // @3
        long minOffset =
            this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
                requestHeader.getQueueId());                                                                                                     // @4
        if (minOffset <= 0
            && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(                                // @5
            requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
            responseHeader.setOffset(0L);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        } else {                                                                                                                                                 // @6
            response.setCode(ResponseCode.QUERY_NOT_FOUND);
            response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
        }
    }
    return response;
}
複製代碼

代碼@1:從消費消息進度文件中查詢消息消費進度。

代碼@2:若是消息消費進度文件中存儲該隊列的消息進度,其返回的offset必然會大於等於0,則直接返回該偏移量該客戶端,客戶端從該偏移量開始消費。

代碼@3:若是未從消息消費進度文件中查詢到其進度,offset爲-1。則首先獲取該主題、消息隊列當前在Broker服務器中的最小偏移量(@4)。若是小於等於0(返回0則表示該隊列的文件還不曾刪除過)而且其最小偏移量對應的消息存儲在內存中而不是存在磁盤中,則返回偏移量0,這就意味着ConsumeFromWhere中定義的三種枚舉類型都不會生效,直接從0開始消費,到這裏就能解開其謎團了(@5)。

代碼@6:若是偏移量小於等於0,但其消息已經存儲在磁盤中,此時返回未找到,最終RebalancePushImpl#computePullFromWhere中獲得的偏移量爲-1。

看到這裏,你們應該能回答文章開頭處提到的問題了吧?

看到這裏,你們應該明白了,爲何設置的CONSUME_FROM_LAST_OFFSET,但消費組是從消息隊列的開始處消費了吧,緣由就是消息消費進度文件中並無找到其消息消費進度,而且該隊列在Broker端的最小偏移量爲0,說的更直白點,consumequeue/topicName/queueNum的第一個消息消費隊列文件爲00000000000000000000,而且消息其對應的消息緩存在Broker端的內存中(pageCache),其返回給消費端的偏移量爲0,故會從0開始消費,而不是從隊列的最大偏移量處開始消費。

爲了知識體系的完備性,咱們順便來看一下其餘兩種策略的計算邏輯。

2.2 CONSUME_FROM_FIRST_OFFSET

case CONSUME_FROM_FIRST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {    // @2
        result = lastOffset;
    } else if (-1 == lastOffset) {  // @3
        result = 0L;
    } else {                                  
        result = -1;                    // @4
    }
    break;
}
複製代碼

從隊列的開始偏移量開始消費,其計算邏輯以下: 代碼@1:首先經過偏移量存儲器查詢消費隊列的消費進度。

代碼@2:若是大於等於0,則從當前該偏移量開始消費。

代碼@3:若是遠程返回-1,表示並無存儲該隊列的消息消費進度,從0開始。

代碼@4:不然從-1開始消費。

2.4 CONSUME_FROM_TIMESTAMP

從指定時戳後的消息開始消費。

case CONSUME_FROM_TIMESTAMP: {
    ong lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {                                                                                                            // @2
        result = lastOffset;
    } else if (-1 == lastOffset) {                                                                                                 // @3
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
            } catch (MQClientException e) {
                result = -1;
            }
        } else {
            try {
                long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                    UtilAll.YYYYMMDDHHMMSS).getTime();
                result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
            } catch (MQClientException e) {
                result = -1;
            }
        }
    } else {
        result = -1;
    }
    break;
}
複製代碼

其基本套路與CONSUME_FROM_LAST_OFFSET同樣: 代碼@1:首先經過偏移量存儲器查詢消費隊列的消費進度。

代碼@2:若是大於等於0,則從當前該偏移量開始消費。

代碼@3:若是遠程返回-1,表示並無存儲該隊列的消息消費進度,若是是重試主題,則從當前隊列的最大偏移量開始消費,若是是普通主題,則根據時間戳去Broker端查詢,根據查詢到的偏移量開始消費。

原理就介紹到這裏,下面根據上述理論對其進行驗證。

三、猜測與驗證

根據上述理論分析咱們得知設置CONSUME_FROM_LAST_OFFSET但並非從消息隊列的最大偏移量開始消費的「罪魁禍首」是由於消息消費隊列的最小偏移量爲0,若是不爲0,則就會符合預期,咱們來驗證一下這個猜測。 首先咱們刪除commitlog目錄下的文件,如圖所示:

在這裏插入圖片描述
其消費隊列截圖以下:
在這裏插入圖片描述
消費端的驗證代碼以下:

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_02");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
複製代碼

運行結果以下:

在這裏插入圖片描述
並無消息存在的消息,符合預期。

四、解決方案

若是在生產環境下,一個新的消費組訂閱一個已經存在比較久的topic,設置CONSUME_FROM_MAX_OFFSET是符合預期的,即該主題的consumequeue/{queueNum}/fileName,fileName一般不會是00000000000000000000,如是是上面文件名,想要實現從隊列的最後開始消費,該如何作呢?那就走自動建立消費組的路子,執行以下命令:

./mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g my_consumer_05

//克隆一個訂閱了該topic的消費組消費進度
./mqadmin cloneGroupOffset -n 127.0.0.1:9876 -s my_consumer_01 -d my_consumer_05 -t TopicTest

//重置消費進度到當前隊列的最大值
./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g my_consumer_05 -t TopicTest -s -1
複製代碼

按照上上述命令後,便可實現其目的。

您都看到這裏了,麻煩幫忙點個贊,謝謝您的承認與鼓勵。


做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。

在這裏插入圖片描述
相關文章
相關標籤/搜索