記一次因Rocket MQ客戶端版本引發的重複消費

記一次因Rocket MQ客戶端版本引發的重複消費

現象

使用了以下版本的MQ客戶端版本:ide

<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.6.2.Final</version>
</dependency>

MQ Consumer 端對消息成功消費後,若是重啓 Consumer,會對消息再次消費。但注意到每次的 reconsumeTimes=0 。this

即 consumer 從 broker 中成功消費消息後,並無將結果 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 返回。spa

Consumer 實現

Consumer 類

如下爲 consumer 配置信息。線程

@PostConstruct
    public void init() throws MQClientException {
        log.info("構建 MQ Consumer...");
        this.consumer = new DefaultMQPushConsumer(mqGroupName);
        consumer.setNamesrvAddr(nameServer);
        consumer.setInstanceName(instance);
        //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息
        //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)所有消費一遍
        //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時之前
        //若是非第一次啓動,按照上次消費的位置繼續消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe(topic, TAG);
        //消費線程數量
        consumer.setConsumeThreadMax(2);
        consumer.setConsumeThreadMin(2);
        consumer.setPersistConsumerOffsetInterval(1000);
        log.info("MQ Consume From Where is {}", consumer.getConsumeFromWhere());
    }

    /**
     * 註冊監聽,並啓動MQ
     *
     * @param listener 消費監聽
     */
    public void consumeConcurrently(MessageListenerConcurrently listener) {
        try {
            consumer.registerMessageListener(listener);
            consumer.start();
        } catch (MQClientException e) {
            log.error(e.getMessage(), e);
        }
    }

消息監聽

@Override
    public void run(String... args) throws Exception {
        log.info("Alarm Report MQ Consumer Start ...");
        mqConsumer.consumeConcurrently((messageExtList, context) -> {
            try {
                ······ 略
            } catch (Exception e) {
                //異常捕獲,確保不會由於程序異常致使的消息重複消費
                log.error(e.getMessage(), e);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

    }

異常緣由:異常線程空指針

Mq Consuemr 選擇 ConsumeMessageConcurrentlyService 對 broker 消息進行消費。指針

ConsumeMessageConcurrentlyService類,經過提交內部線程類對 broker 消息進行監聽消費code

在內部類 ConsumeRequest的 run 方法中,有以下邏輯:隊列

  • ConsumeMessageContext 經過判斷 Hook 是否存在進行初始化
  • 當實現類 defaultMQPushConsumerImpl hasHook == false 時,則不會對 ConsumeMessageContext 進行初始化,此時 ConsumeMessageContext=null;
  • 而代碼往下執行後,則必對 ConsumeMessageContext 調用,且沒有判空操做
  • 從而,當 Hook 爲空時,程序必出空指針異常
  • 且沒有異常捕獲,異常並不會打印或向上拋出 =.= 。
class : com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run

            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                ······略
            }

            ······略
            try {
               ······略,對消息監聽的執行
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                ······略
            }
            ······略
            //當前行,空指針異常······
            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE,returnType.name());
            if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", //
                        ConsumeMessageConcurrentlyService.this.consumerGroup, //
                        msgs, //
                        messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

解決

解決方法一:傳入 Hook

consumer 在初始化時,向其默認的實現類註冊 Hookget

consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageHook() {
            @Override
            public String hookName() {
                return null;
            }

            @Override
            public void consumeMessageBefore(ConsumeMessageContext context) {

            }

            @Override
            public void consumeMessageAfter(ConsumeMessageContext context) {

            }
        });

解決方法二:其它版本的MQ客戶端

不須要註冊 Hook,又不想註冊 Hook 的,能夠選擇其它客戶端版本,如:it

<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.5.8</version>
        </dependency>

這個Final版本,略坑!io

相關文章
相關標籤/搜索