本文主要研究一下rocketmq producer的batchjava
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.javagit
public class DefaultMQProducer extends ClientConfig implements MQProducer { //...... private MessageBatch batch(Collection<Message> msgs) throws MQClientException { MessageBatch msgBatch; try { msgBatch = MessageBatch.generateFromList(msgs); for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); message.setTopic(withNamespace(message.getTopic())); } msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch", e); } msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch; } //...... }
rocketmq-common-4.6.0-sources.jar!/org/apache/rocketmq/common/message/MessageBatch.javagithub
public class MessageBatch extends Message implements Iterable<Message> { private static final long serialVersionUID = 621335151046335557L; private final List<Message> messages; private MessageBatch(List<Message> messages) { this.messages = messages; } public byte[] encode() { return MessageDecoder.encodeMessages(messages); } public Iterator<Message> iterator() { return messages.iterator(); } public static MessageBatch generateFromList(Collection<Message> messages) { assert messages != null; assert messages.size() > 0; List<Message> messageList = new ArrayList<Message>(messages.size()); Message first = null; for (Message message : messages) { if (message.getDelayTimeLevel() > 0) { throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching"); } if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { throw new UnsupportedOperationException("Retry Group is not supported for batching"); } if (first == null) { first = message; } else { if (!first.getTopic().equals(message.getTopic())) { throw new UnsupportedOperationException("The topic of the messages in one batch should be the same"); } if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) { throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same"); } } messageList.add(message); } MessageBatch messageBatch = new MessageBatch(messageList); messageBatch.setTopic(first.getTopic()); messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK()); return messageBatch; } }
DefaultMQProducer的batch方法接收Message集合,它會使用MessageBatch.generateFromList建立MessageBatch,以後遍歷MessageBatch校驗message,設置惟一id,更新topic以及body;最後返回MessageBatchapache