rocketmq3.26研究之五DefaultMQPushConsumer

  1. DefaultMQPushConsumerexpress

    rocketmq客戶端消費者實現,從名字上已經能夠看出其消息獲取方式爲broker往消費端推送數據,其內部實現了流控,消費位置上報等等。緩存

    • 重要字段 
      1 String consumerGroup 消費者組名,必須設置 
      須要注意的是,多個消費者若是具備一樣的組名,那麼這些消費者必須只消費同一個topic,具體緣由參見rocketmq問題彙總-一個consumerGroup只對應一個topic 
      2 MessageModel messageModel 
      消費的方式,分爲兩種: 
      2.1 BROADCASTING 廣播模式,即全部的消費者能夠消費一樣的消息 
      2.2 CLUSTERING 集羣模式,即全部的消費者平均來消費一組消息 
      3 ConsumeFromWhere consumeFromWhere 
      消費者從那個位置消費,分別爲: 
      3.1 CONSUME_FROM_LAST_OFFSET:第一次啓動從隊列最後位置消費,後續再啓動接着上次消費的進度開始消費 
      3.2 CONSUME_FROM_FIRST_OFFSET:第一次啓動從隊列初始位置消費,後續再啓動接着上次消費的進度開始消費 
      3.3 CONSUME_FROM_TIMESTAMP:第一次啓動從指定時間點位置消費,後續再啓動接着上次消費的進度開始消費 
      以上所說的第一次啓動是指歷來沒有消費過的消費者,若是該消費者消費過,那麼會在broker端記錄該消費者的消費位置,若是該消費者掛了再啓動,那麼自動從上次消費的進度開始,見RemoteBrokerOffsetStore 
      4 AllocateMessageQueueStrategy allocateMessageQueueStrategy 
      消息分配策略,用於集羣模式下,消息平均分配給全部客戶端 
      默認實現爲AllocateMessageQueueAveragely 
      Map<topic, sub expression> subscription // topic對應的訂閱tag 
      6 MessageListener messageListener //客戶端消費消息的實現類 
      7 OffsetStore offsetStore //offset存儲實現,分爲本地存儲或遠程存儲 
      8 int consumeThreadMin = 20 //線程池自動調整,參見MQClientInstance調整客戶端消費線程池 
      9 int consumeThreadMax = 64//線程池自動調整,參見MQClientInstance調整客戶端消費線程池 
      10 long adjustThreadPoolNumsThreshold = 100000 
      11 int consumeConcurrentlyMaxSpan = 2000//單隊列並行消費最大跨度,用於流控 
      12 int pullThresholdForQueue = 1000 // 一個queue最大消費的消息個數,用於流控 
      13 long pullInterval = 0 //消息拉取時間間隔,默認爲0,即拉完一次立馬拉第二次,單位毫秒 
      14 consumeMessageBatchMaxSize = 1//併發消費時,一次消費消息的數量,默認爲1,假如修改成50,此時如有100條消息,那麼會建立兩個線程,每一個線程分配50條消息。 
      15 pullBatchSize = 32 //消息拉取一次的數量 
      16 boolean postSubscriptionWhenPull = false 
      17 boolean unitMode = false 
      18 DefaultMQPushConsumerImpl defaultMQPushConsumerImpl 
      消費者實現類,全部的功能都委託給DefaultMQPushConsumerImpl來實現
    • 重要方法 
      1 subscribe(String topic, String subExpression) 
      訂閱某個topic,subExpression傳*爲訂閱該topic全部消息 
      2 registerMessageListener(MessageListenerConcurrently messageListener) 
      註冊消息回調,若是須要順序消費,須要註冊MessageListenerOrderly的實現 
      3 start 
      啓動消息消費,參照DefaultMQPushConsumerImpl.start
  2. DefaultMQPushConsumerImpl併發

    消費者具體實現類app

    • 重要字段 
      RebalancePushImpl rebalanceImpl負載均衡實現類 
      MQClientInstance mQClientFactory 因爲在介紹DefaultMQProducer時已經介紹過MQClientInstance的做用,故在此再也不解釋,若有須要能夠點擊連接跳過去查看 
      PullAPIWrapper pullAPIWrapper 
      拉取消息的封裝
    • 重要方法 
      1 start 
      該方法實現了consumer的啓動,並開始消費消息,主要包括如下幾個方面: 
      1.1 校驗設置參數是否合法 
      1.2 將訂閱關係(即topic<->*)複製到RebalancePushImpl,包括%RETRY% topic 
      1.3 設置instanceName爲jvm pid 
      1.4 初始化MQClientInstance實例(這個實例和rocketmq3.26研究之四DefaultMQProducer中的MQClientInstance是同樣的),也就是說rocketmq的producer和consumer都持有MQClientInstance的實例,來實現其功能。 
      1.5 爲RebalancePushImpl設置參數,包括消費者組名,消費模式(集羣or廣播) 
      1.6 初始化PullAPIWrapper 
      1.7 初始化OffsetStore,集羣消費默認爲RemoteBrokerOffsetStore 
      1.8 初始化ConsumeMessageService並啓動,根據順序消費和併發消費初始化不一樣的實例,集羣消費爲ConsumeMessageConcurrentlyService,順序消費爲ConsumeMessageOrderlyService(這個再也不介紹) 
      1.9 註冊本身,保證一個consumer group只對應一個DefaultMQPushConsumerImpl的實例 
      1.10 啓動MQClientInstance參見start方法 
      1.11 updateTopicSubscribeInfoWhenSubscriptionChanged 
      循環全部topic,調用MQClientInstance的updateTopicRouteInfoFromNameServer,來更新topic的路由信息和訂閱信息 
      1.12 給全部的broker發送心跳 
      1.13 進行rebalance 
      2 pullMessage(final PullRequest pullRequest) 
      消息拉取方法,此方法是最關鍵的方法,其執行步驟以下: 
      2.1 獲取ProcessQueue對象 
      2.2 判斷這個隊列是否已經中止了,參見updateProcessQueueTableInRebalance 
      2.3 設置ProcessQueue的最近拉取時間 
      2.4 從ProcessQueue獲取其內含有的消息個數,默認大於1000,則進行流控,所謂的流控即延時執行消息拉取,調用executePullRequestLater實現。 
      2.5 如果集羣消費模式,從ProcessQueue獲取offset跨度,默認大於2000,則進行流控。這個限制貌似和2.4同樣,由於這裏的offset跨度也就是消費的位置,其實也就是消息個數,可是這裏是指並行消費,便可能存在多個消費者消費同一個queue?(這塊還不太清楚),那麼此時就會offset跨度>消息數了,那個這個限制實際是限制了並行消費一個queue時的最大消息數,也就是跨度。 
      2.6 從RebalanceImpl獲取訂閱關係,參考subscriptionInner 
      2.7 構造PullCallback的匿名內部實現類,進行回調,請參考DefaultMQPushConsumerImpl.PullCallback 
      2.8 調用PullAPIWrapper.pullKernelImpl拉取數據
  3. AllocateMessageQueueAveragely負載均衡

    集羣消費時用到該類,該類爲消費者分配queue,其是AllocateMessageQueueStrategy的默認實現jvm

    • 重要方法 
      List<MessageQueue> allocate( 
      String consumerGroup, //消費者屬組,此方法中只用於輸出日誌用 
      String currentCID, //cid,默認爲ip@pid,該參數的做用參考rocketmq問題彙總-instanceName參數什麼時候該設置? 
      List<MessageQueue> mqAll,//topic<->broker name<->queue id關係對象 
      List cidAll)// 同一consumer group下的全部的cid列表 
      該方法實現了爲消費者平均分配queue。其首先須要根據全部的消費者數和全部queue的量計算出平均每一個消費者須要消費多少個queue,再根據當前消費者在消費者組的位置(即currentCID在cidAll的位置),分配相應的queue。
  4. RebalancePushImplpost

    該類實現了負載均衡,如下屬性或方法來自於它本身或其父類RebalanceImplfetch

    • 重要屬性 
      ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner 
      此map保存了topic和該topic對應的訂閱tag,默認爲*,即訂閱topic下全部的消息。 
      ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable 
      此map保存了topic<->brokerName<->queueId的對應關係 
      ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable 
      此map保存了MessageQueue的處理隊列,具體來講就是某一個消費者,消費某個boker的某個queue的處理隊列。
    • 重要方法 
      1 doRebalance 
      遍歷全部訂閱的topic,進行rebalanceByTopic 
      2 rebalanceByTopic 
      根據廣播或集羣模式進行rebalance,如下對集羣消費模式說一下: 
      2.1 首先根據topic獲取到Set<MessageQueue> 
      2.2 其次根據topic和consumer group獲取到全部的consumer的cid列表(調用MQClientInstance.findConsumerIdList實現) 
      2.3 調用AllocateMessageQueueAveragely.allocate來從新獲取相應的MessageQueue列表 
      2.4 根據上步返回的結果,調用updateProcessQueueTableInRebalance 
      3 updateProcessQueueTableInRebalance 
      這個方法功能以下: 
      3.1更新processQueueTable 
      具體的爲查找processQueueTable中無用的ProcessQueue,標記爲中止,移除,查找上步中的MessageQueue列表新的對應關係,建立新的ProcessQueue並添加進去,並生成新的PullRequest,此時有個關鍵的字段,即PullRequest.nextOffset,從computePullFromWhere計算得出,這個字段關係着這個請求下次拉取數據的位置。 
      3.2 針對新的PullRequest調用PullMessageService.executePullRequestImmediately進行消息拉取 
      4 computePullFromWhere(MessageQueue mq) 
      計算一個queue該從那個位置開始拉取數據,具體實現以下(如下分析基於集羣消費模式): 
      4.1 首先須要肯定消費的位置,即ConsumeFromWhere 
      4.2 獲取其OffsetStore,集羣模式默認爲RemoteBrokerOffsetStore 
      4.3 若是消費位置爲CONSUME_FROM_LAST_OFFSET,那麼首先須要調用RemoteBrokerOffsetStore.fetchConsumeOffsetFromBroker獲取消費位置(該消費位置在broker端的存儲請參考ConsumerOffsetManager.queryOffset),若是獲取不到,則獲取此隊列的最大消費位置。 
      4.4 若是消費位置爲CONSUME_FROM_FIRST_OFFSET,那麼首先須要調用RemoteBrokerOffsetStore.fetchConsumeOffsetFromBroker獲取消費位置,若是獲取不到,則返回0。 
      4.5 若是消費位置爲CONSUME_FROM_TIMESTAMP,那麼首先須要調用RemoteBrokerOffsetStore.fetchConsumeOffsetFromBroker獲取消費位置,若是獲取不到,則調用MQAdminImpl.searchOffset獲取位置。
  5. MessageQueue.net

    該類存儲了topic的路由信息,好比testTopic對應broker-a的queue-0等線程

    • 重要字段 
      1 String topic 
      2 String brokerName 
      3 int queueId
  6. RemoteBrokerOffsetStore

    該類實現了偏移量的存儲和查詢

    • 重要方法 
      fetchConsumeOffsetFromBroker(MessageQueue mq) 
      根據broker name獲取broker地址,以後根據topic,group name和queue id獲取消費的位置
  7. PullMessageService

    該類從名字上看是拉取消息服務,其實它只是對拉取請求進行了封裝,使其隊列化

    • 重要字段 
      LinkedBlockingQueue<PullRequest> pullRequestQueue 
      拉取請求隊列
    • 重要方法 
      1 run 
      不斷的從pullRequestQueue中取出請求,並調用消息拉取 
      2 pullMessage(PullRequest pullRequest) 
      消息拉取方法,根據consumer group查找DefaultMQPushConsumerImpl而後調用其pullMessage方法進行消息拉取 
      3 executePullRequestLater(PullRequest pullRequest, long timeDelay內部爲一個定時任務,timeDelay毫秒後將pullRequest放到pullRequestQueue中
  8. PullRequest

    拉取請求

    • 重要字段 
      1 String consumerGroup 
      消費者組名 
      2 MessageQueue messageQueue 
      消息隊列對應關係,主要包括topic<->broker name<->queue id 
      ProcessQueue processQueue 
      隊列消費處理對象 
      long nextOffset 
      下一次請求的偏移量
  9. ProcessQueue 

    表明了一個消費的queue,假設consumer C,消費topic T,T的路由爲broker-0,broker-1,那麼共有兩個ProcessQueue對應consumer C。 
    - 重要字段 
    TreeMap<Long, MessageExt> msgTreeMap用於緩存從broker拉取的消息。

  10. DefaultMQPushConsumerImpl.PullCallback 
    其內部有兩個重要方法 
    1 onException 
    發生異常時延時重試,即調用executePullRequestLater 
    2 onSuccess 
    成功返回結果後,處理步驟以下: 
    2.1 調用processPullResult進行解碼 
    2.2 針對的消息的狀態NO_NEW_MSG,NO_MATCHED_MSG,OFFSET_ILLEGAL,FOUND,只介紹正常狀況即FOUND,其他狀況不作分析。 
    針對FOUND,首先將消息列表存入ProcessQueue,以後調用ConsumeMessageService.submitConsumeRequest進行消費。 
    2.3 將PullRequest存入PullMessageService的queue預備下一次的消息拉取,注意,此時,便造成了一個循環,循環的拉取消息 
  11. PullAPIWrapper

    消息拉取的封裝

    • 重要字段 
      ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable 
      存儲了mq對應的brokerid,即從broker master拉取消息,仍是從broker slave拉取消息。
    • 重要方法 
      1 PullResult pullKernelImpl( 
      MessageQueue mq,// 1 mq 
      String subExpression,// 2 訂閱的tag,默認爲* 
      final long subVersion,// 3 訂閱的時間 
      final long offset,// 4 拉取消息的位置,參照更新processQueueTable 
      final int maxNums,// 5 拉取消息的數量,默認爲32 
      final int sysFlag,// 6 一些標誌 
      final long commitOffset,// 7 提交的offset 
      final long brokerSuspendMaxTimeMillis,// 8 broker無響應最大時間,默認15秒 
      final long timeoutMillis,// 9 客戶端超時時間,默認20秒 
      final CommunicationMode communicationMode,// 10 默認爲CommunicationMode.ASYNC 
      final PullCallback pullCallback// 參見PullCallback 
      1.1 查找brokerid,調用recalculatePullFromWhichNode,爲了看是否應該從slave拉取消息,主要爲了防止master壓力過大或掛掉的狀況 
      1.2 從MQClientInstance.brokerAddrTable獲取broker地址 
      1.3 調用MQClientAPIImpl.pullMessage,進行與broker交互,因爲在介紹DefaultMQProducer已經大概說過了,在此就不囉嗦了,須要囉嗦一下的是返回結果後,會回調PullCallback 
      2 recalculatePullFromWhichNode(MessageQueue mq) 
      從pullFromWhichNodeTable獲取brokerid,不存在返回0,即master 
      3 processPullResult 
      處理pullKernelImpl返回的結果,主要進行消息的解碼和更新pullFromWhichNodeTable
  12. ConsumeMessageConcurrentlyService

    • 重要方法 
      1 submitConsumeRequest(List msgs, …) 
      根據消費者設置的consumeMessageBatchMaxSize來決定提交多少個任務來消費,若是msgs.size<=consumeMessageBatchMaxSize,那麼提交一個任務就夠了,若是msgs.size>consumeMessageBatchMaxSize,那麼須要提交msgs.size%consumeMessageBatchMaxSize個任務,這裏的任務實現類爲ConsumeRequest 
      2 processConsumeResult 
      根據返回的狀態,將集羣模式消費失敗的消息,從新發送到broker,注意:須要消費者實現MessageListenerConcurrently.consumeMessage(final List msgs, ConsumeConcurrentlyContext context)方法時,假設一共100個消息,消費了10個,消費第11個時發生異常,須要設置context.setAckIndex(11),而後返回,這樣第11至100個消息會被從新發送至broker,等待下一次消費。 
      而後從ProcessQueue中移除消費過的消息。 
      最後更新最新的offset至RemoteBrokerOffsetStore。另外,關於客戶端將offset同步至broker的實如今MQClientInstance更新客戶端offset中介紹了,在此再也不贅述。
  13. ConsumeMessageConcurrentlyService.ConsumeRequest

    併發消費實現

    • 重要字段 
      List msgs; 
      ProcessQueue processQueue; 
      MessageQueue messageQueue;
    • 重要方法 
      run 
      此方法爲消費消息的地方,分爲如下幾個步驟: 
      1 消費前執行預設的鉤子方法,若是有的話。 
      2 調用MessageListenerConcurrently的實現類來消費消息,該類須要消費者本身實現。 
      3 消費後執行預設的鉤子方法,若是有的話。 
      4 調用processConsumeResult處理返回結果
  14. 最後,就不附圖了,參考rocketmq3.26研究之DefaultMQPushConsumer消費流程來看相應的流程圖吧

相關文章
相關標籤/搜索