本文主要研究一下rocketmq的adjustThreadPoolNumsThresholdjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.javagit
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { private final InternalLogger log = ClientLogger.getLog(); //...... /** * Threshold for dynamic adjustment of the number of thread pool */ private long adjustThreadPoolNumsThreshold = 100000; public long getAdjustThreadPoolNumsThreshold() { return adjustThreadPoolNumsThreshold; } public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) { this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold; } //...... }
ocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.javagithub
public class DefaultMQPushConsumerImpl implements MQConsumerInner { //...... public void adjustThreadPool() { long computeAccTotal = this.computeAccumulationTotal(); long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold(); long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0); long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8); if (computeAccTotal >= incThreshold) { this.consumeMessageService.incCorePoolSize(); } if (computeAccTotal < decThreshold) { this.consumeMessageService.decCorePoolSize(); } } private long computeAccumulationTotal() { long msgAccTotal = 0; ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable(); Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); ProcessQueue value = next.getValue(); msgAccTotal += value.getMsgAccCnt(); } return msgAccTotal; } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.javaapache
public class MQClientInstance { //...... public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } } private void startScheduledTask() { //...... this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); } public void adjustThreadPool() { Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { try { if (impl instanceof DefaultMQPushConsumerImpl) { DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl; dmq.adjustThreadPool(); } } catch (Exception e) { } } } } //...... }
DefaultMQPushConsumer定義了adjustThreadPoolNumsThreshold屬性,默認爲100000;MQClientInstance的start方法對於CREATE_JUST狀態會執行startScheduledTask()方法,後者會註冊一個定時任務,每隔1分鐘執行一次adjustThreadPool方法;adjustThreadPool方法則遍歷consumerTable的MQConsumerInner,對於DefaultMQPushConsumerImpl類型的MQConsumerInner執行adjustThreadPool方法ide