DelayQueue系列(三):持久化方案

原文發表於簡書DelayQueue之持久化方案,本次更新主要是對processTask方法作了優化,以及優化了補償執行的額外延遲時間設置,可對比閱讀。redis

上一篇文章中提到了咱們在項目中運用DelayQueue解決了一些須要延遲執行的任務,可是最近咱們在生產環境上遇到了一個問題。重啓服務器後,那些未執行的延遲任務就消失不見了。因而如何將延遲任務持久化就提上了日程。算法

關於DelayQueue的具體實現方案,已經在上一篇文章DelayQueue系列(二):基礎組件中提到過了。本文就再也不復述了。數據庫

本期的主題主要是探討如何將延遲任務進行持久化。json

何爲延遲任務的持久化?顧名思義,就是將這些延遲任務的執行必要的數據,存儲到數據庫或redis等。bash

那麼爲什麼要進行持久化呢?很簡單,由於延遲任務的數據是放在內存裏,那麼本身須要咱們本身寫持久化的備案以達到高可用,不然服務器故障宕機或新發布版本而致使服務器重啓的時候,那麼那些未執行的延遲任務數據將完全丟失,這顯然是咱們不肯意見到的。服務器

我目前採用的方案以下:
一、在須要使用到DelayQueue的地方,調用saveDelayTask方法,須要的參數有延遲任務函數策略工廠類的路由tag,執行方法所需的json格式的參數messageBody,延遲多久執行以秒爲單位的delayTime。
二、任務調度每5秒去執行getNotCompletedMessageList方法。併發

大多數狀況下,會在預計執行的時間點準時去執行processTask方法,那麼異常情況下,若是服務器重啓,那麼定時任務調度會在必定時間後找到那些沒有如期執行的延遲任務,經過定時任務調度的方式依次執行各自任務的processTask方法。app

異常狀態下,延遲任務執行會比預期執行時間有必定的延後,我設計的方案是目前咱們能夠容許的範圍,這個你們能夠酌情設置備選方案延後的時間。函數

期間會用到yb_delay_task_message這張表,以下是該表的表結構:post

CREATE TABLE `yb_delay_task_message` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `tag` varchar(128) NOT NULL DEFAULT '' COMMENT '延遲隊列執行函數的key',
  `message_body` longtext CHARACTER SET utf8mb4 COMMENT '消息體,以json格式存儲',
  `status` tinyint(3) unsigned DEFAULT '0' COMMENT '狀態;0:未完成,1:已完成,2:已失敗 3:執行中',
  `error_stack` longtext COMMENT '失敗堆棧',
  `version` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '版本號',
  `ip_address` bigint(20) DEFAULT NULL COMMENT '執行ip地址',
  `delay_time` bigint(20) NOT NULL COMMENT '延遲執行的時間長度',
  `expected_time` datetime DEFAULT NULL COMMENT '預計執行時間',
  `execution_time` datetime DEFAULT NULL COMMENT '實際執行時間',
  `create_time` datetime NOT NULL COMMENT '建立時間',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
  PRIMARY KEY (`id`),
  KEY `idx_expectedTime_status` (`expected_time`,`status`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 COMMENT='延遲隊列消息表';
複製代碼

核心代碼大體以下,其餘代碼很簡單就不一一公佈了。

public void saveDelayTask(String tag, String messageBody, Long delayTime) {
    DelayTaskMessage delayTaskMessage = new DelayTaskMessage();
    delayTaskMessage.setTag(tag);
    LocalDateTime now = LocalDateTime.now();
    delayTaskMessage.setCreateTime(now);
    delayTaskMessage.setUpdateTime(now);
    delayTaskMessage.setDelayTime(delayTime);
    delayTaskMessage.setExpectedTime(now.plusSeconds(delayTime));
    delayTaskMessage.setMessageBody(messageBody);
    delayTaskMessage.setStatus(KafkaMessageStatusEnum.NOT_COMPLETE.getCode());
    int res = delayTaskMessageMapper.insertDelayTaskMessage(delayTaskMessage);
    if (res <= 0) {
        log.error("ybBrokerApp|insertDelayTaskMessage error, res<=0");
        throw new RuntimeException("insertDelayTaskMessage error, res<=0");
    }
    TaskMessage taskMessage = new TaskMessage(delayTime * 1000, messageBody,
            function -> this.processTask(delayTaskMessage));
    DelayQueue<TaskMessage> queue = taskManager.getQueue();
    queue.offer(taskMessage);
}
複製代碼

首先來分析一下,用來保存延遲任務的saveDelayTask方法。

tag是指延遲任務的標記,用於指定對應的策略類。
messageBody主要用於存儲執行延遲任務的一些必要的數據,以json方法存儲。
delayTime是延遲時間,默認以s爲單位,主要是便於使用。

這個方法的主要功能是首先保存還未執行的延遲任務,自動根據延遲時間計算該延遲任務的預期執行時間,以便於後續的補償算法跟蹤,而後運用DelayQueue的特性,將這個延遲任務提交給延遲隊列執行。

public int processTask(DelayTaskMessage param) {
    DelayTaskMessage delayTaskMessage = delayTaskMessageMapper.getDelayTaskMessageById(param.getId());
    try {
        if (null != delayTaskMessage) {
            if (!Objects.equals(delayTaskMessage.getStatus(), DelayTaskMessageStatusEnum.NOT_COMPLETE.getCode())) {
                log.info("processTask executed already");
                return 1;
            }
            try {
                delayTaskMessage.setIpAddress(InetAddress.getLocalHost().getHostAddress());
            } catch (UnknownHostException ex) {
                log.error("Address.getLocalHost error", ex);
            }
            int res = delayTaskMessageMapper.delayTaskStartProcess(delayTaskMessage);
            if (res <= 0) {
                log.info("delayTaskStartProcess error,maybe processTask executed already");
                return 1;
            }
            //處理邏輯
            DelayTaskExecuteProcessor delayTaskExecuteProcessor = delayTaskExecuteProcessorFactory.getExecuteProcessor(delayTaskMessage.getTag());
            if (delayTaskExecuteProcessor != null) {
                delayTaskExecuteProcessor.execute(delayTaskMessage);
            } else {
                throw new RuntimeException("no such processor,tag=" + delayTaskMessage.getTag());
            }
            delayTaskMessage.setExecutionTime(LocalDateTime.now());
            res = delayTaskMessageMapper.delayTaskProcessSuccess(delayTaskMessage);
            if (res <= 0) {
                log.error("delayTaskProcessSuccess error");
                return 1;
            }
            return 1;
        } else {
            log.error("ybBrokerApp processTask error, delayTaskMessage is null delayTaskMessageId=", param.getId());
            return 0;
        }
    } catch (Exception e) {
        log.error("ybBrokerApp processTask error , param = " + param.toString() + "|", e);
        if (null != delayTaskMessage) {
            delayTaskMessage.setErrorStack(e.getMessage());
            delayTaskMessageMapper.delayTaskProcessFail(delayTaskMessage);
        }
        return 0;
    }
}
複製代碼

而後是核心的處理延遲任務的processTask方法。

一、根據id,在數據庫尋找到對應須要執行的延遲任務的持久化數據。
二、若是這條持久化數據非空且狀態不是未執行的狀態,那麼提示該任務無需再被執行,防止重複執行。這裏的status,一共有四種狀態,未執行,執行中,執行成功和執行失敗。
三、若是這條持久化數據非空,且是未執行的狀態,那麼將這條數據的執行狀態改成執行中,同時將執行方法的ip地址記錄下來,便於後續分析,而後經過version來控制併發問題。只有當version和數據庫中的version一致,且數據庫中的status爲未執行,才容許將狀態改成執行中。
四、若是上一步執行成功,則找到tag對應的策略類執行對應的execute方法。
五、接下來,將這條持久化數據的狀態改成已執行成功的狀態,這裏就不須要受version和status的限制了,直接改成執行成功便可。
六、若是執行過程當中出現其餘異常,那麼就將數據的狀態改成執行失敗。

public List<DelayTaskMessage> getNotCompletedMessageList(int total, int index) {
    LocalDateTime expectedTime = LocalDateTime.now().plusSeconds(1L);
    List<DelayTaskMessage> delayTaskMessageList = delayTaskMessageMapper.getNotCompletedMessageList(expectedTime,total, index);
    if (CollectionUtils.isEmpty(delayTaskMessageList)) {
        return Lists.newArrayList();
    }
    return delayTaskMessageList;
}
複製代碼

最後是補償方案的落實,我是在定時任務中去保證延遲任務必定會被執行至少一次的。至於會不會被重複執行,我是經過在processTask這個方法中去控制的。

個人設計是每隔5s去遍歷一下那些過了預期執行時間+1s依然未執行的的延遲任務。而後將這些列表中的延遲任務從新調用processTask方法。

若是最終是經過補償方案執行的延遲任務會比預期執行時間還要晚執行1到6s。目前在咱們的項目中,這個額外延遲是能夠被接收的。你們仍是要根據實際狀況酌情修改這個額外延遲的時間。

我在yb_delay_task_message表中記錄了expectedTime(預計執行時間)和executionTime(實際執行時間),因此具體的執行性能能夠經過這兩個字段去對比。

以上是我針對DelayQueue設計的的持久化方案,若是你們有更好的意見,能夠一塊兒討論哦。

相關文章
相關標籤/搜索