這節介紹RocketMQ客戶端的啓動流程,即Consumer和Producer的啓動流程。express
首先先看下客戶端的demobootstrap
Producer:api
public class SyncProducer { public static void main (String[] args) throws Exception { // 實例化消息生產者Producer DefaultMQProducer producer = new DefaultMQProducer ("GroupTest"); // 設置NameServer的地址 producer.setNamesrvAddr ("localhost:9876"); // 啓動Producer實例 producer.start (); for (int i = 0; i < 100; i++) { // 建立消息,並指定Topic,Tag和消息體 Message msg = new Message ("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 發送消息到一個Broker SendResult sendResult = producer.send (msg); // 經過sendResult返回消息是否成功送達 System.out.printf ("%s%n", sendResult); } // 若是再也不發送消息,關閉Producer實例。 producer.shutdown (); } }
Consumer:緩存
public class Consumer { public static void main (String[] args) throws InterruptedException, MQClientException { // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest"); // 設置NameServer的地址 consumer.setNamesrvAddr ("localhost:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾須要消費的消息 consumer.subscribe ("TopicTest", "*"); // 註冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener (new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs); // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啓動消費者實例 consumer.start (); System.out.printf ("Consumer Started.%n"); } }
Producer和Consumer的啓動相似,在初始化而後進行必要設置(主要是客戶端所屬的Group和NameServer地址)後,執行start方法啓動後臺監聽服務,事實上Producer和Consumer都是調用同一個類MQClientInstance的start方法,下圖爲繼承關係:微信
DefaultMQproducer和DefaultMQPushConsumer都繼承自ClientConfig,顧名思義ClientConfig表示客戶端的配置,包括NameServer地址、客戶端地址、客戶端實例名等。因爲Producer和Consumer都須要同Broker和NameServer交互,因此配置上有不少相同,這兩個將主要功能的實現都委託給了對應的Impl(DefaultMQProducerImpl和DefaultMQPushConsumerImpl)。Impl內部調用了MQClientInstance來完成客戶端同遠程交互的主要功能,而Producer和Consumer則封裝本身相關的行爲,MQClientInstance內部又委託忒了MQClientAPIImpl。app
DefaultMQProducer的啓動以下:dom
DefaultMQProducer將start委託給了DefaultMQProducerImpl來完成,主要過程爲:tcp
DefaultMQPushConsumer的啓動以下:ide
DefaultMQPushConsumer一樣將start委託給了DefaultMQPushConsumerImpl來完成,流程上也類似。但相比DefaultMQProducer多了不少其餘組件來輔助消費過程,如rebalance、offset管理等,主要過程爲:fetch
DefaultMQPushConsumer爲推模式,RocketMQ還提供了拉模式來消費消息,實現類爲DefaultMQPullConsumer,啓動過程相似,推模式是用拉模式來實現的,重點實現都在MQClientInstace中。
MQClientInstance爲一個門戶類,組合了各功能,以下,包括Rebalance、消費數據統計、生產消息、消費消息等,這些都有對應的實現。
上面說過,Producer和Consumer在啓動的時候,都會在內部先初始化一個MQClientInstance對象,而後調用其start方法啓動對應的後臺程序,以下:
MQClientInstance的start方法除了調用自身進行準備工做外,也調用了其餘組件的start方法開始它們的準備工做,主要流程爲:
下面詳細介紹下各個過程。
該方法用於更新NameServer地址,該方法會從http://xxx:port/rocketmq/yyy
,默認8080端口(若是xxx中沒有:,即不帶端口時)中獲取NameServer地址(xxx爲域名,由系統配置項rocketmq.namesrv.domain控制,默認爲jmenv.tbsite.net;yyy爲訪問路徑,由系統配置項rocketmq.namesrv.domain.subgroup控制,默認爲nsaddr)。該地址要求返回結果爲一個ip列表,以;隔開,若是獲取回來的地址跟現有的地址不一致則會更新緩存的NameServer地址列表。解析出來的地址列表用於根據NettyRemotingClient內部持有的變量:
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
該方法在內部調用了NettyRemotingClient的start方法,用於初始化Netty客戶端。NettyRemotingClient是基於Netty實現的tcp協議客戶端,主要流程爲:
關於NettyRemotingClient後面會專門進行講解,這裏只介紹在客戶端啓動時其作了哪些動做。
該方法用於根據客戶端實例關注的全部topic的路由信息,包括客戶端監聽的topic以及producer生產的topic。首先會遍歷從MQClientInstance內部的consumerTable和consumerTable的客戶端實例,拿到全部的topic信息,而後挨個更新topic的路由。
同步topic路由時,會經過NettyRemotingClient選擇一個NameServer獲取topic路由信息,而後判斷topic信息是否發生了更改,主要比較topic所對應的Queue和Broker是否發生了更改。若路由信息發生了更改則會同步topic所在的broker地址列表,即內部的brokerAddrTable屬性;接着同步produer關注的topic路由信息,即producerTable屬性;接着同步consumer訂閱的topic路由信息,即consumerTable屬性;最後更新本地topic信息,即topicRouteTable屬性。
該方法會遍歷MQClient所持有的各個producer和consumer,將客戶端信息構造爲HeartbeatData對象,而後調用MQClientAPIImpl的sendHearbeat方法,向全部的broker上報心跳數據。心跳內容包括:
該方法會遍歷consumerTable裏的全部MQConsumer對象,獲取每一個隊列處理的MessageQueue,而後調用OffsetStore持久化全部的MessageQueue。OffsetStore後面會專門進行講解。
該方法主要是動態調整DefaultMQPushConsumerImpl(推模式)客戶端消費線程池的大小。前面說過推模式是經過包裝拉模式來實現的,內部都依賴PullAPIWrapper。實現上推模式多了一個ConsumeMessageService定時使用拉模式消費消息,該實現須要一個線程池,adjustThreadPool就是動態調整該線程池的大小。關於客戶端消費消息的過程,後面也會專門進行講解。
PullMessageService用於封裝拉模式以實現推模式。它會循環從內部的LinkedBlockingQueue<PullRequest>中拿出PullRequest對象(消費q消息封裝的對象),選取一個可用的客戶端實例DefaultMQPushConsumerImpl,調用其pullMessage方法.該方法會判斷消費進度,決定是當即消費仍是延遲消費,若是是延遲消費則再放回LinkedBlockingQueue中等待消費;若是是直接消費,則調用PullMessageService(拉模式)的executePullRequestImmediately消費消息.
PullMessageService的基礎關係以下:
PullMessageService.start內部主要是啓動線程,該線程會循環執行執行任務,具體實現會在後續介紹消息消費的時候說起。
該方法用於啓動rebalance任務。RebalanceService同PullMessageService相同,都繼承自ServiceThread類,,並實現了run方法。RebalanceService在run方法中等待必定時間(默認20S,能夠經過rocketmq.client.rebalance.waitInterval配置具體時間)後會調用MQClientInstance.doRebalance執行具體的動做。具體實現會在後續介紹rebalance實現的時候說起。
在上面2.
時有說起該流程,這裏的DefaultMQPushConsumerImpl對象是Group爲CLIENT_INNER_PRODUCER
的內部對象。
客戶端的啓動過程就如上面介紹,下面附上該部分當時源碼閱讀過程作的筆記簡圖,該圖描述了客戶端啓動過程的大體過程:
更多原創內容請搜索微信公衆號:啊駝(doubaotaizi)