RocketMQ原理(4)——消息ACK機制及消費進度管理

https://zhuanlan.zhihu.com/p/25140744 中剖析過,consumer的每一個實例是靠隊列分配來決定如何消費消息的。那麼消費進度具體是如何管理的,又是如何保證消息成功消費的(RocketMQ有保證消息確定消費成功的特性(失敗則重試)?java

本文將詳細解析消息具體是如何ack的,又是如何保證消費確定成功的。git

因爲以上工做全部的機制都實如今PushConsumer中,因此本文的原理均只適用於RocketMQ中的PushConsumer即Java客戶端中的DefaultPushConsumer。 若使用了PullConsumer模式,相似的工做如何ack,如何保證消費等均須要使用方本身實現。github

注:廣播消費和集羣消費的處理有部分區別,如下均特指集羣消費(CLSUTER),廣播(BROADCASTING)下部分可能不適用。數據庫

保證消費成功

PushConsumer爲了保證消息確定消費成功,只有使用方明確表示消費成功,RocketMQ纔會認爲消息消費成功。中途斷電,拋出異常等都不會認爲成功——即都會從新投遞。緩存

消費的時候,咱們須要注入一個消費回調,具體sample代碼以下:運維

consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            doMyJob();//執行真正消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

業務實現消費回調的時候,當且僅當此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ纔會認爲這批消息(默認是1條)是消費完成的。(具體如何ACK見後面章節)ide

若是這時候消息消費失敗,例如數據庫異常,餘額不足扣款失敗等一切業務認爲消息須要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認爲這批消息消費失敗了。函數

爲了保證消息是確定被至少消費成功一次,RocketMQ會把這批消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)後,再次投遞到這個ConsumerGroup。而若是一直這樣重複消費都持續失敗到必定次數(默認16次),就會投遞到DLQ死信隊列。應用能夠監控死信隊列來作人工干預。oop

注:性能

  1. 若是業務的回調沒有處理好而拋出異常,會認爲是消費失敗當ConsumeConcurrentlyStatus.RECONSUME_LATER處理。
  2. 當使用順序消費的回調MessageListenerOrderly時,因爲順序消費是要前者消費成功才能繼續消費,因此沒有RECONSUME_LATER的這個狀態,只有SUSPEND_CURRENT_QUEUE_A_MOMENT來暫停隊列的其他消費,直到原消息不斷重試成功爲止才能繼續消費。

啓動的時候從哪裏消費

當新實例啓動的時候,PushConsumer會拿到本消費組broker已經記錄好的消費進度(consumer offset),按照這個進度發起本身的第一次Pull請求。

若是這個消費進度在Broker並無存儲起來,證實這個是一個全新的消費組,這時候客戶端有幾個策略能夠選擇:

CONSUME_FROM_LAST_OFFSET //默認策略,從該隊列最尾開始消費,即跳過歷史消息
CONSUME_FROM_FIRST_OFFSET //從隊列最開始開始消費,即歷史消息(還儲存在broker的)所有消費一遍
CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時之前

 

因此,社區中常常有人問:「爲何我設了CONSUME_FROM_LAST_OFFSET,歷史的消息仍是被消費了」? 緣由就在於只有全新的消費組纔會使用到這些策略,老的消費組都是按已經存儲過的消費進度繼續消費。

對於老消費組想跳過歷史消息能夠採用如下兩種方法:

  1. 代碼按照日期判斷,太老的消息直接return CONSUME_SUCCESS過濾。
  2. 代碼判斷消息的offset和MAX_OFFSET相差很遠,認爲是積壓了不少,直接return CONSUME_SUCCESS過濾。
  3. 消費者啓動前,先調整該消費組的消費進度,再開始消費。能夠人工使用命令resetOffsetByTime,或調用內部的運維接口,祥見ResetOffsetByTimeCommand.java

消息ACK機制

RocketMQ是以consumer group+queue爲單位是管理消費進度的,以一個consumer offset標記這個這個消費組在這條queue上的消費進度。

若是某已存在的消費組出現了新消費實例的時候,依靠這個組的消費進度,就能夠判斷第一次是從哪裏開始拉取的。

每次消息成功後,本地的消費進度會被更新,而後由定時器定時同步到broker,以此持久化消費進度。

可是每次記錄消費進度的時候,只會把一批消息中最小的offset值爲消費進度值,以下圖:


 

 

這鐘方式和傳統的一條message單獨ack的方式有本質的區別。性能上提高的同時,會帶來一個潛在的重複問題——因爲消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,後面99條都消費結束了,只有2101消費一直沒有結束的狀況。

在這種狀況下,RocketMQ爲了保證消息確定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度纔會一會兒更新到2200。

在這種設計下,就有消費大量重複的風險。如2101在尚未消費完成的時候消費實例忽然退出(機器斷電,或者被kill)。這條queue的消費進度仍是維持在2101,當queue從新分配給新的實例的時候,新的實例從broker上拿到的消費進度仍是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過仍是會投遞一次。

對於這個場景,3.2.6以前的RocketMQ無能爲力,因此業務必需要保證消息消費的冪等性,這也是RocketMQ官方屢次強調的態度。

實際上,從源碼的角度上看,RocketMQ多是考慮過這個問題的,截止到3.2.6的版本的源碼中,能夠看到爲了緩解這個問題的影響面,DefaultMQPushConsumer中有個配置consumeConcurrentlyMaxSpan

/**
 * Concurrently max span offset.it has no effect on sequential consumption
 */
private int consumeConcurrentlyMaxSpan = 2000;

這個值默認是2000,當RocketMQ發現本地緩存的消息的最大值-最小值差距大於這個值(2000)的時候,會觸發流控——也就是說若是頭尾都卡住了部分消息,達到了這個閾值就再也不拉取消息。

但做用實際頗有限,像剛剛這個例子,2101的消費是死循環,其餘消費很是正常的話,是無能爲力的。一旦退出,在不人工干預的狀況下,2101後全部消息所有重複。

Ack卡進度解決方案

對於這個卡消費進度的問題,最顯而易見的解法是設定一個超時時間,達到超時時間的那個消費看成消費失敗處理。

後來RocketMQ顯然也發現了這個問題,而RocketMQ在3.5.8以後也就是採用這樣的方案去解決這個問題。

  1. 在pushConsumer中 有一個consumeTimeout字段(默認15分鐘),用於設置最大的消費超時時間。消費前會記錄一個消費的開始時間,後面用於比對。
  2. 消費者啓動的時候,會按期掃描全部消費的消息,達到這個timeout的那些消息,就會觸發sendBack並ack的操做。這裏掃描的間隔也是consumeTimeout(單位分鐘)的間隔。

核心源碼以下:

//ConsumeMessageConcurrentlyService.java
public void start() {
    this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            cleanExpireMsg();
        }

    }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg() {
    Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
            this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
    while (it.hasNext()) {
        Map.Entry<MessageQueue, ProcessQueue> next = it.next();
        ProcessQueue pq = next.getValue();
        pq.cleanExpiredMsg(this.defaultMQPushConsumer);
    }
}

//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
    if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
        return;
    }

    int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
    for (int i = 0; i < loop; i++) {
        MessageExt msg = null;
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                    msg = msgTreeMap.firstEntry().getValue();
                } else {

                    break;
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getExpiredMsg exception", e);
        }

        try {

            pushConsumer.sendMessageBack(msg, 3);
            log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                        try {
                            msgTreeMap.remove(msgTreeMap.firstKey());
                        } catch (Exception e) {
                            log.error("send expired msg exception", e);
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception", e);
            }
        } catch (Exception e) {
            log.error("send expired msg exception", e);
        }
    }
}

 

經過源碼看這個方案,其實能夠看出有幾個不太完善的問題:

  1. 消費timeout的時間很是不精確。因爲掃描的間隔是15分鐘,因此實際上觸發的時候,消息是有可能卡住了接近30分鐘(15*2)才被清理。
  2. 因爲定時器一啓動就開始調度了,中途這個consumeTimeout再更新也不會生效。
相關文章
相關標籤/搜索