本文接:《RocketMQ 源碼分析Message 拉取與消費(上)》。json
主要解析 Consumer
在 消費 邏輯涉及到的源碼。數組
MQ 提供了兩類消費者:網絡
Push
開頭,實際在實現時,使用 Pull
方式實現。經過 Pull
不斷不斷不斷輪詢 Broker
獲取消息。當不存在新消息時,Broker
會掛起請求,直到有新消息產生,取消掛起,返回新消息。這樣,基本和 Broker
主動 Push
作到接近的實時性(固然,仍是有相應的實時性損失)。原理相似 長輪詢( Long-Polling
)。本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。
本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。
本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。併發
先看一張 PushConsumer
包含的組件以及組件之間的交互圖:app
RebalanceService
:均衡消息隊列服務,負責分配當前 Consumer
可消費的消息隊列(MessageQueue
)。當有新的 Consumer
的加入或移除,都會從新分配消息隊列。PullMessageService
:拉取消息服務,不斷不斷不斷從 Broker
拉取消息,並提交消費任務到ConsumeMessageService
。ConsumeMessageService
:消費消息服務,不斷不斷不斷消費消息,並處理消費結果。RemoteBrokerOffsetStore
:Consumer
消費進度管理,負責從 Broker
獲取消費進度,同步消費進度到 Broker
。ProcessQueue
:消息處理隊列。MQClientInstance
:封裝對 Namesrv
,Broker
的 API調用,提供給 Producer
、Consumer
使用。1: public void subscribe(String topic, String subExpression) throws MQClientException { 2: try { 3: // 建立訂閱數據 4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // 5: topic, subExpression); 6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 7: // 經過心跳同步Consumer信息到Broker 8: if (this.mQClientFactory != null) { 9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 10: } 11: } catch (Exception e) { 12: throw new MQClientException("subscription exception", e); 13: } 14: }
說明 :訂閱 Topic
。負載均衡
Consumer
信息到 Broker
。1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2: String subString) throws Exception { 3: SubscriptionData subscriptionData = new SubscriptionData(); 4: subscriptionData.setTopic(topic); 5: subscriptionData.setSubString(subString); 6: // 處理訂閱表達式 7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { 8: subscriptionData.setSubString(SubscriptionData.SUB_ALL); 9: } else { 10: String[] tags = subString.split("\\|\\|"); 11: if (tags.length > 0) { 12: for (String tag : tags) { 13: if (tag.length() > 0) { 14: String trimString = tag.trim(); 15: if (trimString.length() > 0) { 16: subscriptionData.getTagsSet().add(trimString); 17: subscriptionData.getCodeSet().add(trimString.hashCode()); 18: } 19: } 20: } 21: } else { 22: throw new Exception("subString split error"); 23: } 24: } 25: 26: return subscriptionData; 27: }
說明 :根據 Topic
和 訂閱表達式 建立訂閱數據異步
subscriptionData.subVersion = System.currentTimeMillis()。ide
1: public void registerMessageListener(MessageListenerConcurrently messageListener) { 2: this.messageListener = messageListener; 3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); 4: }
說明 :註冊消息監聽器。oop
1: public class RebalanceService extends ServiceThread { 2: 3: /** 4: * 等待間隔,單位:毫秒 5: */ 6: private static long waitInterval = 7: Long.parseLong(System.getProperty( 8: "rocketmq.client.rebalance.waitInterval", "20000")); 9: 10: private final Logger log = ClientLogger.getLog(); 11: /** 12: * MQClient對象 13: */ 14: private final MQClientInstance mqClientFactory; 15: 16: public RebalanceService(MQClientInstance mqClientFactory) { 17: this.mqClientFactory = mqClientFactory; 18: } 19: 20: @Override 21: public void run() { 22: log.info(this.getServiceName() + " service started"); 23: 24: while (!this.isStopped()) { 25: this.waitForRunning(waitInterval); 26: this.mqClientFactory.doRebalance(); 27: } 28: 29: log.info(this.getServiceName() + " service end"); 30: } 31: 32: @Override 33: public String getServiceName() { 34: return RebalanceService.class.getSimpleName(); 35: } 36: }
說明 :均衡消息隊列服務,負責分配當前 Consumer
可消費的消息隊列( MessageQueue
)。源碼分析
第 26 行 :調用 MQClientInstance#doRebalance(...)
分配消息隊列。目前有三種狀況狀況下觸發:
詳細解析見:MQClientInstance#doRebalance(…)。
第 25 行
等待超時,每 20s 調用一次。PushConsumer
啓動時,調用 rebalanceService#wakeup(...)
觸發。Broker
通知 Consumer
加入 或 移除時,Consumer
響應通知,調用rebalanceService#wakeup(...)
觸發。1: public void doRebalance() { 2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { 3: MQConsumerInner impl = entry.getValue(); 4: if (impl != null) { 5: try { 6: impl.doRebalance(); 7: } catch (Throwable e) { 8: log.error("doRebalance exception", e); 9: } 10: } 11: } 12: }
說明 :遍歷當前 Client
包含的 consumerTable
( Consumer
集合 ),執行消息隊列分配。
疑問:目前代碼調試下來,consumerTable
只包含 Consumer
本身。😈有大大對這個疑問有解答的,煩請解答下。
MQConsumerInner#doRebalance(...)
進行隊列分配。DefaultMQPushConsumerImpl
、DefaultMQPullConsumerImpl
分別對該接口方法進行了實現。DefaultMQPushConsumerImpl#doRebalance(...)
詳細解析見:DefaultMQPushConsumerImpl#doRebalance(…)。1: public void doRebalance() { 2: if (!this.pause) { 3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); 4: } 5: }
說明:執行消息隊列分配。
RebalanceImpl#doRebalance(...)
進行隊列分配。詳細解析見:RebalancePushImpl#doRebalance(…)。1: /** 2: * 執行分配消息隊列 3: * 4: * @param isOrder 是否順序消息 5: */ 6: public void doRebalance(final boolean isOrder) { 7: // 分配每一個 topic 的消息隊列 8: Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); 9: if (subTable != null) { 10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { 11: final String topic = entry.getKey(); 12: try { 13: this.rebalanceByTopic(topic, isOrder); 14: } catch (Throwable e) { 15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 16: log.warn("rebalanceByTopic Exception", e); 17: } 18: } 19: } 20: } 21: // 移除未訂閱的topic對應的消息隊列 22: this.truncateMessageQueueNotMyTopic(); 23: } 24: 25: /** 26: * 移除未訂閱的消息隊列 27: */ 28: private void truncateMessageQueueNotMyTopic() { 29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); 30: for (MessageQueue mq : this.processQueueTable.keySet()) { 31: if (!subTable.containsKey(mq.getTopic())) { 32: 33: ProcessQueue pq = this.processQueueTable.remove(mq); 34: if (pq != null) { 35: pq.setDropped(true); 36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq); 37: } 38: } 39: } 40: }
#doRebalance(...)
說明 :執行分配消息隊列。
subscriptionInner
),分配每個 Topic
的消息隊列。Topic
的消息隊列。#truncateMessageQueueNotMyTopic(...)
說明 :
移除未訂閱的消息隊列。當調用DefaultMQPushConsumer#unsubscribe(topic)
時,只移除訂閱主題集合( subscriptionInner
),對應消息隊列移除在該方法。
1: private void rebalanceByTopic(final String topic, final boolean isOrder) { 2: switch (messageModel) { 3: case BROADCASTING: { 4: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 5: if (mqSet != null) { 6: boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); 7: if (changed) { 8: this.messageQueueChanged(topic, mqSet, mqSet); 9: log.info("messageQueueChanged {} {} {} {}", // 10: consumerGroup, // 11: topic, // 12: mqSet, // 13: mqSet); 14: } 15: } else { 16: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 17: } 18: break; 19: } 20: case CLUSTERING: { 21: // 獲取 topic 對應的 隊列 和 consumer信息 22: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 23: List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 24: if (null == mqSet) { 25: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 26: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 27: } 28: } 29: 30: if (null == cidAll) { 31: log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 32: } 33: 34: if (mqSet != null && cidAll != null) { 35: // 排序 消息隊列 和 消費者數組。由於是在Client進行分配隊列,排序後,各Client的順序才能保持一致。 36: List<MessageQueue> mqAll = new ArrayList<>(); 37: mqAll.addAll(mqSet); 38: 39: Collections.sort(mqAll); 40: Collections.sort(cidAll); 41: 42: AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 43: 44: // 根據 隊列分配策略 分配消息隊列 45: List<MessageQueue> allocateResult; 46: try { 47: allocateResult = strategy.allocate(// 48: this.consumerGroup, // 49: this.mQClientFactory.getClientId(), // 50: mqAll, // 51: cidAll); 52: } catch (Throwable e) { 53: log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 54: e); 55: return; 56: } 57: 58: Set<MessageQueue> allocateResultSet = new HashSet<>(); 59: if (allocateResult != null) { 60: allocateResultSet.addAll(allocateResult); 61: } 62: 63: // 更新消息隊列 64: boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 65: if (changed) { 66: log.info( 67: "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 68: strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 69: allocateResultSet.size(), allocateResultSet); 70: this.messageQueueChanged(topic, mqSet, allocateResultSet); 71: } 72: } 73: break; 74: } 75: default: 76: break; 77: } 78: } 79: 80: /** 81: * 當負載均衡時,更新 消息處理隊列 82: * - 移除 在processQueueTable && 不存在於 mqSet 裏的消息隊列 83: * - 增長 不在processQueueTable && 存在於mqSet 裏的消息隊列 84: * 85: * @param topic Topic 86: * @param mqSet 負載均衡結果後的消息隊列數組 87: * @param isOrder 是否順序 88: * @return 是否變動 89: */ 90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { 91: boolean changed = false; 92: 93: // 移除 在processQueueTable && 不存在於 mqSet 裏的消息隊列 94: Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); 95: while (it.hasNext()) { // TODO 待讀: 96: Entry<MessageQueue, ProcessQueue> next = it.next(); 97: MessageQueue mq = next.getKey(); 98: ProcessQueue pq = next.getValue(); 99: 100: if (mq.getTopic().equals(topic)) { 101: if (!mqSet.contains(mq)) { // 不包含的隊列 102: pq.setDropped(true); 103: if (this.removeUnnecessaryMessageQueue(mq, pq)) { 104: it.remove(); 105: changed = true; 106: log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); 107: } 108: } else if (pq.isPullExpired()) { // 隊列拉取超時,進行清理 109: switch (this.consumeType()) { 110: case CONSUME_ACTIVELY: 111: break; 112: case CONSUME_PASSIVELY: 113: pq.setDropped(true); 114: if (this.removeUnnecessaryMessageQueue(mq, pq)) { 115: it.remove(); 116: changed = true; 117: log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", 118: consumerGroup, mq); 119: } 120: break; 121: default: 122: break; 123: } 124: } 125: } 126: } 127: 128: // 增長 不在processQueueTable && 存在於mqSet 裏的消息隊列。 129: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數組 130: for (MessageQueue mq : mqSet) { 131: if (!this.processQueueTable.containsKey(mq)) { 132: if (isOrder && !this.lock(mq)) { 133: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); 134: continue; 135: } 136: 137: this.removeDirtyOffset(mq); 138: ProcessQueue pq = new ProcessQueue(); 139: long nextOffset = this.computePullFromWhere(mq); 140: if (nextOffset >= 0) { 141: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); 142: if (pre != null) { 143: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); 144: } else { 145: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); 146: PullRequest pullRequest = new PullRequest(); 147: pullRequest.setConsumerGroup(consumerGroup); 148: pullRequest.setNextOffset(nextOffset); 149: pullRequest.setMessageQueue(mq); 150: pullRequest.setProcessQueue(pq); 151: pullRequestList.add(pullRequest); 152: changed = true; 153: } 154: } else { 155: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); 156: } 157: } 158: } 159: 160: // 發起消息拉取請求 161: this.dispatchPullRequest(pullRequestList); 162: 163: return changed; 164: }
#rebalanceByTopic(...)
說明 :分配 Topic
的消息隊列。
BROADCASTING
) 下,分配 Topic
對應的全部消息隊列。CLUSTERING
) 下,分配 Topic
對應的部分消息隊列。
Topic
對應的消息隊列和消費者們,並對其進行排序。由於各 Consumer
是在本地分配消息隊列,排序後才能保證各Consumer
順序一致。AllocateMessageQueueStrategy
) 分配消息隊列。詳細解析見:AllocateMessageQueueStrategy。Topic
對應的消息隊列。#updateProcessQueueTableInRebalance(...)
說明 :當分配隊列時,更新 Topic
對應的消息隊列,並返回是否有變動。
mqSet
) 的 消息處理隊列(processQueueTable
)。
當前時間 - 最後一次拉取消息時間 > 120s
( 120s 可配置),斷定發生BUG,太久未進行消息拉取,移除消息隊列。移除後,下面#新增隊列邏輯#能夠從新加入新的該消息隊列。mqSet
) 新增的消息隊列。
順序消費
相關跳過,詳細解析見:《RocketMQ 源碼分析Message 順序發送與消費》。1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { 2: // 同步隊列的消費進度,並移除之。 3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); 4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); 5: // TODO 順序消費 6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly() 7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { 8: try { 9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { 10: try { 11: return this.unlockDelay(mq, pq); 12: } finally { 13: pq.getLockConsume().unlock(); 14: } 15: } else { 16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // 17: mq, // 18: pq.getTryUnlockTimes()); 19: 20: pq.incTryUnlockTimes(); 21: } 22: } catch (Exception e) { 23: log.error("removeUnnecessaryMessageQueue Exception", e); 24: } 25: 26: return false; 27: } 28: return true; 29: }
順序消費
相關跳過,詳細解析見:《RocketMQ 源碼分析Message 順序發送與消費》。[PullConsumer]
RebalancePullImpl#removeUnnecessaryMessageQueue(…)1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { 2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq); 3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); 4: return true; 5: }
說明 :移除不須要的消息隊列相關的信息,並返回移除成功,
和RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致。
1: public void dispatchPullRequest(List<PullRequest> pullRequestList) { 2: for (PullRequest pullRequest : pullRequestList) { 3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); 4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); 5: } 6: }
說明 :發起消息拉取請求。該調用是PushConsumer
不斷不斷不斷拉取消息的起點。
1: public void executePullRequestImmediately(final PullRequest pullRequest) { 2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); 3: }
說明 :提交拉取請求。提交後,PullMessageService
異步執行,非阻塞。詳細解析見:PullMessageService。
1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { 2: private final Logger log = ClientLogger.getLog(); 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) { 7: // 校驗參數是否正確 8: if (currentCID == null || currentCID.length() < 1) { 9: throw new IllegalArgumentException("currentCID is empty"); 10: } 11: if (mqAll == null || mqAll.isEmpty()) { 12: throw new IllegalArgumentException("mqAll is null or mqAll empty"); 13: } 14: if (cidAll == null || cidAll.isEmpty()) { 15: throw new IllegalArgumentException("cidAll is null or cidAll empty"); 16: } 17: 18: List<MessageQueue> result = new ArrayList<>(); 19: if (!cidAll.contains(currentCID)) { 20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 21: consumerGroup, 22: currentCID, 23: cidAll); 24: return result; 25: } 26: // 平均分配 27: int index = cidAll.indexOf(currentCID); // 第幾個consumer。 28: int mod = mqAll.size() % cidAll.size(); // 餘數,即多少消息隊列沒法平均分配。 29: int averageSize = 30: mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() 31: + 1 : mqAll.size() / cidAll.size()); 32: int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有餘數的狀況下,[0, mod) 平分餘數,即每consumer多分配一個節點;第index開始,跳過前mod餘數。 33: int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配隊列數量。之因此要Math.min()的緣由是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息隊列。 34: for (int i = 0; i < range; i++) { 35: result.add(mqAll.get((startIndex + i) % mqAll.size())); 36: } 37: return result; 38: } 39: 40: @Override 41: public String getName() { 42: return "AVG"; 43: } 44: }
說明 :平均分配隊列策略。
index
:當前 Consumer
在消費集羣裏是第幾個。這裏就是爲何須要對傳入的 cidAll
參數必須進行排序的緣由。若是不排序,Consumer
在本地計算出來的 index
沒法一致,影響計算結果。mod
:餘數,即多少消息隊列沒法平均分配。averageSize
:代碼能夠簡化成(mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
。
[ 0, mod )
:mqAll.size() / cidAll.size() + 1
。前面 mod
個Consumer
平分餘數,多得到 1 個消息隊列。[ mod, cidAll.size() )
:mqAll.size() / cidAll.size()
。startIndex
:Consumer
分配消息隊列開始位置。range
:分配隊列數量。之因此要 Math#min(...)
的緣由:當mqAll.size() <= cidAll.size()
時,最後幾個 Consumer
分配不到消息隊列。舉個例子:
固定消息隊列長度爲4。
Consumer 2 能夠整除* | Consumer 3 不可整除* | Consumer 5 沒法都分配* | |
---|---|---|---|
消息隊列[0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息隊列[1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息隊列[2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息隊列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy { 2: /** 3: * 消費者消費brokerName集合 4: */ 5: private Set<String> consumeridcs; 6: 7: @Override 8: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 9: List<String> cidAll) { 10: // 參數校驗 11: List<MessageQueue> result = new ArrayList<MessageQueue>(); 12: int currentIndex = cidAll.indexOf(currentCID); 13: if (currentIndex < 0) { 14: return result; 15: } 16: // 計算符合當前配置的消費者數組('consumeridcs')對應的消息隊列 17: List<MessageQueue> premqAll = new ArrayList<MessageQueue>(); 18: for (MessageQueue mq : mqAll) { 19: String[] temp = mq.getBrokerName().split("@"); 20: if (temp.length == 2 && consumeridcs.contains(temp[0])) { 21: premqAll.add(mq); 22: } 23: } 24: // 平均分配 25: int mod = premqAll.size() / cidAll.size(); 26: int rem = premqAll.size() % cidAll.size(); 27: int startIndex = mod * currentIndex; 28: int endIndex = startIndex + mod; 29: for (int i = startIndex; i < endIndex; i++) { 30: result.add(mqAll.get(i)); 31: } 32: if (rem > currentIndex) { 33: result.add(premqAll.get(currentIndex + mod * cidAll.size())); 34: } 35: return result; 36: } 37: 38: @Override 39: public String getName() { 40: return "MACHINE_ROOM"; 41: } 42: 43: public Set<String> getConsumeridcs() { 44: return consumeridcs; 45: } 46: 47: public void setConsumeridcs(Set<String> consumeridcs) { 48: this.consumeridcs = consumeridcs; 49: } 50: }
說明 :平均分配可消費的 Broker
對應的消息隊列。
Broker
對應的消息隊列。AllocateMessageQueueAveragely
略有不一樣,其是將多餘的結尾部分分配給前 rem
個 Consumer
。Consumer
和 Broker
分配須要怎麼配置。😈等研究主從相關源碼時,仔細考慮下。1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { 2: private final Logger log = ClientLogger.getLog(); 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) { 7: // 校驗參數是否正確 8: if (currentCID == null || currentCID.length() < 1) { 9: throw new IllegalArgumentException("currentCID is empty"); 10: } 11: if (mqAll == null || mqAll.isEmpty()) { 12: throw new IllegalArgumentException("mqAll is null or mqAll empty"); 13: } 14: if (cidAll == null || cidAll.isEmpty()) { 15: throw new IllegalArgumentException("cidAll is null or cidAll empty"); 16: } 17: 18: List<MessageQueue> result = new ArrayList<MessageQueue>(); 19: if (!cidAll.contains(currentCID)) { 20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 21: consumerGroup, 22: currentCID, 23: cidAll); 24: return result; 25: } 26: 27: // 環狀分配 28: int index = cidAll.indexOf(currentCID); 29: for (int i = index; i < mqAll.size(); i++) { 30: if (i % cidAll.size() == index) { 31: result.add(mqAll.get(i)); 32: } 33: } 34: return result; 35: } 36: 37: @Override 38: public String getName() { 39: return "AVG_BY_CIRCLE"; 40: } 41: }
說明 :環狀分配消息隊列。
1: public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { 2: private List<MessageQueue> messageQueueList; 3: 4: @Override 5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) { 7: return this.messageQueueList; 8: } 9: 10: @Override 11: public String getName() { 12: return "CONFIG"; 13: } 14: 15: public List<MessageQueue> getMessageQueueList() { 16: return messageQueueList; 17: } 18: 19: public void setMessageQueueList(List<MessageQueue> messageQueueList) { 20: this.messageQueueList = messageQueueList; 21: } 22: }
說明 :分配配置的消息隊列。
疑問 :該分配策略的使用場景。
1: public long computePullFromWhere(MessageQueue mq) { 2: long result = -1; 3: final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); 4: final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); 5: switch (consumeFromWhere) { 6: case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: // 廢棄 7: case CONSUME_FROM_MIN_OFFSET: // 廢棄 8: case CONSUME_FROM_MAX_OFFSET: // 廢棄 9: case CONSUME_FROM_LAST_OFFSET: { 10: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 11: if (lastOffset >= 0) { 12: result = lastOffset; 13: } 14: // First start,no offset 15: else if (-1 == lastOffset) { 16: if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 17: result = 0L; 18: } else { 19: try { 20: result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); 21: } catch (MQClientException e) { 22: result = -1; 23: } 24: } 25: } else { 26: result = -1; 27: } 28: break; 29: } 30: case CONSUME_FROM_FIRST_OFFSET: { 31: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 32: if (lastOffset >= 0) { 33: result = lastOffset; 34: } else if (-1 == lastOffset) { 35: result = 0L; 36: } else { 37: result = -1; 38: } 39: break; 40: } 41: case CONSUME_FROM_TIMESTAMP: { 42: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); 43: if (lastOffset >= 0) { 44: result = lastOffset; 45: } else if (-1 == lastOffset) { 46: if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 47: try { 48: result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); 49: } catch (MQClientException e) { 50: result = -1; 51: } 52: } else { 53: try { 54: long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), 55: UtilAll.YYYY_MMDD_HHMMSS).getTime(); 56: result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); 57: } catch (MQClientException e) { 58: result = -1; 59: } 60: } 61: } else { 62: result = -1; 63: } 64: break; 65: } 66: 67: default: 68: break; 69: } 70: 71: return result; 72: }
說明 :計算消息隊列開始消費位置。
PushConsumer
讀取消費進度有三種選項:
CONSUME_FROM_LAST_OFFSET
:第 6 至 29 行 :一個新的消費集羣第一次啓動從隊列的最後位置開始消費。後續再啓動接着上次消費的進度開始消費。CONSUME_FROM_FIRST_OFFSET
:第 30 至 40 行 :一個新的消費集羣第一次啓動從隊列的最前位置開始消費。後續再啓動接着上次消費的進度開始消費。CONSUME_FROM_TIMESTAMP
:第 41 至 65 行 :一個新的消費集羣第一次啓動從指定時間點開始消費。後續再啓動接着上次消費的進度開始消費。[PullConsumer]
RebalancePullImpl#computePullFromWhere(…)暫時跳過。
1: public class PullMessageService extends ServiceThread { 2: private final Logger log = ClientLogger.getLog(); 3: /** 4: * 拉取消息請求隊列 5: */ 6: private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<>(); 7: /** 8: * MQClient對象 9: */ 10: private final MQClientInstance mQClientFactory; 11: /** 12: * 定時器。用於延遲提交拉取請求 13: */ 14: private final ScheduledExecutorService scheduledExecutorService = Executors 15: .newSingleThreadScheduledExecutor(new ThreadFactory() { 16: @Override 17: public Thread newThread(Runnable r) { 18: return new Thread(r, "PullMessageServiceScheduledThread"); 19: } 20: }); 21: 22: public PullMessageService(MQClientInstance mQClientFactory) { 23: this.mQClientFactory = mQClientFactory; 24: } 25: 26: /** 27: * 執行延遲拉取消息請求 28: * 29: * @param pullRequest 拉取消息請求 30: * @param timeDelay 延遲時長 31: */ 32: public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { 33: this.scheduledExecutorService.schedule(new Runnable() { 34: 35: @Override 36: public void run() { 37: PullMessageService.this.executePullRequestImmediately(pullRequest); 38: } 39: }, timeDelay, TimeUnit.MILLISECONDS); 40: } 41: 42: /** 43: * 執行當即拉取消息請求 44: * 45: * @param pullRequest 拉取消息請求 46: */ 47: public void executePullRequestImmediately(final PullRequest pullRequest) { 48: try { 49: this.pullRequestQueue.put(pullRequest); 50: } catch (InterruptedException e) { 51: log.error("executePullRequestImmediately pullRequestQueue.put", e); 52: } 53: } 54: 55: /** 56: * 執行延遲任務 57: * 58: * @param r 任務 59: * @param timeDelay 延遲時長 60: */ 61: public void executeTaskLater(final Runnable r, final long timeDelay) { 62: this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); 63: } 64: 65: public ScheduledExecutorService getScheduledExecutorService() { 66: return scheduledExecutorService; 67: } 68: 69: /** 70: * 拉取消息 71: * 72: * @param pullRequest 拉取消息請求 73: */ 74: private void pullMessage(final PullRequest pullRequest) { 75: final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); 76: if (consumer != null) { 77: DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; 78: impl.pullMessage(pullRequest); 79: } else { 80: log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); 81: } 82: } 83: 84: @Override 85: public void run() { 86: log.info(this.getServiceName() + " service started"); 87: 88: while (!this.isStopped()) { 89: try { 90: PullRequest pullRequest = this.pullRequestQueue.take(); 91: if (pullRequest != null) { 92: this.pullMessage(pullRequest); 93: } 94: } catch (InterruptedException e) { 95: } catch (Exception e) { 96: log.error("Pull Message Service Run Method exception", e); 97: } 98: } 99: 100: log.info(this.getServiceName() + " service end"); 101: } 102: 103: @Override 104: public String getServiceName() { 105: return PullMessageService.class.getSimpleName(); 106: } 107: 108: }
說明 :拉取消息服務,不斷不斷不斷從 Broker
拉取消息,並提交消費任務到ConsumeMessageService
。
#executePullRequestLater(...)
:第 26 至 40 行 : 提交延遲拉取消息請求。#executePullRequestImmediately(...)
:第 42 至 53 行 :提交當即拉取消息請求。#executeTaskLater(...)
:第 55 至 63 行 :提交延遲任務。#pullMessage(...)
:第 69 至 82 行 :執行拉取消息邏輯。詳細解析見:DefaultMQPushConsumerImpl#pullMessage(…)。#run(...)
:第 84 至 101 行 :循環拉取消息請求隊列( pullRequestQueue
),進行消息拉取。1: public void pullMessage(final PullRequest pullRequest) { 2: final ProcessQueue processQueue = pullRequest.getProcessQueue(); 3: if (processQueue.isDropped()) { 4: log.info("the pull request[{}] is dropped.", pullRequest.toString()); 5: return; 6: } 7: 8: // 設置隊列最後拉取消息時間 9: pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); 10: 11: // 判斷consumer狀態是否運行中。若是不是,則延遲拉取消息。 12: try { 13: this.makeSureStateOK(); 14: } catch (MQClientException e) { 15: log.warn("pullMessage exception, consumer state not ok", e); 16: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); 17: return; 18: } 19: 20: // 判斷是否暫停中。 21: if (this.isPause()) { 22: log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); 23: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); 24: return; 25: } 26: 27: // 判斷是否超過最大持有消息數量。默認最大值爲1000。 28: long size = processQueue.getMsgCount().get(); 29: if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) { 30: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延遲消息拉取請求。50ms。 31: if ((flowControlTimes1++ % 1000) == 0) { 32: log.warn( 33: "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", 34: processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1); 35: } 36: return; 37: } 38: 39: if (!this.consumeOrderly) { // 判斷消息跨度是否過大。 40: if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { 41: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延遲消息拉取請求。50ms。 42: if ((flowControlTimes2++ % 1000) == 0) { 43: log.warn( 44: "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", 45: processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), 46: pullRequest, flowControlTimes2); 47: } 48: return; 49: } 50: } else { // TODO 順序消費 51: if (processQueue.isLocked()) { 52: if (!pullRequest.isLockedFirst()) { 53: final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); 54: boolean brokerBusy = offset < pullRequest.getNextOffset(); 55: log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", 56: pullRequest, offset, brokerBusy); 57: if (brokerBusy) { 58: log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", 59: pullRequest, offset); 60: } 61: 62: pullRequest.setLockedFirst(true); 63: pullRequest.setNextOffset(offset); 64: } 65: } else { 66: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); 67: log.info("pull message later because not locked in broker, {}", pullRequest); 68: return; 69: } 70: } 71: 72: // 獲取Topic 對應的訂閱信息。若不存在,則延遲拉取消息 73: final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); 74: if (null == subscriptionData) { 75: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); 76: log.warn("find the consumer's subscription failed, {}", pullRequest); 77: return; 78: } 79: 80: final long beginTimestamp = System.currentTimeMillis(); 81: 82: PullCallback pullCallback = new PullCallback() { 83: @Override 84: public void onSuccess(PullResult pullResult) { 85: if (pullResult != null) { 86: pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, 87: subscriptionData); 88: 89: switch (pullResult.getPullStatus()) { 90: case FOUND: 91: // 設置下次拉取消息隊列位置 92: long prevRequestOffset = pullRequest.getNextOffset(); 93: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 94: 95: // 統計 96: long pullRT = System.currentTimeMillis() - beginTimestamp; 97: DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), 98: pullRequest.getMessageQueue().getTopic(), pullRT); 99: 100: long firstMsgOffset = Long.MAX_VALUE; 101: if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { 102: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 103: } else { 104: firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); 105: 106: // 統計 107: DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), 108: pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); 109: 110: // 提交拉取到的消息到消息處理隊列 111: boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); 112: 113: // 提交消費請求 114: DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// 115: pullResult.getMsgFoundList(), // 116: processQueue, // 117: pullRequest.getMessageQueue(), // 118: dispathToConsume); 119: 120: // 提交下次拉取消息請求 121: if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { 122: DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 123: DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); 124: } else { 125: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 126: } 127: } 128: 129: // 下次拉取消息隊列位置小於上次拉取消息隊列位置 或者 第一條消息的消息隊列位置小於上次拉取消息隊列位置,則斷定爲BUG,輸出log 130: if (pullResult.getNextBeginOffset() < prevRequestOffset// 131: || firstMsgOffset < prevRequestOffset) { 132: log.warn( 133: "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // 134: pullResult.getNextBeginOffset(), // 135: firstMsgOffset, // 136: prevRequestOffset); 137: } 138: 139: break; 140: case NO_NEW_MSG: 141: // 設置下次拉取消息隊列位置 142: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 143: 144: // 持久化消費進度 145: DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); 146: 147: // 當即提交拉取消息請求 148: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 149: break; 150: case NO_MATCHED_MSG: 151: // 設置下次拉取消息隊列位置 152: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 153: 154: // 持久化消費進度 155: DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); 156: 157: // 提交當即拉取消息請求 158: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 159: break; 160: case OFFSET_ILLEGAL: 161: log.warn("the pull request offset illegal, {} {}", // 162: pullRequest.toString(), pullResult.toString()); 163: // 設置下次拉取消息隊列位置 164: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 165: 166: // 設置消息處理隊列爲dropped 167: pullRequest.getProcessQueue().setDropped(true); 168: 169: // 提交延遲任務,進行消費處理隊列移除。不當即移除的緣由:可能有地方正在使用,避免受到影響。 170: DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { 171: 172: @Override 173: public void run() { 174: try { 175: // 更新消費進度,同步消費進度到Broker 176: DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), 177: pullRequest.getNextOffset(), false); 178: DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); 179: 180: // 移除消費處理隊列 181: DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); 182: 183: log.warn("fix the pull request offset, {}", pullRequest); 184: } catch (Throwable e) { 185: log.error("executeTaskLater Exception", e); 186: } 187: } 188: }, 10000); 189: break; 190: default: 191: break; 192: } 193: } 194: } 195: 196: @Override 197: public void onException(Throwable e) { 198: if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 199: log.warn("execute the pull request exception", e); 200: } 201: 202: // 提交延遲拉取消息請求 203: DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); 204: } 205: }; 206: 207: // 集羣消息模型下,計算提交的消費進度。 208: boolean commitOffsetEnable = false; 209: long commitOffsetValue = 0L; 210: if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { 211: commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); 212: if (commitOffsetValue > 0) { 213: commitOffsetEnable = true; 214: } 215: } 216: 217: // 計算請求的 訂閱表達式 和 是否進行filtersrv過濾消息 218: String subExpression = null; 219: boolean classFilter = false; 220: SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); 221: if (sd != null) { 222: if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { 223: subExpression = sd.getSubString(); 224: } 225: 226: classFilter = sd.isClassFilterMode(); 227: } 228: 229: // 計算拉取消息系統標識 230: int sysFlag = PullSysFlag.buildSysFlag(// 231: commitOffsetEnable, // commitOffset 232: true, // suspend 233: subExpression != null, // subscription 234: classFilter // class filter 235: ); 236: 237: // 執行拉取。若是拉取請求發生異常時,提交延遲拉取消息請求。 238: try { 239: this.pullAPIWrapper.pullKernelImpl(// 240: pullRequest.getMessageQueue(), // 1 241: subExpression, // 2 242: subscriptionData.getSubVersion(), // 3 243: pullRequest.getNextOffset(), // 4 244: this.defaultMQPushConsumer.getPullBatchSize(), // 5 245: sysFlag, // 6 246: commitOffsetValue, // 7 247: BROKER_SUSPEND_MAX_TIME_MILLIS, // 8 248: CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9 249: CommunicationMode.ASYNC, // 10 250: pullCallback// 11 251: ); 252: } catch (Exception e) { 253: log.error("pullKernelImpl exception", e); 254: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); 255: } 256: } 257: 258: private void correctTagsOffset(final PullRequest pullRequest) { 259: if (0L == pullRequest.getProcessQueue().getMsgCount().get()) { 260: this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true); 261: } 262: }
#pullMessage(...)
說明 :拉取消息。
Consumer
未處於運行中狀態,不進行消息拉取,提交延遲拉取消息請求。Consumer
處於暫停中,不進行消息拉取,提交延遲拉取消息請求。Consumer
爲併發消費 而且 消息隊列持有消息跨度過大(消息跨度 = 持有消息最後一條和第一條的消息位置差,默認:2000),不進行消息拉取,提交延遲拉取消息請求。順序消費
相關跳過,詳細解析見:《RocketMQ 源碼分析 —— Message 順序發送與消費》。Topic
對應的訂閱信息不存在,不進行消息拉取,提交延遲拉取消息請求。Consumer
本地的訂閱信息(SubscriptionData
),而不使用 Broker
裏的訂閱信息。詳細解析見:PullMessageProcessor#processRequest(…) 第 64 至 110 行代碼。Broker
處理拉取消息邏輯見:PullMessageProcessor#processRequest(…)。PullCallback
:拉取消息回調:
FOUND
) :ConsumeMessageService
。詳細解析見:ConsumeMessageConcurrentlyService。pullInterval
),提交當即或者延遲拉取消息請求。默認拉取頻率爲 0ms ,提交當即拉取消息請求。NO_NEW_MSG
) :#correctTagsOffset(...)
。NO_MATCHED_MSG
)。邏輯同 NO_NEW_MSG
。OFFSET_ILLEGAL
)。dropped
。Broker
。#correctTagsOffset(...)
:更正消費進度。
1: /** 2: * 拉取消息核心方法 3: * 4: * @param mq 消息隊列 5: * @param subExpression 訂閱表達式 6: * @param subVersion 訂閱版本號 7: * @param offset 拉取隊列開始位置 8: * @param maxNums 拉取消息數量 9: * @param sysFlag 拉取請求系統標識 10: * @param commitOffset 提交消費進度 11: * @param brokerSuspendMaxTimeMillis broker掛起請求最大時間 12: * @param timeoutMillis 請求broker超時時長 13: * @param communicationMode 通信模式 14: * @param pullCallback 拉取回調 15: * @return 拉取消息結果。只有通信模式爲同步時,才返回結果,不然返回null。 16: * @throws MQClientException 當尋找不到 broker 時,或發生其餘client異常 17: * @throws RemotingException 當遠程調用發生異常時 18: * @throws MQBrokerException 當 broker 發生異常時。只有通信模式爲同步時纔會發生該異常。 19: * @throws InterruptedException 當發生中斷異常時 20: */ 21: protected PullResult pullKernelImpl( 22: final MessageQueue mq, 23: final String subExpression, 24: final long subVersion, 25: final long offset, 26: final int maxNums, 27: final int sysFlag, 28: final long commitOffset, 29: final long brokerSuspendMaxTimeMillis, 30: final long timeoutMillis, 31: final CommunicationMode communicationMode, 32: final PullCallback pullCallback 33: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 34: // 獲取Broker信息 35: FindBrokerResult findBrokerResult = 36: this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 37: this.recalculatePullFromWhichNode(mq), false); 38: if (null == findBrokerResult) { 39: this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); 40: findBrokerResult = 41: this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), 42: this.recalculatePullFromWhichNode(mq), false); 43: } 44: 45: // 請求拉取消息 46: if (findBrokerResult != null) { 47: int sysFlagInner = sysFlag; 48: 49: if (findBrokerResult.isSlave()) { 50: sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); 51: } 52: 53: PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); 54: requestHeader.setConsumerGroup(this.consumerGroup); 55: requestHeader.setTopic(mq.getTopic()); 56: requestHeader.setQueueId(mq.getQueueId()); 57: requestHeader.setQueueOffset(offset); 58: requestHeader.setMaxMsgNums(maxNums); 59: requestHeader.setSysFlag(sysFlagInner); 60: requestHeader.setCommitOffset(commitOffset); 61: requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); 62: requestHeader.setSubscription(subExpression); 63: requestHeader.setSubVersion(subVersion); 64: 65: String brokerAddr = findBrokerResult.getBrokerAddr(); 66: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { // TODO filtersrv 67: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); 68: } 69: 70: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( 71: brokerAddr, 72: requestHeader, 73: timeoutMillis, 74: communicationMode, 75: pullCallback); 76: 77: return pullResult; 78: } 79: 80: // Broker信息不存在,則拋出異常 81: throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); 82: }
說明 :拉取消息核心方法。該方法參數較多,能夠看下代碼註釋上每一個參數的說明😈。
Broker
信息(Broker
地址、是否爲從節點)。
Broker
信息不存在,則拋出異常。1: /** 2: * 消息隊列 與 拉取Broker 的映射 3: * 當拉取消息時,會經過該映射獲取拉取請求對應的Broker 4: */ 5: private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = 6: new ConcurrentHashMap<MessageQueue, AtomicLong>(32); 7: /** 8: * 是否使用默認Broker 9: */ 10: private volatile boolean connectBrokerByUser = false; 11: /** 12: * 默認Broker編號 13: */ 14: private volatile long defaultBrokerId = MixAll.MASTER_ID; 15: 16: /** 17: * 計算消息隊列拉取消息對應的Broker編號 18: * 19: * @param mq 消息隊列 20: * @return Broker編號 21: */ 22: public long recalculatePullFromWhichNode(final MessageQueue mq) { 23: // 若開啓默認Broker開關,則返回默認Broker編號 24: if (this.isConnectBrokerByUser()) { 25: return this.defaultBrokerId; 26: } 27: 28: // 若消息隊列映射拉取Broker存在,則返回映射Broker編號 29: AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); 30: if (suggest != null) { 31: return suggest.get(); 32: } 33: 34: // 返回Broker主節點編號 35: return MixAll.MASTER_ID; 36: }
說明 :計算消息隊列拉取消息對應的 Broker
編號。
1: /** 2: * Broker名字 和 Broker地址相關 Map 3: */ 4: private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = 5: new ConcurrentHashMap<>(); 6: 7: /** 8: * 得到Broker信息 9: * 10: * @param brokerName broker名字 11: * @param brokerId broker編號 12: * @param onlyThisBroker 是否必須是該broker 13: * @return Broker信息 14: */ 15: public FindBrokerResult findBrokerAddressInSubscribe(// 16: final String brokerName, // 17: final long brokerId, // 18: final boolean onlyThisBroker// 19: ) { 20: String brokerAddr = null; // broker地址 21: boolean slave = false; // 是否爲從節點 22: boolean found = false; // 是否找到 23: 24: // 得到Broker信息 25: HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); 26: if (map != null && !map.isEmpty()) { 27: brokerAddr = map.get(brokerId); 28: slave = brokerId != MixAll.MASTER_ID; 29: found = brokerAddr != null; 30: 31: // 若是不強制得到,選擇一個Broker 32: if (!found && !onlyThisBroker) { 33: Entry<Long, String> entry = map.entrySet().iterator().next(); 34: brokerAddr = entry.getValue(); 35: slave = entry.getKey() != MixAll.MASTER_ID; 36: found = true; 37: } 38: } 39: 40: // 找到broker,則返回信息 41: if (found) { 42: return new FindBrokerResult(brokerAddr, slave); 43: } 44: 45: // 找不到,則返回空 46: return null; 47: }
說明 :獲取 Broker
信息(Broker
地址、是否爲從節點)。
1: /** 2: * 處理拉取結果 3: * 1. 更新消息隊列拉取消息Broker編號的映射 4: * 2. 解析消息,並根據訂閱信息消息tagCode匹配合適消息 5: * 6: * @param mq 消息隊列 7: * @param pullResult 拉取結果 8: * @param subscriptionData 訂閱信息 9: * @return 拉取結果 10: */ 11: public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, 12: final SubscriptionData subscriptionData) { 13: PullResultExt pullResultExt = (PullResultExt) pullResult; 14: 15: // 更新消息隊列拉取消息Broker編號的映射 16: this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); 17: 18: // 解析消息,並根據訂閱信息消息tagCode匹配合適消息 19: if (PullStatus.FOUND == pullResult.getPullStatus()) { 20: // 解析消息 21: ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); 22: List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); 23: 24: // 根據訂閱信息消息tagCode匹配合適消息 25: List<MessageExt> msgListFilterAgain = msgList; 26: if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { 27: msgListFilterAgain = new ArrayList<>(msgList.size()); 28: for (MessageExt msg : msgList) { 29: if (msg.getTags() != null) { 30: if (subscriptionData.getTagsSet().contains(msg.getTags())) { 31: msgListFilterAgain.add(msg); 32: } 33: } 34: } 35: } 36: 37: // Hook 38: if (this.hasHook()) { 39: FilterMessageContext filterMessageContext = new FilterMessageContext(); 40: filterMessageContext.setUnitMode(unitMode); 41: filterMessageContext.setMsgList(msgListFilterAgain); 42: this.executeHook(filterMessageContext); 43: } 44: 45: // 設置消息隊列當前最小/最大位置到消息拓展字段 46: for (MessageExt msg : msgListFilterAgain) { 47: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, 48: Long.toString(pullResult.getMinOffset())); 49: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, 50: Long.toString(pullResult.getMaxOffset())); 51: } 52: 53: // 設置消息列表 54: pullResultExt.setMsgFoundList(msgListFilterAgain); 55: } 56: 57: // 清空消息二進制數組 58: pullResultExt.setMessageBinary(null); 59: 60: return pullResult; 61: }
說明 :處理拉取結果。
更新消息隊列拉取消息 Broker
編號的映射。
解析消息,並根據訂閱信息消息 tagCode
匹配合適消息。
Broker
編號的映射。下次拉取消息時,若是未設置默認拉取的Broker
編號,會使用更新後的 Broker
編號。tagCode
匹配合適消息。tagCode
匹配消息。Hook
。1: /** 2: * 消息映射讀寫鎖 3: */ 4: private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); 5: /** 6: * 消息映射 7: * key:消息隊列位置 8: */ 9: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>(); 10: /** 11: * 消息數 12: */ 13: private final AtomicLong msgCount = new AtomicLong(); 14: /** 15: * 添加消息最大隊列位置 16: */ 17: private volatile long queueOffsetMax = 0L; 18: /** 19: * 是否正在消費 20: */ 21: private volatile boolean consuming = false; 22: /** 23: * Broker累計消息數量 24: * 計算公式 = queueMaxOffset - 新添加消息數組[n - 1].queueOffset 25: * Acc = Accumulation 26: * cnt = (猜想)對比度 27: */ 28: private volatile long msgAccCnt = 0; 29: 30: /** 31: * 添加消息,並返回是否提交給消費者 32: * 返回true,當有新消息添加成功時, 33: * 34: * @param msgs 消息 35: * @return 是否提交給消費者 36: */ 37: public boolean putMessage(final List<MessageExt> msgs) { 38: boolean dispatchToConsume = false; 39: try { 40: this.lockTreeMap.writeLock().lockInterruptibly(); 41: try { 42: // 添加消息 43: int validMsgCnt = 0; 44: for (MessageExt msg : msgs) { 45: MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); 46: if (null == old) { 47: validMsgCnt++; 48: this.queueOffsetMax = msg.getQueueOffset(); 49: } 50: } 51: msgCount.addAndGet(validMsgCnt); 52: 53: // 計算是否正在消費 54: if (!msgTreeMap.isEmpty() && !this.consuming) { 55: dispatchToConsume = true; 56: this.consuming = true; 57: } 58: 59: // Broker累計消息數量 60: if (!msgs.isEmpty()) { 61: MessageExt messageExt = msgs.get(msgs.size() - 1); 62: String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); 63: if (property != null) { 64: long accTotal = Long.parseLong(property) - messageExt.getQueueOffset(); 65: if (accTotal > 0) { 66: this.msgAccCnt = accTotal; 67: } 68: } 69: } 70: } finally { 71: this.lockTreeMap.writeLock().unlock(); 72: } 73: } catch (InterruptedException e) { 74: log.error("putMessage exception", e); 75: } 76: 77: return dispatchToConsume; 78: }
若是用最簡單粗暴的方式描述 PullConsumer
拉取消息的過程,那就是以下的代碼:
while (true) { if (不知足拉取消息) { Thread.sleep(間隔); continue; } 主動拉取消息(); }
1: /** 2: * 消費線程池隊列 3: */ 4: private final BlockingQueue<Runnable> consumeRequestQueue; 5: /** 6: * 消費線程池 7: */ 8: private final ThreadPoolExecutor consumeExecutor; 9: 10: public void submitConsumeRequest(// 11: final List<MessageExt> msgs, // 12: final ProcessQueue processQueue, // 13: final MessageQueue messageQueue, // 14: final boolean dispatchToConsume) { 15: final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); 16: if (msgs.size() <= consumeBatchSize) { // 提交消息小於批量消息數,直接提交消費請求 17: ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); 18: try { 19: this.consumeExecutor.submit(consumeRequest); 20: } catch (RejectedExecutionException e) { 21: this.submitConsumeRequestLater(consumeRequest); 22: } 23: } else { // 提交消息大於批量消息數,進行分拆成多個消費請求 24: for (int total = 0; total < msgs.size(); ) { 25: // 計算當前拆分請求包含的消息 26: List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize); 27: for (int i = 0; i < consumeBatchSize; i++, total++) { 28: if (total < msgs.size()) { 29: msgThis.add(msgs.get(total)); 30: } else { 31: break; 32: } 33: } 34: 35: // 提交拆分消費請求 36: ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); 37: try { 38: this.consumeExecutor.submit(consumeRequest); 39: } catch (RejectedExecutionException e) { 40: // 若是被拒絕,則將當前拆分消息+剩餘消息提交延遲消費請求。 41: for (; total < msgs.size(); total++) { 42: msgThis.add(msgs.get(total)); 43: } 44: this.submitConsumeRequestLater(consumeRequest); 45: } 46: } 47: } 48: }
說明 :提交當即消費請求。
1: /** 2: * 提交延遲消費請求 3: * 4: * @param msgs 消息列表 5: * @param processQueue 消息處理隊列 6: * @param messageQueue 消息隊列 7: */ 8: private void submitConsumeRequestLater(// 9: final List<MessageExt> msgs, // 10: final ProcessQueue processQueue, // 11: final MessageQueue messageQueue// 12: ) { 13: 14: this.scheduledExecutorService.schedule(new Runnable() { 15: 16: @Override 17: public void run() { 18: ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true); 19: } 20: }, 5000, TimeUnit.MILLISECONDS); 21: } 22: 23: /** 24: * 提交延遲消費請求 25: * @param consumeRequest 消費請求 26: */ 27: private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// 28: ) { 29: 30: this.scheduledExecutorService.schedule(new Runnable() { 31: 32: @Override 33: public void run() { 34: ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); // TODO BUG ? 35: } 36: }, 5000, TimeUnit.MILLISECONDS); 37: }
說明 :提交延遲消費請求。
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
。若是消息數超過批量消費上限,會不會是BUG。1: class ConsumeRequest implements Runnable { 2: 3: /** 4: * 消費消息列表 5: */ 6: private final List<MessageExt> msgs; 7: /** 8: * 消息處理隊列 9: */ 10: private final ProcessQueue processQueue; 11: /** 12: * 消息隊列 13: */ 14: private final MessageQueue messageQueue; 15: 16: public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { 17: this.msgs = msgs; 18: this.processQueue = processQueue; 19: this.messageQueue = messageQueue; 20: } 21: 22: @Override 23: public void run() { 24: // 廢棄隊列不進行消費 25: if (this.processQueue.isDropped()) { 26: log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); 27: return; 28: } 29: 30: MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; // 監聽器 31: ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); // 消費Context 32: ConsumeConcurrentlyStatus status = null; // 消費結果狀態 33: 34: // Hook 35: ConsumeMessageContext consumeMessageContext = null; 36: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 37: consumeMessageContext = new ConsumeMessageContext(); 38: consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); 39: consumeMessageContext.setProps(new HashMap<String, String>()); 40: consumeMessageContext.setMq(messageQueue); 41: consumeMessageContext.setMsgList(msgs); 42: consumeMessageContext.setSuccess(false); 43: ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); 44: } 45: 46: long beginTimestamp = System.currentTimeMillis(); 47: boolean hasException = false; 48: ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; // 消費返回結果類型 49: try { 50: // 當消息爲重試消息,設置Topic爲原始Topic 51: ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); 52: 53: // 設置開始消費時間 54: if (msgs != null && !msgs.isEmpty()) { 55: for (MessageExt msg : msgs) { 56: MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); 57: } 58: } 59: 60: // 進行消費 61: status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); 62: } catch (Throwable e) { 63: log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", 64: RemotingHelper.exceptionSimpleDesc(e), // 65: ConsumeMessageConcurrentlyService.this.consumerGroup, 66: msgs, 67: messageQueue); 68: hasException = true; 69: } 70: 71: // 解析消費返回結果類型 72: long consumeRT = System.currentTimeMillis() - beginTimestamp; 73: if (null == status) { 74: if (hasException) { 75: returnType = ConsumeReturnType.EXCEPTION; 76: } else { 77: returnType = ConsumeReturnType.RETURNNULL; 78: } 79: } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { 80: returnType = ConsumeReturnType.TIME_OUT; 81: } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { 82: returnType = ConsumeReturnType.FAILED; 83: } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { 84: returnType = ConsumeReturnType.SUCCESS; 85: } 86: 87: // Hook 88: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 89: consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); 90: } 91: 92: // 消費結果狀態爲空時,則設置爲稍後從新消費 93: if (null == status) { 94: log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", 95: ConsumeMessageConcurrentlyService.this.consumerGroup, 96: msgs, 97: messageQueue); 98: status = ConsumeConcurrentlyStatus.RECONSUME_LATER; 99: } 100: 101: // Hook 102: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { 103: consumeMessageContext.setStatus(status.toString()); 104: consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); 105: ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); 106: } 107: 108: // 統計 109: ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() 110: .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); 111: 112: // 處理消費結果 113: if (!processQueue.isDropped()) { 114: ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); 115: } else { 116: log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); 117: } 118: } 119: 120: }
說明 :消費請求。提交請求執行消費。
Topic
爲原始 Topic
。例如:原始 Topic
爲 TopicTest
,重試時 Topic
爲 %RETRY%please_rename_unique_group_name_4
,通過該方法,Topic
設置回TopicTest
。Hook
。Hook
。1: public void processConsumeResult(// 2: final ConsumeConcurrentlyStatus status, // 3: final ConsumeConcurrentlyContext context, // 4: final ConsumeRequest consumeRequest// 5: ) { 6: int ackIndex = context.getAckIndex(); 7: 8: // 消息爲空,直接返回 9: if (consumeRequest.getMsgs().isEmpty()) 10: return; 11: 12: // 計算從consumeRequest.msgs[0]到consumeRequest.msgs[ackIndex]的消息消費成功 13: switch (status) { 14: case CONSUME_SUCCESS: 15: if (ackIndex >= consumeRequest.getMsgs().size()) { 16: ackIndex = consumeRequest.getMsgs().size() - 1; 17: } 18: // 統計成功/失敗數量 19: int ok = ackIndex + 1; 20: int failed = consumeRequest.getMsgs().size() - ok; 21: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); 22: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); 23: break; 24: case RECONSUME_LATER: 25: ackIndex = -1; 26: // 統計成功/失敗數量 27: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), 28: consumeRequest.getMsgs().size()); 29: break; 30: default: 31: break; 32: } 33: 34: // 處理消費失敗的消息 35: switch (this.defaultMQPushConsumer.getMessageModel()) { 36: case BROADCASTING: // 廣播模式,不管是否消費失敗,不發回消息到Broker,只打印Log 37: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { 38: MessageExt msg = consumeRequest.getMsgs().get(i); 39: log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); 40: } 41: break; 42: case CLUSTERING: 43: // 發回消息失敗到Broker。 44: List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size()); 45: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { 46: MessageExt msg = consumeRequest.getMsgs().get(i); 47: boolean result = this.sendMessageBack(msg, context); 48: if (!result) { 49: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); 50: msgBackFailed.add(msg); 51: } 52: } 53: 54: // 發回Broker失敗的消息,直接提交延遲從新消費 55: if (!msgBackFailed.isEmpty()) { 56: consumeRequest.getMsgs().removeAll(msgBackFailed); 57: 58: this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); 59: } 60: break; 61: default: 62: break; 63: } 64: 65: // 移除消費成功消息,並更新最新消費進度 66: long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); 67: if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { 68: this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); 69: } 70: }
說明 :處理消費結果。
ackIndex
值。consumeRequest.msgs[0 - ackIndex]
爲消費成功,須要進行ack
確認。
CONSUME_SUCCESS
:ackIndex = context.getAckIndex()
。RECONSUME_LATER
:ackIndex = -1
。BROADCASTING
:廣播模式,不管是否消費失敗,不發回消息到Broker
,只打印日誌。CLUSTERING
:集羣模式,消費失敗的消息發回到 Broker
。
Broker
。詳細解析見:DefaultMQPushConsumerImpl#sendMessageBack(…)。Broker
失敗的消息,直接提交延遲從新消費。Broker
成功,結果由於例如網絡異常,致使 Consumer
覺得發回失敗,斷定消費發回失敗,會致使消息重複消費,所以,消息消費要盡最大可能性實現冪等性。Broker
成功】的消息,並更新最新消費進度。
Broker
成功】的消息?見第 56 行。1: /** 2: * 移除消息,並返回第一條消息隊列位置 3: * 4: * @param msgs 消息 5: * @return 消息隊列位置 6: */ 7: public long removeMessage(final List<MessageExt> msgs) { 8: long result = -1; 9: final long now = System.currentTimeMillis(); 10: try { 11: this.lockTreeMap.writeLock().lockInterruptibly(); 12: this.lastConsumeTimestamp = now; 13: try { 14: if (!msgTreeMap.isEmpty()) { 15: result = this.queueOffsetMax + 1; // 這裏+1的緣由是:若是msgTreeMap爲空時,下一條得到的消息位置爲queueOffsetMax+1 16: 17: // 移除消息 18: int removedCnt = 0; 19: for (MessageExt msg : msgs) { 20: MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); 21: if (prev != null) { 22: removedCnt--; 23: } 24: } 25: msgCount.addAndGet(removedCnt); 26: 27: if (!msgTreeMap.isEmpty()) { 28: result = msgTreeMap.firstKey(); 29: } 30: } 31: } finally { 32: this.lockTreeMap.writeLock().unlock(); 33: } 34: } catch (Throwable t) { 35: log.error("removeMessage exception", t); 36: } 37: 38: return result; 39: }
1: public void start() { 2: this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { 3: 4: @Override 5: public void run() { 6: cleanExpireMsg(); 7: } 8: 9: }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); 10: } 11: 12: /** 13: * 清理過時消息 14: */ 15: private void cleanExpireMsg() { 16: Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = 17: this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); 18: while (it.hasNext()) { 19: Map.Entry<MessageQueue, ProcessQueue> next = it.next(); 20: ProcessQueue pq = next.getValue(); 21: pq.cleanExpiredMsg(this.defaultMQPushConsumer); 22: } 23: }
說明 :定時清理過時消息,默認週期:15min。
1: public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { 2: // 順序消費時,直接返回 3: if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { 4: return; 5: } 6: 7: // 循環移除消息 8: int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; // 每次循環最多移除16條 9: for (int i = 0; i < loop; i++) { 10: // 獲取第一條消息。判斷是否超時,若不超時,則結束循環 11: MessageExt msg = null; 12: try { 13: this.lockTreeMap.readLock().lockInterruptibly(); 14: try { 15: if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { 16: msg = msgTreeMap.firstEntry().getValue(); 17: } else { 18: break; 19: } 20: } finally { 21: this.lockTreeMap.readLock().unlock(); 22: } 23: } catch (InterruptedException e) { 24: log.error("getExpiredMsg exception", e); 25: } 26: 27: try { 28: // 發回超時消息 29: pushConsumer.sendMessageBack(msg, 3); 30: log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); 31: 32: // 判斷此時消息是否依然是第一條,如果,則進行移除 33: try { 34: this.lockTreeMap.writeLock().lockInterruptibly(); 35: try { 36: if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { 37: try { 38: msgTreeMap.remove(msgTreeMap.firstKey()); 39: } catch (Exception e) { 40: log.error("send expired msg exception", e); 41: } 42: } 43: } finally { 44: this.lockTreeMap.writeLock().unlock(); 45: } 46: } catch (InterruptedException e) { 47: log.error("getExpiredMsg exception", e); 48: } 49: } catch (Exception e) { 50: log.error("send expired msg exception", e); 51: } 52: } 53: }
說明 :移除過時消息。
Broker
。1: public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) 2: throws RemotingException, MQBrokerException, InterruptedException, MQClientException { 3: try { 4: // Consumer發回消息 5: String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) 6: : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); 7: this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, 8: this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); 9: } catch (Exception e) { // TODO 疑問:什麼狀況下會發生異常 10: // 異常時,使用Client內置Producer發回消息 11: log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); 12: 13: Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); 14: 15: String originMsgId = MessageAccessor.getOriginMessageId(msg); 16: MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); 17: 18: newMsg.setFlag(msg.getFlag()); 19: MessageAccessor.setProperties(newMsg, msg.getProperties()); 20: MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); 21: MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); 22: MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); 23: newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); 24: 25: this.mQClientFactory.getDefaultMQProducer().send(newMsg); 26: } 27: }
說明 :發回消息。
Consumer
發回消息。詳細解析見:MQClientAPIImpl#consumerSendMessageBack(…)。Consumer
內置默認 Producer
發送消息。
1: /** 2: * Consumer發回消息 3: * @param addr Broker地址 4: * @param msg 消息 5: * @param consumerGroup 消費分組 6: * @param delayLevel 延遲級別 7: * @param timeoutMillis 超時 8: * @param maxConsumeRetryTimes 消費最大重試次數 9: * @throws RemotingException 當遠程調用發生異常時 10: * @throws MQBrokerException 當Broker發生異常時 11: * @throws InterruptedException 當線程中斷時 12: */ 13: public void consumerSendMessageBack( 14: final String addr, 15: final MessageExt msg, 16: final String consumerGroup, 17: final int delayLevel, 18: final long timeoutMillis, 19: final int maxConsumeRetryTimes 20: ) throws RemotingException, MQBrokerException, InterruptedException { 21: ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); 22: RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); 23: 24: requestHeader.setGroup(consumerGroup); 25: requestHeader.setOriginTopic(msg.getTopic()); 26: requestHeader.setOffset(msg.getCommitLogOffset()); 27: requestHeader.setDelayLevel(delayLevel); 28: requestHeader.setOriginMsgId(msg.getMsgId()); 29: requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); 30: 31: RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), 32: request, timeoutMillis); 33: assert response != null; 34: switch (response.getCode()) { 35: case ResponseCode.SUCCESS: { 36: return; 37: } 38: default: 39: break; 40: } 41: 42: throw new MQBrokerException(response.getCode(), response.getRemark()); 43: }
RemoteBrokerOffsetStore
:Consumer
集羣模式 下,使用遠程 Broker
消費進度。LocalFileOffsetStore
:Consumer
廣播模式下,使用本地 文件
消費進度。1: @Override 2: public void load() throws MQClientException { 3: // 從本地硬盤讀取消費進度 4: OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); 5: if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { 6: offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); 7: 8: // 打印每一個消息隊列的消費進度 9: for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { 10: AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); 11: log.info("load consumer's offset, {} {} {}", 12: this.groupName, 13: mq, 14: offset.get()); 15: } 16: } 17: }
說明 :從本地文件加載消費進度到內存。
OffsetSerializeWrapper
1: public class OffsetSerializeWrapper extends RemotingSerializable { 2: private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = 3: new ConcurrentHashMap<>(); 4: 5: public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() { 6: return offsetTable; 7: } 8: 9: public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) { 10: this.offsetTable = offsetTable; 11: } 12: }
說明 :本地 Offset
存儲序列化。
Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json { "offsetTable":{{ "brokerName":"broker-a", "queueId":3, "topic":"TopicTest" }:1470,{ "brokerName":"broker-a", "queueId":2, "topic":"TopicTest" }:1471,{ "brokerName":"broker-a", "queueId":1, "topic":"TopicTest" }:1470,{ "brokerName":"broker-a", "queueId":0, "topic":"TopicTest" }:1470 } }
1: @Override 2: public void load() { 3: }
說明 :不進行加載,實際讀取消費進度時,從 Broker
獲取。
讀取消費進度類型:
READ_FROM_MEMORY
:從內存讀取。READ_FROM_STORE
:從存儲( Broker
或 文件
)讀取。MEMORY_FIRST_THEN_STORE
:優先從內存讀取,讀取不到,從存儲讀取。文件
讀取消費進度。1: @Override 2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) { 3: if (mq != null) { 4: switch (type) { 5: case MEMORY_FIRST_THEN_STORE: 6: case READ_FROM_MEMORY: { 7: AtomicLong offset = this.offsetTable.get(mq); 8: if (offset != null) { 9: return offset.get(); 10: } else if (ReadOffsetType.READ_FROM_MEMORY == type) { 11: return -1; 12: } 13: } 14: case READ_FROM_STORE: { 15: try { 16: long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); 17: AtomicLong offset = new AtomicLong(brokerOffset); 18: this.updateOffset(mq, offset.get(), false); 19: return brokerOffset; 20: } 21: // No offset in broker 22: catch (MQBrokerException e) { 23: return -1; 24: } 25: //Other exceptions 26: catch (Exception e) { 27: log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e); 28: return -2; 29: } 30: } 31: default: 32: break; 33: } 34: } 35: 36: return -1; 37: }
Broker
讀取消費進度。該方法 RemoteBrokerOffsetStore
與 LocalFileOffsetStore
實現相同。
1: @Override 2: public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { 3: if (mq != null) { 4: AtomicLong offsetOld = this.offsetTable.get(mq); 5: if (null == offsetOld) { 6: offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); 7: } 8: 9: if (null != offsetOld) { 10: if (increaseOnly) { 11: MixAll.compareAndIncreaseOnly(offsetOld, offset); 12: } else { 13: offsetOld.set(offset); 14: } 15: } 16: } 17: }
1: @Override 2: public void persistAll(Set<MessageQueue> mqs) { 3: if (null == mqs || mqs.isEmpty()) 4: return; 5: 6: OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); 7: for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 8: if (mqs.contains(entry.getKey())) { 9: AtomicLong offset = entry.getValue(); 10: offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); 11: } 12: } 13: 14: String jsonString = offsetSerializeWrapper.toJson(true); 15: if (jsonString != null) { 16: try { 17: MixAll.string2File(jsonString, this.storePath); 18: } catch (IOException e) { 19: log.error("persistAll consumer offset Exception, " + this.storePath, e); 20: } 21: } 22: }
說明 :持久化消費進度。將消費進度寫入文件。
1: @Override 2: public void persistAll(Set<MessageQueue> mqs) { 3: if (null == mqs || mqs.isEmpty()) 4: return; 5: 6: // 持久化消息隊列 7: final HashSet<MessageQueue> unusedMQ = new HashSet<>(); 8: if (!mqs.isEmpty()) { 9: for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 10: MessageQueue mq = entry.getKey(); 11: AtomicLong offset = entry.getValue(); 12: if (offset != null) { 13: if (mqs.contains(mq)) { 14: try { 15: this.updateConsumeOffsetToBroker(mq, offset.get()); 16: log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", 17: this.groupName, 18: this.mQClientFactory.getClientId(), 19: mq, 20: offset.get()); 21: } catch (Exception e) { 22: log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); 23: } 24: } else { 25: unusedMQ.add(mq); 26: } 27: } 28: } 29: } 30: 31: // 移除不適用的消息隊列 32: if (!unusedMQ.isEmpty()) { 33: for (MessageQueue mq : unusedMQ) { 34: this.offsetTable.remove(mq); 35: log.info("remove unused mq, {}, {}", mq, this.groupName); 36: } 37: } 38: }
說明 :持久化指定消息隊列數組的消費進度到 Broker
,並移除非指定消息隊列。
1: private void startScheduledTask() { 2: // 定時同步消費進度 3: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 4: 5: @Override 6: public void run() { 7: try { 8: MQClientInstance.this.cleanOfflineBroker(); 9: MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); 10: } catch (Exception e) { 11: log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); 12: } 13: } 14: }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); 15: }
說明 :定時進行持久化,默認週期:5000ms。