本文主要研究一下rocketmq的SequenceProducerImpljava
io/openmessaging/rocketmq/producer/SequenceProducerImpl.javagit
public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer { private BlockingQueue<Message> msgCacheQueue; public SequenceProducerImpl(final KeyValue properties) { super(properties); this.msgCacheQueue = new LinkedBlockingQueue<>(); } @Override public KeyValue properties() { return properties; } @Override public void send(final Message message) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message); try { Validators.checkMessage(rmqMessage, this.rocketmqProducer); } catch (MQClientException e) { throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); } msgCacheQueue.add(message); } @Override public void send(final Message message, final KeyValue properties) { send(message); } @Override public synchronized void commit() { List<Message> messages = new ArrayList<>(); msgCacheQueue.drainTo(messages); List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>(); for (Message message : messages) { rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); } if (rmqMessages.size() == 0) { return; } try { SendResult sendResult = this.rocketmqProducer.send(rmqMessages); String[] msgIdArray = sendResult.getMsgId().split(","); for (int i = 0; i < messages.size(); i++) { Message message = messages.get(i); message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); } } catch (Exception e) { throw checkProducerException("", "", e); } } @Override public synchronized void rollback() { msgCacheQueue.clear(); } }
rocketmq的SequenceProducerImpl在send方法的時候不是真正方法,而是添加到隊列,只有在commit的時候才批量發送,rollback的時候清空隊列。這裏的send方法語義不是太好,能夠改成pending之類的名稱。github