聊聊rocketmq的suspendCurrentQueueTimeMillis

本文主要研究一下rocketmq的suspendCurrentQueueTimeMillisjava

suspendCurrentQueueTimeMillis

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    //......

    /**
     * Suspending pulling time for cases requiring slow pulling like flow-control scenario.
     */
    private long suspendCurrentQueueTimeMillis = 1000;

    public long getSuspendCurrentQueueTimeMillis() {
        return suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
    }

    //......
}
  • DefaultMQPushConsumer定義了suspendCurrentQueueTimeMillis屬性,默認值爲1000

submitConsumeRequestLater

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.javagithub

public class ConsumeMessageOrderlyService implements ConsumeMessageService {

    //......

    private void submitConsumeRequestLater(
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final long suspendTimeMillis
    ) {
        long timeMillis = suspendTimeMillis;
        if (timeMillis == -1) {
            timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
        }

        if (timeMillis < 10) {
            timeMillis = 10;
        } else if (timeMillis > 30000) {
            timeMillis = 30000;
        }

        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
                ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);
            }
        }, timeMillis, TimeUnit.MILLISECONDS);
    }

    //......
}
  • submitConsumeRequestLater方法在timeMillis爲-1時會讀取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,若是該值小於10則重置爲10,若是該值大於30000則重置爲30000;而後使用scheduledExecutorService延時timeMillis執行submitConsumeRequest方法

小結

DefaultMQPushConsumer定義了suspendCurrentQueueTimeMillis屬性,默認值爲1000;ConsumeMessageOrderlyService的submitConsumeRequestLater方法在timeMillis爲-1時會讀取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,若是該值小於10則重置爲10,若是該值大於30000則重置爲30000;而後使用scheduledExecutorService延時timeMillis執行submitConsumeRequest方法apache

doc

相關文章
相關標籤/搜索