本文主要研究一下rocketmq的maxReconsumeTimesjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { private final InternalLogger log = ClientLogger.getLog(); //...... /** * Max re-consume times. -1 means 16 times. * </p> * * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion * queue waiting. */ private int maxReconsumeTimes = -1; //...... public int getMaxReconsumeTimes() { return maxReconsumeTimes; } public void setMaxReconsumeTimes(final int maxReconsumeTimes) { this.maxReconsumeTimes = maxReconsumeTimes; } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javagithub
public class DefaultMQPushConsumerImpl implements MQConsumerInner { //...... public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); } finally { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } } private int getMaxReconsumeTimes() { // default reconsume times: 16 if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) { return 16; } else { return this.defaultMQPushConsumer.getMaxReconsumeTimes(); } } //...... }
rocketmq/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.javaapache
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { //...... private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, MessageExt msg, TopicConfig topicConfig) { String newTopic = requestHeader.getTopic(); if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark( "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return false; } int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); if (reconsumeTimes >= maxReconsumeTimes) { newTopic = MixAll.getDLQTopic(groupName); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); msg.setTopic(newTopic); msg.setQueueId(queueIdInt); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return false; } } } int sysFlag = requestHeader.getSysFlag(); if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; } msg.setSysFlag(sysFlag); return true; } //...... }
%RETRY%
)開頭的,會先從subscriptionGroupConfig.getRetryMaxTimes()獲取maxReconsumeTimes,對於mq版本大於等於MQVersion.Version.V3_4_9.ordinal()的則會從request的header中讀取maxReconsumeTimes;以後從request的header讀取reconsumeTimes,若是該值大於等於maxReconsumeTimes則更新newTopic爲MixAll.getDLQTopic(groupName)DefaultMQPushConsumer定義了maxReconsumeTimes屬性,默認爲-1;DefaultMQPushConsumerImpl的sendMessageBack方法會對mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack進行異常捕獲,出現異常時會使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及調用getMaxReconsumeTimes方法設置newMsg的maxReconsumeTimes,最後使用mQClientFactory.getDefaultMQProducer().send(newMsg)發送消息;broker端會判斷reconsumeTimes若是大於等於maxReconsumeTimes則會將其topic改成MixAll.getDLQTopic(groupName)dom