本文主要研究一下rocketmq的pullThresholdForTopicjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { //...... /** * Flow control threshold on topic level, default value is -1(Unlimited) * <p> * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on * {@code pullThresholdForTopic} if it is't unlimited * <p> * For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, * then pullThresholdForQueue will be set to 100 */ private int pullThresholdForTopic = -1; /** * Limit the cached message size on topic level, default value is -1 MiB(Unlimited) * <p> * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on * {@code pullThresholdSizeForTopic} if it is't unlimited * <p> * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are * assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB */ private int pullThresholdSizeForTopic = -1; //...... public int getPullThresholdForTopic() { return pullThresholdForTopic; } public void setPullThresholdForTopic(final int pullThresholdForTopic) { this.pullThresholdForTopic = pullThresholdForTopic; } public int getPullThresholdSizeForTopic() { return pullThresholdSizeForTopic; } public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) { this.pullThresholdSizeForTopic = pullThresholdSizeForTopic; } //...... }
默認值-1
)、pullThresholdSizeForTopic(默認值-1
)屬性rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javagithub
public class DefaultMQPushConsumerImpl implements MQConsumerInner { //...... private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup()); //...... // pullThresholdForTopic if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) { if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) { throw new MQClientException( "pullThresholdForTopic Out of range [1, 6553500]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } } if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) { // pullThresholdSizeForTopic if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) { throw new MQClientException( "pullThresholdSizeForTopic Out of range [1, 102400]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } } //...... } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.javaapache
public class RebalancePushImpl extends RebalanceImpl { private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000")); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { this(null, null, null, null, defaultMQPushConsumerImpl); } public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; } @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { /** * When rebalance result changed, should update subscription's version to notify broker. * Fix: inconsistency subscription may lead to consumer miss messages. */ SubscriptionData subscriptionData = this.subscriptionInner.get(topic); long newVersion = System.currentTimeMillis(); log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); subscriptionData.setSubVersion(newVersion); int currentQueueCount = this.processQueueTable.size(); if (currentQueueCount != 0) { int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic(); if (pullThresholdForTopic != -1) { int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount); log.info("The pullThresholdForQueue is changed from {} to {}", this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal); this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal); } int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic(); if (pullThresholdSizeForTopic != -1) { int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount); log.info("The pullThresholdSizeForQueue is changed from {} to {}", this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal); this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal); } } // notify broker this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); } //...... }
默認值-1
)、pullThresholdSizeForTopic(默認值-1
)屬性