一個新的消費組訂閱一個已存在的Topic主題時,消費組是從該Topic的哪條消息開始消費呢?java
首先翻閱DefaultMQPushConsumer的API時,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼簾,從字面意思來看是設置消費者從哪裏開始消費,正是解開該問題的」鑰匙「。ConsumeFromWhere枚舉類圖以下:緩存
是否是點小激動,還不快試試。服務器
需求:新的消費組啓動時,從隊列最後開始消費,即只消費啓動後發送到消息服務器後的最新消息。ide
本示例所用到的Topic路由信息以下: 測試
Broker的配置以下(broker.conf)this
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store/commitlog namesrvAddr=127.0.0.1:9876 autoCreateTopicEnable=false mapedFileSizeCommitLog=10240 mapedFileSizeConsumeQueue=2000
其中重點修改了以下兩個參數:.net
public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 300; i++) { try { Message msg = new Message("TopicTest" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }
經過上述,往TopicTest發送300條消息,發送完畢後,RocketMQ Broker存儲結構以下: code
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_01"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { [@Override](https://my.oschina.net/u/1162528) public ConsumeConcurrentlyStatus consumeMessage(List<messageext> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
執行上述代碼後,按照指望,應該是不會消費任何消息,只有等生產者再發送消息後,纔會對消息進行消費,事實是這樣嗎?執行效果如圖所示: 中間件
使人意外的是,居然從隊列的最小偏移量開始消費了,這就「尷尬」了。難不成是RocketMQ的Bug。帶着這個疑問,從源碼的角度嘗試來解讀該問題,並指導咱們實踐。blog
對於一個新的消費組,不管是集羣模式仍是廣播模式都不會存儲該消費組的消費進度,能夠理解爲-1,此時就須要根據DefaultMQPushConsumer#consumeFromWhere屬性來決定其從何處開始消費,首先咱們須要找到其對應的處理入口。咱們知道,消息消費者從Broker服務器拉取消息時,須要進行消費隊列的負載,即RebalanceImpl。
> 舒適提示:本文不會詳細介紹RocketMQ消息隊列負載、消息拉取、消息消費邏輯,只會展現出通往該問題的簡短流程,如想詳細瞭解消息消費具體細節,建議購買筆者出版的《RocketMQ技術內幕》書籍。
RebalancePushImpl#computePullFromWhere
public long computePullFromWhere(MessageQueue mq) { long result = -1; // [@1](https://my.oschina.net/u/1198) final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { // @2 // 省略部分代碼 break; } case CONSUME_FROM_FIRST_OFFSET: { // [@3](https://my.oschina.net/u/2648711) // 省略部分代碼 break; } case CONSUME_FROM_TIMESTAMP: { //@4 // 省略部分代碼 break; } default: break; } return result; // @5 }
代碼@1:先解釋幾個局部變量。
代碼@2:CONSUME_FROM_LAST_OFFSET(從隊列的最大偏移量開始消費)的處理邏輯,下文會詳細介紹。
代碼@3:CONSUME_FROM_FIRST_OFFSET(從隊列最小偏移量開始消費)的處理邏輯,下文會詳細介紹。
代碼@4:CONSUME_FROM_TIMESTAMP(從指定時間戳開始消費)的處理邏輯,下文會詳細介紹。
代碼@5:返回最後計算的偏移量,從該偏移量出開始消費。
case CONSUME_FROM_LAST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1 if (lastOffset >= 0) { // @2 result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { // @3 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { // @4 result = -1; } } } else { result = -1; } break; }
代碼@1:使用offsetStore從消息消費進度文件中讀取消費消費進度,本文將以集羣模式爲例展開。稍後詳細分析。
代碼@2:若是返回的偏移量大於等於0,則直接使用該offset,這個也能理解,大於等於0,表示查詢到有效的消息消費進度,從該有效進度開始消費,但咱們要特別留意lastOffset爲0是什麼場景,由於返回0,並不會執行CONSUME_FROM_LAST_OFFSET(語義)。
代碼@3:若是lastOffset爲-1,表示當前並未存儲其有效偏移量,能夠理解爲第一次消費,若是是消費組重試主題,從重試隊列偏移量爲0開始消費;若是是普通主題,則從隊列當前的最大的有效偏移量開始消費,即CONSUME_FROM_LAST_OFFSET語義的實現。
代碼@4:若是從遠程服務拉取最大偏移量拉取異常或其餘狀況,則使用-1做爲第一次拉取偏移量。
分析,上述執行的現象,雖然設置的是CONSUME_FROM_LAST_OFFSET,但現象是從隊列的第一條消息開始消費,根據上述源碼的分析,只有從消費組消費進度存儲文件中取到的消息偏移量爲0時,纔會從第一條消息開始消費,故接下來重點分析消息消費進度存儲器(OffsetStore)在什麼狀況下會返回0。
接下來咱們將以集羣模式來查看一下消息消費進度的查詢邏輯,集羣模式的消息進度存儲管理器實現爲: RemoteBrokerOffsetStore,最終Broker端的命令處理類爲:ConsumerManageProcessor。
ConsumerManageProcessor#queryConsumerOffset private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); final QueryConsumerOffsetRequestHeader requestHeader = (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); long offset = this.brokerController.getConsumerOffsetManager().queryOffset( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); // @1 if (offset >= 0) { // @2 responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { // @3 long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); // @4 if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( // @5 requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { // @6 response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); } } return response; }
代碼@1:從消費消息進度文件中查詢消息消費進度。
代碼@2:若是消息消費進度文件中存儲該隊列的消息進度,其返回的offset必然會大於等於0,則直接返回該偏移量該客戶端,客戶端從該偏移量開始消費。
代碼@3:若是未從消息消費進度文件中查詢到其進度,offset爲-1。則首先獲取該主題、消息隊列當前在Broker服務器中的最小偏移量(@4)。若是小於等於0(返回0則表示該隊列的文件還不曾刪除過)而且其最小偏移量對應的消息存儲在內存中而不是存在磁盤中,則返回偏移量0,這就意味着ConsumeFromWhere中定義的三種枚舉類型都不會生效,直接從0開始消費,到這裏就能解開其謎團了(@5)。
代碼@6:若是偏移量小於等於0,但其消息已經存儲在磁盤中,此時返回未找到,最終RebalancePushImpl#computePullFromWhere中獲得的偏移量爲-1。
看到這裏,你們應該能回答文章開頭處提到的問題了吧?
看到這裏,你們應該明白了,爲何設置的CONSUME_FROM_LAST_OFFSET,但消費組是從消息隊列的開始處消費了吧,緣由就是消息消費進度文件中並無找到其消息消費進度,而且該隊列在Broker端的最小偏移量爲0,說的更直白點,consumequeue/topicName/queueNum的第一個消息消費隊列文件爲00000000000000000000,而且消息其對應的消息緩存在Broker端的內存中(pageCache),其返回給消費端的偏移量爲0,故會從0開始消費,而不是從隊列的最大偏移量處開始消費。
爲了知識體系的完備性,咱們順便來看一下其餘兩種策略的計算邏輯。
case CONSUME_FROM_FIRST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1 if (lastOffset >= 0) { // @2 result = lastOffset; } else if (-1 == lastOffset) { // @3 result = 0L; } else { result = -1; // @4 } break; }
從隊列的開始偏移量開始消費,其計算邏輯以下: 代碼@1:首先經過偏移量存儲器查詢消費隊列的消費進度。
代碼@2:若是大於等於0,則從當前該偏移量開始消費。
代碼@3:若是遠程返回-1,表示並無存儲該隊列的消息消費進度,從0開始。
代碼@4:不然從-1開始消費。
從指定時戳後的消息開始消費。
case CONSUME_FROM_TIMESTAMP: { ong lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); // @1 if (lastOffset >= 0) { // @2 result = lastOffset; } else if (-1 == lastOffset) { // @3 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; }
其基本套路與CONSUME_FROM_LAST_OFFSET同樣: 代碼@1:首先經過偏移量存儲器查詢消費隊列的消費進度。
代碼@2:若是大於等於0,則從當前該偏移量開始消費。
代碼@3:若是遠程返回-1,表示並無存儲該隊列的消息消費進度,若是是重試主題,則從當前隊列的最大偏移量開始消費,若是是普通主題,則根據時間戳去Broker端查詢,根據查詢到的偏移量開始消費。
原理就介紹到這裏,下面根據上述理論對其進行驗證。
根據上述理論分析咱們得知設置CONSUME_FROM_LAST_OFFSET但並非從消息隊列的最大偏移量開始消費的「罪魁禍首」是由於消息消費隊列的最小偏移量爲0,若是不爲0,則就會符合預期,咱們來驗證一下這個猜測。 首先咱們刪除commitlog目錄下的文件,如圖所示:
其消費隊列截圖以下:
消費端的驗證代碼以下:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_02"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<messageext> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
運行結果以下:
並無消息存在的消息,符合預期。
若是在生產環境下,一個新的消費組訂閱一個已經存在比較久的topic,設置CONSUME_FROM_MAX_OFFSET是符合預期的,即該主題的consumequeue/{queueNum}/fileName,fileName一般不會是00000000000000000000,如是是上面文件名,想要實現從隊列的最後開始消費,該如何作呢?那就走自動建立消費組的路子,執行以下命令:
./mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g my_consumer_05 //克隆一個訂閱了該topic的消費組消費進度 ./mqadmin cloneGroupOffset -n 127.0.0.1:9876 -s my_consumer_01 -d my_consumer_05 -t TopicTest //重置消費進度到當前隊列的最大值 ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g my_consumer_05 -t TopicTest -s -1
按照上上述命令後,便可實現其目的。
> 做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。
</messageext></messageext>