聊聊rocketmq的maxReconsumeTimes

本文主要研究一下rocketmq的maxReconsumeTimesjava

maxReconsumeTimes

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Max re-consume times. -1 means 16 times.
     * </p>
     *
     * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
     * queue waiting.
     */
    private int maxReconsumeTimes = -1;

    //......

    public int getMaxReconsumeTimes() {
        return maxReconsumeTimes;
    }

    public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
        this.maxReconsumeTimes = maxReconsumeTimes;
    }

    //......
}
  • DefaultMQPushConsumer定義了maxReconsumeTimes屬性,默認爲-1

sendMessageBack

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javagithub

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    //......

    public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
                this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
        } catch (Exception e) {
            log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

            String originMsgId = MessageAccessor.getOriginMessageId(msg);
            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

            newMsg.setFlag(msg.getFlag());
            MessageAccessor.setProperties(newMsg, msg.getProperties());
            MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
        } finally {
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
        }
    }

    private int getMaxReconsumeTimes() {
        // default reconsume times: 16
        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
            return 16;
        } else {
            return this.defaultMQPushConsumer.getMaxReconsumeTimes();
        }
    }

    //......
}
  • DefaultMQPushConsumerImpl的sendMessageBack方法會對mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack進行異常捕獲,出現異常時會使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及調用getMaxReconsumeTimes方法設置newMsg的maxReconsumeTimes,最後使用mQClientFactory.getDefaultMQProducer().send(newMsg)發送消息

handleRetryAndDLQ

rocketmq/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.javaapache

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    //......

    private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
                                      RemotingCommand request,
                                      MessageExt msg, TopicConfig topicConfig) {
        String newTopic = requestHeader.getTopic();
        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark(
                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return false;
            }

            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
            }
            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
            if (reconsumeTimes >= maxReconsumeTimes) {
                newTopic = MixAll.getDLQTopic(groupName);
                int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0
                );
                msg.setTopic(newTopic);
                msg.setQueueId(queueIdInt);
                if (null == topicConfig) {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("topic[" + newTopic + "] not exist");
                    return false;
                }
            }
        }
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
        }
        msg.setSysFlag(sysFlag);
        return true;
    }

    //......
}
  • SendMessageProcessor的handleRetryAndDLQ方法會判斷若是topic是RETRY_GROUP_TOPIC_PREFIX(%RETRY%)開頭的,會先從subscriptionGroupConfig.getRetryMaxTimes()獲取maxReconsumeTimes,對於mq版本大於等於MQVersion.Version.V3_4_9.ordinal()的則會從request的header中讀取maxReconsumeTimes;以後從request的header讀取reconsumeTimes,若是該值大於等於maxReconsumeTimes則更新newTopic爲MixAll.getDLQTopic(groupName)

小結

DefaultMQPushConsumer定義了maxReconsumeTimes屬性,默認爲-1;DefaultMQPushConsumerImpl的sendMessageBack方法會對mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack進行異常捕獲,出現異常時會使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及調用getMaxReconsumeTimes方法設置newMsg的maxReconsumeTimes,最後使用mQClientFactory.getDefaultMQProducer().send(newMsg)發送消息;broker端會判斷reconsumeTimes若是大於等於maxReconsumeTimes則會將其topic改成MixAll.getDLQTopic(groupName)dom

doc

相關文章
相關標籤/搜索