🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: node
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
本文接:《RocketMQ 源碼分析 —— Message 拉取與消費(上)》。web
主要解析 Consumer
在 消費 邏輯涉及到的源碼。json
MQ 提供了兩類消費者:後端
Push
開頭,實際在實現時,使用 Pull
方式實現。經過 Pull
不斷不斷不斷輪詢 Broker
獲取消息。當不存在新消息時,Broker
會掛起請求,直到有新消息產生,取消掛起,返回新消息。這樣,基本和 Broker
主動 Push
作到接近的實時性(固然,仍是有相應的實時性損失)。原理相似 長輪詢( Long-Polling
)。本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。
本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。
本文主要講解PushConsumer
,部分講解PullConsumer
,跳過順序消費
。 api
先看一張 PushConsumer
包含的組件以及組件之間的交互圖:數組
RebalanceService
:均衡消息隊列服務,負責分配當前 Consumer
可消費的消息隊列( MessageQueue
)。當有新的 Consumer
的加入或移除,都會從新分配消息隊列。PullMessageService
:拉取消息服務,不斷不斷不斷從 Broker
拉取消息,並提交消費任務到 ConsumeMessageService
。ConsumeMessageService
:消費消息服務,不斷不斷不斷消費消息,並處理消費結果。RemoteBrokerOffsetStore
:Consumer
消費進度管理,負責從 Broker
獲取消費進度,同步消費進度到 Broker
。ProcessQueue
:消息處理隊列。MQClientInstance
:封裝對 Namesrv
,Broker
的 API調用,提供給 Producer
、Consumer
使用。1: public void subscribe(String topic, String subExpression) throws MQClientException {
2: try {
3: // 建立訂閱數據
4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
5: topic, subExpression);
6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
7: // 經過心跳同步Consumer信息到Broker
8: if (this.mQClientFactory != null) {
9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
10: }
11: } catch (Exception e) {
12: throw new MQClientException("subscription exception", e);
13: }
14: }複製代碼
Topic
。Consumer
信息到 Broker
。1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2: String subString) throws Exception {
3: SubscriptionData subscriptionData = new SubscriptionData();
4: subscriptionData.setTopic(topic);
5: subscriptionData.setSubString(subString);
6: // 處理訂閱表達式
7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
8: subscriptionData.setSubString(SubscriptionData.SUB_ALL);
9: } else {
10: String[] tags = subString.split("\\|\\|");
11: if (tags.length > 0) {
12: for (String tag : tags) {
13: if (tag.length() > 0) {
14: String trimString = tag.trim();
15: if (trimString.length() > 0) {
16: subscriptionData.getTagsSet().add(trimString);
17: subscriptionData.getCodeSet().add(trimString.hashCode());
18: }
19: }
20: }
21: } else {
22: throw new Exception("subString split error");
23: }
24: }
25:
26: return subscriptionData;
27: }複製代碼
Topic
和 訂閱表達式 建立訂閱數據1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
2: this.messageListener = messageListener;
3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
4: }複製代碼
1: public class RebalanceService extends ServiceThread {
2:
3: /** 4: * 等待間隔,單位:毫秒 5: */
6: private static long waitInterval =
7: Long.parseLong(System.getProperty(
8: "rocketmq.client.rebalance.waitInterval", "20000"));
9:
10: private final Logger log = ClientLogger.getLog();
11: /** 12: * MQClient對象 13: */
14: private final MQClientInstance mqClientFactory;
15:
16: public RebalanceService(MQClientInstance mqClientFactory) {
17: this.mqClientFactory = mqClientFactory;
18: }
19:
20: @Override
21: public void run() {
22: log.info(this.getServiceName() + " service started");
23:
24: while (!this.isStopped()) {
25: this.waitForRunning(waitInterval);
26: this.mqClientFactory.doRebalance();
27: }
28:
29: log.info(this.getServiceName() + " service end");
30: }
31:
32: @Override
33: public String getServiceName() {
34: return RebalanceService.class.getSimpleName();
35: }
36: }複製代碼
Consumer
可消費的消息隊列( MessageQueue
)。第 26 行 :調用 MQClientInstance#doRebalance(...)
分配消息隊列。目前有三種狀況狀況下觸發:微信
第 25 行
等待超時,每 20s 調用一次。PushConsumer
啓動時,調用 rebalanceService#wakeup(...)
觸發。Broker
通知 Consumer
加入 或 移除時,Consumer
響應通知,調用 rebalanceService#wakeup(...)
觸發。詳細解析見:MQClientInstance#doRebalance(...)。網絡
1: public void doRebalance() {
2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
3: MQConsumerInner impl = entry.getValue();
4: if (impl != null) {
5: try {
6: impl.doRebalance();
7: } catch (Throwable e) {
8: log.error("doRebalance exception", e);
9: }
10: }
11: }
12: }複製代碼
Client
包含的 consumerTable
( Consumer
集合 ),執行消息隊列分配。consumerTable
只包含 Consumer
本身。😈有大大對這個疑問有解答的,煩請解答下。MQConsumerInner#doRebalance(...)
進行隊列分配。DefaultMQPushConsumerImpl
、DefaultMQPullConsumerImpl
分別對該接口方法進行了實現。DefaultMQPushConsumerImpl#doRebalance(...)
詳細解析見:DefaultMQPushConsumerImpl#doRebalance(...)。1: public void doRebalance() {
2: if (!this.pause) {
3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
4: }
5: }複製代碼
RebalanceImpl#doRebalance(...)
進行隊列分配。詳細解析見:RebalancePushImpl#doRebalance(...)。1: /** 2: * 執行分配消息隊列 3: * 4: * @param isOrder 是否順序消息 5: */
6: public void doRebalance(final boolean isOrder) {
7: // 分配每一個 topic 的消息隊列
8: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
9: if (subTable != null) {
10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
11: final String topic = entry.getKey();
12: try {
13: this.rebalanceByTopic(topic, isOrder);
14: } catch (Throwable e) {
15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
16: log.warn("rebalanceByTopic Exception", e);
17: }
18: }
19: }
20: }
21: // 移除未訂閱的topic對應的消息隊列
22: this.truncateMessageQueueNotMyTopic();
23: }
24:
25: /** 26: * 移除未訂閱的消息隊列 27: */
28: private void truncateMessageQueueNotMyTopic() {
29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
30: for (MessageQueue mq : this.processQueueTable.keySet()) {
31: if (!subTable.containsKey(mq.getTopic())) {
32:
33: ProcessQueue pq = this.processQueueTable.remove(mq);
34: if (pq != null) {
35: pq.setDropped(true);
36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
37: }
38: }
39: }
40: }複製代碼
#doRebalance(...)
說明 :執行分配消息隊列。
subscriptionInner
),分配每個 Topic
的消息隊列。Topic
的消息隊列。#truncateMessageQueueNotMyTopic(...)
說明 :移除未訂閱的消息隊列。當調用 DefaultMQPushConsumer#unsubscribe(topic)
時,只移除訂閱主題集合( subscriptionInner
),對應消息隊列移除在該方法。1: private void rebalanceByTopic(final String topic, final boolean isOrder) {
2: switch (messageModel) {
3: case BROADCASTING: {
4: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
5: if (mqSet != null) {
6: boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
7: if (changed) {
8: this.messageQueueChanged(topic, mqSet, mqSet);
9: log.info("messageQueueChanged {} {} {} {}", //
10: consumerGroup, //
11: topic, //
12: mqSet, //
13: mqSet);
14: }
15: } else {
16: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17: }
18: break;
19: }
20: case CLUSTERING: {
21: // 獲取 topic 對應的 隊列 和 consumer信息
22: Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
23: List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
24: if (null == mqSet) {
25: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
26: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
27: }
28: }
29:
30: if (null == cidAll) {
31: log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
32: }
33:
34: if (mqSet != null && cidAll != null) {
35: // 排序 消息隊列 和 消費者數組。由於是在Client進行分配隊列,排序後,各Client的順序才能保持一致。
36: List<MessageQueue> mqAll = new ArrayList<>();
37: mqAll.addAll(mqSet);
38:
39: Collections.sort(mqAll);
40: Collections.sort(cidAll);
41:
42: AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
43:
44: // 根據 隊列分配策略 分配消息隊列
45: List<MessageQueue> allocateResult;
46: try {
47: allocateResult = strategy.allocate(//
48: this.consumerGroup, //
49: this.mQClientFactory.getClientId(), //
50: mqAll, //
51: cidAll);
52: } catch (Throwable e) {
53: log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
54: e);
55: return;
56: }
57:
58: Set<MessageQueue> allocateResultSet = new HashSet<>();
59: if (allocateResult != null) {
60: allocateResultSet.addAll(allocateResult);
61: }
62:
63: // 更新消息隊列
64: boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
65: if (changed) {
66: log.info(
67: "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
68: strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
69: allocateResultSet.size(), allocateResultSet);
70: this.messageQueueChanged(topic, mqSet, allocateResultSet);
71: }
72: }
73: break;
74: }
75: default:
76: break;
77: }
78: }
79:
80: /** 81: * 當負載均衡時,更新 消息處理隊列 82: * - 移除 在processQueueTable && 不存在於 mqSet 裏的消息隊列 83: * - 增長 不在processQueueTable && 存在於mqSet 裏的消息隊列 84: * 85: * @param topic Topic 86: * @param mqSet 負載均衡結果後的消息隊列數組 87: * @param isOrder 是否順序 88: * @return 是否變動 89: */
90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
91: boolean changed = false;
92:
93: // 移除 在processQueueTable && 不存在於 mqSet 裏的消息隊列
94: Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
95: while (it.hasNext()) { // TODO 待讀:
96: Entry<MessageQueue, ProcessQueue> next = it.next();
97: MessageQueue mq = next.getKey();
98: ProcessQueue pq = next.getValue();
99:
100: if (mq.getTopic().equals(topic)) {
101: if (!mqSet.contains(mq)) { // 不包含的隊列
102: pq.setDropped(true);
103: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
104: it.remove();
105: changed = true;
106: log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
107: }
108: } else if (pq.isPullExpired()) { // 隊列拉取超時,進行清理
109: switch (this.consumeType()) {
110: case CONSUME_ACTIVELY:
111: break;
112: case CONSUME_PASSIVELY:
113: pq.setDropped(true);
114: if (this.removeUnnecessaryMessageQueue(mq, pq)) {
115: it.remove();
116: changed = true;
117: log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
118: consumerGroup, mq);
119: }
120: break;
121: default:
122: break;
123: }
124: }
125: }
126: }
127:
128: // 增長 不在processQueueTable && 存在於mqSet 裏的消息隊列。
129: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數組
130: for (MessageQueue mq : mqSet) {
131: if (!this.processQueueTable.containsKey(mq)) {
132: if (isOrder && !this.lock(mq)) {
133: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
134: continue;
135: }
136:
137: this.removeDirtyOffset(mq);
138: ProcessQueue pq = new ProcessQueue();
139: long nextOffset = this.computePullFromWhere(mq);
140: if (nextOffset >= 0) {
141: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
142: if (pre != null) {
143: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
144: } else {
145: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
146: PullRequest pullRequest = new PullRequest();
147: pullRequest.setConsumerGroup(consumerGroup);
148: pullRequest.setNextOffset(nextOffset);
149: pullRequest.setMessageQueue(mq);
150: pullRequest.setProcessQueue(pq);
151: pullRequestList.add(pullRequest);
152: changed = true;
153: }
154: } else {
155: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
156: }
157: }
158: }
159:
160: // 發起消息拉取請求
161: this.dispatchPullRequest(pullRequestList);
162:
163: return changed;
164: }複製代碼
#rebalanceByTopic(...)
說明 :分配 Topic
的消息隊列。
BROADCASTING
) 下,分配 Topic
對應的全部消息隊列。 CLUSTERING
) 下,分配 Topic
對應的部分消息隊列。
Topic
對應的消息隊列和消費者們,並對其進行排序。由於各 Consumer
是在本地分配消息隊列,排序後才能保證各 Consumer
順序一致。AllocateMessageQueueStrategy
) 分配消息隊列。詳細解析見:AllocateMessageQueueStrategy。Topic
對應的消息隊列。#updateProcessQueueTableInRebalance(...)
說明 :當分配隊列時,更新 Topic
對應的消息隊列,並返回是否有變動。
mqSet
) 的 消息處理隊列( processQueueTable
)。
當前時間 - 最後一次拉取消息時間 > 120s
( 120s 可配置),斷定發生 BUG,太久未進行消息拉取,移除消息隊列。移除後,下面#新增隊列邏輯#能夠從新加入新的該消息隊列。mqSet
) 新增的消息隊列。
順序消費
相關跳過,詳細解析見:《RocketMQ 源碼分析 —— Message 順序發送與消費》。1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: // 同步隊列的消費進度,並移除之。
3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
5: // TODO 順序消費
6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
8: try {
9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
10: try {
11: return this.unlockDelay(mq, pq);
12: } finally {
13: pq.getLockConsume().unlock();
14: }
15: } else {
16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
17: mq, //
18: pq.getTryUnlockTimes());
19:
20: pq.incTryUnlockTimes();
21: }
22: } catch (Exception e) {
23: log.error("removeUnnecessaryMessageQueue Exception", e);
24: }
25:
26: return false;
27: }
28: return true;
29: }複製代碼
順序消費
相關跳過,詳細解析見:《RocketMQ 源碼分析 —— Message 順序發送與消費》。[PullConsumer]
RebalancePullImpl#removeUnnecessaryMessageQueue(...)1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
4: return true;
5: }複製代碼
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致。1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
2: for (PullRequest pullRequest : pullRequestList) {
3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
5: }
6: }複製代碼
PushConsumer
不斷不斷不斷拉取消息的起點。1: public void executePullRequestImmediately(final PullRequest pullRequest) {
2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
3: }複製代碼
PullMessageService
異步執行,非阻塞。詳細解析見:PullMessageService。1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) {
7: // 校驗參數是否正確
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == null || cidAll.isEmpty()) {
15: throw new IllegalArgumentException("cidAll is null or cidAll empty");
16: }
17:
18: List<MessageQueue> result = new ArrayList<>();
19: if (!cidAll.contains(currentCID)) {
20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
21: consumerGroup,
22: currentCID,
23: cidAll);
24: return result;
25: }
26: // 平均分配
27: int index = cidAll.indexOf(currentCID); // 第幾個consumer。
28: int mod = mqAll.size() % cidAll.size(); // 餘數,即多少消息隊列沒法平均分配。
29: int averageSize =
30: mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
31: + 1 : mqAll.size() / cidAll.size());
32: int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有餘數的狀況下,[0, mod) 平分餘數,即每consumer多分配一個節點;第index開始,跳過前mod餘數。
33: int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配隊列數量。之因此要Math.min()的緣由是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息隊列。
34: for (int i = 0; i < range; i++) {
35: result.add(mqAll.get((startIndex + i) % mqAll.size()));
36: }
37: return result;
38: }
39:
40: @Override
41: public String getName() {
42: return "AVG";
43: }
44: }複製代碼
index
:當前 Consumer
在消費集羣裏是第幾個。這裏就是爲何須要對傳入的 cidAll
參數必須進行排序的緣由。若是不排序,Consumer
在本地計算出來的 index
沒法一致,影響計算結果。mod
:餘數,即多少消息隊列沒法平均分配。averageSize
:代碼能夠簡化成 (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
。
[ 0, mod )
:mqAll.size() / cidAll.size() + 1
。前面 mod
個 Consumer
平分餘數,多得到 1 個消息隊列。[ mod, cidAll.size() )
:mqAll.size() / cidAll.size()
。startIndex
:Consumer
分配消息隊列開始位置。range
:分配隊列數量。之因此要 Math#min(...)
的緣由:當 mqAll.size() <= cidAll.size()
時,最後幾個 Consumer
分配不到消息隊列。固定消息隊列長度爲4。併發
Consumer 2 能夠整除* | Consumer 3 不可整除* | Consumer 5 沒法都分配* | |
---|---|---|---|
消息隊列[0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息隊列[1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息隊列[2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息隊列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
2: /** 3: * 消費者消費brokerName集合 4: */
5: private Set<String> consumeridcs;
6:
7: @Override
8: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 9: List<String> cidAll) {
10: // 參數校驗
11: List<MessageQueue> result = new ArrayList<MessageQueue>();
12: int currentIndex = cidAll.indexOf(currentCID);
13: if (currentIndex < 0) {
14: return result;
15: }
16: // 計算符合當前配置的消費者數組('consumeridcs')對應的消息隊列
17: List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
18: for (MessageQueue mq : mqAll) {
19: String[] temp = mq.getBrokerName().split("@");
20: if (temp.length == 2 && consumeridcs.contains(temp[0])) {
21: premqAll.add(mq);
22: }
23: }
24: // 平均分配
25: int mod = premqAll.size() / cidAll.size();
26: int rem = premqAll.size() % cidAll.size();
27: int startIndex = mod * currentIndex;
28: int endIndex = startIndex + mod;
29: for (int i = startIndex; i < endIndex; i++) {
30: result.add(mqAll.get(i));
31: }
32: if (rem > currentIndex) {
33: result.add(premqAll.get(currentIndex + mod * cidAll.size()));
34: }
35: return result;
36: }
37:
38: @Override
39: public String getName() {
40: return "MACHINE_ROOM";
41: }
42:
43: public Set<String> getConsumeridcs() {
44: return consumeridcs;
45: }
46:
47: public void setConsumeridcs(Set<String> consumeridcs) {
48: this.consumeridcs = consumeridcs;
49: }
50: }複製代碼
Broker
對應的消息隊列。Broker
對應的消息隊列。AllocateMessageQueueAveragely
略有不一樣,其是將多餘的結尾部分分配給前 rem
個 Consumer
。Consumer
和 Broker
分配須要怎麼配置。😈等研究主從相關源碼時,仔細考慮下。1: public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) {
7: // 校驗參數是否正確
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == null || cidAll.isEmpty()) {
15: throw new IllegalArgumentException("cidAll is null or cidAll empty");
16: }
17:
18: List<MessageQueue> result = new ArrayList<MessageQueue>();
19: if (!cidAll.contains(currentCID)) {
20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
21: consumerGroup,
22: currentCID,
23: cidAll);
24: return result;
25: }
26:
27: // 環狀分配
28: int index = cidAll.indexOf(currentCID);
29: for (int i = index; i < mqAll.size(); i++) {
30: if (i % cidAll.size() == index) {
31: result.add(mqAll.get(i));
32: }
33: }
34: return result;
35: }
36:
37: @Override
38: public String getName() {
39: return "AVG_BY_CIRCLE";
40: }
41: }複製代碼
1: public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
2: private List<MessageQueue> messageQueueList;
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 6: List<String> cidAll) {
7: return this.messageQueueList;
8: }
9:
10: @Override
11: public String getName() {
12: return "CONFIG";
13: }
14:
15: public List<MessageQueue> getMessageQueueList() {
16: return messageQueueList;
17: }
18:
19: public void setMessageQueueList(List<MessageQueue> messageQueueList) {
20: this.messageQueueList = messageQueueList;
21: }
22: }複製代碼
1: public long computePullFromWhere(MessageQueue mq) {
2: long result = -1;
3: final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
4: final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
5: switch (consumeFromWhere) {
6: case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: // 廢棄
7: case CONSUME_FROM_MIN_OFFSET: // 廢棄
8: case CONSUME_FROM_MAX_OFFSET: // 廢棄
9: case CONSUME_FROM_LAST_OFFSET: {
10: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
11: if (lastOffset >= 0) {
12: result = lastOffset;
13: }
14: // First start,no offset
15: else if (-1 == lastOffset) {
16: if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
17: result = 0L;
18: } else {
19: try {
20: result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
21: } catch (MQClientException e) {
22: result = -1;
23: }
24: }
25: } else {
26: result = -1;
27: }
28: break;
29: }
30: case CONSUME_FROM_FIRST_OFFSET: {
31: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
32: if (lastOffset >= 0) {
33: result = lastOffset;
34: } else if (-1 == lastOffset) {
35: result = 0L;
36: } else {
37: result = -1;
38: }
39: break;
40: }
41: case CONSUME_FROM_TIMESTAMP: {
42: long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
43: if (lastOffset >= 0) {
44: result = lastOffset;
45: } else if (-1 == lastOffset) {
46: if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
47: try {
48: result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
49: } catch (MQClientException e) {
50: result = -1;
51: }
52: } else {
53: try {
54: long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
55: UtilAll.YYYY_MMDD_HHMMSS).getTime();
56: result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
57: } catch (MQClientException e) {
58: result = -1;
59: }
60: }
61: } else {
62: result = -1;
63: }
64: break;
65: }
66:
67: default:
68: break;
69: }
70:
71: return result;
72: }複製代碼
PushConsumer
讀取消費進度有三種選項:
CONSUME_FROM_LAST_OFFSET
:第 6 至 29 行 :一個新的消費集羣第一次啓動從隊列的最後位置開始消費。後續再啓動接着上次消費的進度開始消費。CONSUME_FROM_FIRST_OFFSET
:第 30 至 40 行 :一個新的消費集羣第一次啓動從隊列的最前位置開始消費。後續再啓動接着上次消費的進度開始消費。CONSUME_FROM_TIMESTAMP
:第 41 至 65 行 :一個新的消費集羣第一次啓動從指定時間點開始消費。後續再啓動接着上次消費的進度開始消費。[PullConsumer]
RebalancePullImpl#computePullFromWhere(...)暫時跳過。😈app
1: public class PullMessageService extends ServiceThread {
2: private final Logger log = ClientLogger.getLog();
3: /** 4: * 拉取消息請求隊列 5: */
6: private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<>();
7: /** 8: * MQClient對象 9: */
10: private final MQClientInstance mQClientFactory;
11: /** 12: * 定時器。用於延遲提交拉取請求 13: */
14: private final ScheduledExecutorService scheduledExecutorService = Executors
15: .newSingleThreadScheduledExecutor(new ThreadFactory() {
16: @Override
17: public Thread newThread(Runnable r) {
18: return new Thread(r, "PullMessageServiceScheduledThread");
19: }
20: });
21:
22: public PullMessageService(MQClientInstance mQClientFactory) {
23: this.mQClientFactory = mQClientFactory;
24: }
25:
26: /** 27: * 執行延遲拉取消息請求 28: * 29: * @param pullRequest 拉取消息請求 30: * @param timeDelay 延遲時長 31: */
32: public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
33: this.scheduledExecutorService.schedule(new Runnable() {
34:
35: @Override
36: public void run() {
37: PullMessageService.this.executePullRequestImmediately(pullRequest);
38: }
39: }, timeDelay, TimeUnit.MILLISECONDS);
40: }
41:
42: /** 43: * 執行當即拉取消息請求 44: * 45: * @param pullRequest 拉取消息請求 46: */
47: public void executePullRequestImmediately(final PullRequest pullRequest) {
48: try {
49: this.pullRequestQueue.put(pullRequest);
50: } catch (InterruptedException e) {
51: log.error("executePullRequestImmediately pullRequestQueue.put", e);
52: }
53: }
54:
55: /** 56: * 執行延遲任務 57: * 58: * @param r 任務 59: * @param timeDelay 延遲時長 60: */
61: public void executeTaskLater(final Runnable r, final long timeDelay) {
62: this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
63: }
64:
65: public ScheduledExecutorService getScheduledExecutorService() {
66: return scheduledExecutorService;
67: }
68:
69: /** 70: * 拉取消息 71: * 72: * @param pullRequest 拉取消息請求 73: */
74: private void pullMessage(final PullRequest pullRequest) {
75: final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
76: if (consumer != null) {
77: DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
78: impl.pullMessage(pullRequest);
79: } else {
80: log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
81: }
82: }
83:
84: @Override
85: public void run() {
86: log.info(this.getServiceName() + " service started");
87:
88: while (!this.isStopped()) {
89: try {
90: PullRequest pullRequest = this.pullRequestQueue.take();
91: if (pullRequest != null) {
92: this.pullMessage(pullRequest);
93: }
94: } catch (InterruptedException e) {
95: } catch (Exception e) {
96: log.error("Pull Message Service Run Method exception", e);
97: }
98: }
99:
100: log.info(this.getServiceName() + " service end");
101: }
102:
103: @Override
104: public String getServiceName() {
105: return PullMessageService.class.getSimpleName();
106: }
107:
108: }複製代碼
Broker
拉取消息,並提交消費任務到 ConsumeMessageService
。#executePullRequestLater(...)
:第 26 至 40 行 : 提交延遲拉取消息請求。#executePullRequestImmediately(...)
:第 42 至 53 行 :提交當即拉取消息請求。#executeTaskLater(...)
:第 55 至 63 行 :提交延遲任務。#pullMessage(...)
:第 69 至 82 行 :執行拉取消息邏輯。詳細解析見:DefaultMQPushConsumerImpl#pullMessage(...)。#run(...)
:第 84 至 101 行 :循環拉取消息請求隊列( pullRequestQueue
),進行消息拉取。1: public void pullMessage(final PullRequest pullRequest) {
2: final ProcessQueue processQueue = pullRequest.getProcessQueue();
3: if (processQueue.isDropped()) {
4: log.info("the pull request[{}] is dropped.", pullRequest.toString());
5: return;
6: }
7:
8: // 設置隊列最後拉取消息時間
9: pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
10:
11: // 判斷consumer狀態是否運行中。若是不是,則延遲拉取消息。
12: try {
13: this.makeSureStateOK();
14: } catch (MQClientException e) {
15: log.warn("pullMessage exception, consumer state not ok", e);
16: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
17: return;
18: }
19:
20: // 判斷是否暫停中。
21: if (this.isPause()) {
22: log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
23: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
24: return;
25: }
26:
27: // 判斷是否超過最大持有消息數量。默認最大值爲1000。
28: long size = processQueue.getMsgCount().get();
29: if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
30: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延遲消息拉取請求。50ms。
31: if ((flowControlTimes1++ % 1000) == 0) {
32: log.warn(
33: "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
34: processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
35: }
36: return;
37: }
38:
39: if (!this.consumeOrderly) { // 判斷消息跨度是否過大。
40: if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
41: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延遲消息拉取請求。50ms。
42: if ((flowControlTimes2++ % 1000) == 0) {
43: log.warn(
44: "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
45: processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
46: pullRequest, flowControlTimes2);
47: }
48: return;
49: }
50: } else { // TODO 順序消費
51: if (processQueue.isLocked()) {
52: if (!pullRequest.isLockedFirst()) {
53: final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
54: boolean brokerBusy = offset < pullRequest.getNextOffset();
55: log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
56: pullRequest, offset, brokerBusy);
57: if (brokerBusy) {
58: log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
59: pullRequest, offset);
60: }
61:
62: pullRequest.setLockedFirst(true);
63: pullRequest.setNextOffset(offset);
64: }
65: } else {
66: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
67: log.info("pull message later because not locked in broker, {}", pullRequest);
68: return;
69: }
70: }
71:
72: // 獲取Topic 對應的訂閱信息。若不存在,則延遲拉取消息
73: final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
74: if (null == subscriptionData) {
75: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
76: log.warn("find the consumer's subscription failed, {}", pullRequest);
77: return;
78: }
79:
80: final long beginTimestamp = System.currentTimeMillis();
81:
82: PullCallback pullCallback = new PullCallback() {
83: @Override
84: public void onSuccess(PullResult pullResult) {
85: if (pullResult != null) {
86: pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
87: subscriptionData);
88:
89: switch (pullResult.getPullStatus()) {
90: case FOUND:
91: // 設置下次拉取消息隊列位置
92: long prevRequestOffset = pullRequest.getNextOffset();
93: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
94:
95: // 統計
96: long pullRT = System.currentTimeMillis() - beginTimestamp;
97: DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
98: pullRequest.getMessageQueue().getTopic(), pullRT);
99:
100: long firstMsgOffset = Long.MAX_VALUE;
101: if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
102: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
103: } else {
104: firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
105:
106: // 統計
107: DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
108: pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
109:
110: // 提交拉取到的消息到消息處理隊列
111: boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
112:
113: // 提交消費請求
114: DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
115: pullResult.getMsgFoundList(), //
116: processQueue, //
117: pullRequest.getMessageQueue(), //
118: dispathToConsume);
119:
120: // 提交下次拉取消息請求
121: if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
122: DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
123: DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
124: } else {
125: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
126: }
127: }
128:
129: // 下次拉取消息隊列位置小於上次拉取消息隊列位置 或者 第一條消息的消息隊列位置小於上次拉取消息隊列位置,則斷定爲BUG,輸出log
130: if (pullResult.getNextBeginOffset() < prevRequestOffset//
131: || firstMsgOffset < prevRequestOffset) {
132: log.warn(
133: "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
134: pullResult.getNextBeginOffset(), //
135: firstMsgOffset, //
136: prevRequestOffset);
137: }
138:
139: break;
140: case NO_NEW_MSG:
141: // 設置下次拉取消息隊列位置
142: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
143:
144: // 持久化消費進度
145: DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
146:
147: // 當即提交拉取消息請求
148: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
149: break;
150: case NO_MATCHED_MSG:
151: // 設置下次拉取消息隊列位置
152: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
153:
154: // 持久化消費進度
155: DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
156:
157: // 提交當即拉取消息請求
158: DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
159: break;
160: case OFFSET_ILLEGAL:
161: log.warn("the pull request offset illegal, {} {}", //
162: pullRequest.toString(), pullResult.toString());
163: // 設置下次拉取消息隊列位置
164: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
165:
166: // 設置消息處理隊列爲dropped
167: pullRequest.getProcessQueue().setDropped(true);
168:
169: // 提交延遲任務,進行消費處理隊列移除。不當即移除的緣由:可能有地方正在使用,避免受到影響。
170: DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
171:
172: @Override
173: public void run() {
174: try {
175: // 更新消費進度,同步消費進度到Broker
176: DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
177: pullRequest.getNextOffset(), false);
178: DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
179:
180: // 移除消費處理隊列
181: DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
182:
183: log.warn("fix the pull request offset, {}", pullRequest);
184: } catch (Throwable e) {
185: log.error("executeTaskLater Exception", e);
186: }
187: }
188: }, 10000);
189: break;
190: default:
191: break;
192: }
193: }
194: }
195:
196: @Override
197: public void onException(Throwable e) {
198: if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
199: log.warn("execute the pull request exception", e);
200: }
201:
202: // 提交延遲拉取消息請求
203: DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
204: }
205: };
206:
207: // 集羣消息模型下,計算提交的消費進度。
208: boolean commitOffsetEnable = false;
209: long commitOffsetValue = 0L;
210: if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
211: commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
212: if (commitOffsetValue > 0) {
213: commitOffsetEnable = true;
214: }
215: }
216:
217: // 計算請求的 訂閱表達式 和 是否進行filtersrv過濾消息
218: String subExpression = null;
219: boolean classFilter = false;
220: SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
221: if (sd != null) {
222: if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
223: subExpression = sd.getSubString();
224: }
225:
226: classFilter = sd.isClassFilterMode();
227: }
228:
229: // 計算拉取消息系統標識
230: int sysFlag = PullSysFlag.buildSysFlag(//
231: commitOffsetEnable, // commitOffset
232: true, // suspend
233: subExpression != null, // subscription
234: classFilter // class filter
235: );
236:
237: // 執行拉取。若是拉取請求發生異常時,提交延遲拉取消息請求。
238: try {
239: this.pullAPIWrapper.pullKernelImpl(//
240: pullRequest.getMessageQueue(), // 1
241: subExpression, // 2
242: subscriptionData.getSubVersion(), // 3
243: pullRequest.getNextOffset(), // 4
244: this.defaultMQPushConsumer.getPullBatchSize(), // 5
245: sysFlag, // 6
246: commitOffsetValue, // 7
247: BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
248: CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
249: CommunicationMode.ASYNC, // 10
250: pullCallback// 11
251: );
252: } catch (Exception e) {
253: log.error("pullKernelImpl exception", e);
254: this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
255: }
256: }
257:
258: private void correctTagsOffset(final PullRequest pullRequest) {
259: if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
260: this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
261: }
262: }複製代碼
#pullMessage(...)
說明 :拉取消息。
Consumer
未處於運行中狀態,不進行消息拉取,提交延遲拉取消息請求。Consumer
處於暫停中,不進行消息拉取,提交延遲拉取消息請求。Consumer
爲併發消費 而且 消息隊列持有消息跨度過大(消息跨度 = 持有消息最後一條和第一條的消息位置差,默認:2000),不進行消息拉取,提交延遲拉取消息請求。順序消費
相關跳過,詳細解析見:《RocketMQ 源碼分析 —— Message 順序發送與消費》。Topic
對應的訂閱信息不存在,不進行消息拉取,提交延遲拉取消息請求。Consumer
本地的訂閱信息( SubscriptionData
),而不使用 Broker
裏的訂閱信息。詳細解析見:PullMessageProcessor#processRequest(...) 第 64 至 110 行代碼。Broker
處理拉取消息邏輯見:PullMessageProcessor#processRequest(...)。PullCallback
:拉取消息回調:
FOUND
) :
ConsumeMessageService
。詳細解析見:ConsumeMessageConcurrentlyService。pullInterval
),提交當即或者延遲拉取消息請求。默認拉取頻率爲 0ms ,提交當即拉取消息請求。NO_NEW_MSG
) :#correctTagsOffset(...)
。NO_MATCHED_MSG
)。邏輯同 NO_NEW_MSG
。OFFSET_ILLEGAL
)。dropped
。Broker
。#correctTagsOffset(...)
:更正消費進度。
1: /** 2: * 拉取消息核心方法 3: * 4: * @param mq 消息隊列 5: * @param subExpression 訂閱表達式 6: * @param subVersion 訂閱版本號 7: * @param offset 拉取隊列開始位置 8: * @param maxNums 拉取消息數量 9: * @param sysFlag 拉取請求系統標識 10: * @param commitOffset 提交消費進度 11: * @param brokerSuspendMaxTimeMillis broker掛起請求最大時間 12: * @param timeoutMillis 請求broker超時時長 13: * @param communicationMode 通信模式 14: * @param pullCallback 拉取回調 15: * @return 拉取消息結果。只有通信模式爲同步時,才返回結果,不然返回null。 16: * @throws MQClientException 當尋找不到 broker 時,或發生其餘client異常 17: * @throws RemotingException 當遠程調用發生異常時 18: * @throws MQBrokerException 當 broker 發生異常時。只有通信模式爲同步時纔會發生該異常。 19: * @throws InterruptedException 當發生中斷異常時 20: */
21: protected PullResult pullKernelImpl( 22: final MessageQueue mq, 23: final String subExpression, 24: final long subVersion, 25: final long offset, 26: final int maxNums, 27: final int sysFlag, 28: final long commitOffset, 29: final long brokerSuspendMaxTimeMillis, 30: final long timeoutMillis, 31: final CommunicationMode communicationMode, 32: final PullCallback pullCallback 33: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
34: // 獲取Broker信息
35: FindBrokerResult findBrokerResult =
36: this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
37: this.recalculatePullFromWhichNode(mq), false);
38: if (null == findBrokerResult) {
39: this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
40: findBrokerResult =
41: this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
42: this.recalculatePullFromWhichNode(mq), false);
43: }
44:
45: // 請求拉取消息
46: if (findBrokerResult != null) {
47: int sysFlagInner = sysFlag;
48:
49: if (findBrokerResult.isSlave()) {
50: sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
51: }
52:
53: PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
54: requestHeader.setConsumerGroup(this.consumerGroup);
55: requestHeader.setTopic(mq.getTopic());
56: requestHeader.setQueueId(mq.getQueueId());
57: requestHeader.setQueueOffset(offset);
58: requestHeader.setMaxMsgNums(maxNums);
59: requestHeader.setSysFlag(sysFlagInner);
60: requestHeader.setCommitOffset(commitOffset);
61: requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
62: requestHeader.setSubscription(subExpression);
63: requestHeader.setSubVersion(subVersion);
64:
65: String brokerAddr = findBrokerResult.getBrokerAddr();
66: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { // TODO filtersrv
67: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
68: }
69:
70: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
71: brokerAddr,
72: requestHeader,
73: timeoutMillis,
74: communicationMode,
75: pullCallback);
76:
77: return pullResult;
78: }
79:
80: // Broker信息不存在,則拋出異常
81: throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
82: }複製代碼
Broker
信息(Broker
地址、是否爲從節點)。
Broker
信息不存在,則拋出異常。1: /** 2: * 消息隊列 與 拉取Broker 的映射 3: * 當拉取消息時,會經過該映射獲取拉取請求對應的Broker 4: */
5: private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
6: new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
7: /** 8: * 是否使用默認Broker 9: */
10: private volatile boolean connectBrokerByUser = false;
11: /** 12: * 默認Broker編號 13: */
14: private volatile long defaultBrokerId = MixAll.MASTER_ID;
15:
16: /** 17: * 計算消息隊列拉取消息對應的Broker編號 18: * 19: * @param mq 消息隊列 20: * @return Broker編號 21: */
22: public long recalculatePullFromWhichNode(final MessageQueue mq) {
23: // 若開啓默認Broker開關,則返回默認Broker編號
24: if (this.isConnectBrokerByUser()) {
25: return this.defaultBrokerId;
26: }
27:
28: // 若消息隊列映射拉取Broker存在,則返回映射Broker編號
29: AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
30: if (suggest != null) {
31: return suggest.get();
32: }
33:
34: // 返回Broker主節點編號
35: return MixAll.MASTER_ID;
36: }複製代碼
Broker
編號。1: /** 2: * Broker名字 和 Broker地址相關 Map 3: */
4: private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
5: new ConcurrentHashMap<>();
6:
7: /** 8: * 得到Broker信息 9: * 10: * @param brokerName broker名字 11: * @param brokerId broker編號 12: * @param onlyThisBroker 是否必須是該broker 13: * @return Broker信息 14: */
15: public FindBrokerResult findBrokerAddressInSubscribe(// 16: final String brokerName, // 17: final long brokerId, // 18: final boolean onlyThisBroker// 19: ) {
20: String brokerAddr = null; // broker地址
21: boolean slave = false; // 是否爲從節點
22: boolean found = false; // 是否找到
23:
24: // 得到Broker信息
25: HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
26: if (map != null && !map.isEmpty()) {
27: brokerAddr = map.get(brokerId);
28: slave = brokerId != MixAll.MASTER_ID;
29: found = brokerAddr != null;
30:
31: // 若是不強制得到,選擇一個Broker
32: if (!found && !onlyThisBroker) {
33: Entry<Long, String> entry = map.entrySet().iterator().next();
34: brokerAddr = entry.getValue();
35: slave = entry.getKey() != MixAll.MASTER_ID;
36: found = true;
37: }
38: }
39:
40: // 找到broker,則返回信息
41: if (found) {
42: return new FindBrokerResult(brokerAddr, slave);
43: }
44:
45: // 找不到,則返回空
46: return null;
47: }複製代碼
Broker
信息(Broker
地址、是否爲從節點)。1: /** 2: * 處理拉取結果 3: * 1. 更新消息隊列拉取消息Broker編號的映射 4: * 2. 解析消息,並根據訂閱信息消息tagCode匹配合適消息 5: * 6: * @param mq 消息隊列 7: * @param pullResult 拉取結果 8: * @param subscriptionData 訂閱信息 9: * @return 拉取結果 10: */
11: public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, 12: final SubscriptionData subscriptionData) {
13: PullResultExt pullResultExt = (PullResultExt) pullResult;
14:
15: // 更新消息隊列拉取消息Broker編號的映射
16: this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
17:
18: // 解析消息,並根據訂閱信息消息tagCode匹配合適消息
19: if (PullStatus.FOUND == pullResult.getPullStatus()) {
20: // 解析消息
21: ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
22: List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
23:
24: // 根據訂閱信息消息tagCode匹配合適消息
25: List<MessageExt> msgListFilterAgain = msgList;
26: if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
27: msgListFilterAgain = new ArrayList<>(msgList.size());
28: for (MessageExt msg : msgList) {
29: if (msg.getTags() != null) {
30: if (subscriptionData.getTagsSet().contains(msg.getTags())) {
31: msgListFilterAgain.add(msg);
32: }
33: }
34: }
35: }
36:
37: // Hook
38: if (this.hasHook()) {
39: FilterMessageContext filterMessageContext = new FilterMessageContext();
40: filterMessageContext.setUnitMode(unitMode);
41: filterMessageContext.setMsgList(msgListFilterAgain);
42: this.executeHook(filterMessageContext);
43: }
44:
45: // 設置消息隊列當前最小/最大位置到消息拓展字段
46: for (MessageExt msg : msgListFilterAgain) {
47: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
48: Long.toString(pullResult.getMinOffset()));
49: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
50: Long.toString(pullResult.getMaxOffset()));
51: }
52:
53: // 設置消息列表
54: pullResultExt.setMsgFoundList(msgListFilterAgain);
55: }
56:
57: // 清空消息二進制數組
58: pullResultExt.setMessageBinary(null);
59:
60: return pullResult;
61: }複製代碼
Broker
編號的映射。tagCode
匹配合適消息。Broker
編號的映射。下次拉取消息時,若是未設置默認拉取的 Broker
編號,會使用更新後的 Broker
編號。tagCode
匹配合適消息。
tagCode
匹配消息。Hook
。1: /** 2: * 消息映射讀寫鎖 3: */
4: private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
5: /** 6: * 消息映射 7: * key:消息隊列位置 8: */
9: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();
10: /** 11: * 消息數 12: */
13: private final AtomicLong msgCount = new AtomicLong();
14: /** 15: * 添加消息最大隊列位置 16: */
17: private volatile long queueOffsetMax = 0L;
18: /** 19: * 是否正在消費 20: */
21: private volatile boolean consuming = false;
22: /** 23: * Broker累計消息數量 24: * 計算公式 = queueMaxOffset - 新添加消息數組[n - 1].queueOffset 25: * Acc = Accumulation 26: * cnt = (猜想)對比度 27: */
28: private volatile long msgAccCnt = 0;
29:
30: /** 31: * 添加消息,並返回是否提交給消費者 32: * 返回true,當有新消息添加成功時, 33: * 34: * @param msgs 消息 35: * @return 是否提交給消費者 36: */
37: public boolean putMessage(final List<MessageExt> msgs) {
38: boolean dispatchToConsume = false;
39: try {
40: this.lockTreeMap.writeLock().lockInterruptibly();
41: try {
42: // 添加消息
43: int validMsgCnt = 0;
44: for (MessageExt msg : msgs) {
45: MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
46: if (null == old) {
47: validMsgCnt++;
48: this.queueOffsetMax = msg.getQueueOffset();
49: }
50: }
51: msgCount.addAndGet(validMsgCnt);
52:
53: // 計算是否正在消費
54: if (!msgTreeMap.isEmpty() && !this.consuming) {
55: dispatchToConsume = true;
56: this.consuming = true;
57: }
58:
59: // Broker累計消息數量
60: if (!msgs.isEmpty()) {
61: MessageExt messageExt = msgs.get(msgs.size() - 1);
62: String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
63: if (property != null) {
64: long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
65: if (accTotal > 0) {
66: this.msgAccCnt = accTotal;
67: }
68: }
69: }
70: } finally {
71: this.lockTreeMap.writeLock().unlock();
72: }
73: } catch (InterruptedException e) {
74: log.error("putMessage exception", e);
75: }
76:
77: return dispatchToConsume;
78: }複製代碼
若是用最簡單粗暴的方式描述 PullConsumer
拉取消息的過程,那就是以下的代碼:
while (true) {
if (不知足拉取消息) {
Thread.sleep(間隔);
continue;
}
主動拉取消息();
}複製代碼
1: /** 2: * 消費線程池隊列 3: */
4: private final BlockingQueue<Runnable> consumeRequestQueue;
5: /** 6: * 消費線程池 7: */
8: private final ThreadPoolExecutor consumeExecutor;
9:
10: public void submitConsumeRequest(// 11: final List<MessageExt> msgs, // 12: final ProcessQueue processQueue, // 13: final MessageQueue messageQueue, // 14: final boolean dispatchToConsume) {
15: final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
16: if (msgs.size() <= consumeBatchSize) { // 提交消息小於批量消息數,直接提交消費請求
17: ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
18: try {
19: this.consumeExecutor.submit(consumeRequest);
20: } catch (RejectedExecutionException e) {
21: this.submitConsumeRequestLater(consumeRequest);
22: }
23: } else { // 提交消息大於批量消息數,進行分拆成多個消費請求
24: for (int total = 0; total < msgs.size(); ) {
25: // 計算當前拆分請求包含的消息
26: List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize);
27: for (int i = 0; i < consumeBatchSize; i++, total++) {
28: if (total < msgs.size()) {
29: msgThis.add(msgs.get(total));
30: } else {
31: break;
32: }
33: }
34:
35: // 提交拆分消費請求
36: ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
37: try {
38: this.consumeExecutor.submit(consumeRequest);
39: } catch (RejectedExecutionException e) {
40: // 若是被拒絕,則將當前拆分消息+剩餘消息提交延遲消費請求。
41: for (; total < msgs.size(); total++) {
42: msgThis.add(msgs.get(total));
43: }
44: this.submitConsumeRequestLater(consumeRequest);
45: }
46: }
47: }
48: }複製代碼
1: /** 2: * 提交延遲消費請求 3: * 4: * @param msgs 消息列表 5: * @param processQueue 消息處理隊列 6: * @param messageQueue 消息隊列 7: */
8: private void submitConsumeRequestLater(// 9: final List<MessageExt> msgs, // 10: final ProcessQueue processQueue, // 11: final MessageQueue messageQueue// 12: ) {
13:
14: this.scheduledExecutorService.schedule(new Runnable() {
15:
16: @Override
17: public void run() {
18: ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
19: }
20: }, 5000, TimeUnit.MILLISECONDS);
21: }
22:
23: /** 24: * 提交延遲消費請求 25: * @param consumeRequest 消費請求 26: */
27: private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// 28: ) {
29:
30: this.scheduledExecutorService.schedule(new Runnable() {
31:
32: @Override
33: public void run() {
34: ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); // TODO BUG ?
35: }
36: }, 5000, TimeUnit.MILLISECONDS);
37: }複製代碼
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
。若是消息數超過批量消費上限,會不會是BUG。1: class ConsumeRequest implements Runnable {
2:
3: /** 4: * 消費消息列表 5: */
6: private final List<MessageExt> msgs;
7: /** 8: * 消息處理隊列 9: */
10: private final ProcessQueue processQueue;
11: /** 12: * 消息隊列 13: */
14: private final MessageQueue messageQueue;
15:
16: public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
17: this.msgs = msgs;
18: this.processQueue = processQueue;
19: this.messageQueue = messageQueue;
20: }
21:
22: @Override
23: public void run() {
24: // 廢棄隊列不進行消費
25: if (this.processQueue.isDropped()) {
26: log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
27: return;
28: }
29:
30: MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; // 監聽器
31: ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); // 消費Context
32: ConsumeConcurrentlyStatus status = null; // 消費結果狀態
33:
34: // Hook
35: ConsumeMessageContext consumeMessageContext = null;
36: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
37: consumeMessageContext = new ConsumeMessageContext();
38: consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
39: consumeMessageContext.setProps(new HashMap<String, String>());
40: consumeMessageContext.setMq(messageQueue);
41: consumeMessageContext.setMsgList(msgs);
42: consumeMessageContext.setSuccess(false);
43: ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
44: }
45:
46: long beginTimestamp = System.currentTimeMillis();
47: boolean hasException = false;
48: ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; // 消費返回結果類型
49: try {
50: // 當消息爲重試消息,設置Topic爲原始Topic
51: ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
52:
53: // 設置開始消費時間
54: if (msgs != null && !msgs.isEmpty()) {
55: for (MessageExt msg : msgs) {
56: MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
57: }
58: }
59:
60: // 進行消費
61: status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
62: } catch (Throwable e) {
63: log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
64: RemotingHelper.exceptionSimpleDesc(e), //
65: ConsumeMessageConcurrentlyService.this.consumerGroup,
66: msgs,
67: messageQueue);
68: hasException = true;
69: }
70:
71: // 解析消費返回結果類型
72: long consumeRT = System.currentTimeMillis() - beginTimestamp;
73: if (null == status) {
74: if (hasException) {
75: returnType = ConsumeReturnType.EXCEPTION;
76: } else {
77: returnType = ConsumeReturnType.RETURNNULL;
78: }
79: } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
80: returnType = ConsumeReturnType.TIME_OUT;
81: } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
82: returnType = ConsumeReturnType.FAILED;
83: } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
84: returnType = ConsumeReturnType.SUCCESS;
85: }
86:
87: // Hook
88: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
89: consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
90: }
91:
92: // 消費結果狀態爲空時,則設置爲稍後從新消費
93: if (null == status) {
94: log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
95: ConsumeMessageConcurrentlyService.this.consumerGroup,
96: msgs,
97: messageQueue);
98: status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
99: }
100:
101: // Hook
102: if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
103: consumeMessageContext.setStatus(status.toString());
104: consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
105: ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
106: }
107:
108: // 統計
109: ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
110: .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
111:
112: // 處理消費結果
113: if (!processQueue.isDropped()) {
114: ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
115: } else {
116: log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
117: }
118: }
119:
120: }複製代碼
Topic
爲原始 Topic
。例如:原始 Topic
爲 TopicTest
,重試時 Topic
爲 %RETRY%please_rename_unique_group_name_4
,通過該方法,Topic
設置回 TopicTest
。Hook
。Hook
。1: public void processConsumeResult(// 2: final ConsumeConcurrentlyStatus status, // 3: final ConsumeConcurrentlyContext context, // 4: final ConsumeRequest consumeRequest// 5: ) {
6: int ackIndex = context.getAckIndex();
7:
8: // 消息爲空,直接返回
9: if (consumeRequest.getMsgs().isEmpty())
10: return;
11:
12: // 計算從consumeRequest.msgs[0]到consumeRequest.msgs[ackIndex]的消息消費成功
13: switch (status) {
14: case CONSUME_SUCCESS:
15: if (ackIndex >= consumeRequest.getMsgs().size()) {
16: ackIndex = consumeRequest.getMsgs().size() - 1;
17: }
18: // 統計成功/失敗數量
19: int ok = ackIndex + 1;
20: int failed = consumeRequest.getMsgs().size() - ok;
21: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
22: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
23: break;
24: case RECONSUME_LATER:
25: ackIndex = -1;
26: // 統計成功/失敗數量
27: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
28: consumeRequest.getMsgs().size());
29: break;
30: default:
31: break;
32: }
33:
34: // 處理消費失敗的消息
35: switch (this.defaultMQPushConsumer.getMessageModel()) {
36: case BROADCASTING: // 廣播模式,不管是否消費失敗,不發回消息到Broker,只打印Log
37: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
38: MessageExt msg = consumeRequest.getMsgs().get(i);
39: log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
40: }
41: break;
42: case CLUSTERING:
43: // 發回消息失敗到Broker。
44: List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());
45: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
46: MessageExt msg = consumeRequest.getMsgs().get(i);
47: boolean result = this.sendMessageBack(msg, context);
48: if (!result) {
49: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
50: msgBackFailed.add(msg);
51: }
52: }
53:
54: // 發回Broker失敗的消息,直接提交延遲從新消費
55: if (!msgBackFailed.isEmpty()) {
56: consumeRequest.getMsgs().removeAll(msgBackFailed);
57:
58: this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
59: }
60: break;
61: default:
62: break;
63: }
64:
65: // 移除消費成功消息,並更新最新消費進度
66: long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
67: if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
68: this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
69: }
70: }複製代碼
ackIndex
值。consumeRequest.msgs[0 - ackIndex]
爲消費成功,須要進行 ack
確認。
CONSUME_SUCCESS
:ackIndex = context.getAckIndex()
。RECONSUME_LATER
:ackIndex = -1
。BROADCASTING
:廣播模式,不管是否消費失敗,不發回消息到 Broker
,只打印日誌。CLUSTERING
:集羣模式,消費失敗的消息發回到 Broker
。
Broker
。詳細解析見:DefaultMQPushConsumerImpl#sendMessageBack(...)。Broker
失敗的消息,直接提交延遲從新消費。Broker
成功,結果由於例如網絡異常,致使 Consumer
覺得發回失敗,斷定消費發回失敗,會致使消息重複消費,所以,消息消費要盡最大可能性實現冪等性。Broker
成功】的消息,並更新最新消費進度。
Broker
成功】的消息?見第 56 行。1: /** 2: * 移除消息,並返回第一條消息隊列位置 3: * 4: * @param msgs 消息 5: * @return 消息隊列位置 6: */
7: public long removeMessage(final List<MessageExt> msgs) {
8: long result = -1;
9: final long now = System.currentTimeMillis();
10: try {
11: this.lockTreeMap.writeLock().lockInterruptibly();
12: this.lastConsumeTimestamp = now;
13: try {
14: if (!msgTreeMap.isEmpty()) {
15: result = this.queueOffsetMax + 1; // 這裏+1的緣由是:若是msgTreeMap爲空時,下一條得到的消息位置爲queueOffsetMax+1
16:
17: // 移除消息
18: int removedCnt = 0;
19: for (MessageExt msg : msgs) {
20: MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
21: if (prev != null) {
22: removedCnt--;
23: }
24: }
25: msgCount.addAndGet(removedCnt);
26:
27: if (!msgTreeMap.isEmpty()) {
28: result = msgTreeMap.firstKey();
29: }
30: }
31: } finally {
32: this.lockTreeMap.writeLock().unlock();
33: }
34: } catch (Throwable t) {
35: log.error("removeMessage exception", t);
36: }
37:
38: return result;
39: }複製代碼
1: public void start() {
2: this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
3:
4: @Override
5: public void run() {
6: cleanExpireMsg();
7: }
8:
9: }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
10: }
11:
12: /** 13: * 清理過時消息 14: */
15: private void cleanExpireMsg() {
16: Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
17: this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
18: while (it.hasNext()) {
19: Map.Entry<MessageQueue, ProcessQueue> next = it.next();
20: ProcessQueue pq = next.getValue();
21: pq.cleanExpiredMsg(this.defaultMQPushConsumer);
22: }
23: }複製代碼
1: public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
2: // 順序消費時,直接返回
3: if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
4: return;
5: }
6:
7: // 循環移除消息
8: int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; // 每次循環最多移除16條
9: for (int i = 0; i < loop; i++) {
10: // 獲取第一條消息。判斷是否超時,若不超時,則結束循環
11: MessageExt msg = null;
12: try {
13: this.lockTreeMap.readLock().lockInterruptibly();
14: try {
15: if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
16: msg = msgTreeMap.firstEntry().getValue();
17: } else {
18: break;
19: }
20: } finally {
21: this.lockTreeMap.readLock().unlock();
22: }
23: } catch (InterruptedException e) {
24: log.error("getExpiredMsg exception", e);
25: }
26:
27: try {
28: // 發回超時消息
29: pushConsumer.sendMessageBack(msg, 3);
30: log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
31:
32: // 判斷此時消息是否依然是第一條,如果,則進行移除
33: try {
34: this.lockTreeMap.writeLock().lockInterruptibly();
35: try {
36: if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
37: try {
38: msgTreeMap.remove(msgTreeMap.firstKey());
39: } catch (Exception e) {
40: log.error("send expired msg exception", e);
41: }
42: }
43: } finally {
44: this.lockTreeMap.writeLock().unlock();
45: }
46: } catch (InterruptedException e) {
47: log.error("getExpiredMsg exception", e);
48: }
49: } catch (Exception e) {
50: log.error("send expired msg exception", e);
51: }
52: }
53: }複製代碼
Broker
。1: public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) 2: throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
3: try {
4: // Consumer發回消息
5: String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
6: : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
7: this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
8: this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
9: } catch (Exception e) { // TODO 疑問:什麼狀況下會發生異常
10: // 異常時,使用Client內置Producer發回消息
11: log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
12:
13: Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
14:
15: String originMsgId = MessageAccessor.getOriginMessageId(msg);
16: MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
17:
18: newMsg.setFlag(msg.getFlag());
19: MessageAccessor.setProperties(newMsg, msg.getProperties());
20: MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
21: MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
22: MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
23: newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
24:
25: this.mQClientFactory.getDefaultMQProducer().send(newMsg);
26: }
27: }複製代碼
Consumer
發回消息。詳細解析見:MQClientAPIImpl#consumerSendMessageBack(...)。Consumer
內置默認 Producer
發送消息。
1: /** 2: * Consumer發回消息 3: * @param addr Broker地址 4: * @param msg 消息 5: * @param consumerGroup 消費分組 6: * @param delayLevel 延遲級別 7: * @param timeoutMillis 超時 8: * @param maxConsumeRetryTimes 消費最大重試次數 9: * @throws RemotingException 當遠程調用發生異常時 10: * @throws MQBrokerException 當Broker發生異常時 11: * @throws InterruptedException 當線程中斷時 12: */
13: public void consumerSendMessageBack( 14: final String addr, 15: final MessageExt msg, 16: final String consumerGroup, 17: final int delayLevel, 18: final long timeoutMillis, 19: final int maxConsumeRetryTimes 20: ) throws RemotingException, MQBrokerException, InterruptedException {
21: ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
22: RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
23:
24: requestHeader.setGroup(consumerGroup);
25: requestHeader.setOriginTopic(msg.getTopic());
26: requestHeader.setOffset(msg.getCommitLogOffset());
27: requestHeader.setDelayLevel(delayLevel);
28: requestHeader.setOriginMsgId(msg.getMsgId());
29: requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
30:
31: RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
32: request, timeoutMillis);
33: assert response != null;
34: switch (response.getCode()) {
35: case ResponseCode.SUCCESS: {
36: return;
37: }
38: default:
39: break;
40: }
41:
42: throw new MQBrokerException(response.getCode(), response.getRemark());
43: }複製代碼
RemoteBrokerOffsetStore
:Consumer
集羣模式 下,使用遠程 Broker
消費進度。LocalFileOffsetStore
:Consumer
廣播模式下,使用本地 文件
消費進度。1: @Override
2: public void load() throws MQClientException {
3: // 從本地硬盤讀取消費進度
4: OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
5: if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
6: offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
7:
8: // 打印每一個消息隊列的消費進度
9: for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
10: AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
11: log.info("load consumer's offset, {} {} {}",
12: this.groupName,
13: mq,
14: offset.get());
15: }
16: }
17: }複製代碼
1: public class OffsetSerializeWrapper extends RemotingSerializable {
2: private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
3: new ConcurrentHashMap<>();
4:
5: public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
6: return offsetTable;
7: }
8:
9: public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) {
10: this.offsetTable = offsetTable;
11: }
12: }複製代碼
Offset
存儲序列化。Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":3,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":2,
"topic":"TopicTest"
}:1471,{
"brokerName":"broker-a",
"queueId":1,
"topic":"TopicTest"
}:1470,{
"brokerName":"broker-a",
"queueId":0,
"topic":"TopicTest"
}:1470
}
}複製代碼
1: @Override
2: public void load() {
3: }複製代碼
Broker
獲取。讀取消費進度類型:
READ_FROM_MEMORY
:從內存讀取。READ_FROM_STORE
:從存儲( Broker
或 文件
)讀取。MEMORY_FIRST_THEN_STORE
:優先從內存讀取,讀取不到,從存儲讀取。1: @Override
2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
3: if (mq != null) {
4: switch (type) {
5: case MEMORY_FIRST_THEN_STORE:
6: case READ_FROM_MEMORY: {
7: AtomicLong offset = this.offsetTable.get(mq);
8: if (offset != null) {
9: return offset.get();
10: } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
11: return -1;
12: }
13: }
14: case READ_FROM_STORE: {
15: OffsetSerializeWrapper offsetSerializeWrapper;
16: try {
17: offsetSerializeWrapper = this.readLocalOffset();
18: } catch (MQClientException e) {
19: return -1;
20: }
21: if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
22: AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
23: if (offset != null) {
24: this.updateOffset(mq, offset.get(), false);
25: return offset.get();
26: }
27: }
28: }
29: default:
30: break;
31: }
32: }
33:
34: return -1;
35: }複製代碼
文件
讀取消費進度。1: @Override
2: public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
3: if (mq != null) {
4: switch (type) {
5: case MEMORY_FIRST_THEN_STORE:
6: case READ_FROM_MEMORY: {
7: AtomicLong offset = this.offsetTable.get(mq);
8: if (offset != null) {
9: return offset.get();
10: } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
11: return -1;
12: }
13: }
14: case READ_FROM_STORE: {
15: try {
16: long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
17: AtomicLong offset = new AtomicLong(brokerOffset);
18: this.updateOffset(mq, offset.get(), false);
19: return brokerOffset;
20: }
21: // No offset in broker
22: catch (MQBrokerException e) {
23: return -1;
24: }
25: //Other exceptions
26: catch (Exception e) {
27: log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
28: return -2;
29: }
30: }
31: default:
32: break;
33: }
34: }
35:
36: return -1;
37: }複製代碼
Broker
讀取消費進度。該方法 RemoteBrokerOffsetStore
與 LocalFileOffsetStore
實現相同。
1: @Override
2: public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
3: if (mq != null) {
4: AtomicLong offsetOld = this.offsetTable.get(mq);
5: if (null == offsetOld) {
6: offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
7: }
8:
9: if (null != offsetOld) {
10: if (increaseOnly) {
11: MixAll.compareAndIncreaseOnly(offsetOld, offset);
12: } else {
13: offsetOld.set(offset);
14: }
15: }
16: }
17: }複製代碼
1: @Override
2: public void persistAll(Set<MessageQueue> mqs) {
3: if (null == mqs || mqs.isEmpty())
4: return;
5:
6: OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
7: for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
8: if (mqs.contains(entry.getKey())) {
9: AtomicLong offset = entry.getValue();
10: offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
11: }
12: }
13:
14: String jsonString = offsetSerializeWrapper.toJson(true);
15: if (jsonString != null) {
16: try {
17: MixAll.string2File(jsonString, this.storePath);
18: } catch (IOException e) {
19: log.error("persistAll consumer offset Exception, " + this.storePath, e);
20: }
21: }
22: }複製代碼
1: @Override
2: public void persistAll(Set<MessageQueue> mqs) {
3: if (null == mqs || mqs.isEmpty())
4: return;
5:
6: // 持久化消息隊列
7: final HashSet<MessageQueue> unusedMQ = new HashSet<>();
8: if (!mqs.isEmpty()) {
9: for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
10: MessageQueue mq = entry.getKey();
11: AtomicLong offset = entry.getValue();
12: if (offset != null) {
13: if (mqs.contains(mq)) {
14: try {
15: this.updateConsumeOffsetToBroker(mq, offset.get());
16: log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
17: this.groupName,
18: this.mQClientFactory.getClientId(),
19: mq,
20: offset.get());
21: } catch (Exception e) {
22: log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
23: }
24: } else {
25: unusedMQ.add(mq);
26: }
27: }
28: }
29: }
30:
31: // 移除不適用的消息隊列
32: if (!unusedMQ.isEmpty()) {
33: for (MessageQueue mq : unusedMQ) {
34: this.offsetTable.remove(mq);
35: log.info("remove unused mq, {}, {}", mq, this.groupName);
36: }
37: }
38: }複製代碼
Broker
,並移除非指定消息隊列。1: private void startScheduledTask() {
2: // 定時同步消費進度
3: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
4:
5: @Override
6: public void run() {
7: try {
8: MQClientInstance.this.cleanOfflineBroker();
9: MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
10: } catch (Exception e) {
11: log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
12: }
13: }
14: }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
15: }複製代碼
😈多是本系列最長的一篇文章,若有表達錯誤和不清晰,請多多見諒。感謝對本系列的閱讀、收藏、點贊、分享,特別是翻到結尾。😜真的有丟丟長。