RocketMQ探索-Producer的start

在RocketMQ中,使用Producer相關類來生產消息,第一次使用的時,會調用producer.start()方法來進行初始化。這裏咱們來探索一下Producer的start作了些什麼。ide

時序圖以下:fetch

producer.start()實際上是調用DefaultMQProducerImpl.start(),重點看看裏面2句代碼:this

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

說明:把DefaultMQProducer對象添加到MQClientInstance的producerTable屬性中:線程

public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }

    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }

    return true;
}

producerTable對象裏面存儲producerGroupName和DefaultMQProducer的映射。key-value:<producerGroupName,DefaultMQProducer>netty

if (startFactory) {
    mQClientFactory.start();
}

mQClientFactory.start()調用MQClientInstance(負責啓動通訊服務和定時任務)的start:server

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
                }
                // Start request-response channel 啓動MQClientAPIImpl
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service 用於將消費失敗的消息發回broker
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

在Producer啓動的時候,serviceState值爲CREATE_JUST。由於在以上的方法中就是調用各類的start方法:對象

  • this.mQClientAPIImpl.start();

        說明:調用:MQClientAPIImpl.start()--->NettyRemotingClient.start():MQClientAPIImpl(客戶端與遠程交互的封裝,其內部使用了RemotingClient來實現與遠程的交互),NettyRemotingClient.start()的方法裏面啓動了netty的通訊客戶端(這裏就不對netty作介紹了,由於這個是個大工程)。繼承

  • this.startScheduledTask();

         說明:啓動各類的定時任務:隊列

  • /**
     * 啓動各類定時任務
     */
    private void startScheduledTask() {
        //每兩分鐘執行一次尋址服務(NameServer地址)
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
    
        //每30秒更新一次全部的topic的路由信息(topicRouteTable)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
    
        //每30秒移除離線的broker
        //每30秒發送一次心跳給全部的master broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    
        //更新offset每5秒提交一次消費的offset,broker端爲ConsumerOffsetManager負責記錄,此offset是邏輯偏移量,好比說,consumerA@consumerAGroup 在broker_a的queue 0的消費隊列共有10000條消息,目前消費到888,那麼offset就是888.
        //由於producer和consumer內部都持有MQClientInstance實例,故MQClientInstance既有生產者邏輯,又有消費者邏輯。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
        //每1分鐘調整一次線程池,這也是針對消費者來講的,具體爲若是消息堆積超過10W條,則調大線程池,最多64個線程;若是消息堆積少於8W條,則調小線程池,最少20的線程。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

         查看方法上面的註釋。ci

  • this.pullMessageService.start();

         說明:consumer的拉取消息線程實現方式:PullMessageService繼承ServiceThread(對拉取消息請求進行了封裝,使其隊列化),start拉取消息線程啓動,在run方法裏面實現了:不斷的從pullRequestQueue中取出請求,並調用消息拉取。

  • this.rebalanceService.start();

         說明:再平衡線程啓動

  • this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

         說明:暫時沒有搞明白

總結:在Producer啓動的時候,作了幾件事情:

  1. 把本身保持到producerTable屬性中<groupName,Producer>
  2. 啓動各類定時任務(對MQClientInstance說明一下,由於生產和消費者都會持有MQClientInstance因此在啓動任務的時候會啓動生產和消費相關的任務線程)

         啓動執行尋址服務的任務(NameServer地址)。

         啓動更新全部的topic的路由信息(topicRouteTable)的任務。

         啓動移除離線的broker和發送心跳給全部的master broker的任務。

         啓動提交消費的offset(邏輯偏移量)到broker(broker端爲ConsumerOffsetManager負責記錄)            的任務。

         啓動調整線程池的任務(針對消費)。

相關文章
相關標籤/搜索