分佈式定時任務框架Quartz

前言

項目中總要寫那麼幾個定時任務來處理一些事情。一些簡單的定時任務使用Spring自帶的定時任務就能完成。可是若是須要大量的定時任務的話要怎麼才能統一管理呢?java

本文介紹Quartz分佈式調度框架。mysql

介紹

Quartz介紹

Quartz是OpenSymphony開源組織在Job scheduling領域又一個開源項目,是徹底由java開發的一個開源的任務日程管理系統。 目前是 Terracotta 旗下的一個項目。官網地址 http://www.quartz-scheduler.org/ 能夠 下載 Quartz 的發佈版本及其源代碼。git

特色

  • 集成方便(徹底使用Java編寫)
  • 無需依賴可集羣部署也可單機運行
  • 能夠經過JVM獨立運行

Job

建立一個任務只須要實現Job接口便可spring

觸發器

  • 能夠經過 Calendar 執行(排除節假日)
  • 指定某個時間無線循環執行 好比每五分鐘執行一次
  • 固定時間執行 例如每週週一上午十點執行

通常狀況使用SimpleTrigger,和CronTrigger,這些觸發器實現了Trigger接口。或者 ScheduleBuilder 子類 SimpleScheduleBuilder和CronScheduleBuilder。sql

對於簡單的時間來講,好比天天執行幾回,使用SimpleTrigger。對於複雜的時間表達式來講,好比每月15日上午幾點幾分,使用CronTrigger以及CromExpression 類。數據庫

注意 :一個job能夠被多個Trigger 綁定,可是一個Trigger只能綁定一個job服務器

存儲

有兩種存儲方式 RAMJobStore和 JDBCJobStore 。併發

RAMJobStore不須要外部數據庫調度信息存儲在JVM內存中 因此,當應用程序中止運行時,全部調度信息將被丟失存儲多少個Job和Trigger也會受到限制。框架

JDBCJobStore 支持集羣全部觸發器和job都存儲在數據庫中不管服務器中止和重啓均可以恢復任務同時支持事務處理。分佈式

實戰

準備

上面簡單的介紹了一下Quartz,而後如今開始實戰,本文使用SpringBoot整合。

項目地址: https://gitee.com/lqlm/toolsL...

首先建立數據庫表由於太多了就不房子文章中了能夠去官方網站下載,也能夠用個人下載地址下載

地址: https://lqcoder.com/quartz.sql

建立完成以後:

Table Name Description
QRTZ_CALENDARS 存儲Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存儲CronTrigger,包括Cron表達式和時區信息
QRTZ_FIRED_TRIGGERS 存儲與已觸發的Trigger相關的狀態信息,以及相聯Job的執行信息
QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的Trigger組的信息
QRTZ_SCHEDULER_STATE 存儲少許的有關Scheduler的狀態信息,和別的Scheduler實例
QRTZ_LOCKS 存儲程序的悲觀鎖的信息
QRTZ_JOB_DETAILS 存儲每個已配置的Job的詳細信息
QRTZ_JOB_LISTENERS 存儲有關已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存儲簡單的Trigger,包括重複次數、間隔、以及已觸的次數
QRTZ_BLOG_TRIGGERS Trigger做爲Blob類型存儲
QRTZ_TRIGGER_LISTENERS 存儲已配置的TriggerListener的信息
QRTZ_TRIGGERS 存儲已配置的Trigger的信息

本文統一使用Cron方式來建立。

注意:cron方式須要用到的4張數據表:
qrtz_triggers,qrtz_cron_triggers,qrtz_fired_triggers,qrtz_job_details

整合項目

建立一個SpringBoot項目而後加入quartz依賴,同時也要加入c3p0的依賴由於quartz使用的數據庫是和項目分開的。

<!--spring boot集成quartz-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.0.2</version>
        </dependency>

同時在resources下建立quartz.properties文件內容

org.quartz.scheduler.instanceName = MyScheduler
org.quartz.threadPool.threadCount = 10
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = myDS
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql:數據庫地址
org.quartz.dataSource.myDS.user = 數據庫帳號
org.quartz.dataSource.myDS.password = 數據庫密碼
org.quartz.dataSource.myDS.maxConnections = 鏈接數

而後建立一個Job類

/**
 * @author snluomeng
 * @date 2019/12/19 16:27
 */
@Slf4j
public class MyJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("==================開始執行任務==================");
    }
}

建立一個工具類而後進行定時任務的增刪改

首先建立一個調度工廠

private static SchedulerFactory schedulerFactory = new StdSchedulerFactory();

添加定時任務

public static void addJob(String jobName, String jobGroupName,
                              String triggerName, String triggerGroupName, Class jobClass, String cron) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            // 任務名,任務組,任務執行類
//            Trigger.TriggerState state = sched.getTriggerState();

            JobDetail jobDetail=  JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
            // 觸發器
            TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
            // 觸發器名,觸發器組
            triggerBuilder.withIdentity(triggerName, triggerGroupName);
            triggerBuilder.startNow();
            // 觸發器時間設定
            triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
            // 建立Trigger對象
            CronTrigger trigger = (CronTrigger) triggerBuilder.build();

            // 調度容器設置JobDetail和Trigger
            sched.scheduleJob(jobDetail, trigger);

            // 啓動
            if (!sched.isShutdown()) {
                sched.start();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

建立流程

經過工廠獲取 Scheduler對象

Scheduler sched = schedulerFactory.getScheduler();

設置Job的實現類和一些靜態信息

//jobClass 設置Job的實現類
  //jobName Job名稱
  //jobGroupName Job組名稱
JobDetail jobDetail=  JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();

構建觸發器

// 觸發器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
 // 觸發器名,觸發器組
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 觸發器時間設定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 建立Trigger對象
CronTrigger trigger = (CronTrigger) triggerBuilder.build();

​ 而後把Job和觸發器都設置到Scheduler對象中

// 調度容器設置JobDetail和Trigger
sched.scheduleJob(jobDetail, trigger);

啓動

// 啓動
sched.start();

運行

由於使用的是SpringBoot項目因此就直接在啓動類加入添加定時任務

參數分別爲:JobName JsobgropName 中間省略 實現類,任務執行時間

QuartUtil.addJob("測試定時任務","test","測試定時任務","testTrigger",MyJob.class,"0/5 * * * * ?");

而後查看輸出日誌:

能夠看到已經在執行了,如今咱們去看一下數據庫中的數據要查看的表有qrtz_triggers,qrtz_cron_triggers,qrtz_fired_triggers,qrtz_job_details

qrtz_job_details:

已經存在

如今把項目中止而後在從新啓動會發生什麼?

發現拋出了異常,由於咱們已經添加過這個定時任務了因此重複添加是行不通的。

這時候咱們直接啓動便可。

一樣封裝啓動方法

public static void startJobs() {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            sched.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

啓動全部定時任務很是簡單直接獲取Scheduler對象而後start便可。

修改定時任務

修改定時任務一樣須要獲取Scheduler對象,和添加流程基本一致,只不過最後不是調用的scheduleJob()而是調用的rescheduleJob()方法.有兩種方式都須要指定定時器名稱

  • 第一種是調用rescheduleJob()直接修改
  • 第二種是先刪除而後在新增
/**
     * @Description: 修改一個任務的觸發時間
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName 觸發器名
     * @param triggerGroupName 觸發器組名
     * @param cron   時間設置,參考quartz說明文檔
     */
    public static void modifyJobTime(String jobName,
                                     String jobGroupName, String triggerName, String triggerGroupName, String cron) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
            CronTrigger trigger = (CronTrigger) sched.getTrigger(triggerKey);
            if (trigger == null) {
                return;
            }

            String oldTime = trigger.getCronExpression();
            if (!oldTime.equalsIgnoreCase(cron)) {
                /** 方式一 :調用 rescheduleJob 開始 */
                // 觸發器
                TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
                // 觸發器名,觸發器組
                triggerBuilder.withIdentity(triggerName, triggerGroupName);
                triggerBuilder.startNow();
                // 觸發器時間設定
                triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
                // 建立Trigger對象
                trigger = (CronTrigger) triggerBuilder.build();
                // 方式一 :修改一個任務的觸發時間
                sched.rescheduleJob(triggerKey, trigger);
                /** 方式一 :調用 rescheduleJob 結束 */

                /** 方式二:先刪除,而後在建立一個新的Job  */
                //JobDetail jobDetail = sched.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
                //Class<? extends Job> jobClass = jobDetail.getJobClass();
                //removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
                //addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, cron);
                /** 方式二 :先刪除,而後在建立一個新的Job */
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

刪除任務

刪除定時任務在修改的時候已經有實例.注意都須要指定任務名稱 任務分組和觸發器名稱觸發器分組

/**
     * @Description: 移除一個任務
     *
     * @param jobName
     * @param jobGroupName
     * @param triggerName
     * @param triggerGroupName
     */
    public static void removeJob(String jobName, String jobGroupName,
                                 String triggerName, String triggerGroupName) {
        try {
            Scheduler sched = schedulerFactory.getScheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
            sched.pauseTrigger(triggerKey);// 中止觸發器
            sched.unscheduleJob(triggerKey);// 移除觸發器
            sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 刪除任務
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

傳遞參數

如今還有一個問題就是我想把參數傳遞到Job實現類裏面咋整?

在添加定時任務時,建立JobDetail的時候有一個setJobData()方法參數爲JobDataMap,看下JobBuilder源碼

能夠看到JobBuilder提供了setJobData方法傳遞的參數爲JobDataMap是Map類型.

在建立定時任務的時候能夠:

JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("testKey","測試傳遞參數");
JobDetail jobDetail=  JobBuilder.newJob(jobClass).setJobData(jobDataMap).withIdentity(jobName, jobGroupName).build();

而後在Job實現類方法中直接取

@Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("==================開始執行任務==================");
        log.info("執行任務線程ID{}",Thread.currentThread().getId());
        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        log.info("參數爲{}",jobDataMap.get("testKey"));
    }

JobDataMap能夠直接看成Map進行操做.

單個參數可使用usingJobData()來添加,參數爲K V 取值方法一致,同時參數也是持久化到數據庫的

若是須要查詢管理的話能夠直接查詢數據庫

原理解析

上面簡單的介紹了一下怎麼使用,那麼你必定對它是怎麼運行的感興趣.接下來就分析一下Quartz究竟是怎麼實現的

注意到上面增刪改都要先經過schedulerFactory工廠(工廠模式)來先獲取Scheduler實例,如今就從第一步開始分析

本文就簡單分析一下Scheduler工廠和添加定時任務這兩步驟.

Scheduler工廠

public Scheduler getScheduler() throws SchedulerException {
        //讀取quartz配置文件,未指定則順序遍歷各個path下的quartz.properties文件
        if (this.cfg == null) {
            //若是爲空就初始化
            this.initialize();
        }
        // 獲取調度器池,採用了單例模式
        // 爲了不併發getInstance是synchronized加鎖的
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
        // 從調度器池中取出當前配置所用的調度器
        Scheduler sched = schedRep.lookup(this.getSchedulerName());
        if (sched != null) {
            if (!sched.isShutdown()) {
                return sched;
            }

            schedRep.remove(this.getSchedulerName());
        }
        // 若是調度器池中沒有當前配置的調度器,則實例化一個調度器,主要動做包括:
        // 1)初始化threadPool(線程池):開發者能夠經過org.quartz.threadPool.class配置指定使用哪一個線程池類,好比SimpleThreadPool。先class load線程池類,接着動態生成線程池實例bean,而後經過反射,使用setXXX()方法將以org.quartz.threadPool開頭的配置內容賦值給bean成員變量;
        // 2)初始化jobStore(任務存儲方式):開發者能夠經過org.quartz.jobStore.class配置指定使用哪一個任務存儲類,好比RAMJobStore。先class load任務存儲類,接着動態生成實例bean,而後經過反射,使用setXXX()方法將以org.quartz.jobStore開頭的配置內容賦值給bean成員變量;
        // 3)初始化dataSource(數據源):開發者能夠經過org.quartz.dataSource配置指定數據源詳情,好比哪一個數據庫、帳號、密碼等。jobStore要指定爲JDBCJobStore,dataSource纔會有效;
        // 4)初始化其餘配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
        // 5)初始化threadExecutor(線程執行器):默認爲DefaultThreadExecutor;
        // 6)建立工做線程:根據配置建立N個工做thread,執行start()啓動thread,並將N個thread順序add進threadPool實例的空閒線程列表availWorkers中;
        // 7)建立調度器線程:建立QuartzSchedulerThread實例,並經過threadExecutor.execute(實例)啓動調度器線程;
        // 8)建立調度器:建立StdScheduler實例,將上面全部配置和引用組合進實例中,並將實例存入調度器池中
        sched = instantiate();
        return sched;
    }

添加定時任務

public Date scheduleJob(JobDetail jobDetail,
                            Trigger trigger) throws SchedulerException {
        // 檢查調度器是否開啓
        validateState();
        //參數校驗省略
        if (jobDetail == null) {
            throw new SchedulerException("JobDetail cannot be null");
        }.....
        OperableTrigger trig = (OperableTrigger)trigger;
        //校驗觸發器參數
        if (trigger.getJobKey() == null) {
            trig.setJobKey(jobDetail.getKey());
        } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
            throw new SchedulerException(
                    "Trigger does not reference given job!");
        }
        trig.validate();
        Calendar cal = null;
        if (trigger.getCalendarName() != null) {
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        }
        //獲取時間
        Date ft = trig.computeFirstFireTime(cal);
        // 把job和trigger註冊進調度器的jobStore
        resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
        // 通知job監聽者
        notifySchedulerListenersJobAdded(jobDetail);
        // 通知調度器線程
        notifySchedulerThread(trigger.getNextFireTime().getTime());
        // 通知trigger監聽者
        notifySchedulerListenersSchduled(trigger);
        return ft;
    }
    public void validateState() throws SchedulerException {
        //若是關閉則拋出異常
        if (isShutdown()) {
            throw new SchedulerException("The Scheduler has been shutdown.");
        }
        // other conditions to check (?)
    }
相關文章
相關標籤/搜索