用Quartz實現工做流

Quartz簡介

做爲一個優秀的開源調度框架,Quartz 具備如下特色:
強大的調度功能,支持當即調度、定時調度、週期調度、併發調度;
靈活的應用方式,支持job間經過listener實現依賴調度,能夠方便的進行調度組合,支持調度數據的多種存儲方式;
分佈式和集羣能力;
做爲 Spring 默認的調度框架,Quartz 很容易與 Spring 集成實現靈活可配置的調度功能。算法

需求

對於工做流須要知足如下需求:
支持任務按照順序進行調度(這是工做流的基本需求)
存在多任務併發調度的狀況
存在某一個任務等待多個上游任務都結束才啓動調度的狀況
任務失敗後,依賴該任務的下游結點要中止運行數據庫

Quartz的主要組件

Quartz的主要組件以下圖所示,任務調度三個主要的類是 Scheduler、Trigger、Job。
Scheduler 是執行調度的控制器。
Trigger 是用於定義調度時間的元素,咱們項目沒有定時調度的需求,全部調度都選用理解觸發就能夠了。
Job 表示被調度的任務,Job和Trigger成對傳遞給Scheduler,當Trigger的條件知足時,它對應的Job就會被Scheduler觸發。
clipboard.png
Trigger/Job的組合不能實現順序調度,實現順序調度須要用到JobListener,JobListener對指定Job進行監聽,如上圖所示,JobLisener能夠捕捉到三個任務觸發點.
咱們須要的是在Job已執行完成這個觸發點,把下一個Job啓動起來。
也有TriggerLisener/SchedulerLisener,觸發點和Trigger、Scheduler相關,和咱們的需求關係不大,暫忽略。緩存

實現

爲每一個算法任務建立一個Job,任務失敗不能啓動後續任務,因此在job運行失敗的狀況下,須要把啓動Job的JobLisener刪除掉。併發

public class HelloJob implements Job {
    private String JobName;
     
    public HelloJob(String name) {
        JobName = name;
    }
 
    public void execute(JobExecutionContext context)  throws JobExecutionException {
 
        /* 獲取傳遞參數 */
        JobDataMap jobDataMap = context.getMergedJobDataMap();
 
       /* 從jobDataMap中獲取下游JobLisener名稱  */
 
       /* 執行spark mlib 做業 */
 
       if (/* 執行失敗 */){
           /* 刪除依賴本任務的JobLisener */
           context.getScheduler().getListenerManager().removeJobListener("next_job_lisener");
       }  
  
       /* 當前任務結果寫入數據庫 */
    }
}

基於全部的依賴關係,建立JobLisener,並將JobLisener與它依賴的Job綁定,在JobLisener中將下一步的Job啓動起來。框架

public class HelloJobListener implements JobListener {
    private String lisenerName;
    private JobDetail nextJob;
    HelloJobListener(String name, JobDetail job){
        lisenerName = name;
        nextJob = job;
    }
 
    public String getName() {
        return lisenerName;
    }
 
    public void jobWasExecuted(JobExecutionContext inContext,JobExecutionException inException) {
 
        /* 建立Trigger */
        Trigger trigger =  newTrigger()
                .withIdentity(lisenerName)
                .startNow()
                .build();
 
        inContext.getScheduler().scheduleJob(nextJob, trigger);
 
        try {
            /* 拉起下一個Job */
            inContext.getScheduler().scheduleJob(nextJob, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
}

當前任務依賴多個上游Job時,試驗了AndMatcher,這個方法是對多個條件進行判斷的接口,不能進行多上游依賴判斷。
須要本身在JobLisener中實現多個依賴是否完成的檢查。JobLisener須要知道其它依賴的完成狀況,而且在本身完成後更新本身的狀態。
全部Job、JobLisener的關係配置好之後,調用scheduler.start()就能夠啓動整個調度。
後續主線程的任務就是檢查工做流是否已經完成。每一個任務結點在任務完成後,會將當前任務結點的的運行結果寫入數據庫或緩存。
主線程依據上下游依賴關係去數據庫中定時檢查數據的結果,當全部分支都運行完成或運行失敗後,得出算法的整體結果。
爲提升更新效率,上一輪檢查事後,已經完成的任務記錄已查閱標記,下一輪檢查從未查閱結點開始檢查。分佈式

相關文章
相關標籤/搜索