聊聊rocketmq的sendOrderly

本文主要研究一下rocketmq的sendOrderlyjava

sendOrderly

rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.javagit

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {

    //......

    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        try {
            long now = System.currentTimeMillis();
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
            long costTime = System.currentTimeMillis() - now;
            if (log.isDebugEnabled()) {
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
            }
            return sendResult;
        } catch (Exception e) {
            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
        long timeout) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        try {
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
        } catch (Exception e) {
            log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    //......
}
  • syncSendOrderly方法最後調用的是producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);asyncSendOrderly方法最後調用的是producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout),比syncSendOrderly多了一個sendCallback

DefaultMQProducer

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.javagithub

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    //......

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
    }

    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        msg.setTopic(withNamespace(msg.getTopic()));
        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
    }

    //......
}
  • DefaultMQProducer的send方法最後調用的是defaultMQProducerImpl.send(msg, selector, arg, timeout)或者defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout)方法

DefaultMQProducerImpl

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.javaspring

public class DefaultMQProducerImpl implements MQProducerInner {

    //......

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
    }

    private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
                List<MessageQueue> messageQueueList =
                    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
                Message userMessage = MessageAccessor.cloneMessage(msg);
                String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
                userMessage.setTopic(userTopic);

                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
                throw new MQClientException("select message queue throwed exception.", e);
            }

            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTime) {
                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
            }
            if (mq != null) {
                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
            } else {
                throw new MQClientException("select message queue return null.", null);
            }
        }

        validateNameServerSetting();
        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }

    //......
}
  • DefaultMQProducerImpl的send方法調用的是sendSelectImpl方法,該方法在Validators.checkMessage以後會經過tryToFindTopicPublishInfo(msg.getTopic())查找topicPublishInfo,找不到則拋出MQClientException
  • 找到topicPublishInfo的話則經過mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList())獲取messageQueueList
  • 以後經過mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg))方法獲取mq,最後在mq不爲null的狀況下經過sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime)發送

小結

  • DefaultMQProducerImpl的send方法調用的是sendSelectImpl方法,該方法在Validators.checkMessage以後會經過tryToFindTopicPublishInfo(msg.getTopic())查找topicPublishInfo,找不到則拋出MQClientException
  • 找到topicPublishInfo的話則經過mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList())獲取messageQueueList
  • 以後經過mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg))方法獲取mq,最後在mq不爲null的狀況下經過sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime)發送
這裏沒有使用sendDefaultImpl方法,該方法會經過selectOneMessageQueue(topicPublishInfo, lastBrokerName)來選擇MessageQueue來發送;而sendOrderly方法是經過MessageQueueSelector選擇MessageQueue來發送

doc

相關文章
相關標籤/搜索