RocketMQ源碼 — 9、 RocketMQ延時消息

上一節消息重試裏面提到了重試的消息能夠被延時消費,其實除此以外,用戶發送的消息也能夠指定延時時間(更準確的說是延時等級),而後在指定延時時間以後投遞消息,而後被consumer消費。阿里雲的ons還支持定時消息,並且延時消息是直接指定延時時間,其實阿里雲的延時消息也是定時消息的另外一種表述方式,都是經過設置消息被投遞的時間來實現的,可是Apache RocketMQ在版本4.2.0中尚不支持指定時間的延時,只能經過配置延時等級和延時等級對應的時間來實現延時。html

一個延時消息被髮出到消費成功經歷如下幾個過程:java

  1. 設置消息的延時級別delayLevel
  2. producer發送消息
  3. broker收到消息在準備將消息寫入存儲的時候,判斷是延時消息則更改Message的topic爲延時消息隊列的topic,也就是將消息投遞到延時消息隊列
  4. 有定時任務從延時隊列中讀取消息,拿到消息後判斷是否達到延時時間,若是到了則修改topic爲原始topic。並將消息投遞到原始topic的隊列
  5. consumer像消費其餘消息同樣從broker拉取消息進行消費

注意:批量消息是不支持延時消息的apache

tips:下文中說到的延時隊列能夠理解爲一個ConsumeQueueapp

producer發送延時消息

在producer中發送消息的時候,設置Message的delayLevelide

// org.apache.rocketmq.common.message.Message
public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

調用上面的方法設置延時等級的時候,會向message添加"DELAY"屬性,後面broker處理延時消息就是依賴該屬性進行特別的處理。ui

接下來發送消息的流程和正常發送消息的流程基本一致,只是會將該消息標記爲延時消息類型this

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl 
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
    context.setMsgType(MessageType.Delay_Msg);
}

broker處理延時消息

broker收到延時消息和正常消息在前置的處理流程是一致的,對於延時消息的特殊處理體如今將消息寫入存儲(內存或文件)的時候阿里雲

// org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 省略中間代碼...
    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    // 拿到原始topic和對應的queueId
    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 非事務消息和事務的commit消息纔會進一步判斷delayLevel
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            // 糾正設置過大的level,就是delayLevel設置都大於延時時間等級的最大級
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            // 設置爲延時隊列的topic
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            // 每個延時等級一個queue,queueId = delayLevel - 1
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            // 備份原始的topic和queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            // 更新properties
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    // 省略中間代碼...
}

上面的SCHEDULE_TOPIC是:線程

public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";code

這個topic是一個特殊的topic,和正常的topic不一樣的地方是:

  1. 不會建立TopicConfig,由於也不須要consumer直接消費這個topic下的消息
  2. 不會將topic註冊到namesrv
  3. 這個topic的隊列個數和延時等級的個數是相同的

後面消息寫入的過程和普通的又是一致的。

上面將消息寫入延時隊列中了,接下來就是處理延時隊列中的消息,而後從新發送回原始topic的隊列中。

在此以前先說明下至今還有疑問的一個個概念——delayLevel。這個概念和咱們接下要須要用到的的類org.apache.rocketmq.store.schedule.ScheduleMessageService有關,這個類的字段delayLevelTable裏面保存了具體的延時等級

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

看下這個字段的初始化過程

// org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel
public boolean parseDelayLevel() {
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    // 每一個延時等級延時時間的單位對應的ms數
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);

    // 延時等級在MessageStoreConfig中配置
    // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
        // 根據空格將配置分隔出每一個等級
        String[] levelArray = levelString.split(" ");
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            // 時間單位對應的ms數
            Long tu = timeUnitTable.get(ch);

            // 延時等級從1開始
            int level = i + 1;
            if (level > this.maxDelayLevel) {
                // 找出最大的延時等級
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            long delayTimeMillis = tu * num;
            this.delayLevelTable.put(level, delayTimeMillis);
    // 省略部分代碼...
}

上面這個load方法在broker啓動的時候DefaultMessageStore會調用來初始化延時等級。

接下來就應該解決怎麼處理延時消息隊列中的消息的問題了。處理延時消息的服務是:ScheduleMessageService。

仍是broker啓動的時候DefaultMessageStore會調用org.apache.rocketmq.store.schedule.ScheduleMessageService#start來啓動處理延時消息隊列的服務:

public void start() {

    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
        Integer level = entry.getKey();
        Long timeDelay = entry.getValue();
        // 記錄隊列的處理進度
        Long offset = this.offsetTable.get(level);
        if (null == offset) {
            offset = 0L;
        }

        if (timeDelay != null) {
            // 每一個延時隊列啓動一個定時任務來處理該隊列的延時消息
            this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
        }
    }

    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                // 持久化offsetTable(保存了每一個延時隊列對應的處理進度offset)
                ScheduleMessageService.this.persist();
            } catch (Throwable e) {
                log.error("scheduleAtFixedRate flush exception", e);
            }
        }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

DeliverDelayedMessageTimerTask是一個TimerTask,啓動之後不斷處理延時隊列中的消息,直到出現異常則終止該線程從新啓動一個新的TimerTask

public void executeOnTimeup() {
    // 找到該延時等級對應的ConsumeQueue
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));
    // 記錄異常狀況下一次啓動TimerTask開始處理的offset
    long failScheduleOffset = offset;

    if (cq != null) {
        // 找到offset所處的MappedFile中offset後面的buffer
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 下面三個字段信息是ConsumeQueue物理存儲的信息
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // 注意這個tagCode,再也不是普通的tag的hashCode,而是該延時消息到期的時間
                    long tagsCode = bufferCQ.getByteBuffer().getLong();
                    // 省略中間代碼....
                    long now = System.currentTimeMillis();
                    // 計算應該投遞該消息的時間,若是已經超時則當即投遞
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    // 計算下一個消息的開始位置,用來尋找下一個消息位置(若是有的話)
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    // 判斷延時消息是否到期
                    long countdown = deliverTimestamp - now;

                    if (countdown <= 0) {
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                // 將消息恢復到原始消息的格式,恢復topic、queueId、tagCode等,清除屬性"DELAY"
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.defaultMessageStore
                                        .putMessage(msgInner);

                                if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    // 投遞成功,處理下一個
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    log.error(
                                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                        msgExt.getTopic(), msgExt.getMsgId());
                                    // 投遞失敗,結束當前task,從新啓動TimerTask,從下一個消息開始處理,也就是說當前消息丟棄
                                    // 更新offsetTable中當前隊列的offset爲下一個消息的offset
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                // 從新投遞期間出現任何異常,結束當前task,從新啓動TimerTask,從當前消息開始重試
                                /*
                                 * XXX: warn and notify me
                                 */
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for
                // 處理完當前MappedFile中的消息後,從新啓動TimerTask,從下一個消息開始處理
                // 更新offsetTable中當前隊列的offset爲下一個消息的offset
                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {

                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {
            // 若是根據offsetTable中的offset沒有找到對應的消息(可能被刪除了),則按照當前ConsumeQueue的最小offset開始處理
            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)
    
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}

對於上面的tagCode作一下特別說明,延時消息的tagCode和普通消息不同:

  • 延時消息的tagCode:存儲的是消息到期的時間
  • 非延時消息的tagCode:tags字符串的hashCode

對延時消息的tagCode的特別處理是在下面這個方法中完成的,也就是在build ConsumeQueue信息的時候

org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)

總結

以上就是RocketMQ延時消息的實現方式,上面沒有詳說的是重試消息的延時是怎麼實現的,其實就是在consumer將延時消息發送回broker的時候設置了(用戶能夠本身設置,若是沒有本身設置默認是0)delayLevel,到了broker處理重試消息的時候若是delayLevel是0(也就是說是默認的延時等級)的時候會在原來的基礎上加3,後面的處理就和上面說的延時消息同樣了,存儲的時候將消息投遞到延時隊列,等待延時到期後再從新投遞到原始topic隊列中等到consumer消費。

相關文章
相關標籤/搜索