Spring Boot 應用系列 6 -- Spring Boot 2 整合Quartz

Quartz是實現定時任務的利器,Quartz主要有四個組成部分,分別是:java

1. Job(任務):包含具體的任務邏輯;git

2. JobDetail(任務詳情):是對Job的一種詳情描述;github

3. Trigger(觸發器):負責管理觸發JobDetail的機制;spring

4. Scheduler(調度器):負責Job的執行。app

有兩種方式能夠實現Spring Boot與Quartz的整合:ide

1、使用Spring提供的工廠類測試

spring-context.jar和spring-context-support.jar類庫提供了一些org.quartz包的擴展,使用這些擴展並經過注入bean的方式能夠實現與Quartz的整合。ui

1. pom.xmlspa

添加依賴:.net

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
</dependency>

Spring Boot 2.0.4指定的quartz.jar的版本是2.3.0

2. Job

首先定義一個執行器:

import java.util.Date;

import devutility.internal.text.format.DateFormatUtils;

public class Executor {
    public static void execute(String name, long costMillis) {
        Date startDate = new Date();
        Date endDate = new Date(startDate.getTime() + costMillis);

        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(String.format("[%s]: ", DateFormatUtils.format(startDate, "yyyy-MM-dd HH:mm:ss:SSS")));
        stringBuilder.append(String.format("%s executing on %s, ", name, Thread.currentThread().getName()));
        stringBuilder.append(String.format("will finish at %s.", DateFormatUtils.format(endDate, "yyyy-MM-dd HH:mm:ss:SSS")));
        System.out.println(stringBuilder.toString());

        try {
            Thread.sleep(costMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

定義Job:

import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;

import devutility.test.app.quartz.common.Executor;

@Component
@EnableScheduling
public class SpringJobs {
    public void job1() {
        Executor.execute("Job1", 5000);
    }

    public void job2() {
        Executor.execute("Job2", 6000);
    }
}

@EnableScheduling註解是必需要有的。

3. JobDetail

 1 package devutility.test.app.quartz.config;
 2 
 3 import org.quartz.Trigger;
 4 import org.springframework.beans.factory.annotation.Qualifier;
 5 import org.springframework.context.annotation.Bean;
 6 import org.springframework.context.annotation.Configuration;
 7 import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
 8 import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
 9 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
10 
11 import devutility.test.app.quartz.jobs.SpringJobs;
12 
13 @Configuration
14 public class SpringJobConfiguration {
15     @Bean
16     public MethodInvokingJobDetailFactoryBean jobDetailFactory1(SpringJobs springJobs) {
17         MethodInvokingJobDetailFactoryBean jobDetailFactory = new MethodInvokingJobDetailFactoryBean();
18         jobDetailFactory.setName("Spring-Job1");
19         jobDetailFactory.setGroup("Spring-Group");
20 
21         jobDetailFactory.setTargetObject(springJobs);
22         jobDetailFactory.setTargetMethod("job1");
23 
24         jobDetailFactory.setConcurrent(false);
25         return jobDetailFactory;
26     }
27 
28     @Bean
29     public MethodInvokingJobDetailFactoryBean jobDetailFactory2(SpringJobs springJobs) {
30         MethodInvokingJobDetailFactoryBean jobDetailFactory = new MethodInvokingJobDetailFactoryBean();
31         jobDetailFactory.setName("Spring-Job2");
32         jobDetailFactory.setGroup("Spring-Group");
33 
34         jobDetailFactory.setTargetObject(springJobs);
35         jobDetailFactory.setTargetMethod("job2");
36 
37         jobDetailFactory.setConcurrent(true);
38         return jobDetailFactory;
39     }

MethodInvokingJobDetailFactoryBean是來自spring-context-support.jar的一個工廠類,它實現了FactoryBean<JobDetail>接口,完成了對具體job的封裝。

21行指定具體的任務是咱們在2中定義的SpringJobs,22行指定咱們使用SpringJobs中的job1方法做爲定時任務的具體業務實現。

注意24和37行,若是一個任務每隔5秒執行一次,可是每次須要執行10秒,那麼它有兩種執行方式,串行(執行完上一個再開啓下一個)和並行(間隔時間到了就開始並行執行下一個),24和37就分別設置成了並行和串行執行。

4. Trigger

Quartz支持多種trigger,其中配置最爲靈活的一種當屬CronTrigger,它屬於表達式類型的配置方式,有關cron表達式的配置請參考

 1 package devutility.test.app.quartz.config;
 2 
 3 import org.quartz.Trigger;
 4 import org.springframework.beans.factory.annotation.Qualifier;
 5 import org.springframework.context.annotation.Bean;
 6 import org.springframework.context.annotation.Configuration;
 7 import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
 8 import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
 9 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
10 
11 import devutility.test.app.quartz.jobs.SpringJobs;
12 
13 @Configuration
14 public class SpringJobConfiguration {
15     @Bean
16     public CronTriggerFactoryBean cronTriggerFactory1(@Qualifier("jobDetailFactory1") MethodInvokingJobDetailFactoryBean jobDetailFactory1) {
17         CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
18         cronTriggerFactoryBean.setName("cronTriggerFactoryForJobDetailFactory1");
19         cronTriggerFactoryBean.setJobDetail(jobDetailFactory1.getObject());
20         cronTriggerFactoryBean.setCronExpression("0/3 * * * * ?");
21         return cronTriggerFactoryBean;
22     }
23 
24     @Bean
25     public CronTriggerFactoryBean cronTriggerFactory2(@Qualifier("jobDetailFactory2") MethodInvokingJobDetailFactoryBean jobDetailFactory2) {
26         CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
27         cronTriggerFactoryBean.setName("cronTriggerFactoryForJobDetailFactory2");
28         cronTriggerFactoryBean.setJobDetail(jobDetailFactory2.getObject());
29         cronTriggerFactoryBean.setCronExpression("0/4 * * * * ?");
30         return cronTriggerFactoryBean;
31     }

本例使用的Cron表達式很是簡單,分別是每隔3秒和每隔4秒執行一次。

5. Scheduler

1     @Bean
2     public SchedulerFactoryBean schedulerFactory1(Trigger cronTriggerFactory1, Trigger cronTriggerFactory2) {
3         SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
4         schedulerFactoryBean.setStartupDelay(2);
5         schedulerFactoryBean.setTriggers(cronTriggerFactory1, cronTriggerFactory2);
6         return schedulerFactoryBean;
7     }

行4指明系統啓動以後須要延遲2秒執行;

行5用於註冊須要執行的觸發器;

SchedulerFactoryBean還有一個叫autoStartup的屬性,用於指明任務在系統啓動以後是否當即執行,默認是true。

6. 測試

因而可知,Job1是按照咱們的預期串行執行的。

Job2則是並行執行的。

2、使用org.quartz原生類和方法

1. pom.xml,只須要添加quartz

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
</dependency>

2. 定義一個Scheduler類型的Bean

package devutility.test.app.quartz.config;

import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QuartzConfiguration {
    @Bean
    public Scheduler scheduler() throws SchedulerException {
        return StdSchedulerFactory.getDefaultScheduler();
    }
}

3. Job

package devutility.test.app.quartz.jobs;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

import devutility.test.app.quartz.common.Executor;

@DisallowConcurrentExecution
public class ScheduleJob1 implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        Executor.execute("ScheduleJob1", 5000);

        JobDetail jobDetail = context.getJobDetail();
        System.out.println(String.format("key: %s", jobDetail.getKey()));
    }
}

這種方式每個Job都須要定義一個類實現org.quartz.Job接口,形式上要比第一種方式更條理。

3. 咱們定義一個建立任意Job的公共方法,來實現Job類的定時執行:

 1 package devutility.test.app.quartz.services;
 2 
 3 import org.quartz.CronScheduleBuilder;
 4 import org.quartz.CronTrigger;
 5 import org.quartz.Job;
 6 import org.quartz.JobBuilder;
 7 import org.quartz.JobDetail;
 8 import org.quartz.JobKey;
 9 import org.quartz.Scheduler;
10 import org.quartz.SchedulerException;
11 import org.quartz.TriggerBuilder;
12 import org.quartz.TriggerKey;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.stereotype.Service;
15 
16 import devutility.internal.models.OperationResult;
17 
18 @Service
19 public class JobServiceImpl implements JobService {
20     @Autowired
21     private Scheduler schedulerFactory1;
22 
23     @Autowired
24     private Scheduler scheduler;
25 
26     @Override
27     public void start(String name, String group, String cronExpression, Class<? extends Job> clazz) throws SchedulerException {
28         JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(name, group).build();
29 
30         String triggerName = String.format("trigger_%s", name);
31         CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
32         CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerName, group).withSchedule(scheduleBuilder).build();
33         scheduler.scheduleJob(jobDetail, trigger);
34 
35         if (!scheduler.isStarted()) {
36             scheduler.start();
37         }
38     }

28行建立一個新的JobDetail,並將Job實現類的Class對象賦值給它;

32行建立了一個新的Trigger;

33-37行使用注入進來的scheduler來運行一個定時任務。

3、對Job的CRUD操做

建立和運行Job上面已經講過了,下面說一下Job的暫停、中斷、刪除和更新操做。

1. 暫停

scheduler有一個方法public void pauseJob(JobKey jobKey),該方法經過暫停trigger的觸發來實現暫停job的功能,調用該方法以後正在運行的job會持續運行到業務邏輯處理完畢,等下一個觸發條件知足時再也不開啓新的job。

    public OperationResult pause(String group, String name) {
        OperationResult result = new OperationResult();
        JobKey jobKey = JobKey.jobKey(name, group);

        try {
            scheduler.pauseJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
            result.setErrorMessage(String.format("Pause Job with name = \"%s\" group = \"%s\" failed, system error!", name, group));
        }

        return result;
    }

2. 中斷

針對須要中斷的Job,quartz專門爲其定義了一個接口org.quartz.InterruptableJob:

public interface InterruptableJob extends Job {

    /*
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     * 
     * Interface.
     * 
     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     */

    /**
     * <p>
     * Called by the <code>{@link Scheduler}</code> when a user
     * interrupts the <code>Job</code>.
     * </p>
     * 
     * @throws UnableToInterruptJobException
     *           if there is an exception while interrupting the job.
     */
    void interrupt()
        throws UnableToInterruptJobException;
}

全部須要支持中斷的Job都須要實現這個接口:

package devutility.test.app.quartz.jobs;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.UnableToInterruptJobException;

import devutility.test.app.quartz.common.Executor;

@DisallowConcurrentExecution
public class ScheduleJob2 implements InterruptableJob {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        Executor.execute(context, 5000);
    }

    @Override
    public void interrupt() throws UnableToInterruptJobException {
        System.out.println("ScheduleJob2 is interrupting now。。。");
    }
}

而後經過調用Scheduler實例的boolean interrupt(JobKey jobKey)方法,查看StdScheduler的源碼,咱們發現Scheduler的interrupt方法僅僅是調用了InterruptableJob中的interrupt()方法實現,而後設置了一下本身的interrupted屬性就完了,並未作任何其餘操做。

因此,若是要實現能夠中斷的Job,咱們須要在InterruptableJob實現類中增長中斷的邏輯:

package devutility.test.app.quartz.jobs;

import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.UnableToInterruptJobException;

import devutility.test.app.quartz.common.Executor;

public class ScheduleJob3 implements InterruptableJob {
    private boolean interrupted = false;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        Executor.execute("ScheduleJob3 sub-task 1", 1000);

        if (interrupted) {
            return;
        }

        Executor.execute("ScheduleJob3 sub-task 2", 1000);

        if (interrupted) {
            return;
        }

        Executor.execute("ScheduleJob3 sub-task 3", 1000);
    }

    @Override
    public void interrupt() throws UnableToInterruptJobException {
        interrupted = true;
    }
}

3. 刪除

    public OperationResult delete(String jobName, String jobGroup) {
        OperationResult result = new OperationResult();
        result.append(String.format("Removing Quartz job: %s", jobName));

        try {
            if (!schedulerFactory1.deleteJob(JobKey.jobKey(jobName, jobGroup))) {
                result.setErrorMessage(String.format("Removing job %s failed!", jobName));
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
            result.setErrorMessage(String.format("Removing job %s failed with error!", jobName));
        }

        return result;
    }

4. 更新

    public OperationResult update(CronTrigger cronTrigger, String cronExpression) {
        OperationResult result = new OperationResult();
        TriggerKey triggerKey = cronTrigger.getKey();
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
        CronTrigger newTrigger = cronTrigger.getTriggerBuilder().withSchedule(scheduleBuilder).build();

        try {
            schedulerFactory1.rescheduleJob(triggerKey, newTrigger);
            result.append(String.format("Update cron trigger %s succeeded!", triggerKey.getName()));
        } catch (SchedulerException e) {
            e.printStackTrace();
            result.setErrorMessage(String.format("Update cron trigger %s failed!", triggerKey.getName()));
        }

        return result;
    }

Demo代碼

相關文章
相關標籤/搜索