🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: java
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
建議前置閱讀內容:數據庫
固然對 Message
發送與消費已經有必定了解的同窗,能夠選擇跳過。apache
RocketMQ
提供了兩種順序級別:後端
Producer
將相關聯的消息發送到相同的消息隊列。普通順序消息
的基礎上,Consumer
嚴格順序消費。絕大部分場景下只須要用到普通順序消息。
例如說:給用戶發送短信消息 + 發送推送消息,將兩條消息發送到不一樣的消息隊列,若其中一條消息隊列消費較慢形成堵塞,用戶可能會收到兩條消息會存在必定的時間差,帶來的體驗會相對較差。固然相似這種場景,即便有必定的時間差,不會產生系統邏輯上BUG。另外,普通順序消息
性能能更加好。
那麼何時使用使用徹底嚴格順序?以下是來自官方文檔的說明:數組
目前已知的應用只有數據庫
binlog
同步強依賴嚴格順序消息,其餘應用絕大部分均可以容忍短暫亂序,推薦使用普通的順序消息微信
😈上代碼!!!併發
Producer
順序發送官方發送順序消息的例子:分佈式
1: package org.apache.rocketmq.example.ordermessage;
2:
3: import java.io.UnsupportedEncodingException;
4: import java.util.List;
5: import org.apache.rocketmq.client.exception.MQBrokerException;
6: import org.apache.rocketmq.client.exception.MQClientException;
7: import org.apache.rocketmq.client.producer.DefaultMQProducer;
8: import org.apache.rocketmq.client.producer.MQProducer;
9: import org.apache.rocketmq.client.producer.MessageQueueSelector;
10: import org.apache.rocketmq.client.producer.SendResult;
11: import org.apache.rocketmq.common.message.Message;
12: import org.apache.rocketmq.common.message.MessageQueue;
13: import org.apache.rocketmq.remoting.common.RemotingHelper;
14: import org.apache.rocketmq.remoting.exception.RemotingException;
15:
16: public class Producer {
17: public static void main(String[] args) throws UnsupportedEncodingException {
18: try {
19: MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
20: producer.start();
21:
22: String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
23: for (int i = 0; i < 100; i++) {
24: int orderId = i % 10;
25: Message msg =
26: new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
27: ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
28: SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
29: @Override
30: public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
31: Integer id = (Integer) arg;
32: int index = id % mqs.size();
33: return mqs.get(index);
34: }
35: }, orderId);
36:
37: System.out.printf("%s%n", sendResult);
38: }
39:
40: producer.shutdown();
41: } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
42: e.printStackTrace();
43: }
44: }
45: }複製代碼
id % mqs.size()
來進行消息隊列的選擇。當前例子,咱們傳遞 orderId
做爲參數,那麼相同的 orderId
可以進入相同的消息隊列。MessageQueueSelector
接口的源碼:ide
1: public interface MessageQueueSelector {
2:
3: /** 4: * 選擇消息隊列 5: * 6: * @param mqs 消息隊列 7: * @param msg 消息 8: * @param arg 參數 9: * @return 消息隊列 10: */
11: MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
12: }複製代碼
Producer
選擇隊列發送消息方法的源碼:源碼分析
16: private SendResult sendSelectImpl(// 17: Message msg, // 18: MessageQueueSelector selector, // 19: Object arg, // 20: final CommunicationMode communicationMode, // 21: final SendCallback sendCallback, final long timeout// 22: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
23: this.makeSureStateOK();
24: Validators.checkMessage(msg, this.defaultMQProducer);
25:
26: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
27: if (topicPublishInfo != null && topicPublishInfo.ok()) {
28: MessageQueue mq = null;
29: try {
30: mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
31: } catch (Throwable e) {
32: throw new MQClientException("select message queue throwed exception.", e);
33: }
34:
35: if (mq != null) {
36: return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
37: } else {
38: throw new MQClientException("select message queue return null.", null);
39: }
40: }
41:
42: throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
43: }複製代碼
Consumer
嚴格順序消費Consumer
在嚴格順序消費時,經過 三 把鎖保證嚴格順序消費。
Broker
消息隊列鎖(分佈式鎖) :
Consumer
從 Broker
得到該鎖後,才能進行消息拉取、消費。Consumer
無需該鎖。Consumer
消息隊列鎖(本地鎖) :Consumer
得到該鎖才能操做消息隊列。Consumer
消息處理隊列消費鎖(本地鎖) :Consumer
得到該鎖才能消費消息隊列。可能同窗有疑問,爲何有 Consumer
消息隊列鎖還須要有 Consumer
消息隊列消費鎖呢?😈讓咱們帶着疑問繼續往下看。
集羣模式下,Consumer
更新屬於本身的消息隊列時,會向 Broker
鎖定該消息隊列(廣播模式下不須要)。若是鎖定失敗,則更新失敗,即該消息隊列不屬於本身,不能進行消費。核心代碼以下:
1: // ⬇️⬇️⬇️【RebalanceImpl.java】
2: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
3: // ..... 此處省略部分代碼
4: // 增長 不在processQueueTable && 存在於mqSet 裏的消息隊列。
5: List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數組
6: for (MessageQueue mq : mqSet) {
7: if (!this.processQueueTable.containsKey(mq)) {
8: if (isOrder && !this.lock(mq)) { // 順序消息鎖定消息隊列
9: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
10: continue;
11: }
12:
13: this.removeDirtyOffset(mq);
14: ProcessQueue pq = new ProcessQueue();
15: long nextOffset = this.computePullFromWhere(mq);
16: if (nextOffset >= 0) {
17: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
18: if (pre != null) {
19: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
20: } else {
21: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
22: PullRequest pullRequest = new PullRequest();
23: pullRequest.setConsumerGroup(consumerGroup);
24: pullRequest.setNextOffset(nextOffset);
25: pullRequest.setMessageQueue(mq);
26: pullRequest.setProcessQueue(pq);
27: pullRequestList.add(pullRequest);
28: changed = true;
29: }
30: } else {
31: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
32: }
33: }
34: }
35:
36: // ..... 此處省略部分代碼
37: }
38:
39: // ⬇️⬇️⬇️【RebalanceImpl.java】
40: /** 41: * 請求Broker得到指定消息隊列的分佈式鎖 42: * 43: * @param mq 隊列 44: * @return 是否成功 45: */
46: public boolean lock(final MessageQueue mq) {
47: FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
48: if (findBrokerResult != null) {
49: LockBatchRequestBody requestBody = new LockBatchRequestBody();
50: requestBody.setConsumerGroup(this.consumerGroup);
51: requestBody.setClientId(this.mQClientFactory.getClientId());
52: requestBody.getMqSet().add(mq);
53:
54: try {
55: // 請求Broker得到指定消息隊列的分佈式鎖
56: Set<MessageQueue> lockedMq =
57: this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
58:
59: // 設置消息處理隊列鎖定成功。鎖定消息隊列成功,可能本地沒有消息處理隊列,設置鎖定成功會在lockAll()方法。
60: for (MessageQueue mmqq : lockedMq) {
61: ProcessQueue processQueue = this.processQueueTable.get(mmqq);
62: if (processQueue != null) {
63: processQueue.setLocked(true);
64: processQueue.setLastLockTimestamp(System.currentTimeMillis());
65: }
66: }
67:
68: boolean lockOK = lockedMq.contains(mq);
69: log.info("the message queue lock {}, {} {}",
70: lockOK ? "OK" : "Failed",
71: this.consumerGroup,
72: mq);
73: return lockOK;
74: } catch (Exception e) {
75: log.error("lockBatchMQ exception, " + mq, e);
76: }
77: }
78:
79: return false;
80: }複製代碼
Broker
消息隊列鎖會過時,默認配置 30s。所以,Consumer
須要不斷向 Broker
刷新該鎖過時時間,默認配置 20s 刷新一次。核心代碼以下:
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: public void start() {
3: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
4: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
5: @Override
6: public void run() {
7: ConsumeMessageOrderlyService.this.lockMQPeriodically();
8: }
9: }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
10: }
11: }複製代碼
集羣模式下,Consumer
移除本身的消息隊列時,會向 Broker
解鎖該消息隊列(廣播模式下不須要)。核心代碼以下:
1: // ⬇️⬇️⬇️【RebalancePushImpl.java】
2: /** 3: * 移除不須要的隊列相關的信息 4: * 1. 持久化消費進度,並移除之 5: * 2. 順序消費&集羣模式,解鎖對該隊列的鎖定 6: * 7: * @param mq 消息隊列 8: * @param pq 消息處理隊列 9: * @return 是否移除成功 10: */
11: @Override
12: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
13: // 同步隊列的消費進度,並移除之。
14: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
15: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
16: // 集羣模式下,順序消費移除時,解鎖對隊列的鎖定
17: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
18: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
19: try {
20: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
21: try {
22: return this.unlockDelay(mq, pq);
23: } finally {
24: pq.getLockConsume().unlock();
25: }
26: } else {
27: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
28: mq, //
29: pq.getTryUnlockTimes());
30:
31: pq.incTryUnlockTimes();
32: }
33: } catch (Exception e) {
34: log.error("removeUnnecessaryMessageQueue Exception", e);
35: }
36:
37: return false;
38: }
39: return true;
40: }
41:
42: // ⬇️⬇️⬇️【RebalancePushImpl.java】
43: /** 44: * 延遲解鎖 Broker 消息隊列鎖 45: * 當消息處理隊列不存在消息,則直接解鎖 46: * 47: * @param mq 消息隊列 48: * @param pq 消息處理隊列 49: * @return 是否解鎖成功 50: */
51: private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
52: if (pq.hasTempMessage()) { // TODO 疑問:爲何要延遲移除
53: log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
54: this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
55: @Override
56: public void run() {
57: log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
58: RebalancePushImpl.this.unlock(mq, true);
59: }
60: }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
61: } else {
62: this.unlock(mq, true);
63: }
64: return true;
65: }複製代碼
Consumer
和當前 Consumer
同時消費該消息隊列,致使消息沒法嚴格順序消費。Broker
消息隊列鎖。若是消息處理隊列存在剩餘消息,則延遲解鎖 Broker
消息隊列鎖。❓爲何消息處理隊列存在剩餘消息不能直接解鎖呢?😈我也不知道,百思不得其解。若是有知道的同窗麻煩教育下俺。😏本節會類比併發消費消費隊列,建議對照 PushConsumer併發消費消息 一塊兒理解。
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: class ConsumeRequest implements Runnable {
3:
4: /** 5: * 消息處理隊列 6: */
7: private final ProcessQueue processQueue;
8: /** 9: * 消息隊列 10: */
11: private final MessageQueue messageQueue;
12:
13: @Override
14: public void run() {
15: if (this.processQueue.isDropped()) {
16: log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
17: return;
18: }
19:
20: // 得到 Consumer 消息隊列鎖
21: final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
22: synchronized (objLock) {
23: // (廣播模式) 或者 (集羣模式 && Broker消息隊列鎖有效)
24: if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
25: || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
26: final long beginTime = System.currentTimeMillis();
27: // 循環
28: for (boolean continueConsume = true; continueConsume; ) {
29: if (this.processQueue.isDropped()) {
30: log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
31: break;
32: }
33:
34: // 消息隊列分佈式鎖未鎖定,提交延遲得到鎖並消費請求
35: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
36: && !this.processQueue.isLocked()) {
37: log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
38: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
39: break;
40: }
41: // 消息隊列分佈式鎖已通過期,提交延遲得到鎖並消費請求
42: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
43: && this.processQueue.isLockExpired()) {
44: log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
45: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
46: break;
47: }
48:
49: // 當前週期消費時間超過連續時長,默認:60s,提交延遲消費請求。默認狀況下,每消費1分鐘休息10ms。
50: long interval = System.currentTimeMillis() - beginTime;
51: if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
52: ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
53: break;
54: }
55:
56: // 獲取消費消息。此處和併發消息請求不一樣,併發消息請求已經帶了消費哪些消息。
57: final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
58: List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
59: if (!msgs.isEmpty()) {
60: final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
61:
62: ConsumeOrderlyStatus status = null;
63:
64: // ....省略代碼:Hook:before
65:
66: // 執行消費
67: long beginTimestamp = System.currentTimeMillis();
68: ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
69: boolean hasException = false;
70: try {
71: this.processQueue.getLockConsume().lock(); // 鎖定隊列消費鎖
72:
73: if (this.processQueue.isDropped()) {
74: log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
75: this.messageQueue);
76: break;
77: }
78:
79: status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
80: } catch (Throwable e) {
81: log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
82: RemotingHelper.exceptionSimpleDesc(e), //
83: ConsumeMessageOrderlyService.this.consumerGroup, //
84: msgs, //
85: messageQueue);
86: hasException = true;
87: } finally {
88: this.processQueue.getLockConsume().unlock(); // 鎖定隊列消費鎖
89: }
90:
91: // ....省略代碼:解析消費結果狀態
92:
93: // ....省略代碼:Hook:after
94:
95: ConsumeMessageOrderlyService.this.getConsumerStatsManager()
96: .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
97:
98: // 處理消費結果
99: continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
100: } else {
101: continueConsume = false;
102: }
103: }
104: } else {
105: if (this.processQueue.isDropped()) {
106: log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
107: return;
108: }
109:
110: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
111: }
112: }
113: }
114:
115: }複製代碼
Consumer
消息隊列鎖。Consumer
消息處理隊列消費鎖。相比【Consumer
消息隊列鎖】,其粒度較小。這就是上文提到的❓爲何有Consumer
消息隊列鎖還須要有 Consumer 消息隊列消費鎖呢的緣由。順序消費消息結果 (ConsumeOrderlyStatus
) 有四種狀況:
SUCCESS
:消費成功但不提交。ROLLBACK
:消費失敗,消費回滾。COMMIT
:消費成功提交而且提交。SUSPEND_CURRENT_QUEUE_A_MOMENT
:消費失敗,掛起消費隊列一會會,稍後繼續消費。考慮到 ROLLBACK
、COMMIT
暫時只使用在 MySQL binlog
場景,官方將這兩狀態標記爲 @Deprecated
。固然,相應的實現邏輯依然保留。
在併發消費場景時,若是消費失敗,Consumer
會將消費失敗消息發回到 Broker
重試隊列,跳過當前消息,等待下次拉取該消息再進行消費。
可是在徹底嚴格順序消費消費時,這樣作顯然不行。也所以,消費失敗的消息,會掛起隊列一會會,稍後繼續消費。
不過消費失敗的消息一直失敗,也不可能一直消費。當超過消費重試上限時,Consumer
會將消費失敗超過上限的消息發回到 Broker
死信隊列。
讓咱們來看看代碼:
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: /** 3: * 處理消費結果,並返回是否繼續消費 4: * 5: * @param msgs 消息 6: * @param status 消費結果狀態 7: * @param context 消費Context 8: * @param consumeRequest 消費請求 9: * @return 是否繼續消費 10: */
11: public boolean processConsumeResult(// 12: final List<MessageExt> msgs, // 13: final ConsumeOrderlyStatus status, // 14: final ConsumeOrderlyContext context, // 15: final ConsumeRequest consumeRequest// 16: ) {
17: boolean continueConsume = true;
18: long commitOffset = -1L;
19: if (context.isAutoCommit()) {
20: switch (status) {
21: case COMMIT:
22: case ROLLBACK:
23: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue());
24: case SUCCESS:
25: // 提交消息已消費成功到消息處理隊列
26: commitOffset = consumeRequest.getProcessQueue().commit();
27: // 統計
28: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
29: break;
30: case SUSPEND_CURRENT_QUEUE_A_MOMENT:
31: // 統計
32: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
33: if (checkReconsumeTimes(msgs)) { // 計算是否暫時掛起(暫停)消費N毫秒,默認:10ms
34: // 設置消息從新消費
35: consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
36: // 提交延遲消費請求
37: this.submitConsumeRequestLater(//
38: consumeRequest.getProcessQueue(), //
39: consumeRequest.getMessageQueue(), //
40: context.getSuspendCurrentQueueTimeMillis());
41: continueConsume = false;
42: } else {
43: commitOffset = consumeRequest.getProcessQueue().commit();
44: }
45: break;
46: default:
47: break;
48: }
49: } else {
50: switch (status) {
51: case SUCCESS:
52: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
53: break;
54: case COMMIT:
55: // 提交消息已消費成功到消息處理隊列
56: commitOffset = consumeRequest.getProcessQueue().commit();
57: break;
58: case ROLLBACK:
59: // 設置消息從新消費
60: consumeRequest.getProcessQueue().rollback();
61: this.submitConsumeRequestLater(//
62: consumeRequest.getProcessQueue(), //
63: consumeRequest.getMessageQueue(), //
64: context.getSuspendCurrentQueueTimeMillis());
65: continueConsume = false;
66: break;
67: case SUSPEND_CURRENT_QUEUE_A_MOMENT: // 計算是否暫時掛起(暫停)消費N毫秒,默認:10ms
68: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
69: if (checkReconsumeTimes(msgs)) {
70: // 設置消息從新消費
71: consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
72: // 提交延遲消費請求
73: this.submitConsumeRequestLater(//
74: consumeRequest.getProcessQueue(), //
75: consumeRequest.getMessageQueue(), //
76: context.getSuspendCurrentQueueTimeMillis());
77: continueConsume = false;
78: }
79: break;
80: default:
81: break;
82: }
83: }
84:
85: // 消息處理隊列未dropped,提交有效消費進度
86: if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
87: this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
88: }
89:
90: return continueConsume;
91: }
92:
93: private int getMaxReconsumeTimes() {
94: // default reconsume times: Integer.MAX_VALUE
95: if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
96: return Integer.MAX_VALUE;
97: } else {
98: return this.defaultMQPushConsumer.getMaxReconsumeTimes();
99: }
100: }
101:
102: /** 103: * 計算是否要暫停消費 104: * 不暫停條件:存在消息都超過最大消費次數而且都發回broker成功 105: * 106: * @param msgs 消息 107: * @return 是否要暫停 108: */
109: private boolean checkReconsumeTimes(List<MessageExt> msgs) {
110: boolean suspend = false;
111: if (msgs != null && !msgs.isEmpty()) {
112: for (MessageExt msg : msgs) {
113: if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
114: MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
115: if (!sendMessageBack(msg)) { // 發回失敗,中斷
116: suspend = true;
117: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
118: }
119: } else {
120: suspend = true;
121: msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
122: }
123: }
124: }
125: return suspend;
126: }
127:
128: /** 129: * 發回消息。 130: * 消息發回broker後,對應的消息隊列是死信隊列。 131: * 132: * @param msg 消息 133: * @return 是否發送成功 134: */
135: public boolean sendMessageBack(final MessageExt msg) {
136: try {
137: // max reconsume times exceeded then send to dead letter queue.
138: Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
139: String originMsgId = MessageAccessor.getOriginMessageId(msg);
140: MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
141: newMsg.setFlag(msg.getFlag());
142: MessageAccessor.setProperties(newMsg, msg.getProperties());
143: MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
144: MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
145: MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
146: newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
147:
148: this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
149: return true;
150: } catch (Exception e) {
151: log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
152: }
153:
154: return false;
155: }複製代碼
AutoCommit
)的狀況下,COMMIT
、ROLLBACK
、SUCCESS
邏輯已經統一。Broker
死信隊列,跳過這些消息。此時,消息隊列無需掛起,繼續消費後面的消息。😈涉及到的四個核心方法的源碼:
1: // ⬇️⬇️⬇️【ProcessQueue.java】
2: /** 3: * 消息映射 4: * key:消息隊列位置 5: */
6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>(); /** 7: * 消息映射臨時存儲(消費中的消息) 8: */
9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();
10:
11: /** 12: * 回滾消費中的消息 13: * 邏輯相似於{@link #makeMessageToCosumeAgain(List)} 14: */
15: public void rollback() {
16: try {
17: this.lockTreeMap.writeLock().lockInterruptibly();
18: try {
19: this.msgTreeMap.putAll(this.msgTreeMapTemp);
20: this.msgTreeMapTemp.clear();
21: } finally {
22: this.lockTreeMap.writeLock().unlock();
23: }
24: } catch (InterruptedException e) {
25: log.error("rollback exception", e);
26: }
27: }
28:
29: /** 30: * 提交消費中的消息已消費成功,返回消費進度 31: * 32: * @return 消費進度 33: */
34: public long commit() {
35: try {
36: this.lockTreeMap.writeLock().lockInterruptibly();
37: try {
38: // 消費進度
39: Long offset = this.msgTreeMapTemp.lastKey();
40:
41: //
42: msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
43:
44: //
45: this.msgTreeMapTemp.clear();
46:
47: // 返回消費進度
48: if (offset != null) {
49: return offset + 1;
50: }
51: } finally {
52: this.lockTreeMap.writeLock().unlock();
53: }
54: } catch (InterruptedException e) {
55: log.error("commit exception", e);
56: }
57:
58: return -1;
59: }
60:
61: /** 62: * 指定消息從新消費 63: * 邏輯相似於{@link #rollback()} 64: * 65: * @param msgs 消息 66: */
67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
68: try {
69: this.lockTreeMap.writeLock().lockInterruptibly();
70: try {
71: for (MessageExt msg : msgs) {
72: this.msgTreeMapTemp.remove(msg.getQueueOffset());
73: this.msgTreeMap.put(msg.getQueueOffset(), msg);
74: }
75: } finally {
76: this.lockTreeMap.writeLock().unlock();
77: }
78: } catch (InterruptedException e) {
79: log.error("makeMessageToCosumeAgain exception", e);
80: }
81: }
82:
83: /** 84: * 得到持有消息前N條 85: * 86: * @param batchSize 條數 87: * @return 消息 88: */
89: public List<MessageExt> takeMessags(final int batchSize) {
90: List<MessageExt> result = new ArrayList<>(batchSize);
91: final long now = System.currentTimeMillis();
92: try {
93: this.lockTreeMap.writeLock().lockInterruptibly();
94: this.lastConsumeTimestamp = now;
95: try {
96: if (!this.msgTreeMap.isEmpty()) {
97: for (int i = 0; i < batchSize; i++) {
98: Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
99: if (entry != null) {
100: result.add(entry.getValue());
101: msgTreeMapTemp.put(entry.getKey(), entry.getValue());
102: } else {
103: break;
104: }
105: }
106: }
107:
108: if (result.isEmpty()) {
109: consuming = false;
110: }
111: } finally {
112: this.lockTreeMap.writeLock().unlock();
113: }
114: } catch (InterruptedException e) {
115: log.error("take Messages exception", e);
116: }
117:
118: return result;
119: }複製代碼