spring boot / cloud (十五) 分佈式調度中心進階

spring boot / cloud (十五) 分佈式調度中心進階

<spring boot / cloud (十) 使用quartz搭建調度中心>這篇文章中介紹瞭如何在spring boot項目中集成quartz.java

今天這篇文章則會進一步跟你們討論一下設計和搭建分佈式調度中心所須要關注的事情.mysql

下面先看一下,整體的邏輯架構圖:git

分佈式調度-邏輯架構示意

分佈式調度-邏輯架構示意

架構設計

整體思路是,將調度執行兩個概念分離開來,造成調度中心執行節點兩個模塊:web

調度中心

是一個公共的平臺,負責全部任務的調度,以及任務的管理,不涉及任何業務邏輯,從上圖能夠看到,它主要包括以下模塊:spring

  • 核心調度器quartz : 調度中心的核心,按照jobDetail和trigger的設定發起做業調度,而且提供底層的管理apisql

  • 管理功能 : 可經過restful和web頁面的方式動態的管理做業,觸發器的CURD操做,而且實時生效,並且還能夠記錄調度日誌,以及能夠以圖表,表格,等各類可視化的方式展示調度中心的各個維度的指標信息數據庫

  • RmsJob和RmsJobDisallowConcurrent : 基於http遠程調用(RMS)的做業和禁止併發執行的做業api

  • Callback : 用於接收"執行節點"異步執行完成後的信息restful

執行節點

是嵌入在各個微服務中的一個執行模塊,負責接收調度中心的調度,專一於執行業務邏輯,無需關係調度細節,而且理論上來講,它主要包括以下模塊:架構

  • 同步執行器 : 同步執行而且返回調度中心觸發的任務

  • 異步執行器 : 異步執行調度中心觸發的任務,而且經過callback將執行結果反饋給調度中心

  • 做業鏈 : 可任意組合不一樣任務的執行順序和依賴關係,知足更復雜的業務需求

  • 業務bean : 業務邏輯的載體

架構優勢

這樣一來,調度中心只負責調度,執行節點只負責業務,相互經過http協議進行溝通,兩部分能夠徹底解耦合,加強系統總體的擴展性

而且引入了異步執行器的概念,這同樣一來,調度中心就能以非阻塞的形式觸發執行器,能夠不受任務業務邏輯帶來的性能影響,進一步提升了系統的性能

而後理論上來講執行節點是不侷限於任何的語言或者平臺的,而且與調度中心採用的是通用的http協議,真正的能夠作到跨平臺

特色

集羣,高可用,故障轉移

總體的解決方案是創建在spring cloud基礎上的,依賴於服務發現eureka,可以使全部的服務去中心化,來實現集羣和高可用

調度中心的核心依賴於quartz,而quartz是原生支持集羣的,它經過將做業和觸發器的細節持久化到數據庫中,而後在經過db鎖的方式,與集羣中的各個節點通信,從而實現了去中心化

執行節點調度中心都是註冊在eureka上的,經過ribbon的客戶端負載均衡的特性,自動屏蔽壞掉的節點,自動發現新增長的節點,可以使雙方的http通訊都作到高可用.

以下是quartz集羣配置的片斷:

#Configure scheduler
org.quartz.scheduler.instanceName=clusterQuartzScheduler #實例名稱
org.quartz.scheduler.instanceId=AUTO #自動設定實例ID
org.quartz.scheduler.skipUpdateCheck=true

#Configure JobStore and Cluster
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #使用jdbc持久化到數據中
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #sql代理,mysql
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.tablePrefix=QRTZ_ #表前綴
org.quartz.jobStore.isClustered=true #開啓集羣模式
org.quartz.jobStore.clusterCheckinInterval=20000
org.quartz.jobStore.misfireThreshold=60000

線程池調優

quartz的默認配置,可根據實際狀況進行調整.

#Configure ThreadPool
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool #線程池類型
org.quartz.threadPool.threadCount=5 #線程池數量
org.quartz.threadPool.threadPriority=5 #優先級

這裏就體現出了分離調度的業務邏輯的好處,在傳統的作法中,調度器承載着業務邏輯,必然會佔用執行線程更長時間,併發能力受業務邏輯限制.

將業務邏輯分離出去後,而且採用異步任務的方式,調度器觸發某個任務後,將當即返回,這時佔用執行線程的時間會大幅縮短.

因此在相同的線程池數量下,採用這種架構,是能夠大幅度的提升調度中心的併發能力的.

集中化配置管理

一樣,整個解決方案也依賴於spring cloud config server.

咱們在系統中抽象出了一系列的元數據用於作系統配置,這些元數據在org.itkk.udf.scheduler.meta包下,你們能夠查看,這些元數據基本囊括了全部做業和觸發器的屬性,經過@ConfigurationProperties特性,可輕鬆的將這些元數據類轉化爲配置文件.

而且設計上簡化了後續管理api的複雜度,咱們某個做業或者某個觸發器的一套屬性概括到一個CODE中,而後後續經過這個CODE就能操做所對應的做業或者觸發器.

配置片斷以下:

#jobGroup
org.itkk.scheduler.properties.jobGroup.general=通用
#triggerGroup
org.itkk.scheduler.properties.triggerGroup.general=通用
#rmsJob
org.itkk.scheduler.properties.jobDetail.rmsJob.name=generalJob
org.itkk.scheduler.properties.jobDetail.rmsJob.group=general
org.itkk.scheduler.properties.jobDetail.rmsJob.className=org.itkk.udf.scheduler.job.RmsJob
org.itkk.scheduler.properties.jobDetail.rmsJob.description=通用做業
org.itkk.scheduler.properties.jobDetail.rmsJob.recovery=false
org.itkk.scheduler.properties.jobDetail.rmsJob.durability=true
org.itkk.scheduler.properties.jobDetail.rmsJob.autoInit=true
#rmsJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.name=generalJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.group=general
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.className=org.itkk.udf.scheduler.job.RmsJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.description=通用做業(禁止併發)
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.recovery=false
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.durability=true
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.autoInit=true
#simpleTrigger
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.jobCode=rmsJob
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.name=testSimpleTrigger
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.group=general
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.intervalInMilliseconds=10000
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.autoInit=true
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.description=測試簡單觸發器
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.serviceCode=SCH_CLIENT_UDF_SERVICE_A_DEMO
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.beanName=testBean
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.async=true
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param1=a
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param2=b
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param3=123

以上能夠看,咱們能夠經過properties配置文件設定做業和觸發器的任何屬性,而且經過如:simpleTrigger這個code,就能隨意的經過管理api進行curd操做.

基於rms的JobDetail

從上面的配置能夠看到,解決方案中內置了兩個默認的jobDetail,一個是rmsJob另外一個是rmsJobDisallowConcurrent.

想要使用它們很簡單,爲它們配置一個觸發器便可,rmsjob經過如下屬性來肯定本身將要調用那個任務:

#配置simple或者corn觸發器的dataMap屬性,而且添加以下值:

#指定要調用那個rms,這裏設定的是rmscode,不太清楚的話能夠回看第八篇文章
省略.serviceCode=SCH_CLIENT_UDF_SERVICE_A_DEMO 
#指定要調用哪個bean
省略.beanName=testBean 
#是否採用異步方式
省略.async=true 
#業務參數
省略.param1=a 
省略.param2=b
省略.param3=123

以下方式能夠在執行節點中定義一個執行器

@Component("testBean")
public class TestSch extends AbstractExecutor {
    @Override
    public void handle(String id, Map<String, Object> jobDataMap) {
        try {
            LOGGER.info("任務執行了------id:{}, jobDataMap:{}", id, jobDataMap);
        } catch (JsonProcessingException e) {
            throw new SchException(e);
        }
    }
}

這樣就能爲某一個執行器設定觸發器,從而作到調度的功能.

而rmsJob是能夠併發的觸發執行器的.

禁止併發的基於rms的JobDetail

在這個解決方案中禁止併發有兩個層次

第一個層次就是默認實現的rmsJobDisallowConcurrent,你們看源碼就知道,這個類上標註了@DisallowConcurrentExecution,這個註解的含義是禁止做業併發執行.

在傳統的作法中jobdetail中包含了業務邏輯,沒有異步的遠程操做,因此說在類上標註這個註解能作到禁止併發.

可是如今有了異步任務的概念,觸發器觸發執行器後當即就返回結束了,若是這個時候,觸發器的觸發間隔小於執行器的執行時間,那麼依然仍是會有任務併發執行的.

這顯然是不但願發生的,既然禁止併發,那麼就必定要徹底的作到禁止併發,以下設定保證了這一點:

protected void disallowConcurrentExecute(RmsJobParam rmsJobParam) throws JobExecutionException {
    if (!this.hasRunning(rmsJobParam)) { //沒有正在運行的任務才能運行
        this.execute(rmsJobParam);
    } else { //跳過執行,而且記錄
        RmsJobResult result = new RmsJobResult();
        result.setId(rmsJobParam.getId());
        result.setStats(RmsJobStats.SKIP.value());
        save(rmsJobParam, result);
    }
}

在禁止併發的異步任務觸發前,會校驗當前這個任務是否正在執行,若是正在執行的話,跳過而且記錄.

異步任務,異步回調

執行節點中的任務便可同步執行也可異步執行,經過配置觸發器的async屬性來控制的,

同步執行 : 的任務適合執行時間短,執行時間穩定,而且有必要當即知道返回結果的任務

異步執行 : 高併發,高性能的執行方式,沒有特別的限制,推薦使用

以下實現片斷:

//SchClientController中
public RestResponse<RmsJobResult> execute(@RequestBody RmsJobParam param) {
    //記錄來接收時間
    Date receiveTime = new Date();
    //定義返回值
    RmsJobResult result = new RmsJobResult();
    result.setClientReceiveTime(receiveTime);
    result.setId(param.getId());
    result.setClientStartExecuteTime(new Date());
    //執行(區分同步跟異步)
    if (param.getAsync()) {
        schClientHandle.asyncHandle(param, result);
        result.setStats(RmsJobStats.EXECUTING.value());
    } else {
        schClientHandle.handle(param);
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.COMPLETE.value());
    }
    //返回
    return new RestResponse<>(result);
}
//SchClientHandle中
//異步執行
@Async
public void asyncHandle(RmsJobParam param, RmsJobResult result) {
    try {
        //執行
        this.handle(param);
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.COMPLETE.value());
        //回調
        this.callback(result);
    } catch (Exception e) {
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.ERROR.value());
        result.setErrorMsg(ExceptionUtils.getStackTrace(e));
        //回調
        this.callback(result);
        //拋出異常
        log.error("asyncHandle error:", e);
        throw new SchException(e);
    }

}
//同步執行
public void handle(RmsJobParam param) {
    //判斷bean是否存在
    if (!applicationContext.containsBean(param.getBeanName())) {
        throw new SchException(param.getBeanName() + " not definition");
    }
    //得到bean
    AbstractExecutor bean = applicationContext.getBean(param.getBeanName(), AbstractExecutor.class);
    //執行
    bean.handle(param);
}
//異步回調(重處理)
@Retryable(maxAttempts = 3, value = Exception.class)
private void callback(RmsJobResult result) {
    log.info("try to callback");
    final String serviceCode = "SCH_CLIENT_CALLBACK_1";
    rms.call(serviceCode, result, null, new ParameterizedTypeReference<RestResponse<String>>() {
    }, null);
}
//回調失敗後的處理
@Recover
public void recover(Exception e) {
    log.error("try to callback failed:", e);
}

任務鏈

在執行器父類中提供以下方法,可在執行節點觸發其餘執行器:

//調用鏈 (容許併發,異步調用)
protected String chain(boolean isConcurrent, String parentId, String serviceCode, 
                String beanName, boolean async, Map<String, String> param)

而在執行器中的使用樣例:

@Component("testBean")
public class TestSch extends AbstractExecutor {
    @Override
    public void handle(String id, Map<String, Object> jobDataMap) {
        try {
            LOGGER.info("任務執行了------id:{}, jobDataMap:{}", id, xssObjectMapper.writeValueAsString(jobDataMap)); //NOSONAR
            if (!jobDataMap.containsKey(TriggerDataMapKey.PARENT_TRIGGER_ID.value())) {
                LOGGER.info("job鏈---->"); //NOSONAR
                Map<String, String> param = new HashMap<>();
                param.put("chain1", "1");
                param.put("chain2", "2");
                this.chain(id, "SCH_CLIENT_UDF_SERVICE_A_DEMO", "testBean", param);
            }
        } catch (JsonProcessingException e) {
            throw new SchException(e);
        }
    }
}

這樣可使得執行器更加靈活,能夠隨意組合

管理api

依賴於quartz的底層管理api,咱們能夠抽象出一系列restFul的api,目前實現的功能以下:

做業管理 : 保存做業 , 保存做業(覆蓋) , 移除做業 , 當即觸發做業

觸發器管理 : 保存簡單觸發器 , 保存簡單觸發器(覆蓋) , 保存CRON觸發器 , 保存CRON觸發器(覆蓋) , 刪除觸發器

計劃任務管理 : 清理數據

misfire設定

quartz原生的設定,表示那些錯過了觸發時間的觸發器,後續處理的規則,多是由於 : 服務不可用 , 線程阻塞,線程池耗盡 , 等..

simple觸發器

MISFIRE_INSTRUCTION_FIRE_NOW

以當前時間爲觸發頻率當即觸發執行

MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT

不觸發當即執行
等待下次觸發頻率週期時刻執行
以總次數-已執行次數做爲剩餘週期次數,從新計算FinalTime
調整後的FinalTime會略大於根據starttime計算的到的FinalTime值

MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT

不觸發當即執行
等待下次觸發頻率週期時刻,執行至FinalTime的剩餘週期次數
保持FinalTime不變,從新計算剩餘週期次數(至關於錯過的當作已執行)

MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT

以當前時間爲觸發頻率當即觸發執行
以總次數-已執行次數做爲剩餘週期次數,從新計算FinalTime
調整後的FinalTime會略大於根據starttime計算的到的FinalTime值

MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT

以當前時間爲觸發頻率當即觸發執行
保持FinalTime不變,從新計算剩餘週期次數(至關於錯過的當作已執行)

MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY

以錯過的第一個頻率時間馬上開始執行

MISFIRE_INSTRUCTION_SMART_POLICY(默認)

智能根據trigger屬性選擇策略:
repeatCount爲0,則策略同MISFIRE_INSTRUCTION_FIRE_NOW
repeatCount爲REPEAT_INDEFINITELY,則策略同MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT
不然策略同MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT

cron觸發器

MISFIRE_INSTRUCTION_DO_NOTHING

是什麼都不作,繼續等下一次預約時間再觸發

MISFIRE_INSTRUCTION_FIRE_ONCE_NOW

是當即觸發一次,觸發後恢復正常的頻率

MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY

以錯過的第一個頻率時間馬上開始執行

MISFIRE_INSTRUCTION_SMART_POLICY(默認)

根據建立CronTrigger時選擇的MISFIRE_INSTRUCTION_XXX更新CronTrigger的狀態。
若是misfire指令設置爲MISFIRE_INSTRUCTION_SMART_POLICY,則將使用如下方案:
指令將解釋爲MISFIRE_INSTRUCTION_FIRE_ONCE_NOW

你們可根據自身狀況進行設定

結束

今天跟你們分享了分佈式調度的設計思路和想法,因爲我的時間問題,這個設計的核心部分雖然已經完成,可是好比web界面,restful api,都尚未完成,後續有空就會把這些東西都弄上去的.

不過整體來講,把核心的思想講出來了,也歡迎你們提出意見和建議

代碼倉庫 (博客配套代碼)


想得到最快更新,請關注公衆號

想得到最快更新,請關注公衆號

相關文章
相關標籤/搜索