出現了重複消費的問題,同一個消息被重複消費了屢次,致使了用戶端收到了多條重複的消息,最終排查發現,是由於消費者在處理消息的方法onMessage中有異常沒有捕獲到,致使異常上拋,被consumeMessage捕獲並斷定爲消費失敗,從而放到了重試隊列當中進行重試,下面咱們就來看看RocketMq中會引發消息重試的兩種狀況,內部異常和消費超時。java
在Consumer中處理消息時,會在消費完消息後判斷消費的總時長,若是比超時時間要長則返回TIME_OUT,注意這裏的超時是在consumeMessage內部邏輯處理完畢以後在進行判斷的,若是內部邏輯處理成功,但耗時較長,那麼也會被判斷爲超時。ide
在DefaultMQPushConsumer.java中定義了消費的超時時間爲15分鐘。this
consumeMessage方法中會有兩種返回狀態,正常的狀態消費成功CONSUME_SUCCESS和出現異常時的重試狀態RECONSUME_LATER。spa
若是消費時長超過超時時間那麼即使consumeMessage方法處理成功,返回狀態也是TIME_OUT。3d
class ConsumeRequest implements Runnable { private final List<MessageExt> msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue; public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { this.msgs = msgs; this.processQueue = processQueue; this.messageQueue = messageQueue; } public List<MessageExt> getMsgs() { return msgs; } public ProcessQueue getProcessQueue() { return processQueue; } @Override public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap<String, String>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } } public MessageQueue getMessageQueue() { return messageQueue; } }