RocketMQ源碼解析:Message順序發送與消費

🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: java

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右

1. 概述

建議前置閱讀內容:數據庫

固然對 Message 發送與消費已經有必定了解的同窗,能夠選擇跳過。apache


RocketMQ 提供了兩種順序級別:後端

  • 普通順序消息 :Producer 將相關聯的消息發送到相同的消息隊列。
  • 徹底嚴格順序 :在 普通順序消息 的基礎上,Consumer 嚴格順序消費。

絕大部分場景下只須要用到普通順序消息
例如說:給用戶發送短信消息 + 發送推送消息,將兩條消息發送到不一樣的消息隊列,若其中一條消息隊列消費較慢形成堵塞,用戶可能會收到兩條消息會存在必定的時間差,帶來的體驗會相對較差。固然相似這種場景,即便有必定的時間差,不會產生系統邏輯上BUG。另外,普通順序消息性能能更加好。
那麼何時使用使用徹底嚴格順序?以下是來自官方文檔的說明:數組

目前已知的應用只有數據庫 binlog 同步強依賴嚴格順序消息,其餘應用絕大部分均可以容忍短暫亂序,推薦使用普通的順序消息微信


😈上代碼!!!併發

2. Producer 順序發送

官方發送順序消息的例子分佈式

1: package org.apache.rocketmq.example.ordermessage;
  2: 
  3: import java.io.UnsupportedEncodingException;
  4: import java.util.List;
  5: import org.apache.rocketmq.client.exception.MQBrokerException;
  6: import org.apache.rocketmq.client.exception.MQClientException;
  7: import org.apache.rocketmq.client.producer.DefaultMQProducer;
  8: import org.apache.rocketmq.client.producer.MQProducer;
  9: import org.apache.rocketmq.client.producer.MessageQueueSelector;
 10: import org.apache.rocketmq.client.producer.SendResult;
 11: import org.apache.rocketmq.common.message.Message;
 12: import org.apache.rocketmq.common.message.MessageQueue;
 13: import org.apache.rocketmq.remoting.common.RemotingHelper;
 14: import org.apache.rocketmq.remoting.exception.RemotingException;
 15: 
 16: public class Producer {
 17:     public static void main(String[] args) throws UnsupportedEncodingException {
 18:         try {
 19:             MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
 20:             producer.start();
 21: 
 22:             String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
 23:             for (int i = 0; i < 100; i++) {
 24:                 int orderId = i % 10;
 25:                 Message msg =
 26:                     new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
 27:                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
 28:                 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
 29:                     @Override
 30:                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
 31:                         Integer id = (Integer) arg;
 32:                         int index = id % mqs.size();
 33:                         return mqs.get(index);
 34:                     }
 35:                 }, orderId);
 36: 
 37:                 System.out.printf("%s%n", sendResult);
 38:             }
 39: 
 40:             producer.shutdown();
 41:         } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
 42:             e.printStackTrace();
 43:         }
 44:     }
 45: }複製代碼
  • 第 28 至 35 行 :實現了根據 id % mqs.size() 來進行消息隊列的選擇。當前例子,咱們傳遞 orderId 做爲參數,那麼相同的 orderId 可以進入相同的消息隊列

MessageQueueSelector 接口的源碼ide

1: public interface MessageQueueSelector {
  2: 
  3:     /** 4: * 選擇消息隊列 5: * 6: * @param mqs 消息隊列 7: * @param msg 消息 8: * @param arg 參數 9: * @return 消息隊列 10: */
 11:     MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
 12: }複製代碼

Producer 選擇隊列發送消息方法的源碼源碼分析

16: private SendResult sendSelectImpl(// 17: Message msg, // 18: MessageQueueSelector selector, // 19: Object arg, // 20: final CommunicationMode communicationMode, // 21: final SendCallback sendCallback, final long timeout// 22: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 23:     this.makeSureStateOK();
 24:     Validators.checkMessage(msg, this.defaultMQProducer);
 25: 
 26:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 27:     if (topicPublishInfo != null && topicPublishInfo.ok()) {
 28:         MessageQueue mq = null;
 29:         try {
 30:             mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
 31:         } catch (Throwable e) {
 32:             throw new MQClientException("select message queue throwed exception.", e);
 33:         }
 34: 
 35:         if (mq != null) {
 36:             return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
 37:         } else {
 38:             throw new MQClientException("select message queue return null.", null);
 39:         }
 40:     }
 41: 
 42:     throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
 43: }複製代碼
  • 第 30 行 :選擇消息隊列。
  • 第 36 行 :發送消息。

3. Consumer 嚴格順序消費

Consumer 在嚴格順序消費時,經過 把鎖保證嚴格順序消費。

  • Broker 消息隊列鎖(分佈式鎖) :
    • 集羣模式下,ConsumerBroker 得到該鎖後,才能進行消息拉取、消費。
    • 廣播模式下,Consumer 無需該鎖。
  • Consumer 消息隊列鎖(本地鎖) :Consumer 得到該鎖才能操做消息隊列。
  • Consumer 消息處理隊列消費鎖(本地鎖) :Consumer 得到該鎖才能消費消息隊列。

可能同窗有疑問,爲何有 Consumer 消息隊列鎖還須要有 Consumer 消息隊列消費鎖呢?😈讓咱們帶着疑問繼續往下看。


3.1 得到(鎖定)消息隊列

集羣模式下,Consumer 更新屬於本身的消息隊列時,會向 Broker 鎖定該消息隊列(廣播模式下不須要)。若是鎖定失敗,則更新失敗,即該消息隊列不屬於本身,不能進行消費。核心代碼以下:

1: // ⬇️⬇️⬇️【RebalanceImpl.java】
  2: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
  3: // ..... 此處省略部分代碼 
  4:     // 增長 不在processQueueTable && 存在於mqSet 裏的消息隊列。
  5:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數組
  6:     for (MessageQueue mq : mqSet) {
  7:         if (!this.processQueueTable.containsKey(mq)) {
  8:             if (isOrder && !this.lock(mq)) { // 順序消息鎖定消息隊列
  9:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
 10:                 continue;
 11:             }
 12: 
 13:             this.removeDirtyOffset(mq);
 14:             ProcessQueue pq = new ProcessQueue();
 15:             long nextOffset = this.computePullFromWhere(mq);
 16:             if (nextOffset >= 0) {
 17:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
 18:                 if (pre != null) {
 19:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
 20:                 } else {
 21:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
 22:                     PullRequest pullRequest = new PullRequest();
 23:                     pullRequest.setConsumerGroup(consumerGroup);
 24:                     pullRequest.setNextOffset(nextOffset);
 25:                     pullRequest.setMessageQueue(mq);
 26:                     pullRequest.setProcessQueue(pq);
 27:                     pullRequestList.add(pullRequest);
 28:                     changed = true;
 29:                 }
 30:             } else {
 31:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
 32:             }
 33:         }
 34:     }
 35: 
 36: // ..... 此處省略部分代碼 
 37: }
 38: 
 39: // ⬇️⬇️⬇️【RebalanceImpl.java】
 40: /** 41: * 請求Broker得到指定消息隊列的分佈式鎖 42: * 43: * @param mq 隊列 44: * @return 是否成功 45: */
 46: public boolean lock(final MessageQueue mq) {
 47:     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
 48:     if (findBrokerResult != null) {
 49:         LockBatchRequestBody requestBody = new LockBatchRequestBody();
 50:         requestBody.setConsumerGroup(this.consumerGroup);
 51:         requestBody.setClientId(this.mQClientFactory.getClientId());
 52:         requestBody.getMqSet().add(mq);
 53: 
 54:         try {
 55:             // 請求Broker得到指定消息隊列的分佈式鎖
 56:             Set<MessageQueue> lockedMq =
 57:                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
 58: 
 59:             // 設置消息處理隊列鎖定成功。鎖定消息隊列成功,可能本地沒有消息處理隊列,設置鎖定成功會在lockAll()方法。
 60:             for (MessageQueue mmqq : lockedMq) {
 61:                 ProcessQueue processQueue = this.processQueueTable.get(mmqq);
 62:                 if (processQueue != null) {
 63:                     processQueue.setLocked(true);
 64:                     processQueue.setLastLockTimestamp(System.currentTimeMillis());
 65:                 }
 66:             }
 67: 
 68:             boolean lockOK = lockedMq.contains(mq);
 69:             log.info("the message queue lock {}, {} {}",
 70:                 lockOK ? "OK" : "Failed",
 71:                 this.consumerGroup,
 72:                 mq);
 73:             return lockOK;
 74:         } catch (Exception e) {
 75:             log.error("lockBatchMQ exception, " + mq, e);
 76:         }
 77:     }
 78: 
 79:     return false;
 80: }複製代碼
  • ⬆️⬆️⬆️
  • 第 8 至 11 行 :順序消費時,鎖定消息隊列。若是鎖定失敗,新增消息處理隊列失敗。

Broker 消息隊列鎖會過時,默認配置 30s。所以,Consumer 須要不斷向 Broker 刷新該鎖過時時間,默認配置 20s 刷新一次。核心代碼以下:

1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
  2: public void start() {
  3:     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
  4:         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  5:             @Override
  6:             public void run() {
  7:                 ConsumeMessageOrderlyService.this.lockMQPeriodically();
  8:             }
  9:         }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
 10:     }
 11: }複製代碼

3.2 移除消息隊列

集羣模式下,Consumer 移除本身的消息隊列時,會向 Broker 解鎖該消息隊列(廣播模式下不須要)。核心代碼以下:

1: // ⬇️⬇️⬇️【RebalancePushImpl.java】
  2: /** 3: * 移除不須要的隊列相關的信息 4: * 1. 持久化消費進度,並移除之 5: * 2. 順序消費&集羣模式,解鎖對該隊列的鎖定 6: * 7: * @param mq 消息隊列 8: * @param pq 消息處理隊列 9: * @return 是否移除成功 10: */
 11: @Override
 12: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
 13:     // 同步隊列的消費進度,並移除之。
 14:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
 15:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
 16:     // 集羣模式下,順序消費移除時,解鎖對隊列的鎖定
 17:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
 18:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
 19:         try {
 20:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
 21:                 try {
 22:                     return this.unlockDelay(mq, pq);
 23:                 } finally {
 24:                     pq.getLockConsume().unlock();
 25:                 }
 26:             } else {
 27:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
 28:                     mq, //
 29:                     pq.getTryUnlockTimes());
 30: 
 31:                 pq.incTryUnlockTimes();
 32:             }
 33:         } catch (Exception e) {
 34:             log.error("removeUnnecessaryMessageQueue Exception", e);
 35:         }
 36: 
 37:         return false;
 38:     }
 39:     return true;
 40: }
 41: 
 42: // ⬇️⬇️⬇️【RebalancePushImpl.java】
 43: /** 44: * 延遲解鎖 Broker 消息隊列鎖 45: * 當消息處理隊列不存在消息,則直接解鎖 46: * 47: * @param mq 消息隊列 48: * @param pq 消息處理隊列 49: * @return 是否解鎖成功 50: */
 51: private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
 52:     if (pq.hasTempMessage()) { // TODO 疑問:爲何要延遲移除
 53:         log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
 54:         this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
 55:             @Override
 56:             public void run() {
 57:                 log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
 58:                 RebalancePushImpl.this.unlock(mq, true);
 59:             }
 60:         }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
 61:     } else {
 62:         this.unlock(mq, true);
 63:     }
 64:     return true;
 65: }複製代碼
  • ⬆️⬆️⬆️
  • 第 20 至 32 行 :獲取消息隊列消費鎖,避免和消息隊列消費衝突。若是獲取鎖失敗,則移除消息隊列失敗,等待下次從新分配消費隊列時,再進行移除。若是未得到鎖而進行移除,則可能出現另外的 Consumer 和當前 Consumer 同時消費該消息隊列,致使消息沒法嚴格順序消費。
  • 第 51 至 64 行 :解鎖 Broker 消息隊列鎖。若是消息處理隊列存在剩餘消息,則延遲解鎖 Broker 消息隊列鎖。❓爲何消息處理隊列存在剩餘消息不能直接解鎖呢?😈我也不知道,百思不得其解。若是有知道的同窗麻煩教育下俺。

3.3 消費消息隊列

😏本節會類比併發消費消費隊列,建議對照 PushConsumer併發消費消息 一塊兒理解。

3.1.1 消費消息

順序消費活動圖-消費消息
順序消費活動圖-消費消息

1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
  2: class ConsumeRequest implements Runnable {
  3: 
  4:     /** 5: * 消息處理隊列 6: */
  7:     private final ProcessQueue processQueue;
  8:     /** 9: * 消息隊列 10: */
 11:     private final MessageQueue messageQueue;
 12: 
 13:     @Override
 14:     public void run() {
 15:         if (this.processQueue.isDropped()) {
 16:             log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
 17:             return;
 18:         }
 19: 
 20:         // 得到 Consumer 消息隊列鎖
 21:         final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
 22:         synchronized (objLock) {
 23:             // (廣播模式) 或者 (集羣模式 && Broker消息隊列鎖有效)
 24:             if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 25:                 || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
 26:                 final long beginTime = System.currentTimeMillis();
 27:                 // 循環
 28:                 for (boolean continueConsume = true; continueConsume; ) {
 29:                     if (this.processQueue.isDropped()) {
 30:                         log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
 31:                         break;
 32:                     }
 33: 
 34:                     // 消息隊列分佈式鎖未鎖定,提交延遲得到鎖並消費請求
 35:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 36:                         && !this.processQueue.isLocked()) {
 37:                         log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
 38:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
 39:                         break;
 40:                     }
 41:                     // 消息隊列分佈式鎖已通過期,提交延遲得到鎖並消費請求
 42:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 43:                         && this.processQueue.isLockExpired()) {
 44:                         log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
 45:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
 46:                         break;
 47:                     }
 48: 
 49:                     // 當前週期消費時間超過連續時長,默認:60s,提交延遲消費請求。默認狀況下,每消費1分鐘休息10ms。
 50:                     long interval = System.currentTimeMillis() - beginTime;
 51:                     if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
 52:                         ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
 53:                         break;
 54:                     }
 55: 
 56:                     // 獲取消費消息。此處和併發消息請求不一樣,併發消息請求已經帶了消費哪些消息。
 57:                     final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
 58:                     List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
 59:                     if (!msgs.isEmpty()) {
 60:                         final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
 61: 
 62:                         ConsumeOrderlyStatus status = null;
 63: 
 64:                         // ....省略代碼:Hook:before
 65: 
 66:                         // 執行消費
 67:                         long beginTimestamp = System.currentTimeMillis();
 68:                         ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
 69:                         boolean hasException = false;
 70:                         try {
 71:                             this.processQueue.getLockConsume().lock(); // 鎖定隊列消費鎖
 72: 
 73:                             if (this.processQueue.isDropped()) {
 74:                                 log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
 75:                                     this.messageQueue);
 76:                                 break;
 77:                             }
 78: 
 79:                             status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
 80:                         } catch (Throwable e) {
 81:                             log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
 82:                                 RemotingHelper.exceptionSimpleDesc(e), //
 83:                                 ConsumeMessageOrderlyService.this.consumerGroup, //
 84:                                 msgs, //
 85:                                 messageQueue);
 86:                             hasException = true;
 87:                         } finally {
 88:                             this.processQueue.getLockConsume().unlock(); // 鎖定隊列消費鎖
 89:                         }
 90: 
 91:                         // ....省略代碼:解析消費結果狀態
 92: 
 93:                         // ....省略代碼:Hook:after
 94: 
 95:                         ConsumeMessageOrderlyService.this.getConsumerStatsManager()
 96:                             .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
 97: 
 98:                         // 處理消費結果
 99:                         continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
100:                     } else {
101:                         continueConsume = false;
102:                     }
103:                 }
104:             } else {
105:                 if (this.processQueue.isDropped()) {
106:                     log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
107:                     return;
108:                 }
109: 
110:                 ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
111:             }
112:         }
113:     }
114: 
115: }複製代碼
  • ⬆️⬆️⬆️
  • 第 20 行 :得到 Consumer 消息隊列鎖。
  • 第 58 行 :從消息處理隊列順序得到消息。和併發消費得到消息不一樣。併發消費請求在請求建立時,已經設置好消費哪些消息。
  • 第 71 行 :得到 Consumer 消息處理隊列消費鎖。相比【Consumer消息隊列鎖】,其粒度較小。這就是上文提到的❓爲何有Consumer消息隊列鎖還須要有 Consumer 消息隊列消費鎖呢的緣由。
  • 第 79 行 :執行消費
  • 第 99 行 :處理消費結果。

3.1.2 處理消費結果

順序消費消息結果 (ConsumeOrderlyStatus) 有四種狀況:

  • SUCCESS :消費成功但不提交
  • ROLLBACK :消費失敗,消費回滾。
  • COMMIT :消費成功提交而且提交。
  • SUSPEND_CURRENT_QUEUE_A_MOMENT :消費失敗,掛起消費隊列一會會,稍後繼續消費。

考慮到 ROLLBACKCOMMIT 暫時只使用在 MySQL binlog 場景,官方將這兩狀態標記爲 @Deprecated。固然,相應的實現邏輯依然保留。

併發消費場景時,若是消費失敗,Consumer 會將消費失敗消息發回到 Broker 重試隊列,跳過當前消息,等待下次拉取該消息再進行消費。

可是在徹底嚴格順序消費消費時,這樣作顯然不行。也所以,消費失敗的消息,會掛起隊列一會會,稍後繼續消費。

不過消費失敗的消息一直失敗,也不可能一直消費。當超過消費重試上限時,Consumer 會將消費失敗超過上限的消息發回到 Broker 死信隊列。

讓咱們來看看代碼:

1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
  2: /** 3: * 處理消費結果,並返回是否繼續消費 4: * 5: * @param msgs 消息 6: * @param status 消費結果狀態 7: * @param context 消費Context 8: * @param consumeRequest 消費請求 9: * @return 是否繼續消費 10: */
 11: public boolean processConsumeResult(// 12: final List<MessageExt> msgs, // 13: final ConsumeOrderlyStatus status, // 14: final ConsumeOrderlyContext context, // 15: final ConsumeRequest consumeRequest// 16: ) {
 17:     boolean continueConsume = true;
 18:     long commitOffset = -1L;
 19:     if (context.isAutoCommit()) {
 20:         switch (status) {
 21:             case COMMIT:
 22:             case ROLLBACK:
 23:                 log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue());
 24:             case SUCCESS:
 25:                 // 提交消息已消費成功到消息處理隊列
 26:                 commitOffset = consumeRequest.getProcessQueue().commit();
 27:                 // 統計
 28:                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
 29:                 break;
 30:             case SUSPEND_CURRENT_QUEUE_A_MOMENT:
 31:                 // 統計
 32:                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
 33:                 if (checkReconsumeTimes(msgs)) { // 計算是否暫時掛起(暫停)消費N毫秒,默認:10ms
 34:                     // 設置消息從新消費
 35:                     consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
 36:                     // 提交延遲消費請求
 37:                     this.submitConsumeRequestLater(//
 38:                         consumeRequest.getProcessQueue(), //
 39:                         consumeRequest.getMessageQueue(), //
 40:                         context.getSuspendCurrentQueueTimeMillis());
 41:                     continueConsume = false;
 42:                 } else {
 43:                     commitOffset = consumeRequest.getProcessQueue().commit();
 44:                 }
 45:                 break;
 46:             default:
 47:                 break;
 48:         }
 49:     } else {
 50:         switch (status) {
 51:             case SUCCESS:
 52:                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
 53:                 break;
 54:             case COMMIT:
 55:                 // 提交消息已消費成功到消息處理隊列
 56:                 commitOffset = consumeRequest.getProcessQueue().commit();
 57:                 break;
 58:             case ROLLBACK:
 59:                 // 設置消息從新消費
 60:                 consumeRequest.getProcessQueue().rollback();
 61:                 this.submitConsumeRequestLater(//
 62:                     consumeRequest.getProcessQueue(), //
 63:                     consumeRequest.getMessageQueue(), //
 64:                     context.getSuspendCurrentQueueTimeMillis());
 65:                 continueConsume = false;
 66:                 break;
 67:             case SUSPEND_CURRENT_QUEUE_A_MOMENT: // 計算是否暫時掛起(暫停)消費N毫秒,默認:10ms
 68:                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
 69:                 if (checkReconsumeTimes(msgs)) {
 70:                     // 設置消息從新消費
 71:                     consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
 72:                     // 提交延遲消費請求
 73:                     this.submitConsumeRequestLater(//
 74:                         consumeRequest.getProcessQueue(), //
 75:                         consumeRequest.getMessageQueue(), //
 76:                         context.getSuspendCurrentQueueTimeMillis());
 77:                     continueConsume = false;
 78:                 }
 79:                 break;
 80:             default:
 81:                 break;
 82:         }
 83:     }
 84: 
 85:     // 消息處理隊列未dropped,提交有效消費進度
 86:     if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
 87:         this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
 88:     }
 89: 
 90:     return continueConsume;
 91: }
 92: 
 93: private int getMaxReconsumeTimes() {
 94:     // default reconsume times: Integer.MAX_VALUE
 95:     if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
 96:         return Integer.MAX_VALUE;
 97:     } else {
 98:         return this.defaultMQPushConsumer.getMaxReconsumeTimes();
 99:     }
100: }
101: 
102: /** 103: * 計算是否要暫停消費 104: * 不暫停條件:存在消息都超過最大消費次數而且都發回broker成功 105: * 106: * @param msgs 消息 107: * @return 是否要暫停 108: */
109: private boolean checkReconsumeTimes(List<MessageExt> msgs) {
110:     boolean suspend = false;
111:     if (msgs != null && !msgs.isEmpty()) {
112:         for (MessageExt msg : msgs) {
113:             if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
114:                 MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
115:                 if (!sendMessageBack(msg)) { // 發回失敗,中斷
116:                     suspend = true;
117:                     msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
118:                 }
119:             } else {
120:                 suspend = true;
121:                 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
122:             }
123:         }
124:     }
125:     return suspend;
126: }
127: 
128: /** 129: * 發回消息。 130: * 消息發回broker後,對應的消息隊列是死信隊列。 131: * 132: * @param msg 消息 133: * @return 是否發送成功 134: */
135: public boolean sendMessageBack(final MessageExt msg) {
136:     try {
137:         // max reconsume times exceeded then send to dead letter queue.
138:         Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
139:         String originMsgId = MessageAccessor.getOriginMessageId(msg);
140:         MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
141:         newMsg.setFlag(msg.getFlag());
142:         MessageAccessor.setProperties(newMsg, msg.getProperties());
143:         MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
144:         MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
145:         MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
146:         newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
147: 
148:         this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
149:         return true;
150:     } catch (Exception e) {
151:         log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
152:     }
153: 
154:     return false;
155: }複製代碼
  • ⬆️⬆️⬆️
  • 第 21 至 29 行 :消費成功。在自動提交進度( AutoCommit )的狀況下,COMMITROLLBACKSUCCESS 邏輯已經統一
  • 第 30 至 45 行 :消費失敗。當消息重試次數超過上限(默認 :16次)時,將消息發送到 Broker 死信隊列,跳過這些消息。此時,消息隊列無需掛起,繼續消費後面的消息。
  • 第 85 至 88 行 :提交消費進度。

3.13 消息處理隊列核心方法

😈涉及到的四個核心方法的源碼:

1: // ⬇️⬇️⬇️【ProcessQueue.java】
  2: /** 3: * 消息映射 4: * key:消息隊列位置 5: */
  6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();    /** 7: * 消息映射臨時存儲(消費中的消息) 8: */
  9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();
 10: 
 11: /** 12: * 回滾消費中的消息 13: * 邏輯相似於{@link #makeMessageToCosumeAgain(List)} 14: */
 15: public void rollback() {
 16:     try {
 17:         this.lockTreeMap.writeLock().lockInterruptibly();
 18:         try {
 19:             this.msgTreeMap.putAll(this.msgTreeMapTemp);
 20:             this.msgTreeMapTemp.clear();
 21:         } finally {
 22:             this.lockTreeMap.writeLock().unlock();
 23:         }
 24:     } catch (InterruptedException e) {
 25:         log.error("rollback exception", e);
 26:     }
 27: }
 28: 
 29: /** 30: * 提交消費中的消息已消費成功,返回消費進度 31: * 32: * @return 消費進度 33: */
 34: public long commit() {
 35:     try {
 36:         this.lockTreeMap.writeLock().lockInterruptibly();
 37:         try {
 38:             // 消費進度
 39:             Long offset = this.msgTreeMapTemp.lastKey();
 40: 
 41:             //
 42:             msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
 43: 
 44:             //
 45:             this.msgTreeMapTemp.clear();
 46: 
 47:             // 返回消費進度
 48:             if (offset != null) {
 49:                 return offset + 1;
 50:             }
 51:         } finally {
 52:             this.lockTreeMap.writeLock().unlock();
 53:         }
 54:     } catch (InterruptedException e) {
 55:         log.error("commit exception", e);
 56:     }
 57: 
 58:     return -1;
 59: }
 60: 
 61: /** 62: * 指定消息從新消費 63: * 邏輯相似於{@link #rollback()} 64: * 65: * @param msgs 消息 66: */
 67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
 68:     try {
 69:         this.lockTreeMap.writeLock().lockInterruptibly();
 70:         try {
 71:             for (MessageExt msg : msgs) {
 72:                 this.msgTreeMapTemp.remove(msg.getQueueOffset());
 73:                 this.msgTreeMap.put(msg.getQueueOffset(), msg);
 74:             }
 75:         } finally {
 76:             this.lockTreeMap.writeLock().unlock();
 77:         }
 78:     } catch (InterruptedException e) {
 79:         log.error("makeMessageToCosumeAgain exception", e);
 80:     }
 81: }
 82: 
 83: /** 84: * 得到持有消息前N條 85: * 86: * @param batchSize 條數 87: * @return 消息 88: */
 89: public List<MessageExt> takeMessags(final int batchSize) {
 90:     List<MessageExt> result = new ArrayList<>(batchSize);
 91:     final long now = System.currentTimeMillis();
 92:     try {
 93:         this.lockTreeMap.writeLock().lockInterruptibly();
 94:         this.lastConsumeTimestamp = now;
 95:         try {
 96:             if (!this.msgTreeMap.isEmpty()) {
 97:                 for (int i = 0; i < batchSize; i++) {
 98:                     Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
 99:                     if (entry != null) {
100:                         result.add(entry.getValue());
101:                         msgTreeMapTemp.put(entry.getKey(), entry.getValue());
102:                     } else {
103:                         break;
104:                     }
105:                 }
106:             }
107: 
108:             if (result.isEmpty()) {
109:                 consuming = false;
110:             }
111:         } finally {
112:             this.lockTreeMap.writeLock().unlock();
113:         }
114:     } catch (InterruptedException e) {
115:         log.error("take Messages exception", e);
116:     }
117: 
118:     return result;
119: }複製代碼
相關文章
相關標籤/搜索