使用了以下版本的MQ客戶端版本:ide
<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.6.2.Final</version> </dependency>
MQ Consumer 端對消息成功消費後,若是重啓 Consumer,會對消息再次消費。但注意到每次的 reconsumeTimes=0 。this
即 consumer 從 broker 中成功消費消息後,並無將結果 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 返回。spa
如下爲 consumer 配置信息。線程
@PostConstruct public void init() throws MQClientException { log.info("構建 MQ Consumer..."); this.consumer = new DefaultMQPushConsumer(mqGroupName); consumer.setNamesrvAddr(nameServer); consumer.setInstanceName(instance); //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)所有消費一遍 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時之前 //若是非第一次啓動,按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(topic, TAG); //消費線程數量 consumer.setConsumeThreadMax(2); consumer.setConsumeThreadMin(2); consumer.setPersistConsumerOffsetInterval(1000); log.info("MQ Consume From Where is {}", consumer.getConsumeFromWhere()); } /** * 註冊監聽,並啓動MQ * * @param listener 消費監聽 */ public void consumeConcurrently(MessageListenerConcurrently listener) { try { consumer.registerMessageListener(listener); consumer.start(); } catch (MQClientException e) { log.error(e.getMessage(), e); } }
@Override public void run(String... args) throws Exception { log.info("Alarm Report MQ Consumer Start ..."); mqConsumer.consumeConcurrently((messageExtList, context) -> { try { ······ 略 } catch (Exception e) { //異常捕獲,確保不會由於程序異常致使的消息重複消費 log.error(e.getMessage(), e); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); }
Mq Consuemr 選擇 ConsumeMessageConcurrentlyService 對 broker 消息進行消費。指針
ConsumeMessageConcurrentlyService類,經過提交內部線程類對 broker 消息進行監聽消費code
在內部類 ConsumeRequest的 run 方法中,有以下邏輯:隊列
class : com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); ······略 } ······略 try { ······略,對消息監聽的執行 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { ······略 } ······略 //當前行,空指針異常······ consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE,returnType.name()); if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", // ConsumeMessageConcurrentlyService.this.consumerGroup, // msgs, // messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; }
consumer 在初始化時,向其默認的實現類註冊 Hookget
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageHook() { @Override public String hookName() { return null; } @Override public void consumeMessageBefore(ConsumeMessageContext context) { } @Override public void consumeMessageAfter(ConsumeMessageContext context) { } });
不須要註冊 Hook,又不想註冊 Hook 的,能夠選擇其它客戶端版本,如:it
<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.5.8</version> </dependency>
這個Final版本,略坑!io