RocketMQ客戶端加載流程

 這節介紹RocketMQ客戶端的啓動流程,即Consumer和Producer的啓動流程。express

1. 客戶端demo

 首先先看下客戶端的demobootstrap

Producer:api

public class SyncProducer {

    public static void main (String[] args) throws Exception {
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer ("GroupTest");
        // 設置NameServer的地址
        producer.setNamesrvAddr ("localhost:9876");
        // 啓動Producer實例
        producer.start ();
        for (int i = 0; i < 100; i++) {
            // 建立消息,並指定Topic,Tag和消息體
            Message msg = new Message ("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 發送消息到一個Broker
            SendResult sendResult = producer.send (msg);
            // 經過sendResult返回消息是否成功送達
            System.out.printf ("%s%n", sendResult);
        }
        // 若是再也不發送消息,關閉Producer實例。
        producer.shutdown ();
    }
}

Consumer:緩存

public class Consumer {

    public static void main (String[] args) throws InterruptedException, MQClientException {

        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest");

        // 設置NameServer的地址
        consumer.setNamesrvAddr ("localhost:9876");

        // 訂閱一個或者多個Topic,以及Tag來過濾須要消費的消息
        consumer.subscribe ("TopicTest", "*");
        // 註冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啓動消費者實例
        consumer.start ();
        System.out.printf ("Consumer Started.%n");
    }
}

Producer和Consumer的啓動相似,在初始化而後進行必要設置(主要是客戶端所屬的Group和NameServer地址)後,執行start方法啓動後臺監聽服務,事實上Producer和Consumer都是調用同一個類MQClientInstance的start方法,下圖爲繼承關係:微信

file

DefaultMQproducer和DefaultMQPushConsumer都繼承自ClientConfig,顧名思義ClientConfig表示客戶端的配置,包括NameServer地址、客戶端地址、客戶端實例名等。因爲Producer和Consumer都須要同Broker和NameServer交互,因此配置上有不少相同,這兩個將主要功能的實現都委託給了對應的Impl(DefaultMQProducerImpl和DefaultMQPushConsumerImpl)。Impl內部調用了MQClientInstance來完成客戶端同遠程交互的主要功能,而Producer和Consumer則封裝本身相關的行爲,MQClientInstance內部又委託忒了MQClientAPIImpl。app

2. Producer的啓動

 DefaultMQProducer的啓動以下:dom

file

DefaultMQProducer將start委託給了DefaultMQProducerImpl來完成,主要過程爲:tcp

  • DefaultMQProducerImpl先標記客戶端當前狀態爲START_FAILED(初始狀態爲CREATE_JUST)
  • 調用MQClientManager的getAndCreateMQClientInstance方法獲取MQClientInstance,每一個客戶端實例都會對應一個MQClientInstance,並由MQClientManager管理。MQClientManager內部使用一個Map維護各客戶端的關係,key爲clientId(格式爲ip@instName,instName爲pid),value爲MQClientInstance實例。當key不存在時則會初始化一個實例,在初始化時連帶初始化MQClientAPIImpl、NettyRemoteClient等。
  • 調用MQClientInstance的registerProducer方法,註冊當前客戶端自身。實現上是客戶端放入client實例緩存中,定時器定時上報,後面會說。
  • 調用MQClientInstance的start方法,啓動客戶端的後臺任務,該方法是重點,後面會介紹。
  • 標記客戶端當前狀態爲RUNNING
  • 調用MQClientInstance的sendHeartbeatToAllBrokerWithLock方法,向全部Broker上報心跳

3. Consumer的啓動

 DefaultMQPushConsumer的啓動以下:ide

file

DefaultMQPushConsumer一樣將start委託給了DefaultMQPushConsumerImpl來完成,流程上也類似。但相比DefaultMQProducer多了不少其餘組件來輔助消費過程,如rebalance、offset管理等,主要過程爲:fetch

  • DefaultMQPushConsumerImpl先標記客戶端當前狀態爲START_FAILED(初始狀態爲CREATE_JUST)
  • 同步設置RebalanceImpl的topic(Map</*topic*/String,/*sub expression*/String>)信息
  • 同DefaultMQProducer一致,調用MQClientManager的getAndCreateMQClientInstance方法獲取MQClientInstance,每一個客戶端實例都會對應一個MQClientInstance,並由MQClientManager管理。MQClientManager內部使用一個Map維護各客戶端的關係,key爲clientId(格式爲ip@instName),value爲MQClientInstance實例。當key不存在時則會初始化一個實例,在初始化時連帶初始化MQClientAPIImpl、NettyRemoteClient等。這裏須要說明的是,RocketMQ中Consumer的消費模式分爲CLUSTERING和BROADCASTING,即集羣消費和廣播消費。區別在於集羣消費時,一條消息只會被一個實例消費,即各實例會平分全部的消息;而廣播消費時全部實例都會收到同一條消息。體如今clientId的是,集羣模式下instName爲pid,而廣播模式instName爲DEFAULT。
  • 設置RebalanceImpl屬性,包括所在Group、消費模式、消息分配策略(平均分配q的策略)
  • 初始化PlullAPIWrapper,設置消息過濾器鉤子列表
  • 初始化OffsetStore,設置offset的存儲模式,廣播模式使用本地存儲;集羣模式使用遠程存儲
  • 初始化ConsumeMessageService,根據監聽器類型設定消息消費模式(順序消費/並行消費),pull模式須要本身指定offset,push不須要設定。
  • 啓動ConsumeMessageService
  • 同DefaultMQProducer一致,調用MQClientInstance的registerProducer方法,註冊當前客戶端自身。實現上是客戶端放入client實例緩存中,定時器定時上報,後面會說。
  • 調用MQClientInstance的start方法,啓動客戶端的後臺任務,該方法是重點,後面會介紹。
  • 標記客戶端當前狀態爲RUNNING
  • 判斷監聽信息是否發生改變,從namesrv更新topic的路由信息
  • 調用MQClientInstance的checkClientInBroker方法,確認該實例已經在broker註冊成功,不然拋異常
  • 調用MQClientInstance的sendHeartbeatToAllBrokerWithLock方法,向全部Broker上報心跳
  • 調用MQClientInstance的rebalanceImmediately方法,觸發一次rebalance

 DefaultMQPushConsumer爲推模式,RocketMQ還提供了拉模式來消費消息,實現類爲DefaultMQPullConsumer,啓動過程相似,推模式是用拉模式來實現的,重點實現都在MQClientInstace中。

4. MQClientInstance

 MQClientInstance爲一個門戶類,組合了各功能,以下,包括Rebalance、消費數據統計、生產消息、消費消息等,這些都有對應的實現。

file

 上面說過,Producer和Consumer在啓動的時候,都會在內部先初始化一個MQClientInstance對象,而後調用其start方法啓動對應的後臺程序,以下:

file

MQClientInstance的start方法除了調用自身進行準備工做外,也調用了其餘組件的start方法開始它們的準備工做,主要流程爲:

  1. 先標記客戶端當前狀態爲START_FAILED(初始狀態爲CREATE_JUST)
  • 若沒有指定nameserver地址,則調用MQClientAPIImpl同步獲取一次(經過設置的Http endpoint同步)
  • 調用MQClientAPIImpl的start方法,主要是初始化Netty客戶端,啓動netty client初始化任務,鏈接的創建發生在第一次請求時
  • 開啓MQClientInstance的定時任務,包括:
    1. 若是沒有指定nameserver地址,每兩分鐘從配置的endpoint處同步nameserver地址
    • 定時從namesrv同步topic路由信息
    • 定時清除下線的broker信息;發送心跳
    • 定時持久化消費者消費的offset信息
    • 每1分鐘調整線程池的大小
  • 調用PullMessageService的start方法,啓動拉取消息線程
  • 調用RebalanceService的start方法,啓動rebalance線程
  • 調用內部Producer(CLIENT_INNER_PRODUCER)的start方法
  • 標記客戶端當前狀態爲RUNNING

下面詳細介紹下各個過程。

4.2. MQClientAPIImpl.fetchNameServerAddr

 該方法用於更新NameServer地址,該方法會從http://xxx:port/rocketmq/yyy,默認8080端口(若是xxx中沒有:,即不帶端口時)中獲取NameServer地址(xxx爲域名,由系統配置項rocketmq.namesrv.domain控制,默認爲jmenv.tbsite.net;yyy爲訪問路徑,由系統配置項rocketmq.namesrv.domain.subgroup控制,默認爲nsaddr)。該地址要求返回結果爲一個ip列表,以;隔開,若是獲取回來的地址跟現有的地址不一致則會更新緩存的NameServer地址列表。解析出來的地址列表用於根據NettyRemotingClient內部持有的變量:

private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
4.3. MQClientAPIImpl.start

 該方法在內部調用了NettyRemotingClient的start方法,用於初始化Netty客戶端。NettyRemotingClient是基於Netty實現的tcp協議客戶端,主要流程爲:

  • 初始化客戶端bootstrap鏈接池
  • 設置處理鏈:編碼、解碼、空閒處理、鏈接管理(服務端)、請求分發
  • 每3秒清除超時的請求(netty主線程不處理邏輯)
  • 啓動客戶端的事件處理器,處理IDLE、CLOSE、CONNECT、EXCEPTION事件

關於NettyRemotingClient後面會專門進行講解,這裏只介紹在客戶端啓動時其作了哪些動做。

4.4.2. MQClientInstance.updateTopicRouteInfoFromNameServer

 該方法用於根據客戶端實例關注的全部topic的路由信息,包括客戶端監聽的topic以及producer生產的topic。首先會遍歷從MQClientInstance內部的consumerTable和consumerTable的客戶端實例,拿到全部的topic信息,而後挨個更新topic的路由。

 同步topic路由時,會經過NettyRemotingClient選擇一個NameServer獲取topic路由信息,而後判斷topic信息是否發生了更改,主要比較topic所對應的Queue和Broker是否發生了更改。若路由信息發生了更改則會同步topic所在的broker地址列表,即內部的brokerAddrTable屬性;接着同步produer關注的topic路由信息,即producerTable屬性;接着同步consumer訂閱的topic路由信息,即consumerTable屬性;最後更新本地topic信息,即topicRouteTable屬性。

4.4.3. MQClientInstance.sendHeartbeatToAllBrokerWithLock

 該方法會遍歷MQClient所持有的各個producer和consumer,將客戶端信息構造爲HeartbeatData對象,而後調用MQClientAPIImpl的sendHearbeat方法,向全部的broker上報心跳數據。心跳內容包括:

  • Consumer:全部Consumer的Group、消費類型、消息模式、消費起始offset、訂閱消息的篩選類型等
  • Producer:全部Producer的group
4.4.4. MQClientInstance.persistAllConsumerOffset

 該方法會遍歷consumerTable裏的全部MQConsumer對象,獲取每一個隊列處理的MessageQueue,而後調用OffsetStore持久化全部的MessageQueue。OffsetStore後面會專門進行講解。

4.4.5. MQClientInstance.adjustThreadPool

 該方法主要是動態調整DefaultMQPushConsumerImpl(推模式)客戶端消費線程池的大小。前面說過推模式是經過包裝拉模式來實現的,內部都依賴PullAPIWrapper。實現上推模式多了一個ConsumeMessageService定時使用拉模式消費消息,該實現須要一個線程池,adjustThreadPool就是動態調整該線程池的大小。關於客戶端消費消息的過程,後面也會專門進行講解。

4.5. PullMessageService.start

 PullMessageService用於封裝拉模式以實現推模式。它會循環從內部的LinkedBlockingQueue<PullRequest>中拿出PullRequest對象(消費q消息封裝的對象),選取一個可用的客戶端實例DefaultMQPushConsumerImpl,調用其pullMessage方法.該方法會判斷消費進度,決定是當即消費仍是延遲消費,若是是延遲消費則再放回LinkedBlockingQueue中等待消費;若是是直接消費,則調用PullMessageService(拉模式)的executePullRequestImmediately消費消息.

 PullMessageService的基礎關係以下:

file

PullMessageService.start內部主要是啓動線程,該線程會循環執行執行任務,具體實現會在後續介紹消息消費的時候說起。

4.6. RebalanceService.start

 該方法用於啓動rebalance任務。RebalanceService同PullMessageService相同,都繼承自ServiceThread類,,並實現了run方法。RebalanceService在run方法中等待必定時間(默認20S,能夠經過rocketmq.client.rebalance.waitInterval配置具體時間)後會調用MQClientInstance.doRebalance執行具體的動做。具體實現會在後續介紹rebalance實現的時候說起。

4.7. DefaultMQPushConsumerImpl.start

 在上面2.時有說起該流程,這裏的DefaultMQPushConsumerImpl對象是Group爲CLIENT_INNER_PRODUCER的內部對象。

 客戶端的啓動過程就如上面介紹,下面附上該部分當時源碼閱讀過程作的筆記簡圖,該圖描述了客戶端啓動過程的大體過程:
file

更多原創內容請搜索微信公衆號:啊駝(doubaotaizi)

相關文章
相關標籤/搜索