RocketMQ源碼解析:定時消息與消息重試

🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: java

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右

1. 概述

建議前置閱讀內容:json

😈 爲何把定時消息消息重試放在一塊兒?你猜。
👻 你猜我猜不猜。後端

2. 定時消息

定時消息是指消息發到 Broker 後,不能馬上被 Consumer 消費,要到特定的時間點或者等待特定的時間後才能被消費。微信

下圖是定時消息的處理邏輯圖:app

定時消息邏輯圖.png
定時消息邏輯圖.png

2.1 延遲級別

RocketMQ 目前只支持固定精度的定時消息。官方說法以下:ide

若是要支持任意的時間精度,在 Broker 層面,必需要作消息排序,若是再涉及到持久化,那麼消息排序要不可避免的產生巨大性能開銷。源碼分析

  • 延遲級別:
延遲級別 時間
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h
  • 核心源碼以下:性能

    1: // ⬇️⬇️⬇️【MessageStoreConfig.java】
        2: /** 3: * 消息延遲級別字符串配置 4: */
        5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
        6: 
        7: // ⬇️⬇️⬇️【ScheduleMessageService.java】
        8: /** 9: * 解析延遲級別 10: * 11: * @return 是否解析成功 12: */
       13: public boolean parseDelayLevel() {
       14:     HashMap<String, Long> timeUnitTable = new HashMap<>();
       15:     timeUnitTable.put("s", 1000L);
       16:     timeUnitTable.put("m", 1000L * 60);
       17:     timeUnitTable.put("h", 1000L * 60 * 60);
       18:     timeUnitTable.put("d", 1000L * 60 * 60 * 24);
       19: 
       20:     String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
       21:     try {
       22:         String[] levelArray = levelString.split(" ");
       23:         for (int i = 0; i < levelArray.length; i++) {
       24:             String value = levelArray[i];
       25:             String ch = value.substring(value.length() - 1);
       26:             Long tu = timeUnitTable.get(ch);
       27: 
       28:             int level = i + 1;
       29:             if (level > this.maxDelayLevel) {
       30:                 this.maxDelayLevel = level;
       31:             }
       32:             long num = Long.parseLong(value.substring(0, value.length() - 1));
       33:             long delayTimeMillis = tu * num;
       34:             this.delayLevelTable.put(level, delayTimeMillis);
       35:         }
       36:     } catch (Exception e) {
       37:         log.error("parseDelayLevel exception", e);
       38:         log.info("levelString String = {}", levelString);
       39:         return false;
       40:     }
       41: 
       42:     return true;
       43: }複製代碼

2.2 Producer 發送定時消息

  • 🦅發送時,設置消息的延遲級別
Message msg = new Message(...);
msg.setDelayTimeLevel(level);複製代碼

2.3 Broker 存儲定時消息

  • 🦅 存儲消息時,延遲消息進入 TopicSCHEDULE_TOPIC_XXXX
  • 🦅 延遲級別 與 消息隊列編號 作固定映射:QueueId = DelayLevel - 1

核心代碼以下:this

1: // ⬇️⬇️⬇️【CommitLog.java】
  2: /** 3: * 添加消息,返回消息結果 4: * 5: * @param msg 消息 6: * @return 結果 7: */
  8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
  9:     // ....(省略代碼) 
 10: 
 11:     // 定時消息處理
 12:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
 13:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
 14:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
 15:         // Delay Delivery
 16:         if (msg.getDelayTimeLevel() > 0) {
 17:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 18:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 19:             }
 20: 
 21:             // 存儲消息時,延遲消息進入 `Topic` 爲 `SCHEDULE_TOPIC_XXXX` 。
 22:             topic = ScheduleMessageService.SCHEDULE_TOPIC;
 23: 
 24:             // 延遲級別 與 消息隊列編號 作固定映射
 25:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 26: 
 27:             // Backup real topic, queueId
 28:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
 30:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 31: 
 32:             msg.setTopic(topic);
 33:             msg.setQueueId(queueId);
 34:         }
 35:     }
 36: 
 37:     // ....(省略代碼) 
 38: }
 39: 
 40: // ⬇️⬇️⬇️【ScheduleMessageService.java】
 41: /** 42: * 根據 延遲級別 計算 消息隊列編號 43: * QueueId = DelayLevel - 1 44: * 45: * @param delayLevel 延遲級別 46: * @return 消息隊列編號 47: */
 48: public static int delayLevel2QueueId(final int delayLevel) {
 49:     return delayLevel - 1;
 50: }複製代碼

  • 🦅 生成 ConsumeQueue 時,每條消息的 tagsCode 使用【消息計劃消費時間】。這樣,ScheduleMessageService 在輪詢 ConsumeQueue 時,能夠使用 tagsCode 進行過濾。

核心代碼以下:spa

1: // ⬇️⬇️⬇️【CommitLog.java】
  2: /** 3: * check the message and returns the message size 4: * 5: * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure 6: */
  7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
  8:     try {
  9:         // // ....(省略代碼)
 10: 
 11:         // 17 properties
 12:         short propertiesLength = byteBuffer.getShort();
 13:         if (propertiesLength > 0) {
 14:             // ....(省略代碼)
 15:             String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
 16:             if (tags != null && tags.length() > 0) {
 17:                 tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
 18:             }
 19: 
 20:             // Timing message processing
 21:             {
 22:                 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
 23:                 if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
 24:                     int delayLevel = Integer.parseInt(t);
 25: 
 26:                     if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 27:                         delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
 28:                     }
 29: 
 30:                     if (delayLevel > 0) {
 31:                         tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
 32:                             storeTimestamp);
 33:                     }
 34:                 }
 35:             }
 36:         }
 37: 
 38:         // ....(省略代碼)
 39: 
 40:         return new DispatchRequest(//
 41:             topic, // 1
 42:             queueId, // 2
 43:             physicOffset, // 3
 44:             totalSize, // 4
 45:             tagsCode, // 5
 46:             storeTimestamp, // 6
 47:             queueOffset, // 7
 48:             keys, // 8
 49:             uniqKey, //9
 50:             sysFlag, // 9
 51:             preparedTransactionOffset// 10
 52:         );
 53:     } catch (Exception e) {
 54:     }
 55: 
 56:     return new DispatchRequest(-1, false /* success */);
 57: }
 58: 
 59: // ⬇️⬇️⬇️【ScheduleMessageService.java】
 60: /** 61: * 計算 投遞時間【計劃消費時間】 62: * 63: * @param delayLevel 延遲級別 64: * @param storeTimestamp 存儲時間 65: * @return 投遞時間【計劃消費時間】 66: */
 67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
 68:     Long time = this.delayLevelTable.get(delayLevel);
 69:     if (time != null) {
 70:         return time + storeTimestamp;
 71:     }
 72: 
 73:     return storeTimestamp + 1000;
 74: }複製代碼

2.4 Broker 發送定時消息

  • 🦅 對 SCHEDULE_TOPIC_XXXX 每條消費隊列對應單獨一個定時任務進行輪詢,發送 到達投遞時間【計劃消費時間】 的消息。

下圖是發送定時消息的處理邏輯圖:

定時消息定時邏輯
定時消息定時邏輯

實現代碼以下:

1: /** 2: * ⬇️⬇️⬇️ 發送(投遞)延遲消息定時任務 3: */
  4: class DeliverDelayedMessageTimerTask extends TimerTask {
  5:     /** 6: * 延遲級別 7: */
  8:     private final int delayLevel;
  9:     /** 10: * 位置 11: */
 12:     private final long offset;
 13: 
 14:     public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
 15:         this.delayLevel = delayLevel;
 16:         this.offset = offset;
 17:     }
 18: 
 19:     @Override
 20:     public void run() {
 21:         try {
 22:             this.executeOnTimeup();
 23:         } catch (Exception e) {
 24:             // XXX: warn and notify me
 25:             log.error("ScheduleMessageService, executeOnTimeup exception", e);
 26:             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
 27:                 this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
 28:         }
 29:     }
 30: 
 31:     /** 32: * 糾正可投遞時間。 33: * 由於發送級別對應的發送間隔能夠調整,若是超過當前間隔,則修正成當前配置,避免後面的消息沒法發送。 34: * 35: * @param now 當前時間 36: * @param deliverTimestamp 投遞時間 37: * @return 糾正結果 38: */
 39:     private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
 40:         long result = deliverTimestamp;
 41: 
 42:         long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
 43:         if (deliverTimestamp > maxTimestamp) {
 44:             result = now;
 45:         }
 46: 
 47:         return result;
 48:     }
 49: 
 50:     public void executeOnTimeup() {
 51:         ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,  delayLevel2QueueId(delayLevel));
 52: 
 53:         long failScheduleOffset = offset;
 54: 
 55:         if (cq != null) {
 56:             SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
 57:             if (bufferCQ != null) {
 58:                 try {
 59:                     long nextOffset = offset;
 60:                     int i = 0;
 61:                     for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
 62:                         long offsetPy = bufferCQ.getByteBuffer().getLong();
 63:                         int sizePy = bufferCQ.getByteBuffer().getInt();
 64:                         long tagsCode = bufferCQ.getByteBuffer().getLong();
 65: 
 66:                         long now = System.currentTimeMillis();
 67:                         long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 68: 
 69:                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 70: 
 71:                         long countdown = deliverTimestamp - now;
 72: 
 73:                         if (countdown <= 0) { // 消息到達可發送時間
 74:                             MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
 75:                             if (msgExt != null) {
 76:                                 try {
 77:                                     // 發送消息
 78:                                     MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
 79:                                     PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
 80:                                     if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 發送成功
 81:                                         continue;
 82:                                     } else { // 發送失敗
 83:                                         // XXX: warn and notify me
 84:                                         log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
 85: 
 86:                                         // 安排下一次任務
 87:                                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
 88: 
 89:                                         // 更新進度
 90:                                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
 91:                                         return;
 92:                                     }
 93:                                 } catch (Exception e) {
 94:                                     // XXX: warn and notify me
 95:                                     log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
 96:                                             + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
 97:                                 }
 98:                             }
 99:                         } else {
100:                             // 安排下一次任務
101:                             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
102: 
103:                             // 更新進度
104:                             ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
105:                             return;
106:                         }
107:                     } // end of for
108: 
109:                     nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
110: 
111:                     // 安排下一次任務
112:                     ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
113: 
114:                     // 更新進度
115:                     ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
116:                     return;
117:                 } finally {
118:                     bufferCQ.release();
119:                 }
120:             } // end of if (bufferCQ != null)
121:             else { // 消費隊列已經被刪除部分,跳轉到最小的消費進度
122:                 long cqMinOffset = cq.getMinOffsetInQueue();
123:                 if (offset < cqMinOffset) {
124:                     failScheduleOffset = cqMinOffset;
125:                     log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
126:                         + cqMinOffset + ", queueId=" + cq.getQueueId());
127:                 }
128:             }
129:         } // end of if (cq != null)
130: 
131:         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
132:     }
133: 
134:     /** 135: * 設置消息內容 136: * 137: * @param msgExt 消息 138: * @return 消息 139: */
140:     private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
141:         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
142:         msgInner.setBody(msgExt.getBody());
143:         msgInner.setFlag(msgExt.getFlag());
144:         MessageAccessor.setProperties(msgInner, msgExt.getProperties());
145: 
146:         TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
147:         long tagsCodeValue =
148:             MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
149:         msgInner.setTagsCode(tagsCodeValue);
150:         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
151: 
152:         msgInner.setSysFlag(msgExt.getSysFlag());
153:         msgInner.setBornTimestamp(msgExt.getBornTimestamp());
154:         msgInner.setBornHost(msgExt.getBornHost());
155:         msgInner.setStoreHost(msgExt.getStoreHost());
156:         msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
157: 
158:         msgInner.setWaitStoreMsgOK(false);
159:         MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
160: 
161:         msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
162: 
163:         String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
164:         int queueId = Integer.parseInt(queueIdStr);
165:         msgInner.setQueueId(queueId);
166: 
167:         return msgInner;
168:     }
169: }複製代碼

2.5 Broker 持久化定時發送進度

  • 🦅 定時消息發送進度存儲在文件(../config/delayOffset.json)裏
  • 🦅 每 10s 定時持久化發送進度。

核心代碼以下:

1: // ⬇️⬇️⬇️【ScheduleMessageService.java】
  2: /** 3: public void start() { 4: // 定時發送消息 5: for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 6: Integer level = entry.getKey(); 7: Long timeDelay = entry.getValue(); 8: Long offset = this.offsetTable.get(level); 9: if (null == offset) { 10: offset = 0L; 11: } 12: 13: if (timeDelay != null) { 14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); 15: } 16: } 17: 18: // 定時持久化發送進度 19: this.timer.scheduleAtFixedRate(new TimerTask() { 20: 21: @Override 22: public void run() { 23: try { 24: ScheduleMessageService.this.persist(); 25: } catch (Exception e) { 26: log.error("scheduleAtFixedRate flush exception", e); 27: } 28: } 29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); 30: }複製代碼

3. 消息重試

Consumer 消費消息失敗後,要提供一種重試機制,令消息再消費一次。

  • 🦅 Consumer 將消費失敗的消息發回 Broker,進入延遲消息隊列。即,消費失敗的消息,不會當即消費。

核心代碼以下:

1: // ⬇️⬇️⬇️【SendMessageProcessor.java】
  2: /** 3: * 消費者發回消息 4: * 5: * @param ctx ctx 6: * @param request 請求 7: * @return 響應 8: * @throws RemotingCommandException 當遠程調用異常 9: */
 10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 11: throws RemotingCommandException {
 12:     // ....(省略代碼)
 13:     // 處理 delayLevel(獨有)。
 14:     int delayLevel = requestHeader.getDelayLevel();
 15:     int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
 16:     if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
 17:         maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
 18:     }
 19:     if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
 20:     // ....(省略代碼)
 21:     } else {
 22:         if (0 == delayLevel) {
 23:             delayLevel = 3 + msgExt.getReconsumeTimes();
 24:         }
 25:         msgExt.setDelayTimeLevel(delayLevel);
 26:     }
 27: 
 28:     // ....(省略代碼)
 29:     return response;
 30: }複製代碼
相關文章
相關標籤/搜索