本文主要研究一下rocketmq的RECONSUME_LATERjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.javagit
public enum ConsumeConcurrentlyStatus { /** * Success consumption */ CONSUME_SUCCESS, /** * Failure consumption,later try to consume */ RECONSUME_LATER; }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.javagithub
public interface MessageListenerConcurrently extends MessageListener { /** * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if * consumption failure * * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here * @return The consume status */ ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeConcurrentlyContext context); }
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.javaspring
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); rocketMQListener.onMessage(doConvertMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.javaapache
class ConsumeRequest implements Runnable { private final List<MessageExt> msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue; public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { this.msgs = msgs; this.processQueue = processQueue; this.messageQueue = messageQueue; } public List<MessageExt> getMsgs() { return msgs; } public ProcessQueue getProcessQueue() { return processQueue; } @Override public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap<String, String>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } } public MessageQueue getMessageQueue() { return messageQueue; } }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.javaide
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final InternalLogger log = ClientLogger.getLog(); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final DefaultMQPushConsumer defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final BlockingQueue<Runnable> consumeRequestQueue; private final ThreadPoolExecutor consumeExecutor; private final String consumerGroup; private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService cleanExpireMsgExecutors; //...... public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.javaspring-boot
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { //...... public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { int delayLevel = context.getDelayLevelWhenNextConsume(); // Wrap topic with namespace before sending back message. msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic())); try { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); } return false; } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.javathis
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { //...... private void submitConsumeRequestLater( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue ) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true); } }, 5000, TimeUnit.MILLISECONDS); } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.javaspa
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { //...... public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javadebug
public class DefaultMQPushConsumerImpl implements MQConsumerInner { /** * Delay some time when exception occur */ private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; /** * Flow control interval */ private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; /** * Delay some time when suspend pull service */ private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000; private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15; private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; //...... public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); } finally { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public class MQClientAPIImpl { private final static InternalLogger log = ClientLogger.getLog(); private static boolean sendSmartMsg = Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true")); static { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); } private final RemotingClient remotingClient; private final TopAddressing topAddressing; private final ClientRemotingProcessor clientRemotingProcessor; private String nameSrvAddr = null; private ClientConfig clientConfig; //...... public void consumerSendMessageBack( final String addr, final MessageExt msg, final String consumerGroup, final int delayLevel, final long timeoutMillis, final int maxConsumeRetryTimes ) throws RemotingException, MQBrokerException, InterruptedException { ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); requestHeader.setGroup(consumerGroup); requestHeader.setOriginTopic(msg.getTopic()); requestHeader.setOffset(msg.getCommitLogOffset()); requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); } //...... }
DefaultRocketMQListenerContainer實現了MessageListenerConcurrently方法,它會循環調用rocketMQListener.onMessage,出現異常會設置delayLevelWhenNextConsume,而後當即返回ConsumeConcurrentlyStatus.RECONSUME_LATER