DefaultMQPushConsumerexpress
rocketmq客戶端消費者實現,從名字上已經能夠看出其消息獲取方式爲broker往消費端推送數據,其內部實現了流控,消費位置上報等等。緩存
- 重要字段
1 String consumerGroup 消費者組名,必須設置
須要注意的是,多個消費者若是具備一樣的組名,那麼這些消費者必須只消費同一個topic,具體緣由參見rocketmq問題彙總-一個consumerGroup只對應一個topic
2 MessageModel messageModel
消費的方式,分爲兩種:
2.1 BROADCASTING 廣播模式,即全部的消費者能夠消費一樣的消息
2.2 CLUSTERING 集羣模式,即全部的消費者平均來消費一組消息
3 ConsumeFromWhere consumeFromWhere
消費者從那個位置消費,分別爲:
3.1 CONSUME_FROM_LAST_OFFSET:第一次啓動從隊列最後位置消費,後續再啓動接着上次消費的進度開始消費
3.2 CONSUME_FROM_FIRST_OFFSET:第一次啓動從隊列初始位置消費,後續再啓動接着上次消費的進度開始消費
3.3 CONSUME_FROM_TIMESTAMP:第一次啓動從指定時間點位置消費,後續再啓動接着上次消費的進度開始消費
以上所說的第一次啓動是指歷來沒有消費過的消費者,若是該消費者消費過,那麼會在broker端記錄該消費者的消費位置,若是該消費者掛了再啓動,那麼自動從上次消費的進度開始,見RemoteBrokerOffsetStore
4 AllocateMessageQueueStrategy allocateMessageQueueStrategy
消息分配策略,用於集羣模式下,消息平均分配給全部客戶端
默認實現爲AllocateMessageQueueAveragely
5Map<topic, sub expression>
subscription // topic對應的訂閱tag
6 MessageListener messageListener //客戶端消費消息的實現類
7 OffsetStore offsetStore //offset存儲實現,分爲本地存儲或遠程存儲
8 int consumeThreadMin = 20 //線程池自動調整,參見MQClientInstance調整客戶端消費線程池
9 int consumeThreadMax = 64//線程池自動調整,參見MQClientInstance調整客戶端消費線程池
10 long adjustThreadPoolNumsThreshold = 100000
11 int consumeConcurrentlyMaxSpan = 2000//單隊列並行消費最大跨度,用於流控
12 int pullThresholdForQueue = 1000 // 一個queue最大消費的消息個數,用於流控
13 long pullInterval = 0 //消息拉取時間間隔,默認爲0,即拉完一次立馬拉第二次,單位毫秒
14 consumeMessageBatchMaxSize = 1//併發消費時,一次消費消息的數量,默認爲1,假如修改成50,此時如有100條消息,那麼會建立兩個線程,每一個線程分配50條消息。
15 pullBatchSize = 32 //消息拉取一次的數量
16 boolean postSubscriptionWhenPull = false
17 boolean unitMode = false
18 DefaultMQPushConsumerImpl defaultMQPushConsumerImpl
消費者實現類,全部的功能都委託給DefaultMQPushConsumerImpl來實現- 重要方法
1 subscribe(String topic, String subExpression)
訂閱某個topic,subExpression傳*爲訂閱該topic全部消息
2 registerMessageListener(MessageListenerConcurrently messageListener)
註冊消息回調,若是須要順序消費,須要註冊MessageListenerOrderly的實現
3 start
啓動消息消費,參照DefaultMQPushConsumerImpl.start
DefaultMQPushConsumerImpl併發
消費者具體實現類app
- 重要字段
1 RebalancePushImpl rebalanceImpl負載均衡實現類
2 MQClientInstance mQClientFactory 因爲在介紹DefaultMQProducer時已經介紹過MQClientInstance的做用,故在此再也不解釋,若有須要能夠點擊連接跳過去查看
3 PullAPIWrapper pullAPIWrapper
拉取消息的封裝- 重要方法
1 start
該方法實現了consumer的啓動,並開始消費消息,主要包括如下幾個方面:
1.1 校驗設置參數是否合法
1.2 將訂閱關係(即topic<->*)複製到RebalancePushImpl,包括%RETRY% topic
1.3 設置instanceName爲jvm pid
1.4 初始化MQClientInstance實例(這個實例和rocketmq3.26研究之四DefaultMQProducer中的MQClientInstance是同樣的),也就是說rocketmq的producer和consumer都持有MQClientInstance的實例,來實現其功能。
1.5 爲RebalancePushImpl設置參數,包括消費者組名,消費模式(集羣or廣播)
1.6 初始化PullAPIWrapper
1.7 初始化OffsetStore,集羣消費默認爲RemoteBrokerOffsetStore
1.8 初始化ConsumeMessageService並啓動,根據順序消費和併發消費初始化不一樣的實例,集羣消費爲ConsumeMessageConcurrentlyService,順序消費爲ConsumeMessageOrderlyService(這個再也不介紹)
1.9 註冊本身,保證一個consumer group只對應一個DefaultMQPushConsumerImpl的實例
1.10 啓動MQClientInstance參見start方法
1.11 updateTopicSubscribeInfoWhenSubscriptionChanged
循環全部topic,調用MQClientInstance的updateTopicRouteInfoFromNameServer,來更新topic的路由信息和訂閱信息
1.12 給全部的broker發送心跳
1.13 進行rebalance
2 pullMessage(final PullRequest pullRequest)
消息拉取方法,此方法是最關鍵的方法,其執行步驟以下:
2.1 獲取ProcessQueue對象
2.2 判斷這個隊列是否已經中止了,參見updateProcessQueueTableInRebalance
2.3 設置ProcessQueue的最近拉取時間
2.4 從ProcessQueue獲取其內含有的消息個數,默認大於1000,則進行流控,所謂的流控即延時執行消息拉取,調用executePullRequestLater實現。
2.5 如果集羣消費模式,從ProcessQueue獲取offset跨度,默認大於2000,則進行流控。這個限制貌似和2.4同樣,由於這裏的offset跨度也就是消費的位置,其實也就是消息個數,可是這裏是指並行消費,便可能存在多個消費者消費同一個queue?(這塊還不太清楚),那麼此時就會offset跨度>消息數了,那個這個限制實際是限制了並行消費一個queue時的最大消息數,也就是跨度。
2.6 從RebalanceImpl獲取訂閱關係,參考subscriptionInner
2.7 構造PullCallback的匿名內部實現類,進行回調,請參考DefaultMQPushConsumerImpl.PullCallback
2.8 調用PullAPIWrapper.pullKernelImpl拉取數據
AllocateMessageQueueAveragely負載均衡
集羣消費時用到該類,該類爲消費者分配queue,其是AllocateMessageQueueStrategy的默認實現jvm
- 重要方法
List<
MessageQueue>
allocate(
String consumerGroup, //消費者屬組,此方法中只用於輸出日誌用
String currentCID, //cid,默認爲ip@pid,該參數的做用參考rocketmq問題彙總-instanceName參數什麼時候該設置?
List<
MessageQueue>
mqAll,//topic<->broker name<->queue id關係對象
List cidAll)// 同一consumer group下的全部的cid列表
該方法實現了爲消費者平均分配queue。其首先須要根據全部的消費者數和全部queue的量計算出平均每一個消費者須要消費多少個queue,再根據當前消費者在消費者組的位置(即currentCID在cidAll的位置),分配相應的queue。
RebalancePushImplpost
該類實現了負載均衡,如下屬性或方法來自於它本身或其父類RebalanceImplfetch
- 重要屬性
1ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner
此map保存了topic和該topic對應的訂閱tag,默認爲*,即訂閱topic下全部的消息。
2ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable
此map保存了topic<->brokerName<->queueId的對應關係
3ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable
此map保存了MessageQueue的處理隊列,具體來講就是某一個消費者,消費某個boker的某個queue的處理隊列。- 重要方法
1 doRebalance
遍歷全部訂閱的topic,進行rebalanceByTopic
2 rebalanceByTopic
根據廣播或集羣模式進行rebalance,如下對集羣消費模式說一下:
2.1 首先根據topic獲取到Set<
MessageQueue>
2.2 其次根據topic和consumer group獲取到全部的consumer的cid列表(調用MQClientInstance.findConsumerIdList實現)
2.3 調用AllocateMessageQueueAveragely.allocate來從新獲取相應的MessageQueue列表
2.4 根據上步返回的結果,調用updateProcessQueueTableInRebalance
3 updateProcessQueueTableInRebalance
這個方法功能以下:
3.1更新processQueueTable
具體的爲查找processQueueTable中無用的ProcessQueue,標記爲中止,移除,查找上步中的MessageQueue列表新的對應關係,建立新的ProcessQueue並添加進去,並生成新的PullRequest,此時有個關鍵的字段,即PullRequest.nextOffset,從computePullFromWhere計算得出,這個字段關係着這個請求下次拉取數據的位置。
3.2 針對新的PullRequest調用PullMessageService.executePullRequestImmediately進行消息拉取
4 computePullFromWhere(MessageQueue mq)
計算一個queue該從那個位置開始拉取數據,具體實現以下(如下分析基於集羣消費模式):
4.1 首先須要肯定消費的位置,即ConsumeFromWhere
4.2 獲取其OffsetStore,集羣模式默認爲RemoteBrokerOffsetStore
4.3 若是消費位置爲CONSUME_FROM_LAST_OFFSET,那麼首先須要調用RemoteBrokerOffsetStore.fetchConsumeOffsetFromBroker獲取消費位置(該消費位置在broker端的存儲請參考ConsumerOffsetManager.queryOffset),若是獲取不到,則獲取此隊列的最大消費位置。
4.4 若是消費位置爲CONSUME_FROM_FIRST_OFFSET,那麼首先須要調用RemoteBrokerOffsetStore.fetchConsumeOffsetFromBroker獲取消費位置,若是獲取不到,則返回0。
4.5 若是消費位置爲CONSUME_FROM_TIMESTAMP,那麼首先須要調用RemoteBrokerOffsetStore.fetchConsumeOffsetFromBroker獲取消費位置,若是獲取不到,則調用MQAdminImpl.searchOffset獲取位置。
MessageQueue.net
該類存儲了topic的路由信息,好比testTopic對應broker-a的queue-0等線程
- 重要字段
1 String topic
2 String brokerName
3 int queueId
RemoteBrokerOffsetStore
該類實現了偏移量的存儲和查詢
- 重要方法
fetchConsumeOffsetFromBroker(MessageQueue mq)
根據broker name獲取broker地址,以後根據topic,group name和queue id獲取消費的位置
PullMessageService
該類從名字上看是拉取消息服務,其實它只是對拉取請求進行了封裝,使其隊列化
- 重要字段
LinkedBlockingQueue<PullRequest> pullRequestQueue
拉取請求隊列- 重要方法
1 run
不斷的從pullRequestQueue中取出請求,並調用消息拉取
2 pullMessage(PullRequest pullRequest)
消息拉取方法,根據consumer group查找DefaultMQPushConsumerImpl而後調用其pullMessage方法進行消息拉取
3 executePullRequestLater(PullRequest pullRequest, long timeDelay內部爲一個定時任務,timeDelay毫秒後將pullRequest放到pullRequestQueue中
PullRequest
拉取請求
- 重要字段
1 String consumerGroup
消費者組名
2 MessageQueue messageQueue
消息隊列對應關係,主要包括topic<->broker name<->queue id
3 ProcessQueue processQueue
隊列消費處理對象
long nextOffset
下一次請求的偏移量
表明了一個消費的queue,假設consumer C,消費topic T,T的路由爲broker-0,broker-1,那麼共有兩個ProcessQueue對應consumer C。
- 重要字段
TreeMap<Long, MessageExt> msgTreeMap
用於緩存從broker拉取的消息。
PullAPIWrapper
消息拉取的封裝
- 重要字段
1ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable
存儲了mq對應的brokerid,即從broker master拉取消息,仍是從broker slave拉取消息。- 重要方法
1 PullResult pullKernelImpl(
MessageQueue mq,// 1 mq
String subExpression,// 2 訂閱的tag,默認爲*
final long subVersion,// 3 訂閱的時間
final long offset,// 4 拉取消息的位置,參照更新processQueueTable
final int maxNums,// 5 拉取消息的數量,默認爲32
final int sysFlag,// 6 一些標誌
final long commitOffset,// 7 提交的offset
final long brokerSuspendMaxTimeMillis,// 8 broker無響應最大時間,默認15秒
final long timeoutMillis,// 9 客戶端超時時間,默認20秒
final CommunicationMode communicationMode,// 10 默認爲CommunicationMode.ASYNC
final PullCallback pullCallback// 參見PullCallback
1.1 查找brokerid,調用recalculatePullFromWhichNode,爲了看是否應該從slave拉取消息,主要爲了防止master壓力過大或掛掉的狀況
1.2 從MQClientInstance.brokerAddrTable獲取broker地址
1.3 調用MQClientAPIImpl.pullMessage,進行與broker交互,因爲在介紹DefaultMQProducer已經大概說過了,在此就不囉嗦了,須要囉嗦一下的是返回結果後,會回調PullCallback
2 recalculatePullFromWhichNode(MessageQueue mq)
從pullFromWhichNodeTable獲取brokerid,不存在返回0,即master
3 processPullResult
處理pullKernelImpl返回的結果,主要進行消息的解碼和更新pullFromWhichNodeTable
ConsumeMessageConcurrentlyService
- 重要方法
1 submitConsumeRequest(List msgs, …)
根據消費者設置的consumeMessageBatchMaxSize來決定提交多少個任務來消費,若是msgs.size<=consumeMessageBatchMaxSize,那麼提交一個任務就夠了,若是msgs.size>consumeMessageBatchMaxSize,那麼須要提交msgs.size%consumeMessageBatchMaxSize個任務,這裏的任務實現類爲ConsumeRequest
2 processConsumeResult
根據返回的狀態,將集羣模式消費失敗的消息,從新發送到broker,注意:須要消費者實現MessageListenerConcurrently.consumeMessage(final List msgs, ConsumeConcurrentlyContext context)方法時,假設一共100個消息,消費了10個,消費第11個時發生異常,須要設置context.setAckIndex(11),而後返回,這樣第11至100個消息會被從新發送至broker,等待下一次消費。
而後從ProcessQueue中移除消費過的消息。
最後更新最新的offset至RemoteBrokerOffsetStore。另外,關於客戶端將offset同步至broker的實如今MQClientInstance更新客戶端offset中介紹了,在此再也不贅述。
ConsumeMessageConcurrentlyService.ConsumeRequest
併發消費實現
- 重要字段
List msgs;
ProcessQueue processQueue;
MessageQueue messageQueue;- 重要方法
run
此方法爲消費消息的地方,分爲如下幾個步驟:
1 消費前執行預設的鉤子方法,若是有的話。
2 調用MessageListenerConcurrently的實現類來消費消息,該類須要消費者本身實現。
3 消費後執行預設的鉤子方法,若是有的話。
4 調用processConsumeResult處理返回結果
最後,就不附圖了,參考rocketmq3.26研究之DefaultMQPushConsumer消費流程來看相應的流程圖吧