本文主要研究一下rocketmq的sendOrderlyjava
rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.javagit
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean { //...... public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("syncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); } return sendResult; } catch (Exception e) { log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("asyncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); } catch (Exception e) { log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } //...... }
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.javagithub
public class DefaultMQProducer extends ClientConfig implements MQProducer { //...... public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg, selector, arg, timeout); } public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } //...... }
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.javaspring
public class DefaultMQProducerImpl implements MQProducerInner { //...... public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); } private SendResult sendSelectImpl( Message msg, MessageQueueSelector selector, Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeout < costTime) { throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); } if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); } else { throw new MQClientException("select message queue return null.", null); } } validateNameServerSetting(); throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); } //...... }
這裏沒有使用sendDefaultImpl方法,該方法會經過selectOneMessageQueue(topicPublishInfo, lastBrokerName)來選擇MessageQueue來發送;而sendOrderly方法是經過MessageQueueSelector選擇MessageQueue來發送