上一節消息重試裏面提到了重試的消息能夠被延時消費,其實除此以外,用戶發送的消息也能夠指定延時時間(更準確的說是延時等級),而後在指定延時時間以後投遞消息,而後被consumer消費。阿里雲的ons還支持定時消息,並且延時消息是直接指定延時時間,其實阿里雲的延時消息也是定時消息的另外一種表述方式,都是經過設置消息被投遞的時間來實現的,可是Apache RocketMQ在版本4.2.0中尚不支持指定時間的延時,只能經過配置延時等級和延時等級對應的時間來實現延時。html
一個延時消息被髮出到消費成功經歷如下幾個過程:java
注意:批量消息是不支持延時消息的apache
tips:下文中說到的延時隊列能夠理解爲一個ConsumeQueue
app
在producer中發送消息的時候,設置Message的delayLevelide
// org.apache.rocketmq.common.message.Message public void setDelayTimeLevel(int level) { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); }
調用上面的方法設置延時等級的時候,會向message添加"DELAY"屬性,後面broker處理延時消息就是依賴該屬性進行特別的處理。ui
接下來發送消息的流程和正常發送消息的流程基本一致,只是會將該消息標記爲延時消息類型this
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); }
broker收到延時消息和正常消息在前置的處理流程是一致的,對於延時消息的特殊處理體如今將消息寫入存儲(內存或文件)的時候阿里雲
// org.apache.rocketmq.store.CommitLog#putMessage public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 省略中間代碼... StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); // 拿到原始topic和對應的queueId String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 非事務消息和事務的commit消息纔會進一步判斷delayLevel if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { // 糾正設置過大的level,就是delayLevel設置都大於延時時間等級的最大級 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 設置爲延時隊列的topic topic = ScheduleMessageService.SCHEDULE_TOPIC; // 每個延時等級一個queue,queueId = delayLevel - 1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId // 備份原始的topic和queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); // 更新properties msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } // 省略中間代碼... }
上面的SCHEDULE_TOPIC是:線程
public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
code
這個topic是一個特殊的topic,和正常的topic不一樣的地方是:
後面消息寫入的過程和普通的又是一致的。
上面將消息寫入延時隊列中了,接下來就是處理延時隊列中的消息,而後從新發送回原始topic的隊列中。
在此以前先說明下至今還有疑問的一個個概念——delayLevel。這個概念和咱們接下要須要用到的的類org.apache.rocketmq.store.schedule.ScheduleMessageService有關,這個類的字段delayLevelTable裏面保存了具體的延時等級
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
看下這個字段的初始化過程
// org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel public boolean parseDelayLevel() { HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); // 每一個延時等級延時時間的單位對應的ms數 timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); timeUnitTable.put("h", 1000L * 60 * 60); timeUnitTable.put("d", 1000L * 60 * 60 * 24); // 延時等級在MessageStoreConfig中配置 // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); try { // 根據空格將配置分隔出每一個等級 String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { String value = levelArray[i]; String ch = value.substring(value.length() - 1); // 時間單位對應的ms數 Long tu = timeUnitTable.get(ch); // 延時等級從1開始 int level = i + 1; if (level > this.maxDelayLevel) { // 找出最大的延時等級 this.maxDelayLevel = level; } long num = Long.parseLong(value.substring(0, value.length() - 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); // 省略部分代碼... }
上面這個load方法在broker啓動的時候DefaultMessageStore會調用來初始化延時等級。
接下來就應該解決怎麼處理延時消息隊列中的消息的問題了。處理延時消息的服務是:ScheduleMessageService。
仍是broker啓動的時候DefaultMessageStore會調用org.apache.rocketmq.store.schedule.ScheduleMessageService#start來啓動處理延時消息隊列的服務:
public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); // 記錄隊列的處理進度 Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { // 每一個延時隊列啓動一個定時任務來處理該隊列的延時消息 this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 持久化offsetTable(保存了每一個延時隊列對應的處理進度offset) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
DeliverDelayedMessageTimerTask是一個TimerTask,啓動之後不斷處理延時隊列中的消息,直到出現異常則終止該線程從新啓動一個新的TimerTask
public void executeOnTimeup() { // 找到該延時等級對應的ConsumeQueue ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); // 記錄異常狀況下一次啓動TimerTask開始處理的offset long failScheduleOffset = offset; if (cq != null) { // 找到offset所處的MappedFile中offset後面的buffer SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 下面三個字段信息是ConsumeQueue物理存儲的信息 long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); // 注意這個tagCode,再也不是普通的tag的hashCode,而是該延時消息到期的時間 long tagsCode = bufferCQ.getByteBuffer().getLong(); // 省略中間代碼.... long now = System.currentTimeMillis(); // 計算應該投遞該消息的時間,若是已經超時則當即投遞 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); // 計算下一個消息的開始位置,用來尋找下一個消息位置(若是有的話) nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 判斷延時消息是否到期 long countdown = deliverTimestamp - now; if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { // 將消息恢復到原始消息的格式,恢復topic、queueId、tagCode等,清除屬性"DELAY" MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 投遞成功,處理下一個 continue; } else { // XXX: warn and notify me log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); // 投遞失敗,結束當前task,從新啓動TimerTask,從下一個消息開始處理,也就是說當前消息丟棄 // 更新offsetTable中當前隊列的offset爲下一個消息的offset ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { // 從新投遞期間出現任何異常,結束當前task,從新啓動TimerTask,從當前消息開始重試 /* * XXX: warn and notify me */ log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); } } } else { ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for // 處理完當前MappedFile中的消息後,從新啓動TimerTask,從下一個消息開始處理 // 更新offsetTable中當前隊列的offset爲下一個消息的offset nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } // end of if (bufferCQ != null) else { // 若是根據offsetTable中的offset沒有找到對應的消息(可能被刪除了),則按照當前ConsumeQueue的最小offset開始處理 long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
對於上面的tagCode作一下特別說明,延時消息的tagCode和普通消息不同:
對延時消息的tagCode的特別處理是在下面這個方法中完成的,也就是在build ConsumeQueue信息的時候
org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)
以上就是RocketMQ延時消息的實現方式,上面沒有詳說的是重試消息的延時是怎麼實現的,其實就是在consumer將延時消息發送回broker的時候設置了(用戶能夠本身設置,若是沒有本身設置默認是0)delayLevel,到了broker處理重試消息的時候若是delayLevel是0(也就是說是默認的延時等級)的時候會在原來的基礎上加3,後面的處理就和上面說的延時消息同樣了,存儲的時候將消息投遞到延時隊列,等待延時到期後再從新投遞到原始topic隊列中等到consumer消費。