RocketMQ源碼解析:Message拉取&消費(下)

🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: node

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右


一、概述

本文接:《RocketMQ 源碼分析 —— Message 拉取與消費(上)》web

主要解析 Consumer消費 邏輯涉及到的源碼。json

二、Consumer

MQ 提供了兩類消費者:後端

  • PushConsumer:
    • 在大多數場景下使用。
    • 名字雖然是 Push 開頭,實際在實現時,使用 Pull 方式實現。經過 Pull 不斷不斷不斷輪詢 Broker 獲取消息。當不存在新消息時,Broker掛起請求,直到有新消息產生,取消掛起,返回新消息。這樣,基本和 Broker 主動 Push 作到接近的實時性(固然,仍是有相應的實時性損失)。原理相似 長輪詢( Long-Polling )
  • PullConsumer

本文主要講解PushConsumer,部分講解PullConsumer,跳過順序消費
本文主要講解PushConsumer,部分講解PullConsumer,跳過順序消費
本文主要講解PushConsumer,部分講解PullConsumer,跳過順序消費 api

三、PushConsumer 一覽

先看一張 PushConsumer 包含的組件以及組件之間的交互圖:數組

PushConsumer手繪圖.png
PushConsumer手繪圖.png

  • RebalanceService:均衡消息隊列服務,負責分配當前 Consumer 可消費的消息隊列( MessageQueue )。當有新的 Consumer 的加入或移除,都會從新分配消息隊列。
  • PullMessageService:拉取消息服務,不斷不斷不斷Broker 拉取消息,並提交消費任務到 ConsumeMessageService
  • ConsumeMessageService:消費消息服務,不斷不斷不斷消費消息,並處理消費結果。
  • RemoteBrokerOffsetStoreConsumer 消費進度管理,負責從 Broker 獲取消費進度,同步消費進度到 Broker
  • ProcessQueue :消息處理隊列。
  • MQClientInstance :封裝對 NamesrvBroker 的 API調用,提供給 ProducerConsumer 使用。

四、PushConsumer 訂閱

DefaultMQPushConsumerImpl#subscribe(...)

1: public void subscribe(String topic, String subExpression) throws MQClientException {
  2:     try {
  3:         // 建立訂閱數據
  4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
  5:             topic, subExpression);
  6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  7:         // 經過心跳同步Consumer信息到Broker
  8:         if (this.mQClientFactory != null) {
  9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 10:         }
 11:     } catch (Exception e) {
 12:         throw new MQClientException("subscription exception", e);
 13:     }
 14: }複製代碼
  • 說明 :訂閱 Topic
  • 第 3 至 6 行 :建立訂閱數據。詳細解析見:FilterAPI.buildSubscriptionData(...)
  • 第 7 至 10 行 :經過心跳同步 Consumer 信息到 Broker

FilterAPI.buildSubscriptionData(...)

1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2: String subString) throws Exception {
  3:     SubscriptionData subscriptionData = new SubscriptionData();
  4:     subscriptionData.setTopic(topic);
  5:     subscriptionData.setSubString(subString);
  6:     // 處理訂閱表達式
  7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
  8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL);
  9:     } else {
 10:         String[] tags = subString.split("\\|\\|");
 11:         if (tags.length > 0) {
 12:             for (String tag : tags) {
 13:                 if (tag.length() > 0) {
 14:                     String trimString = tag.trim();
 15:                     if (trimString.length() > 0) {
 16:                         subscriptionData.getTagsSet().add(trimString);
 17:                         subscriptionData.getCodeSet().add(trimString.hashCode());
 18:                     }
 19:                 }
 20:             }
 21:         } else {
 22:             throw new Exception("subString split error");
 23:         }
 24:     }
 25: 
 26:     return subscriptionData;
 27: }複製代碼
  • 說明 :根據 Topic 和 訂閱表達式 建立訂閱數據
  • subscriptionData.subVersion = System.currentTimeMillis()。

DefaultMQPushConsumer#registerMessageListener(...)

1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
  2:     this.messageListener = messageListener;
  3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
  4: }複製代碼
  • 說明 :註冊消息監聽器。

五、PushConsumer 消息隊列分配

RebalanceService&PushConsumer分配隊列
RebalanceService&PushConsumer分配隊列

RebalanceService

1: public class RebalanceService extends ServiceThread {
  2: 
  3:     /** 4: * 等待間隔,單位:毫秒 5: */
  6:     private static long waitInterval =
  7:         Long.parseLong(System.getProperty(
  8:             "rocketmq.client.rebalance.waitInterval", "20000"));
  9: 
 10:     private final Logger log = ClientLogger.getLog();
 11:     /** 12: * MQClient對象 13: */
 14:     private final MQClientInstance mqClientFactory;
 15: 
 16:     public RebalanceService(MQClientInstance mqClientFactory) {
 17:         this.mqClientFactory = mqClientFactory;
 18:     }
 19: 
 20:     @Override
 21:     public void run() {
 22:         log.info(this.getServiceName() + " service started");
 23: 
 24:         while (!this.isStopped()) {
 25:             this.waitForRunning(waitInterval);
 26:             this.mqClientFactory.doRebalance();
 27:         }
 28: 
 29:         log.info(this.getServiceName() + " service end");
 30:     }
 31: 
 32:     @Override
 33:     public String getServiceName() {
 34:         return RebalanceService.class.getSimpleName();
 35:     }
 36: }複製代碼
  • 說明 :均衡消息隊列服務,負責分配當前 Consumer 可消費的消息隊列( MessageQueue )。
  • 第 26 行 :調用 MQClientInstance#doRebalance(...) 分配消息隊列。目前有三種狀況狀況下觸發:微信

    • 第 25 行 等待超時,每 20s 調用一次。
    • PushConsumer 啓動時,調用 rebalanceService#wakeup(...) 觸發。
    • Broker 通知 Consumer 加入 或 移除時,Consumer 響應通知,調用 rebalanceService#wakeup(...) 觸發。

    詳細解析見:MQClientInstance#doRebalance(...)網絡

MQClientInstance#doRebalance(...)

1: public void doRebalance() {
  2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  3:         MQConsumerInner impl = entry.getValue();
  4:         if (impl != null) {
  5:             try {
  6:                 impl.doRebalance();
  7:             } catch (Throwable e) {
  8:                 log.error("doRebalance exception", e);
  9:             }
 10:         }
 11:     }
 12: }複製代碼
  • 說明 :遍歷當前 Client 包含的 consumerTable( Consumer集合 ),執行消息隊列分配。
  • 疑問:目前代碼調試下來,consumerTable 只包含 Consumer 本身。😈有大大對這個疑問有解答的,煩請解答下。
  • 第 6 行 :調用 MQConsumerInner#doRebalance(...) 進行隊列分配。DefaultMQPushConsumerImplDefaultMQPullConsumerImpl 分別對該接口方法進行了實現。DefaultMQPushConsumerImpl#doRebalance(...) 詳細解析見:DefaultMQPushConsumerImpl#doRebalance(...)

DefaultMQPushConsumerImpl#doRebalance(...)

1: public void doRebalance() {
  2:     if (!this.pause) {
  3:         this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
  4:     }
  5: }複製代碼

RebalanceImpl#doRebalance(...)

1: /** 2: * 執行分配消息隊列 3: * 4: * @param isOrder 是否順序消息 5: */
  6: public void doRebalance(final boolean isOrder) {
  7:     // 分配每一個 topic 的消息隊列
  8:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  9:     if (subTable != null) {
 10:         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 11:             final String topic = entry.getKey();
 12:             try {
 13:                 this.rebalanceByTopic(topic, isOrder);
 14:             } catch (Throwable e) {
 15:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 16:                     log.warn("rebalanceByTopic Exception", e);
 17:                 }
 18:             }
 19:         }
 20:     }
 21:     // 移除未訂閱的topic對應的消息隊列
 22:     this.truncateMessageQueueNotMyTopic();
 23: }
 24: 
 25: /** 26: * 移除未訂閱的消息隊列 27: */
 28: private void truncateMessageQueueNotMyTopic() {
 29:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 30:     for (MessageQueue mq : this.processQueueTable.keySet()) {
 31:         if (!subTable.containsKey(mq.getTopic())) {
 32: 
 33:             ProcessQueue pq = this.processQueueTable.remove(mq);
 34:             if (pq != null) {
 35:                 pq.setDropped(true);
 36:                 log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
 37:             }
 38:         }
 39:     }
 40: }複製代碼
  • #doRebalance(...) 說明 :執行分配消息隊列。
    • 第 7 至 20 行 :循環訂閱主題集合( subscriptionInner ),分配每個 Topic 的消息隊列。
    • 第 22 行 :移除未訂閱的 Topic 的消息隊列。
  • #truncateMessageQueueNotMyTopic(...) 說明 :移除未訂閱的消息隊列。當調用 DefaultMQPushConsumer#unsubscribe(topic) 時,只移除訂閱主題集合( subscriptionInner ),對應消息隊列移除在該方法。

RebalanceImpl#rebalanceByTopic(...)

1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
  2:     switch (messageModel) {
  3:         case BROADCASTING: {
  4:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  5:             if (mqSet != null) {
  6:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
  7:                 if (changed) {
  8:                     this.messageQueueChanged(topic, mqSet, mqSet);
  9:                     log.info("messageQueueChanged {} {} {} {}", //
 10:                         consumerGroup, //
 11:                         topic, //
 12:                         mqSet, //
 13:                         mqSet);
 14:                 }
 15:             } else {
 16:                 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 17:             }
 18:             break;
 19:         }
 20:         case CLUSTERING: {
 21:             // 獲取 topic 對應的 隊列 和 consumer信息
 22:             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 23:             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
 24:             if (null == mqSet) {
 25:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 26:                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
 27:                 }
 28:             }
 29: 
 30:             if (null == cidAll) {
 31:                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
 32:             }
 33: 
 34:             if (mqSet != null && cidAll != null) {
 35:                 // 排序 消息隊列 和 消費者數組。由於是在Client進行分配隊列,排序後,各Client的順序才能保持一致。
 36:                 List<MessageQueue> mqAll = new ArrayList<>();
 37:                 mqAll.addAll(mqSet);
 38: 
 39:                 Collections.sort(mqAll);
 40:                 Collections.sort(cidAll);
 41: 
 42:                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 43: 
 44:                 // 根據 隊列分配策略 分配消息隊列
 45:                 List<MessageQueue> allocateResult;
 46:                 try {
 47:                     allocateResult = strategy.allocate(//
 48:                         this.consumerGroup, //
 49:                         this.mQClientFactory.getClientId(), //
 50:                         mqAll, //
 51:                         cidAll);
 52:                 } catch (Throwable e) {
 53:                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
 54:                         e);
 55:                     return;
 56:                 }
 57: 
 58:                 Set<MessageQueue> allocateResultSet = new HashSet<>();
 59:                 if (allocateResult != null) {
 60:                     allocateResultSet.addAll(allocateResult);
 61:                 }
 62: 
 63:                 // 更新消息隊列
 64:                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
 65:                 if (changed) {
 66:                     log.info(
 67:                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
 68:                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
 69:                         allocateResultSet.size(), allocateResultSet);
 70:                     this.messageQueueChanged(topic, mqSet, allocateResultSet);
 71:                 }
 72:             }
 73:             break;
 74:         }
 75:         default:
 76:             break;
 77:     }
 78: }
 79: 
 80: /** 81: * 當負載均衡時,更新 消息處理隊列 82: * - 移除 在processQueueTable && 不存在於 mqSet 裏的消息隊列 83: * - 增長 不在processQueueTable && 存在於mqSet 裏的消息隊列 84: * 85: * @param topic Topic 86: * @param mqSet 負載均衡結果後的消息隊列數組 87: * @param isOrder 是否順序 88: * @return 是否變動 89: */
 90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
 91:     boolean changed = false;
 92: 
 93:     // 移除 在processQueueTable && 不存在於 mqSet 裏的消息隊列
 94:     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
 95:     while (it.hasNext()) { // TODO 待讀:
 96:         Entry<MessageQueue, ProcessQueue> next = it.next();
 97:         MessageQueue mq = next.getKey();
 98:         ProcessQueue pq = next.getValue();
 99: 
100:         if (mq.getTopic().equals(topic)) {
101:             if (!mqSet.contains(mq)) { // 不包含的隊列
102:                 pq.setDropped(true);
103:                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104:                     it.remove();
105:                     changed = true;
106:                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107:                 }
108:             } else if (pq.isPullExpired()) { // 隊列拉取超時,進行清理
109:                 switch (this.consumeType()) {
110:                     case CONSUME_ACTIVELY:
111:                         break;
112:                     case CONSUME_PASSIVELY:
113:                         pq.setDropped(true);
114:                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115:                             it.remove();
116:                             changed = true;
117:                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118:                                 consumerGroup, mq);
119:                         }
120:                         break;
121:                     default:
122:                         break;
123:                 }
124:             }
125:         }
126:     }
127: 
128:     // 增長 不在processQueueTable && 存在於mqSet 裏的消息隊列。
129:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數組
130:     for (MessageQueue mq : mqSet) {
131:         if (!this.processQueueTable.containsKey(mq)) {
132:             if (isOrder && !this.lock(mq)) {
133:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134:                 continue;
135:             }
136: 
137:             this.removeDirtyOffset(mq);
138:             ProcessQueue pq = new ProcessQueue();
139:             long nextOffset = this.computePullFromWhere(mq);
140:             if (nextOffset >= 0) {
141:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142:                 if (pre != null) {
143:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144:                 } else {
145:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146:                     PullRequest pullRequest = new PullRequest();
147:                     pullRequest.setConsumerGroup(consumerGroup);
148:                     pullRequest.setNextOffset(nextOffset);
149:                     pullRequest.setMessageQueue(mq);
150:                     pullRequest.setProcessQueue(pq);
151:                     pullRequestList.add(pullRequest);
152:                     changed = true;
153:                 }
154:             } else {
155:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156:             }
157:         }
158:     }
159: 
160:     // 發起消息拉取請求
161:     this.dispatchPullRequest(pullRequestList);
162: 
163:     return changed;
164: }複製代碼
  • #rebalanceByTopic(...) 說明 :分配 Topic 的消息隊列。
    • 第 3 至 19 行 :廣播模式( BROADCASTING ) 下,分配 Topic 對應的全部消息隊列。
    • 第 20 至 74 行 :集羣模式( CLUSTERING ) 下,分配 Topic 對應的部分消息隊列。
      • 第 21 至 40 行 :獲取 Topic 對應的消息隊列和消費者們,並對其進行排序。由於各 Consumer 是在本地分配消息隊列,排序後才能保證各 Consumer 順序一致。
      • 第 42 至 61 行 :根據 隊列分配策略( AllocateMessageQueueStrategy ) 分配消息隊列。詳細解析見:AllocateMessageQueueStrategy
      • 第 63 至 72 行 :更新 Topic 對應的消息隊列。
  • #updateProcessQueueTableInRebalance(...) 說明 :當分配隊列時,更新 Topic 對應的消息隊列,並返回是否有變動。
    • 第 93 至 126 行 :移除不存在於分配的消息隊列( mqSet ) 的 消息處理隊列( processQueueTable )。
      • 第 103 行 :移除不須要的消息隊列。詳細解析見:RebalancePushImpl#removeUnnecessaryMessageQueue(...)
      • 第 108 至 120 行 :隊列拉取超時,即 當前時間 - 最後一次拉取消息時間 > 120s ( 120s 可配置),斷定發生 BUG,太久未進行消息拉取,移除消息隊列。移除後,下面#新增隊列邏輯#能夠從新加入新的該消息隊列。
    • 第 128 至 158 行 :增長 分配的消息隊列( mqSet ) 新增的消息隊列。
    • 第 161 行 :發起新增的消息隊列消息拉取請求。詳細解析見:RebalancePushImpl#dispatchPullRequest(...)

RebalanceImpl#removeUnnecessaryMessageQueue(...)

RebalancePushImpl#removeUnnecessaryMessageQueue(...)

1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     // 同步隊列的消費進度,並移除之。
  3:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
  4:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
  5:     // TODO 順序消費
  6:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
  7:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
  8:         try {
  9:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
 10:                 try {
 11:                     return this.unlockDelay(mq, pq);
 12:                 } finally {
 13:                     pq.getLockConsume().unlock();
 14:                 }
 15:             } else {
 16:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
 17:                     mq, //
 18:                     pq.getTryUnlockTimes());
 19: 
 20:                 pq.incTryUnlockTimes();
 21:             }
 22:         } catch (Exception e) {
 23:             log.error("removeUnnecessaryMessageQueue Exception", e);
 24:         }
 25: 
 26:         return false;
 27:     }
 28:     return true;
 29: }複製代碼

[PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(...)

1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
  2:     this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
  3:     this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
  4:     return true;
  5: }複製代碼
  • 說明 :移除不須要的消息隊列相關的信息,並返回移除成功。RebalancePushImpl#removeUnnecessaryMessageQueue(...)基本一致。

RebalancePushImpl#dispatchPullRequest(...)

1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
  2:     for (PullRequest pullRequest : pullRequestList) {
  3:         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
  4:         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
  5:     }
  6: }複製代碼
  • 說明 :發起消息拉取請求。該調用是PushConsumer不斷不斷不斷拉取消息的起點

DefaultMQPushConsumerImpl#executePullRequestImmediately(...)

1: public void executePullRequestImmediately(final PullRequest pullRequest) {
  2:     this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
  3: }複製代碼
  • 說明 :提交拉取請求。提交後,PullMessageService 異步執行非阻塞。詳細解析見:PullMessageService

AllocateMessageQueueStrategy

AllocateMessageQueueStrategy類圖
AllocateMessageQueueStrategy類圖

AllocateMessageQueueAveragely

1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
  2:     private final Logger log = ClientLogger.getLog();
  3: 
  4:     @Override
  5:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) {
  7:         // 校驗參數是否正確
  8:         if (currentCID == null || currentCID.length() < 1) {
  9:             throw new IllegalArgumentException("currentCID is empty");
 10:         }
 11:         if (mqAll == null || mqAll.isEmpty()) {
 12:             throw new IllegalArgumentException("mqAll is null or mqAll empty");
 13:         }
 14:         if (cidAll == null || cidAll.isEmpty()) {
 15:             throw new IllegalArgumentException("cidAll is null or cidAll empty");
 16:         }
 17: 
 18:         List<MessageQueue> result = new ArrayList<>();
 19:         if (!cidAll.contains(currentCID)) {
 20:             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
 21:                 consumerGroup,
 22:                 currentCID,
 23:                 cidAll);
 24:             return result;
 25:         }
 26:         // 平均分配
 27:         int index = cidAll.indexOf(currentCID); // 第幾個consumer。
 28:         int mod = mqAll.size() % cidAll.size(); // 餘數,即多少消息隊列沒法平均分配。
 29:         int averageSize =
 30:             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
 31:                 + 1 : mqAll.size() / cidAll.size());
 32:         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有餘數的狀況下,[0, mod) 平分餘數,即每consumer多分配一個節點;第index開始,跳過前mod餘數。
 33:         int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配隊列數量。之因此要Math.min()的緣由是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息隊列。
 34:         for (int i = 0; i < range; i++) {
 35:             result.add(mqAll.get((startIndex + i) % mqAll.size()));
 36:         }
 37:         return result;
 38:     }
 39: 
 40:     @Override
 41:     public String getName() {
 42:         return "AVG";
 43:     }
 44: }複製代碼
  • 說明 :平均分配隊列策略。
  • 第 7 至 25 行 :參數校驗。
  • 第 26 至 36 行 :平均分配消息隊列。
    • 第 27 行 :index :當前 Consumer 在消費集羣裏是第幾個。這裏就是爲何須要對傳入的 cidAll 參數必須進行排序的緣由。若是不排序,Consumer 在本地計算出來的 index 沒法一致,影響計算結果。
    • 第 28 行 :mod :餘數,即多少消息隊列沒法平均分配。
    • 第 29 至 31 行 :averageSize :代碼能夠簡化成 (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
      • [ 0, mod )mqAll.size() / cidAll.size() + 1。前面 modConsumer 平分餘數,多得到 1 個消息隊列。
      • [ mod, cidAll.size() )mqAll.size() / cidAll.size()
    • 第 32 行 :startIndexConsumer 分配消息隊列開始位置。
    • 第 33 行 :range :分配隊列數量。之因此要 Math#min(...) 的緣由:當 mqAll.size() <= cidAll.size() 時,最後幾個 Consumer 分配不到消息隊列。
    • 第 34 至 36 行 :生成分配消息隊列結果。
  • 舉個例子:

固定消息隊列長度爲4併發

Consumer 2 能夠整除* Consumer 3 不可整除* Consumer 5 沒法都分配*
消息隊列[0] Consumer[0] Consumer[0] Consumer[0]
消息隊列[1] Consumer[0] Consumer[0] Consumer[1]
消息隊列[2] Consumer[1] Consumer[1] Consumer[2]
消息隊列[3] Consumer[1] Consumer[2] Consumer[3]

AllocateMessageQueueByMachineRoom

1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
  2:     /** 3: * 消費者消費brokerName集合 4: */
  5:     private Set<String> consumeridcs;
  6: 
  7:     @Override
  8:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 9: List<String> cidAll) {
 10:         // 參數校驗
 11:         List<MessageQueue> result = new ArrayList<MessageQueue>();
 12:         int currentIndex = cidAll.indexOf(currentCID);
 13:         if (currentIndex < 0) {
 14:             return result;
 15:         }
 16:         // 計算符合當前配置的消費者數組('consumeridcs')對應的消息隊列
 17:         List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
 18:         for (MessageQueue mq : mqAll) {
 19:             String[] temp = mq.getBrokerName().split("@");
 20:             if (temp.length == 2 && consumeridcs.contains(temp[0])) {
 21:                 premqAll.add(mq);
 22:             }
 23:         }
 24:         // 平均分配
 25:         int mod = premqAll.size() / cidAll.size();
 26:         int rem = premqAll.size() % cidAll.size();
 27:         int startIndex = mod * currentIndex;
 28:         int endIndex = startIndex + mod;
 29:         for (int i = startIndex; i < endIndex; i++) {
 30:             result.add(mqAll.get(i));
 31:         }
 32:         if (rem > currentIndex) {
 33:             result.add(premqAll.get(currentIndex + mod * cidAll.size()));
 34:         }
 35:         return result;
 36:     }
 37: 
 38:     @Override
 39:     public String getName() {
 40:         return "MACHINE_ROOM";
 41:     }
 42: 
 43:     public Set<String> getConsumeridcs() {
 44:         return consumeridcs;
 45:     }
 46: 
 47:     public void setConsumeridcs(Set<String> consumeridcs) {
 48:         this.consumeridcs = consumeridcs;
 49:     }
 50: }複製代碼
  • 說明 :平均分配可消費的 Broker 對應的消息隊列。
  • 第 7 至 15 行 :參數校驗。
  • 第 16 至 23 行 :計算可消費的 Broker 對應的消息隊列。
  • 第 25 至 34 行 :平均分配消息隊列。該平均分配方式和 AllocateMessageQueueAveragely 略有不一樣,其是將多餘的結尾部分分配給前 remConsumer
  • 疑問:使用該分配策略時,ConsumerBroker 分配須要怎麼配置。😈等研究主從相關源碼時,仔細考慮下。

AllocateMessageQueueAveragelyByCircle

1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
  2:     private final Logger log = ClientLogger.getLog();
  3: 
  4:     @Override
  5:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) {
  7:         // 校驗參數是否正確
  8:         if (currentCID == null || currentCID.length() < 1) {
  9:             throw new IllegalArgumentException("currentCID is empty");
 10:         }
 11:         if (mqAll == null || mqAll.isEmpty()) {
 12:             throw new IllegalArgumentException("mqAll is null or mqAll empty");
 13:         }
 14:         if (cidAll == null || cidAll.isEmpty()) {
 15:             throw new IllegalArgumentException("cidAll is null or cidAll empty");
 16:         }
 17: 
 18:         List<MessageQueue> result = new ArrayList<MessageQueue>();
 19:         if (!cidAll.contains(currentCID)) {
 20:             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
 21:                 consumerGroup,
 22:                 currentCID,
 23:                 cidAll);
 24:             return result;
 25:         }
 26: 
 27:         // 環狀分配
 28:         int index = cidAll.indexOf(currentCID);
 29:         for (int i = index; i < mqAll.size(); i++) {
 30:             if (i % cidAll.size() == index) {
 31:                 result.add(mqAll.get(i));
 32:             }
 33:         }
 34:         return result;
 35:     }
 36: 
 37:     @Override
 38:     public String getName() {
 39:         return "AVG_BY_CIRCLE";
 40:     }
 41: }複製代碼
  • 說明 :環狀分配消息隊列。

AllocateMessageQueueByConfig

1: public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
  2:     private List<MessageQueue> messageQueueList;
  3: 
  4:     @Override
  5:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) {
  7:         return this.messageQueueList;
  8:     }
  9: 
 10:     @Override
 11:     public String getName() {
 12:         return "CONFIG";
 13:     }
 14: 
 15:     public List<MessageQueue> getMessageQueueList() {
 16:         return messageQueueList;
 17:     }
 18: 
 19:     public void setMessageQueueList(List<MessageQueue> messageQueueList) {
 20:         this.messageQueueList = messageQueueList;
 21:     }
 22: }複製代碼
  • 說明 :分配配置的消息隊列。
  • 疑問 :該分配策略的使用場景。

五、PushConsumer 消費進度讀取

RebalancePushImpl#computePullFromWhere(...)

1: public long computePullFromWhere(MessageQueue mq) {
  2:     long result = -1;
  3:     final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
  4:     final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
  5:     switch (consumeFromWhere) {
  6:         case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: // 廢棄
  7:         case CONSUME_FROM_MIN_OFFSET: // 廢棄
  8:         case CONSUME_FROM_MAX_OFFSET: // 廢棄
  9:         case CONSUME_FROM_LAST_OFFSET: {
 10:             long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
 11:             if (lastOffset >= 0) {
 12:                 result = lastOffset;
 13:             }
 14:             // First start,no offset
 15:             else if (-1 == lastOffset) {
 16:                 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 17:                     result = 0L;
 18:                 } else {
 19:                     try {
 20:                         result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
 21:                     } catch (MQClientException e) {
 22:                         result = -1;
 23:                     }
 24:                 }
 25:             } else {
 26:                 result = -1;
 27:             }
 28:             break;
 29:         }
 30:         case CONSUME_FROM_FIRST_OFFSET: {
 31:             long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
 32:             if (lastOffset >= 0) {
 33:                 result = lastOffset;
 34:             } else if (-1 == lastOffset) {
 35:                 result = 0L;
 36:             } else {
 37:                 result = -1;
 38:             }
 39:             break;
 40:         }
 41:         case CONSUME_FROM_TIMESTAMP: {
 42:             long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
 43:             if (lastOffset >= 0) {
 44:                 result = lastOffset;
 45:             } else if (-1 == lastOffset) {
 46:                 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 47:                     try {
 48:                         result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
 49:                     } catch (MQClientException e) {
 50:                         result = -1;
 51:                     }
 52:                 } else {
 53:                     try {
 54:                         long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
 55:                             UtilAll.YYYY_MMDD_HHMMSS).getTime();
 56:                         result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
 57:                     } catch (MQClientException e) {
 58:                         result = -1;
 59:                     }
 60:                 }
 61:             } else {
 62:                 result = -1;
 63:             }
 64:             break;
 65:         }
 66: 
 67:         default:
 68:             break;
 69:     }
 70: 
 71:     return result;
 72: }複製代碼
  • 說明 :計算消息隊列開始消費位置。
  • PushConsumer 讀取消費進度有三種選項:
    • CONSUME_FROM_LAST_OFFSET :第 6 至 29 行 :一個新的消費集羣第一次啓動從隊列的最後位置開始消費。後續再啓動接着上次消費的進度開始消費
    • CONSUME_FROM_FIRST_OFFSET :第 30 至 40 行 :一個新的消費集羣第一次啓動從隊列的最前位置開始消費。後續再啓動接着上次消費的進度開始消費
    • CONSUME_FROM_TIMESTAMP :第 41 至 65 行 :一個新的消費集羣第一次啓動從指定時間點開始消費。後續再啓動接着上次消費的進度開始消費

[PullConsumer] RebalancePullImpl#computePullFromWhere(...)

暫時跳過。😈app

六、PushConsumer 拉取消息

DefaultMQPushConsumerImpl拉取消息
DefaultMQPushConsumerImpl拉取消息

PullMessageService

1: public class PullMessageService extends ServiceThread {
  2:     private final Logger log = ClientLogger.getLog();
  3:     /** 4: * 拉取消息請求隊列 5: */
  6:     private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<>();
  7:     /** 8: * MQClient對象 9: */
 10:     private final MQClientInstance mQClientFactory;
 11:     /** 12: * 定時器。用於延遲提交拉取請求 13: */
 14:     private final ScheduledExecutorService scheduledExecutorService = Executors
 15:         .newSingleThreadScheduledExecutor(new ThreadFactory() {
 16:             @Override
 17:             public Thread newThread(Runnable r) {
 18:                 return new Thread(r, "PullMessageServiceScheduledThread");
 19:             }
 20:         });
 21: 
 22:     public PullMessageService(MQClientInstance mQClientFactory) {
 23:         this.mQClientFactory = mQClientFactory;
 24:     }
 25: 
 26:     /** 27: * 執行延遲拉取消息請求 28: * 29: * @param pullRequest 拉取消息請求 30: * @param timeDelay 延遲時長 31: */
 32:     public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
 33:         this.scheduledExecutorService.schedule(new Runnable() {
 34: 
 35:             @Override
 36:             public void run() {
 37:                 PullMessageService.this.executePullRequestImmediately(pullRequest);
 38:             }
 39:         }, timeDelay, TimeUnit.MILLISECONDS);
 40:     }
 41: 
 42:     /** 43: * 執行當即拉取消息請求 44: * 45: * @param pullRequest 拉取消息請求 46: */
 47:     public void executePullRequestImmediately(final PullRequest pullRequest) {
 48:         try {
 49:             this.pullRequestQueue.put(pullRequest);
 50:         } catch (InterruptedException e) {
 51:             log.error("executePullRequestImmediately pullRequestQueue.put", e);
 52:         }
 53:     }
 54: 
 55:     /** 56: * 執行延遲任務 57: * 58: * @param r 任務 59: * @param timeDelay 延遲時長 60: */
 61:     public void executeTaskLater(final Runnable r, final long timeDelay) {
 62:         this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
 63:     }
 64: 
 65:     public ScheduledExecutorService getScheduledExecutorService() {
 66:         return scheduledExecutorService;
 67:     }
 68: 
 69:     /** 70: * 拉取消息 71: * 72: * @param pullRequest 拉取消息請求 73: */
 74:     private void pullMessage(final PullRequest pullRequest) {
 75:         final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
 76:         if (consumer != null) {
 77:             DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
 78:             impl.pullMessage(pullRequest);
 79:         } else {
 80:             log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
 81:         }
 82:     }
 83: 
 84:     @Override
 85:     public void run() {
 86:         log.info(this.getServiceName() + " service started");
 87: 
 88:         while (!this.isStopped()) {
 89:             try {
 90:                 PullRequest pullRequest = this.pullRequestQueue.take();
 91:                 if (pullRequest != null) {
 92:                     this.pullMessage(pullRequest);
 93:                 }
 94:             } catch (InterruptedException e) {
 95:             } catch (Exception e) {
 96:                 log.error("Pull Message Service Run Method exception", e);
 97:             }
 98:         }
 99: 
100:         log.info(this.getServiceName() + " service end");
101:     }
102: 
103:     @Override
104:     public String getServiceName() {
105:         return PullMessageService.class.getSimpleName();
106:     }
107: 
108: }複製代碼
  • 說明 :拉取消息服務,不斷不斷不斷從 Broker 拉取消息,並提交消費任務到 ConsumeMessageService
  • #executePullRequestLater(...) :第 26 至 40 行 : 提交延遲拉取消息請求。
  • #executePullRequestImmediately(...) :第 42 至 53 行 :提交當即拉取消息請求。
  • #executeTaskLater(...) :第 55 至 63 行 :提交延遲任務
  • #pullMessage(...) :第 69 至 82 行 :執行拉取消息邏輯。詳細解析見:DefaultMQPushConsumerImpl#pullMessage(...)
  • #run(...) :第 84 至 101 行 :循環拉取消息請求隊列( pullRequestQueue ),進行消息拉取。

DefaultMQPushConsumerImpl#pullMessage(...)

1: public void pullMessage(final PullRequest pullRequest) {
  2:     final ProcessQueue processQueue = pullRequest.getProcessQueue();
  3:     if (processQueue.isDropped()) {
  4:         log.info("the pull request[{}] is dropped.", pullRequest.toString());
  5:         return;
  6:     }
  7: 
  8:     // 設置隊列最後拉取消息時間
  9:     pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
 10: 
 11:     // 判斷consumer狀態是否運行中。若是不是,則延遲拉取消息。
 12:     try {
 13:         this.makeSureStateOK();
 14:     } catch (MQClientException e) {
 15:         log.warn("pullMessage exception, consumer state not ok", e);
 16:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
 17:         return;
 18:     }
 19: 
 20:     // 判斷是否暫停中。
 21:     if (this.isPause()) {
 22:         log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
 23:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
 24:         return;
 25:     }
 26: 
 27:     // 判斷是否超過最大持有消息數量。默認最大值爲1000。
 28:     long size = processQueue.getMsgCount().get();
 29:     if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
 30:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延遲消息拉取請求。50ms。
 31:         if ((flowControlTimes1++ % 1000) == 0) {
 32:             log.warn(
 33:                 "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
 34:                 processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
 35:         }
 36:         return;
 37:     }
 38: 
 39:     if (!this.consumeOrderly) { // 判斷消息跨度是否過大。
 40:         if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
 41:             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延遲消息拉取請求。50ms。
 42:             if ((flowControlTimes2++ % 1000) == 0) {
 43:                 log.warn(
 44:                     "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
 45:                     processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
 46:                     pullRequest, flowControlTimes2);
 47:             }
 48:             return;
 49:         }
 50:     } else { // TODO 順序消費
 51:         if (processQueue.isLocked()) {
 52:             if (!pullRequest.isLockedFirst()) {
 53:                 final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
 54:                 boolean brokerBusy = offset < pullRequest.getNextOffset();
 55:                 log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
 56:                     pullRequest, offset, brokerBusy);
 57:                 if (brokerBusy) {
 58:                     log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
 59:                         pullRequest, offset);
 60:                 }
 61: 
 62:                 pullRequest.setLockedFirst(true);
 63:                 pullRequest.setNextOffset(offset);
 64:             }
 65:         } else {
 66:             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
 67:             log.info("pull message later because not locked in broker, {}", pullRequest);
 68:             return;
 69:         }
 70:     }
 71: 
 72:     // 獲取Topic 對應的訂閱信息。若不存在,則延遲拉取消息
 73:     final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
 74:     if (null == subscriptionData) {
 75:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
 76:         log.warn("find the consumer's subscription failed, {}", pullRequest);
 77:         return;
 78:     }
 79: 
 80:     final long beginTimestamp = System.currentTimeMillis();
 81: 
 82:     PullCallback pullCallback = new PullCallback() {
 83:         @Override
 84:         public void onSuccess(PullResult pullResult) {
 85:             if (pullResult != null) {
 86:                 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
 87:                     subscriptionData);
 88: 
 89:                 switch (pullResult.getPullStatus()) {
 90:                     case FOUND:
 91:                         // 設置下次拉取消息隊列位置
 92:                         long prevRequestOffset = pullRequest.getNextOffset();
 93:                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 94: 
 95:                         // 統計
 96:                         long pullRT = System.currentTimeMillis() - beginTimestamp;
 97:                         DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
 98:                             pullRequest.getMessageQueue().getTopic(), pullRT);
 99: 
100:                         long firstMsgOffset = Long.MAX_VALUE;
101:                         if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
102:                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
103:                         } else {
104:                             firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
105: 
106:                             // 統計
107:                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
108:                                 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
109: 
110:                             // 提交拉取到的消息到消息處理隊列
111:                             boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
112: 
113:                             // 提交消費請求
114:                             DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
115:                                 pullResult.getMsgFoundList(), //
116:                                 processQueue, //
117:                                 pullRequest.getMessageQueue(), //
118:                                 dispathToConsume);
119: 
120:                             // 提交下次拉取消息請求
121:                             if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
122:                                 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
123:                                     DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
124:                             } else {
125:                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
126:                             }
127:                         }
128: 
129:                         // 下次拉取消息隊列位置小於上次拉取消息隊列位置 或者 第一條消息的消息隊列位置小於上次拉取消息隊列位置,則斷定爲BUG,輸出log
130:                         if (pullResult.getNextBeginOffset() < prevRequestOffset//
131:                             || firstMsgOffset < prevRequestOffset) {
132:                             log.warn(
133:                                 "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
134:                                 pullResult.getNextBeginOffset(), //
135:                                 firstMsgOffset, //
136:                                 prevRequestOffset);
137:                         }
138: 
139:                         break;
140:                     case NO_NEW_MSG:
141:                         // 設置下次拉取消息隊列位置
142:                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
143: 
144:                         // 持久化消費進度
145:                         DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
146: 
147:                         // 當即提交拉取消息請求
148:                         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
149:                         break;
150:                     case NO_MATCHED_MSG:
151:                         // 設置下次拉取消息隊列位置
152:                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
153: 
154:                         // 持久化消費進度
155:                         DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
156: 
157:                         // 提交當即拉取消息請求
158:                         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
159:                         break;
160:                     case OFFSET_ILLEGAL:
161:                         log.warn("the pull request offset illegal, {} {}", //
162:                             pullRequest.toString(), pullResult.toString());
163:                         // 設置下次拉取消息隊列位置
164:                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
165: 
166:                         // 設置消息處理隊列爲dropped
167:                         pullRequest.getProcessQueue().setDropped(true);
168: 
169:                         // 提交延遲任務,進行消費處理隊列移除。不當即移除的緣由:可能有地方正在使用,避免受到影響。
170:                         DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
171: 
172:                             @Override
173:                             public void run() {
174:                                 try {
175:                                     // 更新消費進度,同步消費進度到Broker
176:                                     DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
177:                                         pullRequest.getNextOffset(), false);
178:                                     DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
179: 
180:                                     // 移除消費處理隊列
181:                                     DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
182: 
183:                                     log.warn("fix the pull request offset, {}", pullRequest);
184:                                 } catch (Throwable e) {
185:                                     log.error("executeTaskLater Exception", e);
186:                                 }
187:                             }
188:                         }, 10000);
189:                         break;
190:                     default:
191:                         break;
192:                 }
193:             }
194:         }
195: 
196:         @Override
197:         public void onException(Throwable e) {
198:             if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
199:                 log.warn("execute the pull request exception", e);
200:             }
201: 
202:             // 提交延遲拉取消息請求
203:             DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
204:         }
205:     };
206: 
207:     // 集羣消息模型下,計算提交的消費進度。
208:     boolean commitOffsetEnable = false;
209:     long commitOffsetValue = 0L;
210:     if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
211:         commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
212:         if (commitOffsetValue > 0) {
213:             commitOffsetEnable = true;
214:         }
215:     }
216: 
217:     // 計算請求的 訂閱表達式 和 是否進行filtersrv過濾消息
218:     String subExpression = null;
219:     boolean classFilter = false;
220:     SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
221:     if (sd != null) {
222:         if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
223:             subExpression = sd.getSubString();
224:         }
225: 
226:         classFilter = sd.isClassFilterMode();
227:     }
228: 
229:     // 計算拉取消息系統標識
230:     int sysFlag = PullSysFlag.buildSysFlag(//
231:         commitOffsetEnable, // commitOffset
232:         true, // suspend
233:         subExpression != null, // subscription
234:         classFilter // class filter
235:     );
236: 
237:     // 執行拉取。若是拉取請求發生異常時,提交延遲拉取消息請求。
238:     try {
239:         this.pullAPIWrapper.pullKernelImpl(//
240:             pullRequest.getMessageQueue(), // 1
241:             subExpression, // 2
242:             subscriptionData.getSubVersion(), // 3
243:             pullRequest.getNextOffset(), // 4
244:             this.defaultMQPushConsumer.getPullBatchSize(), // 5
245:             sysFlag, // 6
246:             commitOffsetValue, // 7
247:             BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
248:             CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
249:             CommunicationMode.ASYNC, // 10
250:             pullCallback// 11
251:         );
252:     } catch (Exception e) {
253:         log.error("pullKernelImpl exception", e);
254:         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
255:     }
256: }
257: 
258: private void correctTagsOffset(final PullRequest pullRequest) {
259:     if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
260:         this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
261:     }
262: }複製代碼
  • #pullMessage(...) 說明 :拉取消息。
    • 第 3 至 6 行 :消息處理隊列已經終止,不進行消息拉取。
    • 第 9 行 :設置消息處理隊列最後拉取消息時間。
    • 第 11 至 18 行 :Consumer 未處於運行中狀態,不進行消息拉取,提交延遲拉取消息請求。
    • 第 20 至 25 行 : Consumer 處於暫停中,不進行消息拉取,提交延遲拉取消息請求。
    • 第 27 至 37 行 :消息處理隊列持有消息超過最大容許值(默認:1000條),不進行消息拉取,提交延遲拉取消息請求。
    • 第 39 至 49 行 :Consumer併發消費 而且 消息隊列持有消息跨度過大(消息跨度 = 持有消息最後一條和第一條的消息位置差,默認:2000),不進行消息拉取,提交延遲拉取消息請求。
    • 第 50 至 70 行 :順序消費 相關跳過,詳細解析見:《RocketMQ 源碼分析 —— Message 順序發送與消費》
    • 第 72 至 78 行 :Topic 對應的訂閱信息不存在,不進行消息拉取,提交延遲拉取消息請求。
    • 第 222 至 224 行 :判斷請求是否使用 Consumer 本地的訂閱信息( SubscriptionData ),而不使用 Broker 裏的訂閱信息。詳細解析見:PullMessageProcessor#processRequest(...) 第 64 至 110 行代碼
    • 第 226 行 :是否開啓過濾類過濾模式。詳細解析見:《RocketMQ 源碼分析 —— Filtersrv》
    • 第 229 至 235 行 :計算拉取消息請求系統標識。詳細解析見:PullMessageRequestHeader.sysFlag
    • 第 237 至 255 行 :
  • PullCallback :拉取消息回調:
    • 第 86 行 :處理拉取結果。詳細邏輯見:PullAPIWrapper#processPullResult(...)
    • 第 89 至 192 行 :處理拉取狀態結果:
      • 第 90 至 139 行 :拉取到消息( FOUND ) :
        • 第 91 至 93 行 :設置下次拉取消息隊列位置。
        • 第 95 至 97 行 :統計。
        • 第 101 至 102 行 :拉取到消息的消息列表爲空,提交當即拉取消息請求。爲何會存在拉取到消息,可是消息結果未空呢?緣由見:PullAPIWrapper#processPullResult(...)
        • 第 106 至 108 行 :統計。
        • 第 111 行 :提交拉取到的消息到消息處理隊列。詳細解析見:ProcessQueue#putMessage(...)
        • 第 113 至 118 行 :提交消費請求到 ConsumeMessageService。詳細解析見:ConsumeMessageConcurrentlyService
        • 第 120 至 126 行 :根據拉取頻率( pullInterval ),提交當即或者延遲拉取消息請求。默認拉取頻率爲 0ms ,提交當即拉取消息請求。
        • 第 129 至 137 行 :下次拉取消息隊列位置小於上次拉取消息隊列位置 或者 第一條消息的消息隊列位置小於上次拉取消息隊列位置,則斷定爲BUG,輸出警告日誌。
          • 第 140 至 149 行 :沒有新消息( NO_NEW_MSG ) :
        • 第 142 行 : 設置下次拉取消息隊列位置。
        • 第 145 行 :更正消費進度。詳細解析見:#correctTagsOffset(...)
        • 第 148 行 :提交當即拉取消息請求。
          • 第 150 至 159 行 :有新消息可是不匹配( NO_MATCHED_MSG )。邏輯同 NO_NEW_MSG
          • 第 160 至 189 行 :拉取請求的消息隊列位置不合法 (OFFSET_ILLEGAL)。
        • 第 164 行 :設置下次拉取消息隊列位置。
        • 第 167 行 :設置消息處理隊列爲 dropped
        • 第 169 至 188 行 :提交延遲任務,進行隊列移除。
          • 第 175 至 178 行 :更新消費進度,同步消費進度到 Broker
          • 第 181 行 :移除消費處理隊列。
            • 疑問:爲何不當即移除???
              • 第 196 至 204 行 :發生異常,提交延遲拉取消息請求。
  • #correctTagsOffset(...) :更正消費進度。
    • 第 258 至 261 行 : 當消費處理隊列持有消息數量爲 0 時,更新消費進度爲拉取請求的拉取消息隊列位置。

PullAPIWrapper#pullKernelImpl(...)

1: /** 2: * 拉取消息核心方法 3: * 4: * @param mq 消息隊列 5: * @param subExpression 訂閱表達式 6: * @param subVersion 訂閱版本號 7: * @param offset 拉取隊列開始位置 8: * @param maxNums 拉取消息數量 9: * @param sysFlag 拉取請求系統標識 10: * @param commitOffset 提交消費進度 11: * @param brokerSuspendMaxTimeMillis broker掛起請求最大時間 12: * @param timeoutMillis 請求broker超時時長 13: * @param communicationMode 通信模式 14: * @param pullCallback 拉取回調 15: * @return 拉取消息結果。只有通信模式爲同步時,才返回結果,不然返回null。 16: * @throws MQClientException 當尋找不到 broker 時,或發生其餘client異常 17: * @throws RemotingException 當遠程調用發生異常時 18: * @throws MQBrokerException 當 broker 發生異常時。只有通信模式爲同步時纔會發生該異常。 19: * @throws InterruptedException 當發生中斷異常時 20: */
 21: protected PullResult pullKernelImpl( 22: final MessageQueue mq, 23: final String subExpression, 24: final long subVersion, 25: final long offset, 26: final int maxNums, 27: final int sysFlag, 28: final long commitOffset, 29: final long brokerSuspendMaxTimeMillis, 30: final long timeoutMillis, 31: final CommunicationMode communicationMode, 32: final PullCallback pullCallback 33: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 34:     // 獲取Broker信息
 35:     FindBrokerResult findBrokerResult =
 36:         this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
 37:             this.recalculatePullFromWhichNode(mq), false);
 38:     if (null == findBrokerResult) {
 39:         this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
 40:         findBrokerResult =
 41:             this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
 42:                 this.recalculatePullFromWhichNode(mq), false);
 43:     }
 44: 
 45:     // 請求拉取消息
 46:     if (findBrokerResult != null) {
 47:         int sysFlagInner = sysFlag;
 48: 
 49:         if (findBrokerResult.isSlave()) {
 50:             sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
 51:         }
 52: 
 53:         PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
 54:         requestHeader.setConsumerGroup(this.consumerGroup);
 55:         requestHeader.setTopic(mq.getTopic());
 56:         requestHeader.setQueueId(mq.getQueueId());
 57:         requestHeader.setQueueOffset(offset);
 58:         requestHeader.setMaxMsgNums(maxNums);
 59:         requestHeader.setSysFlag(sysFlagInner);
 60:         requestHeader.setCommitOffset(commitOffset);
 61:         requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
 62:         requestHeader.setSubscription(subExpression);
 63:         requestHeader.setSubVersion(subVersion);
 64: 
 65:         String brokerAddr = findBrokerResult.getBrokerAddr();
 66:         if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { // TODO filtersrv
 67:             brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
 68:         }
 69: 
 70:         PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
 71:             brokerAddr,
 72:             requestHeader,
 73:             timeoutMillis,
 74:             communicationMode,
 75:             pullCallback);
 76: 
 77:         return pullResult;
 78:     }
 79: 
 80:     // Broker信息不存在,則拋出異常
 81:     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
 82: }複製代碼

PullAPIWrapper#recalculatePullFromWhichNode(...)

1: /** 2: * 消息隊列 與 拉取Broker 的映射 3: * 當拉取消息時,會經過該映射獲取拉取請求對應的Broker 4: */
  5: private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
  6:     new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
  7: /** 8: * 是否使用默認Broker 9: */
 10: private volatile boolean connectBrokerByUser = false;
 11: /** 12: * 默認Broker編號 13: */
 14: private volatile long defaultBrokerId = MixAll.MASTER_ID;
 15: 
 16: /** 17: * 計算消息隊列拉取消息對應的Broker編號 18: * 19: * @param mq 消息隊列 20: * @return Broker編號 21: */
 22: public long recalculatePullFromWhichNode(final MessageQueue mq) {
 23:     // 若開啓默認Broker開關,則返回默認Broker編號
 24:     if (this.isConnectBrokerByUser()) {
 25:         return this.defaultBrokerId;
 26:     }
 27: 
 28:     // 若消息隊列映射拉取Broker存在,則返回映射Broker編號
 29:     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
 30:     if (suggest != null) {
 31:         return suggest.get();
 32:     }
 33: 
 34:     // 返回Broker主節點編號
 35:     return MixAll.MASTER_ID;
 36: }複製代碼
  • 說明 :計算消息隊列拉取消息對應的 Broker 編號。

MQClientInstance#findBrokerAddressInSubscribe(...)

1: /** 2: * Broker名字 和 Broker地址相關 Map 3: */
  4: private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
  5:         new ConcurrentHashMap<>();
  6: 
  7: /** 8: * 得到Broker信息 9: * 10: * @param brokerName broker名字 11: * @param brokerId broker編號 12: * @param onlyThisBroker 是否必須是該broker 13: * @return Broker信息 14: */
 15: public FindBrokerResult findBrokerAddressInSubscribe(// 16: final String brokerName, // 17: final long brokerId, // 18: final boolean onlyThisBroker// 19: ) {
 20:     String brokerAddr = null; // broker地址
 21:     boolean slave = false; // 是否爲從節點
 22:     boolean found = false; // 是否找到
 23: 
 24:     // 得到Broker信息
 25:     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
 26:     if (map != null && !map.isEmpty()) {
 27:         brokerAddr = map.get(brokerId);
 28:         slave = brokerId != MixAll.MASTER_ID;
 29:         found = brokerAddr != null;
 30: 
 31:         // 若是不強制得到,選擇一個Broker
 32:         if (!found && !onlyThisBroker) {
 33:             Entry<Long, String> entry = map.entrySet().iterator().next();
 34:             brokerAddr = entry.getValue();
 35:             slave = entry.getKey() != MixAll.MASTER_ID;
 36:             found = true;
 37:         }
 38:     }
 39: 
 40:     // 找到broker,則返回信息
 41:     if (found) {
 42:         return new FindBrokerResult(brokerAddr, slave);
 43:     }
 44: 
 45:     // 找不到,則返回空
 46:     return null;
 47: }複製代碼
  • 說明 :獲取 Broker 信息(Broker 地址、是否爲從節點)。

PullAPIWrapper#processPullResult(...)

1: /** 2: * 處理拉取結果 3: * 1. 更新消息隊列拉取消息Broker編號的映射 4: * 2. 解析消息,並根據訂閱信息消息tagCode匹配合適消息 5: * 6: * @param mq 消息隊列 7: * @param pullResult 拉取結果 8: * @param subscriptionData 訂閱信息 9: * @return 拉取結果 10: */
 11: public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, 12: final SubscriptionData subscriptionData) {
 13:     PullResultExt pullResultExt = (PullResultExt) pullResult;
 14: 
 15:     // 更新消息隊列拉取消息Broker編號的映射
 16:     this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
 17: 
 18:     // 解析消息,並根據訂閱信息消息tagCode匹配合適消息
 19:     if (PullStatus.FOUND == pullResult.getPullStatus()) {
 20:         // 解析消息
 21:         ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
 22:         List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
 23: 
 24:         // 根據訂閱信息消息tagCode匹配合適消息
 25:         List<MessageExt> msgListFilterAgain = msgList;
 26:         if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
 27:             msgListFilterAgain = new ArrayList<>(msgList.size());
 28:             for (MessageExt msg : msgList) {
 29:                 if (msg.getTags() != null) {
 30:                     if (subscriptionData.getTagsSet().contains(msg.getTags())) {
 31:                         msgListFilterAgain.add(msg);
 32:                     }
 33:                 }
 34:             }
 35:         }
 36: 
 37:         // Hook
 38:         if (this.hasHook()) {
 39:             FilterMessageContext filterMessageContext = new FilterMessageContext();
 40:             filterMessageContext.setUnitMode(unitMode);
 41:             filterMessageContext.setMsgList(msgListFilterAgain);
 42:             this.executeHook(filterMessageContext);
 43:         }
 44: 
 45:         // 設置消息隊列當前最小/最大位置到消息拓展字段
 46:         for (MessageExt msg : msgListFilterAgain) {
 47:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
 48:                 Long.toString(pullResult.getMinOffset()));
 49:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
 50:                 Long.toString(pullResult.getMaxOffset()));
 51:         }
 52: 
 53:         // 設置消息列表
 54:         pullResultExt.setMsgFoundList(msgListFilterAgain);
 55:     }
 56: 
 57:     // 清空消息二進制數組
 58:     pullResultExt.setMessageBinary(null);
 59: 
 60:     return pullResult;
 61: }複製代碼
  • 說明 :處理拉取結果。
    • 更新消息隊列拉取消息 Broker 編號的映射。
    • 解析消息,並根據訂閱信息消息 tagCode匹配合適消息。
  • 第 16 行 :更新消息隊列拉取消息 Broker 編號的映射。下次拉取消息時,若是未設置默認拉取的 Broker 編號,會使用更新後的 Broker 編號。
  • 第 18 至 55 行 :解析消息,並根據訂閱信息消息 tagCode 匹配合適消息。
    • 第 20 至 22 行 :解析消息。詳細解析見:《RocketMQ 源碼分析 —— Message基礎》
    • 第 24 至 35 行 :根據訂閱信息tagCode 匹配消息。
    • 第 37 至 43 行 :Hook
    • 第 45 至 51 行 :設置消息隊列當前最小/最大位置到消息拓展字段。
    • 第 54 行 :設置消息隊列。
  • 第 58 行 :清空消息二進制數組。

ProcessQueue#putMessage(...)

1:  /** 2: * 消息映射讀寫鎖 3: */
  4: private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
  5: /** 6: * 消息映射 7: * key:消息隊列位置 8: */
  9: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();
 10: /** 11: * 消息數 12: */
 13: private final AtomicLong msgCount = new AtomicLong();
 14: /** 15: * 添加消息最大隊列位置 16: */
 17: private volatile long queueOffsetMax = 0L;
 18: /** 19: * 是否正在消費 20: */
 21: private volatile boolean consuming = false;
 22: /** 23: * Broker累計消息數量 24: * 計算公式 = queueMaxOffset - 新添加消息數組[n - 1].queueOffset 25: * Acc = Accumulation 26: * cnt = (猜想)對比度 27: */
 28: private volatile long msgAccCnt = 0;
 29: 
 30: /** 31: * 添加消息,並返回是否提交給消費者 32: * 返回true,當有新消息添加成功時, 33: * 34: * @param msgs 消息 35: * @return 是否提交給消費者 36: */
 37: public boolean putMessage(final List<MessageExt> msgs) {
 38:     boolean dispatchToConsume = false;
 39:     try {
 40:         this.lockTreeMap.writeLock().lockInterruptibly();
 41:         try {
 42:             // 添加消息
 43:             int validMsgCnt = 0;
 44:             for (MessageExt msg : msgs) {
 45:                 MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
 46:                 if (null == old) {
 47:                     validMsgCnt++;
 48:                     this.queueOffsetMax = msg.getQueueOffset();
 49:                 }
 50:             }
 51:             msgCount.addAndGet(validMsgCnt);
 52: 
 53:             // 計算是否正在消費
 54:             if (!msgTreeMap.isEmpty() && !this.consuming) {
 55:                 dispatchToConsume = true;
 56:                 this.consuming = true;
 57:             }
 58: 
 59:             // Broker累計消息數量
 60:             if (!msgs.isEmpty()) {
 61:                 MessageExt messageExt = msgs.get(msgs.size() - 1);
 62:                 String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
 63:                 if (property != null) {
 64:                     long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
 65:                     if (accTotal > 0) {
 66:                         this.msgAccCnt = accTotal;
 67:                     }
 68:                 }
 69:             }
 70:         } finally {
 71:             this.lockTreeMap.writeLock().unlock();
 72:         }
 73:     } catch (InterruptedException e) {
 74:         log.error("putMessage exception", e);
 75:     }
 76: 
 77:     return dispatchToConsume;
 78: }複製代碼

總結

若是用最簡單粗暴的方式描述 PullConsumer 拉取消息的過程,那就是以下的代碼:

while (true) {
    if (不知足拉取消息) {
        Thread.sleep(間隔);
        continue;
    }
    主動拉取消息();
}複製代碼

六、PushConsumer 消費消息

DefaultMQPushConsumerImpl消費消息
DefaultMQPushConsumerImpl消費消息

ConsumeMessageConcurrentlyService 提交消費請求

ConsumeMessageConcurrentlyService#submitConsumeRequest(...)

1: /** 2: * 消費線程池隊列 3: */
  4: private final BlockingQueue<Runnable> consumeRequestQueue;
  5: /** 6: * 消費線程池 7: */
  8: private final ThreadPoolExecutor consumeExecutor;
  9: 
 10: public void submitConsumeRequest(// 11: final List<MessageExt> msgs, // 12: final ProcessQueue processQueue, // 13: final MessageQueue messageQueue, // 14: final boolean dispatchToConsume) {
 15:     final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
 16:     if (msgs.size() <= consumeBatchSize) { // 提交消息小於批量消息數,直接提交消費請求
 17:         ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
 18:         try {
 19:             this.consumeExecutor.submit(consumeRequest);
 20:         } catch (RejectedExecutionException e) {
 21:             this.submitConsumeRequestLater(consumeRequest);
 22:         }
 23:     } else { // 提交消息大於批量消息數,進行分拆成多個消費請求
 24:         for (int total = 0; total < msgs.size(); ) {
 25:             // 計算當前拆分請求包含的消息
 26:             List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize);
 27:             for (int i = 0; i < consumeBatchSize; i++, total++) {
 28:                 if (total < msgs.size()) {
 29:                     msgThis.add(msgs.get(total));
 30:                 } else {
 31:                     break;
 32:                 }
 33:             }
 34: 
 35:             // 提交拆分消費請求
 36:             ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
 37:             try {
 38:                 this.consumeExecutor.submit(consumeRequest);
 39:             } catch (RejectedExecutionException e) {
 40:                 // 若是被拒絕,則將當前拆分消息+剩餘消息提交延遲消費請求。
 41:                 for (; total < msgs.size(); total++) {
 42:                     msgThis.add(msgs.get(total));
 43:                 }
 44:                 this.submitConsumeRequestLater(consumeRequest);
 45:             }
 46:         }
 47:     }
 48: }複製代碼
  • 說明 :提交當即消費請求。
  • 第 16 至 22 行 :提交消息小於等於批量消費數,直接提交消費請求。
  • 第 23 至 47 行 :當提交消息大於批量消費數,進行分拆成多個請求。
    • 第 25 至 33 行 :計算當前拆分請求包含的消息。
    • 第 35 至 38 行 :提交拆分消費請求。
    • 第 39 至 44 行 :提交請求被拒絕,則將當前拆分消息 + 剩餘消息提交延遲消費請求,結束拆分循環。

ConsumeMessageConcurrentlyService#submitConsumeRequestLater

1: /** 2: * 提交延遲消費請求 3: * 4: * @param msgs 消息列表 5: * @param processQueue 消息處理隊列 6: * @param messageQueue 消息隊列 7: */
  8: private void submitConsumeRequestLater(// 9: final List<MessageExt> msgs, // 10: final ProcessQueue processQueue, // 11: final MessageQueue messageQueue// 12: ) {
 13: 
 14:     this.scheduledExecutorService.schedule(new Runnable() {
 15: 
 16:         @Override
 17:         public void run() {
 18:             ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
 19:         }
 20:     }, 5000, TimeUnit.MILLISECONDS);
 21: }
 22: 
 23: /** 24: * 提交延遲消費請求 25: * @param consumeRequest 消費請求 26: */
 27: private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// 28: ) {
 29: 
 30:     this.scheduledExecutorService.schedule(new Runnable() {
 31: 
 32:         @Override
 33:         public void run() {
 34:             ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); // TODO BUG ?
 35:         }
 36:     }, 5000, TimeUnit.MILLISECONDS);
 37: }複製代碼
  • 說明 :提交延遲消費請求。
  • 第 34 行 :直接調用 ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);。若是消息數超過批量消費上限,會不會是BUG

ConsumeRequest

1: class ConsumeRequest implements Runnable {
  2: 
  3:     /** 4: * 消費消息列表 5: */
  6:     private final List<MessageExt> msgs;
  7:     /** 8: * 消息處理隊列 9: */
 10:     private final ProcessQueue processQueue;
 11:     /** 12: * 消息隊列 13: */
 14:     private final MessageQueue messageQueue;
 15: 
 16:     public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
 17:         this.msgs = msgs;
 18:         this.processQueue = processQueue;
 19:         this.messageQueue = messageQueue;
 20:     }
 21: 
 22:     @Override
 23:     public void run() {
 24:         // 廢棄隊列不進行消費
 25:         if (this.processQueue.isDropped()) {
 26:             log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
 27:             return;
 28:         }
 29: 
 30:         MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; // 監聽器
 31:         ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); // 消費Context
 32:         ConsumeConcurrentlyStatus status = null; // 消費結果狀態
 33: 
 34:         // Hook
 35:         ConsumeMessageContext consumeMessageContext = null;
 36:         if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
 37:             consumeMessageContext = new ConsumeMessageContext();
 38:             consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
 39:             consumeMessageContext.setProps(new HashMap<String, String>());
 40:             consumeMessageContext.setMq(messageQueue);
 41:             consumeMessageContext.setMsgList(msgs);
 42:             consumeMessageContext.setSuccess(false);
 43:             ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
 44:         }
 45: 
 46:         long beginTimestamp = System.currentTimeMillis();
 47:         boolean hasException = false;
 48:         ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; // 消費返回結果類型
 49:         try {
 50:             // 當消息爲重試消息,設置Topic爲原始Topic
 51:             ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
 52: 
 53:             // 設置開始消費時間
 54:             if (msgs != null && !msgs.isEmpty()) {
 55:                 for (MessageExt msg : msgs) {
 56:                     MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
 57:                 }
 58:             }
 59: 
 60:             // 進行消費
 61:             status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 62:         } catch (Throwable e) {
 63:             log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
 64:                 RemotingHelper.exceptionSimpleDesc(e), //
 65:                 ConsumeMessageConcurrentlyService.this.consumerGroup,
 66:                 msgs,
 67:                 messageQueue);
 68:             hasException = true;
 69:         }
 70: 
 71:         // 解析消費返回結果類型
 72:         long consumeRT = System.currentTimeMillis() - beginTimestamp;
 73:         if (null == status) {
 74:             if (hasException) {
 75:                 returnType = ConsumeReturnType.EXCEPTION;
 76:             } else {
 77:                 returnType = ConsumeReturnType.RETURNNULL;
 78:             }
 79:         } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
 80:             returnType = ConsumeReturnType.TIME_OUT;
 81:         } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
 82:             returnType = ConsumeReturnType.FAILED;
 83:         } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
 84:             returnType = ConsumeReturnType.SUCCESS;
 85:         }
 86: 
 87:         // Hook
 88:         if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
 89:             consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
 90:         }
 91: 
 92:         // 消費結果狀態爲空時,則設置爲稍後從新消費
 93:         if (null == status) {
 94:             log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
 95:                 ConsumeMessageConcurrentlyService.this.consumerGroup,
 96:                 msgs,
 97:                 messageQueue);
 98:             status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
 99:         }
100: 
101:         // Hook
102:         if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
103:             consumeMessageContext.setStatus(status.toString());
104:             consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
105:             ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
106:         }
107: 
108:         // 統計
109:         ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
110:             .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
111: 
112:         // 處理消費結果
113:         if (!processQueue.isDropped()) {
114:             ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
115:         } else {
116:             log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
117:         }
118:     }
119: 
120: }複製代碼
  • 說明 :消費請求。提交請求執行消費。
  • 第 24 至 28 行 :廢棄處理隊列不進行消費。
  • 第 34 至 44 行 :Hook。
  • 第 51 行 :當消息爲重試消息,設置 Topic爲原始 Topic。例如:原始 TopicTopicTest,重試時 Topic%RETRY%please_rename_unique_group_name_4,通過該方法,Topic 設置回 TopicTest
  • 第 53 至 58 行 :設置開始消費時間。
  • 第 61 行 :進行消費
  • 第 71 至 85 行 :解析消費返回結果類型
  • 第 87 至 90 行 :Hook
  • 第 92 至 99 行 :消費結果狀態未空時,則設置消費結果狀態爲稍後消費。
  • 第 101 至 106 行 :Hook
  • 第 108 至 110 行 :統計。
  • 第 112 至 117 行 :處理消費結果。若是消費處理隊列被移除,剛好消息被消費,則可能致使消息重複消費,所以,消息消費要盡最大可能性實現冪等性。詳細解析見:ConsumeMessageConcurrentlyService#processConsumeResult(...)

ConsumeMessageConcurrentlyService#processConsumeResult(...)

1: public void processConsumeResult(// 2: final ConsumeConcurrentlyStatus status, // 3: final ConsumeConcurrentlyContext context, // 4: final ConsumeRequest consumeRequest// 5: ) {
  6:     int ackIndex = context.getAckIndex();
  7: 
  8:     // 消息爲空,直接返回
  9:     if (consumeRequest.getMsgs().isEmpty())
 10:         return;
 11: 
 12:     // 計算從consumeRequest.msgs[0]到consumeRequest.msgs[ackIndex]的消息消費成功
 13:     switch (status) {
 14:         case CONSUME_SUCCESS:
 15:             if (ackIndex >= consumeRequest.getMsgs().size()) {
 16:                 ackIndex = consumeRequest.getMsgs().size() - 1;
 17:             }
 18:             // 統計成功/失敗數量
 19:             int ok = ackIndex + 1;
 20:             int failed = consumeRequest.getMsgs().size() - ok;
 21:             this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
 22:             this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
 23:             break;
 24:         case RECONSUME_LATER:
 25:             ackIndex = -1;
 26:             // 統計成功/失敗數量
 27:             this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
 28:                 consumeRequest.getMsgs().size());
 29:             break;
 30:         default:
 31:             break;
 32:     }
 33: 
 34:     // 處理消費失敗的消息
 35:     switch (this.defaultMQPushConsumer.getMessageModel()) {
 36:         case BROADCASTING: // 廣播模式,不管是否消費失敗,不發回消息到Broker,只打印Log
 37:             for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 38:                 MessageExt msg = consumeRequest.getMsgs().get(i);
 39:                 log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
 40:             }
 41:             break;
 42:         case CLUSTERING:
 43:             // 發回消息失敗到Broker。
 44:             List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());
 45:             for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 46:                 MessageExt msg = consumeRequest.getMsgs().get(i);
 47:                 boolean result = this.sendMessageBack(msg, context);
 48:                 if (!result) {
 49:                     msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
 50:                     msgBackFailed.add(msg);
 51:                 }
 52:             }
 53: 
 54:             // 發回Broker失敗的消息,直接提交延遲從新消費
 55:             if (!msgBackFailed.isEmpty()) {
 56:                 consumeRequest.getMsgs().removeAll(msgBackFailed);
 57: 
 58:                 this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
 59:             }
 60:             break;
 61:         default:
 62:             break;
 63:     }
 64: 
 65:     // 移除消費成功消息,並更新最新消費進度
 66:     long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 67:     if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
 68:         this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 69:     }
 70: }複製代碼
  • 說明 :處理消費結果。
  • 第 8 至 10 行 :消費請求消息未空時,直接返回。
  • 第 12 至 32 行 :計算 ackIndex 值。consumeRequest.msgs[0 - ackIndex]爲消費成功,須要進行 ack 確認。
    • 第 14 至 23 行 :CONSUME_SUCCESSackIndex = context.getAckIndex()
    • 第 24 至 29 行 :RECONSUME_LATERackIndex = -1
  • 第34 至 63 行 :處理消費失敗的消息。
    • 第 36 至 41 行 :BROADCASTING :廣播模式,不管是否消費失敗,不發回消息到 Broker,只打印日誌。
    • 第 42 至 60 行 :CLUSTERING :集羣模式,消費失敗的消息發回到 Broker
      • 第 43 至 52 行 :發回消費失敗的消息到 Broker。詳細解析見:DefaultMQPushConsumerImpl#sendMessageBack(...)
      • 第 54 至 59 行 :發回 Broker 失敗的消息,直接提交延遲從新消費。
      • 若是發回 Broker 成功,結果由於例如網絡異常,致使 Consumer覺得發回失敗,斷定消費發回失敗,會致使消息重複消費,所以,消息消費要盡最大可能性實現冪等性。
  • 第 65 至 69 行 :移除【消費成功】【消費失敗但發回Broker成功】的消息,並更新最新消費進度。

ProcessQueue#removeMessage(...)

1: /** 2: * 移除消息,並返回第一條消息隊列位置 3: * 4: * @param msgs 消息 5: * @return 消息隊列位置 6: */
  7: public long removeMessage(final List<MessageExt> msgs) {
  8:     long result = -1;
  9:     final long now = System.currentTimeMillis();
 10:     try {
 11:         this.lockTreeMap.writeLock().lockInterruptibly();
 12:         this.lastConsumeTimestamp = now;
 13:         try {
 14:             if (!msgTreeMap.isEmpty()) {
 15:                 result = this.queueOffsetMax + 1; // 這裏+1的緣由是:若是msgTreeMap爲空時,下一條得到的消息位置爲queueOffsetMax+1
 16: 
 17:                 // 移除消息
 18:                 int removedCnt = 0;
 19:                 for (MessageExt msg : msgs) {
 20:                     MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
 21:                     if (prev != null) {
 22:                         removedCnt--;
 23:                     }
 24:                 }
 25:                 msgCount.addAndGet(removedCnt);
 26: 
 27:                 if (!msgTreeMap.isEmpty()) {
 28:                     result = msgTreeMap.firstKey();
 29:                 }
 30:             }
 31:         } finally {
 32:             this.lockTreeMap.writeLock().unlock();
 33:         }
 34:     } catch (Throwable t) {
 35:         log.error("removeMessage exception", t);
 36:     }
 37: 
 38:     return result;
 39: }複製代碼

ConsumeMessageConcurrentlyService#cleanExpireMsg(...)

1: public void start() {
  2:     this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
  3: 
  4:         @Override
  5:         public void run() {
  6:             cleanExpireMsg();
  7:         }
  8: 
  9:     }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
 10: }
 11: 
 12: /** 13: * 清理過時消息 14: */
 15: private void cleanExpireMsg() {
 16:     Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
 17:         this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
 18:     while (it.hasNext()) {
 19:         Map.Entry<MessageQueue, ProcessQueue> next = it.next();
 20:         ProcessQueue pq = next.getValue();
 21:         pq.cleanExpiredMsg(this.defaultMQPushConsumer);
 22:     }
 23: }複製代碼
  • 說明 :定時清理過時消息,默認週期:15min。

ProcessQueue#cleanExpiredMsg(...)

1: public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
  2:     // 順序消費時,直接返回
  3:     if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
  4:         return;
  5:     }
  6: 
  7:     // 循環移除消息
  8:     int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; // 每次循環最多移除16條
  9:     for (int i = 0; i < loop; i++) {
 10:         // 獲取第一條消息。判斷是否超時,若不超時,則結束循環
 11:         MessageExt msg = null;
 12:         try {
 13:             this.lockTreeMap.readLock().lockInterruptibly();
 14:             try {
 15:                 if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
 16:                     msg = msgTreeMap.firstEntry().getValue();
 17:                 } else {
 18:                     break;
 19:                 }
 20:             } finally {
 21:                 this.lockTreeMap.readLock().unlock();
 22:             }
 23:         } catch (InterruptedException e) {
 24:             log.error("getExpiredMsg exception", e);
 25:         }
 26: 
 27:         try {
 28:             // 發回超時消息
 29:             pushConsumer.sendMessageBack(msg, 3);
 30:             log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
 31: 
 32:             // 判斷此時消息是否依然是第一條,如果,則進行移除
 33:             try {
 34:                 this.lockTreeMap.writeLock().lockInterruptibly();
 35:                 try {
 36:                     if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
 37:                         try {
 38:                             msgTreeMap.remove(msgTreeMap.firstKey());
 39:                         } catch (Exception e) {
 40:                             log.error("send expired msg exception", e);
 41:                         }
 42:                     }
 43:                 } finally {
 44:                     this.lockTreeMap.writeLock().unlock();
 45:                 }
 46:             } catch (InterruptedException e) {
 47:                 log.error("getExpiredMsg exception", e);
 48:             }
 49:         } catch (Exception e) {
 50:             log.error("send expired msg exception", e);
 51:         }
 52:     }
 53: }複製代碼
  • 說明 :移除過時消息。
  • 第 2 至 5 行 :順序消費時,直接返回。
  • 第 7 至 9 行 :循環移除消息。默認最大循環次數:16次。
  • 第 10 至 25 行 :獲取第一條消息。判斷是否超時,若不超時,則結束循環。
  • 第 29 行 :發回超時消息到Broker
  • 第 32 至 48 行 :判斷此時消息是否依然是第一條,如果,則進行移除。

七、PushConsumer 發回消費失敗消息

DefaultMQPushConsumerImpl#sendMessageBack(...)

1: public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) 2: throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
  3:     try {
  4:         // Consumer發回消息
  5:         String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
  6:             : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
  7:         this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
  8:             this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
  9:     } catch (Exception e) { // TODO 疑問:什麼狀況下會發生異常
 10:         // 異常時,使用Client內置Producer發回消息
 11:         log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
 12: 
 13:         Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
 14: 
 15:         String originMsgId = MessageAccessor.getOriginMessageId(msg);
 16:         MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
 17: 
 18:         newMsg.setFlag(msg.getFlag());
 19:         MessageAccessor.setProperties(newMsg, msg.getProperties());
 20:         MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
 21:         MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
 22:         MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
 23:         newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
 24: 
 25:         this.mQClientFactory.getDefaultMQProducer().send(newMsg);
 26:     }
 27: }複製代碼
  • 說明 :發回消息。
  • 第 4 至 8 行 :Consumer 發回消息。詳細解析見:MQClientAPIImpl#consumerSendMessageBack(...)
  • 第 10 至 25 行 :發生異常時,Consumer 內置默認 Producer 發送消息。
    • 😈疑問:什麼樣的狀況下會發生異常呢?

MQClientAPIImpl#consumerSendMessageBack(...)

1: /** 2: * Consumer發回消息 3: * @param addr Broker地址 4: * @param msg 消息 5: * @param consumerGroup 消費分組 6: * @param delayLevel 延遲級別 7: * @param timeoutMillis 超時 8: * @param maxConsumeRetryTimes 消費最大重試次數 9: * @throws RemotingException 當遠程調用發生異常時 10: * @throws MQBrokerException 當Broker發生異常時 11: * @throws InterruptedException 當線程中斷時 12: */
 13: public void consumerSendMessageBack( 14: final String addr, 15: final MessageExt msg, 16: final String consumerGroup, 17: final int delayLevel, 18: final long timeoutMillis, 19: final int maxConsumeRetryTimes 20: ) throws RemotingException, MQBrokerException, InterruptedException {
 21:     ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
 22:     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
 23: 
 24:     requestHeader.setGroup(consumerGroup);
 25:     requestHeader.setOriginTopic(msg.getTopic());
 26:     requestHeader.setOffset(msg.getCommitLogOffset());
 27:     requestHeader.setDelayLevel(delayLevel);
 28:     requestHeader.setOriginMsgId(msg.getMsgId());
 29:     requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
 30: 
 31:     RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
 32:         request, timeoutMillis);
 33:     assert response != null;
 34:     switch (response.getCode()) {
 35:         case ResponseCode.SUCCESS: {
 36:             return;
 37:         }
 38:         default:
 39:             break;
 40:     }
 41: 
 42:     throw new MQBrokerException(response.getCode(), response.getRemark());
 43: }複製代碼

八、Consumer 消費進度

OffsetStore

OffsetStore類圖.png
OffsetStore類圖.png

  • RemoteBrokerOffsetStoreConsumer 集羣模式 下,使用遠程 Broker 消費進度。
  • LocalFileOffsetStoreConsumer 廣播模式下,使用本地 文件 消費進度。

OffsetStore#load(...)

LocalFileOffsetStore#load(...)

1: @Override
  2: public void load() throws MQClientException {
  3:     // 從本地硬盤讀取消費進度
  4:     OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
  5:     if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
  6:         offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
  7: 
  8:         // 打印每一個消息隊列的消費進度
  9:         for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
 10:             AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
 11:             log.info("load consumer's offset, {} {} {}",
 12:                 this.groupName,
 13:                 mq,
 14:                 offset.get());
 15:         }
 16:     }
 17: }複製代碼
  • 說明 :從本地文件加載消費進度到內存。
OffsetSerializeWrapper
1: public class OffsetSerializeWrapper extends RemotingSerializable {
  2:     private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
  3:             new ConcurrentHashMap<>();
  4: 
  5:     public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
  6:         return offsetTable;
  7:     }
  8: 
  9:     public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) {
 10:         this.offsetTable = offsetTable;
 11:     }
 12: }複製代碼
  • 說明 :本地 Offset 存儲序列化。
Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
    "offsetTable":{{
            "brokerName":"broker-a",
            "queueId":3,
            "topic":"TopicTest"
        }:1470,{
            "brokerName":"broker-a",
            "queueId":2,
            "topic":"TopicTest"
        }:1471,{
            "brokerName":"broker-a",
            "queueId":1,
            "topic":"TopicTest"
        }:1470,{
            "brokerName":"broker-a",
            "queueId":0,
            "topic":"TopicTest"
        }:1470
    }
}複製代碼

RemoteBrokerOffsetStore#load(...)

1: @Override
  2: public void load() {
  3: }複製代碼
  • 說明 :不進行加載,實際讀取消費進度時,從 Broker 獲取。

OffsetStore#readOffset(...)

讀取消費進度類型:

  • READ_FROM_MEMORY :從內存讀取。
  • READ_FROM_STORE :從存儲( Broker文件 )讀取。
  • MEMORY_FIRST_THEN_STORE :優先從內存讀取,讀取不到,從存儲讀取。

LocalFileOffsetStore#readOffset(...)

1: @Override
  2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
  3:     if (mq != null) {
  4:         switch (type) {
  5:             case MEMORY_FIRST_THEN_STORE:
  6:             case READ_FROM_MEMORY: {
  7:                 AtomicLong offset = this.offsetTable.get(mq);
  8:                 if (offset != null) {
  9:                     return offset.get();
 10:                 } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
 11:                     return -1;
 12:                 }
 13:             }
 14:             case READ_FROM_STORE: {
 15:                 OffsetSerializeWrapper offsetSerializeWrapper;
 16:                 try {
 17:                     offsetSerializeWrapper = this.readLocalOffset();
 18:                 } catch (MQClientException e) {
 19:                     return -1;
 20:                 }
 21:                 if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
 22:                     AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
 23:                     if (offset != null) {
 24:                         this.updateOffset(mq, offset.get(), false);
 25:                         return offset.get();
 26:                     }
 27:                 }
 28:             }
 29:             default:
 30:                 break;
 31:         }
 32:     }
 33: 
 34:     return -1;
 35: }複製代碼
  • 第 16 行 :從 文件 讀取消費進度。

RemoteBrokerOffsetStore#readOffset(...)

1: @Override
  2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
  3:     if (mq != null) {
  4:         switch (type) {
  5:             case MEMORY_FIRST_THEN_STORE:
  6:             case READ_FROM_MEMORY: {
  7:                 AtomicLong offset = this.offsetTable.get(mq);
  8:                 if (offset != null) {
  9:                     return offset.get();
 10:                 } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
 11:                     return -1;
 12:                 }
 13:             }
 14:             case READ_FROM_STORE: {
 15:                 try {
 16:                     long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
 17:                     AtomicLong offset = new AtomicLong(brokerOffset);
 18:                     this.updateOffset(mq, offset.get(), false);
 19:                     return brokerOffset;
 20:                 }
 21:                 // No offset in broker
 22:                 catch (MQBrokerException e) {
 23:                     return -1;
 24:                 }
 25:                 //Other exceptions
 26:                 catch (Exception e) {
 27:                     log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
 28:                     return -2;
 29:                 }
 30:             }
 31:             default:
 32:                 break;
 33:         }
 34:     }
 35: 
 36:     return -1;
 37: }複製代碼
  • 第 16 行 :從 Broker 讀取消費進度。

OffsetStore#updateOffset(...)

該方法 RemoteBrokerOffsetStoreLocalFileOffsetStore 實現相同。

1: @Override
  2: public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
  3:     if (mq != null) {
  4:         AtomicLong offsetOld = this.offsetTable.get(mq);
  5:         if (null == offsetOld) {
  6:             offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
  7:         }
  8: 
  9:         if (null != offsetOld) {
 10:             if (increaseOnly) {
 11:                 MixAll.compareAndIncreaseOnly(offsetOld, offset);
 12:             } else {
 13:                 offsetOld.set(offset);
 14:             }
 15:         }
 16:     }
 17: }複製代碼

OffsetStore#persistAll(...)

LocalFileOffsetStore#persistAll(...)

1: @Override
  2: public void persistAll(Set<MessageQueue> mqs) {
  3:     if (null == mqs || mqs.isEmpty())
  4:         return;
  5: 
  6:     OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
  7:     for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
  8:         if (mqs.contains(entry.getKey())) {
  9:             AtomicLong offset = entry.getValue();
 10:             offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
 11:         }
 12:     }
 13: 
 14:     String jsonString = offsetSerializeWrapper.toJson(true);
 15:     if (jsonString != null) {
 16:         try {
 17:             MixAll.string2File(jsonString, this.storePath);
 18:         } catch (IOException e) {
 19:             log.error("persistAll consumer offset Exception, " + this.storePath, e);
 20:         }
 21:     }
 22: }複製代碼
  • 說明 :持久化消費進度。將消費進度寫入文件

RemoteBrokerOffsetStore#persistAll(...)

1: @Override
  2: public void persistAll(Set<MessageQueue> mqs) {
  3:     if (null == mqs || mqs.isEmpty())
  4:         return;
  5: 
  6:     // 持久化消息隊列
  7:     final HashSet<MessageQueue> unusedMQ = new HashSet<>();
  8:     if (!mqs.isEmpty()) {
  9:         for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
 10:             MessageQueue mq = entry.getKey();
 11:             AtomicLong offset = entry.getValue();
 12:             if (offset != null) {
 13:                 if (mqs.contains(mq)) {
 14:                     try {
 15:                         this.updateConsumeOffsetToBroker(mq, offset.get());
 16:                         log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
 17:                             this.groupName,
 18:                             this.mQClientFactory.getClientId(),
 19:                             mq,
 20:                             offset.get());
 21:                     } catch (Exception e) {
 22:                         log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
 23:                     }
 24:                 } else {
 25:                     unusedMQ.add(mq);
 26:                 }
 27:             }
 28:         }
 29:     }
 30: 
 31:     // 移除不適用的消息隊列
 32:     if (!unusedMQ.isEmpty()) {
 33:         for (MessageQueue mq : unusedMQ) {
 34:             this.offsetTable.remove(mq);
 35:             log.info("remove unused mq, {}, {}", mq, this.groupName);
 36:         }
 37:     }
 38: }複製代碼
  • 說明 :持久化指定消息隊列數組的消費進度到 Broker,並移除非指定消息隊列。

MQClientInstance#persistAllConsumerOffset(...)

1: private void startScheduledTask() {
  2:     // 定時同步消費進度
  3:     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  4: 
  5:         @Override
  6:         public void run() {
  7:             try {
  8:                 MQClientInstance.this.cleanOfflineBroker();
  9:                 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
 10:             } catch (Exception e) {
 11:                 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
 12:             }
 13:         }
 14:     }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
 15: }複製代碼
  • 說明 :定時進行持久化,默認週期:5000ms。
  • 重要說明 :
    • 消費進度持久化不只僅只有定時持久化,拉取消息、分配消息隊列等等操做,都會進行消費進度持久化。
    • 消費進度持久化不只僅只有定時持久化,拉取消息、分配消息隊列等等操做,都會進行消費進度持久化。
    • 消費進度持久化不只僅只有定時持久化,拉取消息、分配消息隊列等等操做,都會進行消費進度持久化。

九、結尾

😈多是本系列最長的一篇文章,若有表達錯誤和不清晰,請多多見諒。感謝對本系列的閱讀、收藏、點贊、分享,特別是翻到結尾。😜真的有丟丟長。

相關文章
相關標籤/搜索