RocketMq進階源碼學習之生產者啓動流程分析java
這裏找個example,單純簡單的發送一條消息,從生產者的start方法開始入手.生產者的啓動流程比較簡單,本文篇幅較短,只分析了主流程,函數
public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send() producer.shutdown(); }
這裏是start方法,在裏面的start方法下,有幾行代碼時表示是否開啓消息軌跡追蹤的,這裏對traceDispatcher進行了null判斷,這裏不明白traceDispatcher是在哪定義的,就一路追蹤了下,發現traceDispatcher這個對象是在Producer的構造函數的中進行初始化的,DefaultMqProducer有一個構造函數裏有一個參數是enableMsgTrace,若是傳入爲true,就會初始化traceDispatcher對象,那麼在start方法這裏判斷不爲空就會開啓消息軌跡追蹤了學習
好比這樣的構造函數便可開啓消息軌跡追蹤fetch
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
public void start() throws MQClientException { //設置生產者組名 this.setProducerGroup(withNamespace(this.producerGroup)); this.defaultMQProducerImpl.start(); //traceDispatcher是爲初始化,已初始化則表示開啓消息追蹤 if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
而後再通過一些配置的校驗以後,開始啓動,主要是作一些定時任務與支線服務線程(如消息重平衡服務,拉取服務等等)this
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } //開啓netty客戶端,NettyRemotingClient this.mQClientAPIImpl.start(); //開啓多個定時任務線程池,如發送心跳,持久化消息消費的offset等 this.startScheduledTask(); //開啓拉取消息服務 this.pullMessageService.start(); //開啓重均衡消息服務 this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } }
生產者啓動基本就到此爲止了,就是作一些校驗,看看是否須要開啓消息軌跡追蹤,再啓動Netty客戶端,而後在啓動一些輔助服務就啓動完畢了spa