在RocketMQ中,使用Producer相關類來生產消息,第一次使用的時,會調用producer.start()方法來進行初始化。這裏咱們來探索一下Producer的start作了些什麼。ide
時序圖以下:fetch
producer.start()實際上是調用DefaultMQProducerImpl.start(),重點看看裏面2句代碼:this
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
說明:把DefaultMQProducer對象添加到MQClientInstance的producerTable屬性中:線程
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } MQProducerInner prev = this.producerTable.putIfAbsent(group, producer); if (prev != null) { log.warn("the producer group[{}] exist already.", group); return false; } return true; }
producerTable對象裏面存儲producerGroupName和DefaultMQProducer的映射。key-value:<producerGroupName,DefaultMQProducer>netty
if (startFactory) { mQClientFactory.start(); }
mQClientFactory.start()調用MQClientInstance(負責啓動通訊服務和定時任務)的start:server
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr()); } // Start request-response channel 啓動MQClientAPIImpl this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service 用於將消費失敗的消息發回broker this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
在Producer啓動的時候,serviceState值爲CREATE_JUST。由於在以上的方法中就是調用各類的start方法:對象
this.mQClientAPIImpl.start();
說明:調用:MQClientAPIImpl.start()--->NettyRemotingClient.start():MQClientAPIImpl(客戶端與遠程交互的封裝,其內部使用了RemotingClient來實現與遠程的交互),NettyRemotingClient.start()的方法裏面啓動了netty的通訊客戶端(這裏就不對netty作介紹了,由於這個是個大工程)。繼承
this.startScheduledTask();
說明:啓動各類的定時任務:隊列
/** * 啓動各類定時任務 */ private void startScheduledTask() { //每兩分鐘執行一次尋址服務(NameServer地址) if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } //每30秒更新一次全部的topic的路由信息(topicRouteTable) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS); //每30秒移除離線的broker //每30秒發送一次心跳給全部的master broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); //更新offset每5秒提交一次消費的offset,broker端爲ConsumerOffsetManager負責記錄,此offset是邏輯偏移量,好比說,consumerA@consumerAGroup 在broker_a的queue 0的消費隊列共有10000條消息,目前消費到888,那麼offset就是888. //由於producer和consumer內部都持有MQClientInstance實例,故MQClientInstance既有生產者邏輯,又有消費者邏輯。 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); //每1分鐘調整一次線程池,這也是針對消費者來講的,具體爲若是消息堆積超過10W條,則調大線程池,最多64個線程;若是消息堆積少於8W條,則調小線程池,最少20的線程。 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); }
查看方法上面的註釋。ci
this.pullMessageService.start();
說明:consumer的拉取消息線程實現方式:PullMessageService繼承ServiceThread(對拉取消息請求進行了封裝,使其隊列化),start拉取消息線程啓動,在run方法裏面實現了:不斷的從pullRequestQueue中取出請求,並調用消息拉取。
this.rebalanceService.start();
說明:再平衡線程啓動
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
說明:暫時沒有搞明白
總結:在Producer啓動的時候,作了幾件事情:
啓動執行尋址服務的任務(NameServer地址)。
啓動更新全部的topic的路由信息(topicRouteTable)的任務。
啓動移除離線的broker和發送心跳給全部的master broker的任務。
啓動提交消費的offset(邏輯偏移量)到broker(broker端爲ConsumerOffsetManager負責記錄) 的任務。
啓動調整線程池的任務(針對消費)。