RocketMq進階源碼學習之生產者啓動流程分析

RocketMq進階源碼學習之生產者啓動流程分析java

這裏找個example,單純簡單的發送一條消息,從生產者的start方法開始入手.生產者的啓動流程比較簡單,本文篇幅較短,只分析了主流程,函數

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.start();
    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    producer.send()
    producer.shutdown();
}

這裏是start方法,在裏面的start方法下,有幾行代碼時表示是否開啓消息軌跡追蹤的,這裏對traceDispatcher進行了null判斷,這裏不明白traceDispatcher是在哪定義的,就一路追蹤了下,發現traceDispatcher這個對象是在Producer的構造函數的中進行初始化的,DefaultMqProducer有一個構造函數裏有一個參數是enableMsgTrace,若是傳入爲true,就會初始化traceDispatcher對象,那麼在start方法這裏判斷不爲空就會開啓消息軌跡追蹤了學習

好比這樣的構造函數便可開啓消息軌跡追蹤fetch

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
public void start() throws MQClientException {
    //設置生產者組名
    this.setProducerGroup(withNamespace(this.producerGroup));
    this.defaultMQProducerImpl.start();
    //traceDispatcher是爲初始化,已初始化則表示開啓消息追蹤
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

而後再通過一些配置的校驗以後,開始啓動,主要是作一些定時任務與支線服務線程(如消息重平衡服務,拉取服務等等)this

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                //開啓netty客戶端,NettyRemotingClient
                this.mQClientAPIImpl.start();
                //開啓多個定時任務線程池,如發送心跳,持久化消息消費的offset等
                this.startScheduledTask();
                //開啓拉取消息服務
                this.pullMessageService.start();
                //開啓重均衡消息服務
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }

生產者啓動基本就到此爲止了,就是作一些校驗,看看是否須要開啓消息軌跡追蹤,再啓動Netty客戶端,而後在啓動一些輔助服務就啓動完畢了spa

相關文章
相關標籤/搜索