本文主要研究一下rocketmq的consumeMessageBatchMaxSizejava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { private final InternalLogger log = ClientLogger.getLog(); //...... /** * Batch consumption size */ private int consumeMessageBatchMaxSize = 1; public int getConsumeMessageBatchMaxSize() { return consumeMessageBatchMaxSize; } public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javagithub
public class DefaultMQPushConsumerImpl implements MQConsumerInner { //...... private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup()); //...... // consumeMessageBatchMaxSize if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1 || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) { throw new MQClientException( "consumeMessageBatchMaxSize Out of range [1, 1024]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } //...... } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.javaapache
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { //...... public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } } //...... }
DefaultMQPushConsumer定義了consumeMessageBatchMaxSize屬性,默認值爲1;DefaultMQPushConsumerImpl的checkConfig方法會校驗defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必須大於等於且小於等於1024;ConsumeMessageConcurrentlyService的submitConsumeRequest方法在msgs.size()小於等於consumeBatchSize時會建立ConsumeRequest,而後提交到consumeExecutor執行,若出現RejectedExecutionException則執行submitConsumeRequestLater;對於msgs.size()大於consumeBatchSize的,則按consumeBatchSize分批建立ConsumeRequest提交給consumeExecutor執行,若出現RejectedExecutionException則將剩餘的msg添加到msgThis,而後執行submitConsumeRequestLaterthis