本文主要研究一下rocketmq的pullThresholdForQueuejava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { //...... /** * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit */ private int pullThresholdForQueue = 1000; /** * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default, * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit * * <p> * The size of a message only measured by message body, so it's not accurate */ private int pullThresholdSizeForQueue = 100; public int getPullThresholdForQueue() { return pullThresholdForQueue; } public void setPullThresholdForQueue(int pullThresholdForQueue) { this.pullThresholdForQueue = pullThresholdForQueue; } public int getPullThresholdSizeForQueue() { return pullThresholdSizeForQueue; } public void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue) { this.pullThresholdSizeForQueue = pullThresholdSizeForQueue; } //...... }
默認值1000
)、pullThresholdSizeForQueue(默認值100
)屬性rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javagithub
public class DefaultMQPushConsumerImpl implements MQConsumerInner { //...... private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup()); //...... // pullThresholdForQueue if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) { throw new MQClientException( "pullThresholdForQueue Out of range [1, 65535]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // pullThresholdSizeForQueue if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) { throw new MQClientException( "pullThresholdSizeForQueue Out of range [1, 1024]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } //...... } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javaapache
public class DefaultMQPushConsumerImpl implements MQConsumerInner { //...... /** * Flow control interval */ private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; //...... public void pullMessage(final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); return; } if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } //...... } private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay); } //...... }
processQueue.getMsgCount()
)是否大於defaultMQPushConsumer.getPullThresholdForQueue(),大於的話則執行executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL)而後提早返回;以後會判斷cachedMessageSizeInMiB(processQueue.getMsgSize().get() / (1024 * 1024)
)是否大於defaultMQPushConsumer.getPullThresholdSizeForQueue(),大於的話也會執行executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL)而後提早返回cachedMessageCount,默認值1000
)、pullThresholdSizeForQueue(cachedMessageSizeInMiB,默認值100
)屬性