本文主要研究一下rocketmq的suspendCurrentQueueTimeMillisjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { //...... /** * Suspending pulling time for cases requiring slow pulling like flow-control scenario. */ private long suspendCurrentQueueTimeMillis = 1000; public long getSuspendCurrentQueueTimeMillis() { return suspendCurrentQueueTimeMillis; } public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.javagithub
public class ConsumeMessageOrderlyService implements ConsumeMessageService { //...... private void submitConsumeRequestLater( final ProcessQueue processQueue, final MessageQueue messageQueue, final long suspendTimeMillis ) { long timeMillis = suspendTimeMillis; if (timeMillis == -1) { timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis(); } if (timeMillis < 10) { timeMillis = 10; } else if (timeMillis > 30000) { timeMillis = 30000; } this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true); } }, timeMillis, TimeUnit.MILLISECONDS); } //...... }
DefaultMQPushConsumer定義了suspendCurrentQueueTimeMillis屬性,默認值爲1000;ConsumeMessageOrderlyService的submitConsumeRequestLater方法在timeMillis爲-1時會讀取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,若是該值小於10則重置爲10,若是該值大於30000則重置爲30000;而後使用scheduledExecutorService延時timeMillis執行submitConsumeRequest方法apache