Spring Boot集成quartz實現定時任務並支持切換任務數據源

org.quartz實現定時任務並自定義切換任務數據源

在工做中常常會須要使用到定時任務處理各類週期性的任務,org.quartz是處理此類定時任務的一個優秀框架。隨着項目一點點推動,此時咱們並不知足於任務僅僅是定時執行,咱們還想要對任務進行更多的控制,隨時能對任務進行人爲干預,就須要對quartz有更深刻的瞭解。而隨着微服務的流行,項目中多數據源的狀況也愈來愈常見,在定時任務中集成多數據源切換的功能也須要集成進來。java

集成quartz實現定時任務

集成quartz實現定時任務mysql

quartz中實現定時任務須要了解的基本概念

Job

經過實現Job類,在實現方法中寫咱們具體想要定時任務完成的工做,而後交給quartz管理。git

JobDetail

Job只負責實現具體任務,因此還須要藉助JobDetail來存儲一些描述Job的基本信息。github

Quartz JobBuilder

爲構造JobDetail實體提供的builder-style API。你能夠這樣使用它來構建一個JobDetailredis

@Bean
public JobDetail jobDetail() {
 return JobBuilder.newJob().ofType(SampleJob.class)
 .storeDurably()
 .withIdentity("Qrtz_Job_Detail")
 .withDescription("Invoke Sample Job service...")
 .build();
}

Spring JobDetailFactoryBean

Spring中配置JobDetail的方式:spring

@Bean
public JobDetailFactoryBean jobDetail() {
 JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
 jobDetailFactory.setJobClass(SampleJob.class);
 jobDetailFactory.setDescription("Invoke Sample Job service...");
 jobDetailFactory.setDurability(true);
 return jobDetailFactory;
}

Trigger

觸發器,表明一個調度參數的配置,何時去調度:sql

@Bean
public Trigger trigger(JobDetail job) {
 return TriggerBuilder.newTrigger().forJob(job)
 .withIdentity("Qrtz_Trigger")
 .withDescription("Sample trigger")
 .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
 .build();
}

Scheduler

調度器,經過JobTrigger來註冊一個調度器:數據庫

@Bean
public Scheduler scheduler(Trigger trigger, JobDetail job) {
 StdSchedulerFactory factory = new StdSchedulerFactory();
 factory.initialize(new ClassPathResource("quartz.properties").getInputStream());

 Scheduler scheduler = factory.getScheduler();
 scheduler.setJobFactory(springBeanJobFactory());
 scheduler.scheduleJob(job, trigger);

 scheduler.start();
 return scheduler;
}

給系統添加一個Job

quartzJob就是咱們須要去執行的任務,由Scheduler調度器負責調度任務們依靠制定好的Trigger來定時執行任務。app

所以首先咱們須要結合以上基礎給系統添加一個Job。框架

addJob

public void addJob(BaseJob job) throws SchedulerException {
        /** 建立JobDetail實例,綁定Job實現類
        * JobDetail 表示一個具體的可執行的調度程序,job是這個可執行調度程序所要執行的內容
        * 另外JobDetail還包含了這個任務調度的方案和策略**/
        // 指明job的名稱,所在組的名稱,以及綁定job類
        JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass())
                .withIdentity(job.getJobKey())
                .withDescription(job.getDescription())
                .usingJobData(job.getDataMap())
                .build();

        /**
         * Trigger表明一個調度參數的配置,何時去調度
         */
        //定義調度觸發規則, 使用cronTrigger規則
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(job.getJobName(),job.getJobGroup())
                .withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression()))
                .startNow()
                .build();
        //將任務和觸發器註冊到任務調度中去
        scheduler.scheduleJob(jobDetail,trigger);
        //判斷調度器是否啓動
        if(!scheduler.isStarted()){
            scheduler.start();
        }
        log.info(String.format("定時任務:%s.%s-已添加到調度器!", job.getJobGroup(),job.getJobName()));
    }

首先須要定義好咱們的Job,以後經過Job初始化JobDetailTrigger,最後將JobDetailTrigger註冊到調度器中。

BaseJob

Job的結構以下:

public abstract class BaseJob implements Job,Serializable {
    private static final long serialVersionUID = 1L;
    private static final String JOB_MAP_KEY = "self";
    /**
     * 任務名稱
     */
    private String jobName;
    /**
     * 任務分組
     */
    private String jobGroup;
    /**
     * 任務狀態 是否啓動任務
     */
    private String jobStatus;
    /**
     * cron表達式
     */
    private String cronExpression;
    /**
     * 描述
     */
    private String description;
    /**
     * 任務執行時調用哪一個類的方法 包名+類名
     */
    private Class beanClass = this.getClass();
    /**
     * 任務是否有狀態
     */
    private String isConcurrent;

    /**
     * Spring bean
     */
    private String springBean;

    /**
     * 任務調用的方法名
     */
    private String methodName;

     /**
     * 該任務所使用的數據源
     */
    private String dataSource = DataSourceEnum.DB1.getName();

    /**
     * 爲了將執行後的任務持久化到數據庫中
     */
    @JsonIgnore
    private JobDataMap dataMap = new JobDataMap();

    public JobKey getJobKey(){
        return JobKey.jobKey(jobName, jobGroup);// 任務名稱和組構成任務key
    }
    ...
}

能夠看到Job中定義了任務的一些基本信息,重點關注其中的dataSourcedataMap屬性。其中dataSource是任務所使用的數據源,並給了一個默認值;因爲任務在添加後會持久化到數據庫中,以後解析任務就會用到dataMap

SchedulerConfig

在添加Job的時候,JobDetailTrigger都是經過關鍵字new生成的,而調度器Scheduler則須要放在容器中維護。

@Configuration
@Order
public class SchedulerConfig {
    @Autowired
    private MyJobFactory myJobFactory;

    @Value("${spring.profiles.active}")
    private String profile;

    /*
     * 經過SchedulerFactoryBean獲取Scheduler的實例
     */
    @Bean(name = "scheduler")
    public Scheduler scheduler() throws Exception {
        return schedulerFactoryBean().getScheduler();
    }
    
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();

        factory.setOverwriteExistingJobs(true);

        // 延時啓動
        factory.setStartupDelay(20);

        // 加載quartz數據源配置
        factory.setQuartzProperties(quartzProperties());

        // 自定義Job Factory,用於Spring注入
        factory.setJobFactory(myJobFactory);
        /*********全局監聽器配置************/
        JobListener myJobListener = new SchedulerListener();
        factory.setGlobalJobListeners(myJobListener);//直接添加爲全局監聽器
        return factory;
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        if (Util.PRODUCT.equals(profile)) {//正式環境
            System.out.println("正式環境quartz配置");
            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties"));
        } else {
            System.out.println("測試環境quartz配置");
            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        }
        //在quartz.properties中的屬性被讀取並注入後再初始化對象
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    /*
     * quartz初始化監聽器
     */
    @Bean
    public QuartzInitializerListener executorListener() {
        return new QuartzInitializerListener();
    }
}

上述代碼中,將scheduler加入到Spring容器中。scheduler是由SchedulerFactoryBean進行維護的,在SchedulerFactoryBean中對調度器工廠作了一些基本設置並從配置文件中加載了quartz數據源配置(配置文件的讀取會根據運行環境profile來進行自動切換),配置了一個全局監聽器用以監放任務的執行過程。

MyJobFactory

使用Spring提供的JobFactory

@Component
public class MyJobFactory extends AdaptableJobFactory {

    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        // 調用父類的方法
        Object jobInstance = super.createJobInstance(bundle);
        // 進行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

quartz.properties

quartz.properties中是quartz鏈接數據庫的一些配置信息。

# \u56FA\u5B9A\u524D\u7F00org.quartz
# \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206
#
#
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false

# \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

# threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B
# \u5E76\u53D1\u4E2A\u6570
org.quartz.threadPool.threadCount = 5
# \u4F18\u5148\u7EA7
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

org.quartz.jobStore.misfireThreshold = 5000

# \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore

#\u6301\u4E45\u5316
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

#org.quartz.jobStore.useProperties=false

org.quartz.jobStore.tablePrefix = QRTZ_

org.quartz.jobStore.dataSource = qzDS

org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=true
org.quartz.dataSource.qzDS.user=quartz
org.quartz.dataSource.qzDS.password=123456

org.quartz.dataSource.qzDS.maxConnections = 30

org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUAL

org.quartz.dataSource.qzDS.validateOnCheckout = true
org.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40


#org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60

quartz會根據這個配置文件將Job持久化到數據庫中,也所以quartz會須要初始化一些數據庫表,表結構文件在文末。

SchedulerListener

調度器監聽器用以監放任務的執行狀態。

public class SchedulerListener implements JobListener {

    private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class);

    public static final String LISTENER_NAME = "QuartSchedulerListener";

    @Override
    public String getName() {
        return LISTENER_NAME; //must return a name
    }

    //任務被調度前
    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource");
        // 切換任務的數據源
        DataSourceContextHolder.setDB(dataSource);
        String jobName = context.getJobDetail().getKey().toString();
        LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName());
    }

    //任務調度被拒了
    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().toString();
        LOG.error("job {} is jobExecutionVetoed", jobName);
        //能夠作一些日誌記錄緣由

    }

    //任務被調度後
    @Override
    public void jobWasExecuted(JobExecutionContext context,
                               JobExecutionException jobException) {
        // 清空存儲的數據源
        String jobName = context.getJobDetail().getKey().toString();
        DataSourceContextHolder.clearDB();
        LOG.info("Job : {} is finished", jobName);
        if (jobException != null && !jobException.getMessage().equals("")) {
            LOG.error("Exception thrown by: " + jobName
                    + " Exception: " + jobException.getMessage());
        }

    }
}

SchedulerListener監放任務被調度前、調度後和調度被拒絕時的狀態,在任務被調度以前和以後對任務所使用的數據源進行了處理。若是項目中不須要數據源切換的話,這個監聽器是不須要的,到此已經完成了quartz的集成。

多數據源切換

多數據源切換

經過自定義DynamicDataSource來覆蓋Spring Boot中原有的數據源。

DataSourceConfig

經過讀取配置文件中不一樣的數據源,初始化項目中可能用到的數據源用以切換。

/**
 * 多數據源配置類
 */
@Configuration
public class DataSourceConfig {
    //數據源1
    @Bean(name = "datasource1")
    @ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis中對應屬性的前綴
    public DataSource dataSource1() {
        return DataSourceBuilder.create().build();
    }

    //數據源2
    @Bean(name = "datasource2")
    @ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis中對應屬性的前綴
    public DataSource dataSource2() {
        return DataSourceBuilder.create().build();
    }

    /**
     * 動態數據源: 經過AOP在不一樣數據源之間動態切換
     *
     * @return
     */
    @Primary
    @Bean(name = "dynamicDataSource")
    public DataSource dynamicDataSource() {
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        // 默認數據源
        dynamicDataSource.setDefaultTargetDataSource(dataSource1());
        // 配置多數據源
        Map<Object, Object> dsMap = new HashMap();
        dsMap.put(DataSourceEnum.DB1.getName(), dataSource1());
        dsMap.put(DataSourceEnum.DB2.getName(), dataSource2());

        dynamicDataSource.setTargetDataSources(dsMap);
        return dynamicDataSource;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        //設置數據源
        sqlSessionFactoryBean.setDataSource(dataSource);
        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 配置@Transactional註解事物
     *
     * @return
     */
    @Bean
    public PlatformTransactionManager transactionManager() {
        return new DataSourceTransactionManager(dynamicDataSource());
    }
}

數據源配置

spring:
  datasource:
    db1:
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: doctor
      password: 123456
      type: com.zaxxer.hikari.HikariDataSource
      jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true
    db2:
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: quartz
      password: 123456
      type: com.zaxxer.hikari.HikariDataSource
      jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true

DataSourceContextHolder

因爲quartz在執行過程當中是經過不一樣的線程來執行Job的,所以此處經過ThreadLocal來保存線程所使用的數據源狀況。

/**
 * 保存本地數據源
 */
public class DataSourceContextHolder {
    private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class);
    /**
     * 默認數據源
     */
    public static final String DEFAULT_DS = DataSourceEnum.DB1.getName();
    /**
     * ThreadLocal以後會進行講解
     */
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    // 設置數據源名
    public static void setDB(String dbType) {
        LOG.info("切換到{}數據源", dbType);
        contextHolder.set(dbType);
    }

    // 獲取數據源名
    public static String getDB() {
        return (contextHolder.get());
    }

    // 清除數據源名
    public static void clearDB() {
        contextHolder.remove();
    }
}

DynamicDataSource

獲取執行中所使用的數據源。因爲數據源被保存在了DataSourceContextHolder中的ThreadLocal中,因此直接獲取就好了。

/**
 * 獲取本地數據源
 */
public class DynamicDataSource extends AbstractRoutingDataSource {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class);

    @Override
    protected Object determineCurrentLookupKey() {
        LOG.info("數據源爲{}", DataSourceContextHolder.getDB());
        return DataSourceContextHolder.getDB();
    }
}

至此就完成了集成quartz及數據源切換的功能。而後就是具體的任務了。

執行任務

具體的任務須要繼承BaseJob並在execute方法中重寫具體須要執行的任務。

execute

@Slf4j
@Service
public class ReadNumJob extends BaseJob {

    @Autowired
    private RedisService redisService;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class);

    @Override
    public void execute(JobExecutionContext context) {
       doSomething();
    }
}

指定數據源

而後在添加任務時指定任務所使用的數據源

ReadNumJob job = new ReadNumJob();
job.setJobName("test");
job.setJobGroup("hys");
job.setDescription("test");
// 指定數據源
job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName());
job.setCronExpression(
"0 */1 * * * ?"
);
try {
jobAndTriggerService.addJob(job);
} catch (SchedulerException e) {
e.printStackTrace();
}

源碼

轉評贊就是最大的鼓勵

相關文章
相關標籤/搜索