建議前置閱讀內容:java
爲何把定時消息與消息重試放在一塊兒?你猜。
你猜我猜不猜。json
定時消息是指消息發到 Broker 後,不能馬上被 Consumer 消費,要到特定的時間點或者等待特定的時間後才能被消費。app
下圖是定時消息的處理邏輯圖:ide
RocketMQ
目前只支持固定精度的定時消息。官方說法以下:源碼分析
若是要支持任意的時間精度,在 Broker 層面,必需要作消息排序,若是再涉及到持久化,那麼消息排序要不可避免的產生巨大性能開銷。性能
延遲級別 | 時間 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
核心源碼以下:this
1: // 【MessageStoreConfig.java】 2: /** 3: * 消息延遲級別字符串配置 4: */ 5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 6: 7: // 【ScheduleMessageService.java】 8: /** 9: * 解析延遲級別 10: * 11: * @return 是否解析成功 12: */ 13: public boolean parseDelayLevel() { 14: HashMap<String, Long> timeUnitTable = new HashMap<>(); 15: timeUnitTable.put("s", 1000L); 16: timeUnitTable.put("m", 1000L * 60); 17: timeUnitTable.put("h", 1000L * 60 * 60); 18: timeUnitTable.put("d", 1000L * 60 * 60 * 24); 19: 20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); 21: try { 22: String[] levelArray = levelString.split(" "); 23: for (int i = 0; i < levelArray.length; i++) { 24: String value = levelArray[i]; 25: String ch = value.substring(value.length() - 1); 26: Long tu = timeUnitTable.get(ch); 27: 28: int level = i + 1; 29: if (level > this.maxDelayLevel) { 30: this.maxDelayLevel = level; 31: } 32: long num = Long.parseLong(value.substring(0, value.length() - 1)); 33: long delayTimeMillis = tu * num; 34: this.delayLevelTable.put(level, delayTimeMillis); 35: } 36: } catch (Exception e) { 37: log.error("parseDelayLevel exception", e); 38: log.info("levelString String = {}", levelString); 39: return false; 40: } 41: 42: return true; 43: }
Message msg = new Message(...); msg.setDelayTimeLevel(level);
Topic
爲 SCHEDULE_TOPIC_XXXX
。核心代碼以下:spa
1: // 【CommitLog.java】 2: /** 3: * 添加消息,返回消息結果 4: * 5: * @param msg 消息 6: * @return 結果 7: */ 8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) { 9: // .... 10: 11: // 定時消息處理 12: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 13: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 14: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 15: // Delay Delivery 16: if (msg.getDelayTimeLevel() > 0) { 17: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 18: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 19: } 20: 21: // 存儲消息時,延遲消息進入 `Topic` 爲 `SCHEDULE_TOPIC_XXXX` 。 22: topic = ScheduleMessageService.SCHEDULE_TOPIC; 23: 24: // 延遲級別 與 消息隊列編號 作固定映射 25: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 26: 27: // Backup real topic, queueId 28: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 30: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 31: 32: msg.setTopic(topic); 33: msg.setQueueId(queueId); 34: } 35: } 36: 37: // .... 38: } 39: 40: // 【ScheduleMessageService.java】 41: /** 42: * 根據 延遲級別 計算 消息隊列編號 43: * QueueId = DelayLevel - 1 44: * 45: * @param delayLevel 延遲級別 46: * @return 消息隊列編號 47: */ 48: public static int delayLevel2QueueId(final int delayLevel) { 49: return delayLevel - 1; 50: }
ConsumeQueue
時,每條消息的 tagsCode
使用【消息計劃消費時間】。這樣,ScheduleMessageService
在輪詢 ConsumeQueue
時,能夠使用 tagsCode
進行過濾。核心代碼以下:.net
1: // 【CommitLog.java】 2: /** 3: * check the message and returns the message size 4: * 5: * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure 6: */ 7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { 8: try { 9: // // .... 10: 11: // 17 properties 12: short propertiesLength = byteBuffer.getShort(); 13: if (propertiesLength > 0) { 14: // .... 15: String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS); 16: if (tags != null && tags.length() > 0) { 17: tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags); 18: } 19: 20: // Timing message processing 21: { 22: String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); 23: if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) { 24: int delayLevel = Integer.parseInt(t); 25: 26: if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 27: delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); 28: } 29: 30: if (delayLevel > 0) { 31: tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, 32: storeTimestamp); 33: } 34: } 35: } 36: } 37: 38: // .... 39: 40: return new DispatchRequest(// 41: topic, // 1 42: queueId, // 2 43: physicOffset, // 3 44: totalSize, // 4 45: tagsCode, // 5 46: storeTimestamp, // 6 47: queueOffset, // 7 48: keys, // 8 49: uniqKey, //9 50: sysFlag, // 9 51: preparedTransactionOffset// 10 52: ); 53: } catch (Exception e) { 54: } 55: 56: return new DispatchRequest(-1, false /* success */); 57: } 58: 59: // 【ScheduleMessageService.java】 60: /** 61: * 計算 投遞時間【計劃消費時間】 62: * 63: * @param delayLevel 延遲級別 64: * @param storeTimestamp 存儲時間 65: * @return 投遞時間【計劃消費時間】 66: */ 67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { 68: Long time = this.delayLevelTable.get(delayLevel); 69: if (time != null) { 70: return time + storeTimestamp; 71: } 72: 73: return storeTimestamp + 1000; 74: }
SCHEDULE_TOPIC_XXXX
每條消費隊列對應單獨一個定時任務進行輪詢,發送 到達投遞時間【計劃消費時間】 的消息。下圖是發送定時消息的處理邏輯圖:3d
實現代碼以下:
1: /** 2: * 發送(投遞)延遲消息定時任務 3: */ 4: class DeliverDelayedMessageTimerTask extends TimerTask { 5: /** 6: * 延遲級別 7: */ 8: private final int delayLevel; 9: /** 10: * 位置 11: */ 12: private final long offset; 13: 14: public DeliverDelayedMessageTimerTask(int delayLevel, long offset) { 15: this.delayLevel = delayLevel; 16: this.offset = offset; 17: } 18: 19: @Override 20: public void run() { 21: try { 22: this.executeOnTimeup(); 23: } catch (Exception e) { 24: // XXX: warn and notify me 25: log.error("ScheduleMessageService, executeOnTimeup exception", e); 26: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( 27: this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); 28: } 29: } 30: 31: /** 32: * 糾正可投遞時間。 33: * 由於發送級別對應的發送間隔能夠調整,若是超過當前間隔,則修正成當前配置,避免後面的消息沒法發送。 34: * 35: * @param now 當前時間 36: * @param deliverTimestamp 投遞時間 37: * @return 糾正結果 38: */ 39: private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { 40: long result = deliverTimestamp; 41: 42: long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); 43: if (deliverTimestamp > maxTimestamp) { 44: result = now; 45: } 46: 47: return result; 48: } 49: 50: public void executeOnTimeup() { 51: ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); 52: 53: long failScheduleOffset = offset; 54: 55: if (cq != null) { 56: SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); 57: if (bufferCQ != null) { 58: try { 59: long nextOffset = offset; 60: int i = 0; 61: for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 62: long offsetPy = bufferCQ.getByteBuffer().getLong(); 63: int sizePy = bufferCQ.getByteBuffer().getInt(); 64: long tagsCode = bufferCQ.getByteBuffer().getLong(); 65: 66: long now = System.currentTimeMillis(); 67: long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); 68: 69: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); 70: 71: long countdown = deliverTimestamp - now; 72: 73: if (countdown <= 0) { // 消息到達可發送時間 74: MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); 75: if (msgExt != null) { 76: try { 77: // 發送消息 78: MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); 79: PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner); 80: if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 發送成功 81: continue; 82: } else { // 發送失敗 83: // XXX: warn and notify me 84: log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); 85: 86: // 安排下一次任務 87: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); 88: 89: // 更新進度 90: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); 91: return; 92: } 93: } catch (Exception e) { 94: // XXX: warn and notify me 95: log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" 96: + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); 97: } 98: } 99: } else { 100: // 安排下一次任務 101: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); 102: 103: // 更新進度 104: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); 105: return; 106: } 107: } // end of for 108: 109: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); 110: 111: // 安排下一次任務 112: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); 113: 114: // 更新進度 115: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); 116: return; 117: } finally { 118: bufferCQ.release(); 119: } 120: } // end of if (bufferCQ != null) 121: else { // 消費隊列已經被刪除部分,跳轉到最小的消費進度 122: long cqMinOffset = cq.getMinOffsetInQueue(); 123: if (offset < cqMinOffset) { 124: failScheduleOffset = cqMinOffset; 125: log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" 126: + cqMinOffset + ", queueId=" + cq.getQueueId()); 127: } 128: } 129: } // end of if (cq != null) 130: 131: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); 132: } 133: 134: /** 135: * 設置消息內容 136: * 137: * @param msgExt 消息 138: * @return 消息 139: */ 140: private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { 141: MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); 142: msgInner.setBody(msgExt.getBody()); 143: msgInner.setFlag(msgExt.getFlag()); 144: MessageAccessor.setProperties(msgInner, msgExt.getProperties()); 145: 146: TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); 147: long tagsCodeValue = 148: MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); 149: msgInner.setTagsCode(tagsCodeValue); 150: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); 151: 152: msgInner.setSysFlag(msgExt.getSysFlag()); 153: msgInner.setBornTimestamp(msgExt.getBornTimestamp()); 154: msgInner.setBornHost(msgExt.getBornHost()); 155: msgInner.setStoreHost(msgExt.getStoreHost()); 156: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); 157: 158: msgInner.setWaitStoreMsgOK(false); 159: MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); 160: 161: msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); 162: 163: String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID); 164: int queueId = Integer.parseInt(queueIdStr); 165: msgInner.setQueueId(queueId); 166: 167: return msgInner; 168: } 169: }
../config/delayOffset.json
)裏核心代碼以下:
1: // 【ScheduleMessageService.java】 2: /** 3: public void start() { 4: // 定時發送消息 5: for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 6: Integer level = entry.getKey(); 7: Long timeDelay = entry.getValue(); 8: Long offset = this.offsetTable.get(level); 9: if (null == offset) { 10: offset = 0L; 11: } 12: 13: if (timeDelay != null) { 14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); 15: } 16: } 17: 18: // 定時持久化發送進度 19: this.timer.scheduleAtFixedRate(new TimerTask() { 20: 21: @Override 22: public void run() { 23: try { 24: ScheduleMessageService.this.persist(); 25: } catch (Exception e) { 26: log.error("scheduleAtFixedRate flush exception", e); 27: } 28: } 29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); 30: }
Consumer 消費消息失敗後,要提供一種重試機制,令消息再消費一次。
Consumer
將消費失敗的消息發回 Broker
,進入延遲消息隊列。即,消費失敗的消息,不會當即消費。核心代碼以下:
1: // 【SendMessageProcessor.java】 2: /** 3: * 消費者發回消息 4: * 5: * @param ctx ctx 6: * @param request 請求 7: * @return 響應 8: * @throws RemotingCommandException 當遠程調用異常 9: */ 10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 11: throws RemotingCommandException { 12: // .... 13: // 處理 delayLevel(獨有) 14: int delayLevel = requestHeader.getDelayLevel(); 15: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); 16: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { 17: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); 18: } 19: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// 20: // .... 21: } else { 22: if (0 == delayLevel) { 23: delayLevel = 3 + msgExt.getReconsumeTimes(); 24: } 25: msgExt.setDelayTimeLevel(delayLevel); 26: } 27: 28: // .... 29: return response; 30: }