在工做中常常會須要使用到定時任務處理各類週期性的任務,org.quartz是處理此類定時任務的一個優秀框架。隨着項目一點點推動,此時咱們並不知足於任務僅僅是定時執行,咱們還想要對任務進行更多的控制,隨時能對任務進行人爲干預,就須要對quartz有更深刻的瞭解。而隨着微服務的流行,項目中多數據源的狀況也愈來愈常見,在定時任務中集成多數據源切換的功能也須要集成進來。java
集成quartz實現定時任務mysql
經過實現Job
類,在實現方法中寫咱們具體想要定時任務完成的工做,而後交給quartz
管理。git
Job
只負責實現具體任務,因此還須要藉助JobDetail
來存儲一些描述Job
的基本信息。github
爲構造JobDetail
實體提供的builder-style API
。你能夠這樣使用它來構建一個JobDetail
:redis
@Bean public JobDetail jobDetail() { return JobBuilder.newJob().ofType(SampleJob.class) .storeDurably() .withIdentity("Qrtz_Job_Detail") .withDescription("Invoke Sample Job service...") .build(); }
在Spring
中配置JobDetail
的方式:spring
@Bean public JobDetailFactoryBean jobDetail() { JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean(); jobDetailFactory.setJobClass(SampleJob.class); jobDetailFactory.setDescription("Invoke Sample Job service..."); jobDetailFactory.setDurability(true); return jobDetailFactory; }
觸發器,表明一個調度參數的配置,何時去調度:sql
@Bean public Trigger trigger(JobDetail job) { return TriggerBuilder.newTrigger().forJob(job) .withIdentity("Qrtz_Trigger") .withDescription("Sample trigger") .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1)) .build(); }
調度器,經過Job
和Trigger
來註冊一個調度器:數據庫
@Bean public Scheduler scheduler(Trigger trigger, JobDetail job) { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(new ClassPathResource("quartz.properties").getInputStream()); Scheduler scheduler = factory.getScheduler(); scheduler.setJobFactory(springBeanJobFactory()); scheduler.scheduleJob(job, trigger); scheduler.start(); return scheduler; }
在quartz
中Job
就是咱們須要去執行的任務,由Scheduler
調度器負責調度任務們依靠制定好的Trigger
來定時執行任務。app
所以首先咱們須要結合以上基礎給系統添加一個Job。框架
public void addJob(BaseJob job) throws SchedulerException { /** 建立JobDetail實例,綁定Job實現類 * JobDetail 表示一個具體的可執行的調度程序,job是這個可執行調度程序所要執行的內容 * 另外JobDetail還包含了這個任務調度的方案和策略**/ // 指明job的名稱,所在組的名稱,以及綁定job類 JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass()) .withIdentity(job.getJobKey()) .withDescription(job.getDescription()) .usingJobData(job.getDataMap()) .build(); /** * Trigger表明一個調度參數的配置,何時去調度 */ //定義調度觸發規則, 使用cronTrigger規則 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(job.getJobName(),job.getJobGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression())) .startNow() .build(); //將任務和觸發器註冊到任務調度中去 scheduler.scheduleJob(jobDetail,trigger); //判斷調度器是否啓動 if(!scheduler.isStarted()){ scheduler.start(); } log.info(String.format("定時任務:%s.%s-已添加到調度器!", job.getJobGroup(),job.getJobName())); }
首先須要定義好咱們的Job,以後經過Job初始化JobDetail
和Trigger
,最後將JobDetail
和Trigger
註冊到調度器中。
Job
的結構以下:
public abstract class BaseJob implements Job,Serializable { private static final long serialVersionUID = 1L; private static final String JOB_MAP_KEY = "self"; /** * 任務名稱 */ private String jobName; /** * 任務分組 */ private String jobGroup; /** * 任務狀態 是否啓動任務 */ private String jobStatus; /** * cron表達式 */ private String cronExpression; /** * 描述 */ private String description; /** * 任務執行時調用哪一個類的方法 包名+類名 */ private Class beanClass = this.getClass(); /** * 任務是否有狀態 */ private String isConcurrent; /** * Spring bean */ private String springBean; /** * 任務調用的方法名 */ private String methodName; /** * 該任務所使用的數據源 */ private String dataSource = DataSourceEnum.DB1.getName(); /** * 爲了將執行後的任務持久化到數據庫中 */ @JsonIgnore private JobDataMap dataMap = new JobDataMap(); public JobKey getJobKey(){ return JobKey.jobKey(jobName, jobGroup);// 任務名稱和組構成任務key } ... }
能夠看到Job
中定義了任務的一些基本信息,重點關注其中的dataSource
和dataMap
屬性。其中dataSource
是任務所使用的數據源,並給了一個默認值;因爲任務在添加後會持久化到數據庫中,以後解析任務就會用到dataMap
。
在添加Job
的時候,JobDetail
和Trigger
都是經過關鍵字new
生成的,而調度器Scheduler
則須要放在容器中維護。
@Configuration @Order public class SchedulerConfig { @Autowired private MyJobFactory myJobFactory; @Value("${spring.profiles.active}") private String profile; /* * 經過SchedulerFactoryBean獲取Scheduler的實例 */ @Bean(name = "scheduler") public Scheduler scheduler() throws Exception { return schedulerFactoryBean().getScheduler(); } @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setOverwriteExistingJobs(true); // 延時啓動 factory.setStartupDelay(20); // 加載quartz數據源配置 factory.setQuartzProperties(quartzProperties()); // 自定義Job Factory,用於Spring注入 factory.setJobFactory(myJobFactory); /*********全局監聽器配置************/ JobListener myJobListener = new SchedulerListener(); factory.setGlobalJobListeners(myJobListener);//直接添加爲全局監聽器 return factory; } @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); if (Util.PRODUCT.equals(profile)) {//正式環境 System.out.println("正式環境quartz配置"); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties")); } else { System.out.println("測試環境quartz配置"); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); } //在quartz.properties中的屬性被讀取並注入後再初始化對象 propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } /* * quartz初始化監聽器 */ @Bean public QuartzInitializerListener executorListener() { return new QuartzInitializerListener(); } }
上述代碼中,將scheduler
加入到Spring
容器中。scheduler
是由SchedulerFactoryBean
進行維護的,在SchedulerFactoryBean
中對調度器工廠作了一些基本設置並從配置文件中加載了quartz數據源配置(配置文件的讀取會根據運行環境profile
來進行自動切換),配置了一個全局監聽器用以監放任務的執行過程。
使用Spring提供的JobFactory
。
@Component public class MyJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { // 調用父類的方法 Object jobInstance = super.createJobInstance(bundle); // 進行注入 capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
quartz.properties
中是quartz鏈接數據庫的一些配置信息。
# \u56FA\u5B9A\u524D\u7F00org.quartz # \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206 # # org.quartz.scheduler.instanceName = DefaultQuartzScheduler org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false org.quartz.scheduler.wrapJobExecutionInUserTransaction = false # \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPool org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool # threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B # \u5E76\u53D1\u4E2A\u6570 org.quartz.threadPool.threadCount = 5 # \u4F18\u5148\u7EA7 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true org.quartz.jobStore.misfireThreshold = 5000 # \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D #org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore #\u6301\u4E45\u5316 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #org.quartz.jobStore.useProperties=false org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.dataSource = qzDS org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=true org.quartz.dataSource.qzDS.user=quartz org.quartz.dataSource.qzDS.password=123456 org.quartz.dataSource.qzDS.maxConnections = 30 org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUAL org.quartz.dataSource.qzDS.validateOnCheckout = true org.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40 #org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60
quartz
會根據這個配置文件將Job
持久化到數據庫中,也所以quartz
會須要初始化一些數據庫表,表結構文件在文末。
調度器監聽器用以監放任務的執行狀態。
public class SchedulerListener implements JobListener { private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class); public static final String LISTENER_NAME = "QuartSchedulerListener"; @Override public String getName() { return LISTENER_NAME; //must return a name } //任務被調度前 @Override public void jobToBeExecuted(JobExecutionContext context) { String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource"); // 切換任務的數據源 DataSourceContextHolder.setDB(dataSource); String jobName = context.getJobDetail().getKey().toString(); LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName()); } //任務調度被拒了 @Override public void jobExecutionVetoed(JobExecutionContext context) { String jobName = context.getJobDetail().getKey().toString(); LOG.error("job {} is jobExecutionVetoed", jobName); //能夠作一些日誌記錄緣由 } //任務被調度後 @Override public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { // 清空存儲的數據源 String jobName = context.getJobDetail().getKey().toString(); DataSourceContextHolder.clearDB(); LOG.info("Job : {} is finished", jobName); if (jobException != null && !jobException.getMessage().equals("")) { LOG.error("Exception thrown by: " + jobName + " Exception: " + jobException.getMessage()); } } }
SchedulerListener
監放任務被調度前、調度後和調度被拒絕時的狀態,在任務被調度以前和以後對任務所使用的數據源進行了處理。若是項目中不須要數據源切換的話,這個監聽器是不須要的,到此已經完成了quartz
的集成。
經過自定義DynamicDataSource
來覆蓋Spring Boot中原有的數據源。
經過讀取配置文件中不一樣的數據源,初始化項目中可能用到的數據源用以切換。
/** * 多數據源配置類 */ @Configuration public class DataSourceConfig { //數據源1 @Bean(name = "datasource1") @ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis中對應屬性的前綴 public DataSource dataSource1() { return DataSourceBuilder.create().build(); } //數據源2 @Bean(name = "datasource2") @ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis中對應屬性的前綴 public DataSource dataSource2() { return DataSourceBuilder.create().build(); } /** * 動態數據源: 經過AOP在不一樣數據源之間動態切換 * * @return */ @Primary @Bean(name = "dynamicDataSource") public DataSource dynamicDataSource() { DynamicDataSource dynamicDataSource = new DynamicDataSource(); // 默認數據源 dynamicDataSource.setDefaultTargetDataSource(dataSource1()); // 配置多數據源 Map<Object, Object> dsMap = new HashMap(); dsMap.put(DataSourceEnum.DB1.getName(), dataSource1()); dsMap.put(DataSourceEnum.DB2.getName(), dataSource2()); dynamicDataSource.setTargetDataSources(dsMap); return dynamicDataSource; } @Bean public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); //設置數據源 sqlSessionFactoryBean.setDataSource(dataSource); return sqlSessionFactoryBean.getObject(); } /** * 配置@Transactional註解事物 * * @return */ @Bean public PlatformTransactionManager transactionManager() { return new DataSourceTransactionManager(dynamicDataSource()); } }
spring: datasource: db1: driver-class-name: com.mysql.cj.jdbc.Driver username: doctor password: 123456 type: com.zaxxer.hikari.HikariDataSource jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true db2: driver-class-name: com.mysql.cj.jdbc.Driver username: quartz password: 123456 type: com.zaxxer.hikari.HikariDataSource jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true
因爲quartz
在執行過程當中是經過不一樣的線程來執行Job
的,所以此處經過ThreadLocal
來保存線程所使用的數據源狀況。
/** * 保存本地數據源 */ public class DataSourceContextHolder { private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class); /** * 默認數據源 */ public static final String DEFAULT_DS = DataSourceEnum.DB1.getName(); /** * ThreadLocal以後會進行講解 */ private static final ThreadLocal<String> contextHolder = new ThreadLocal<>(); // 設置數據源名 public static void setDB(String dbType) { LOG.info("切換到{}數據源", dbType); contextHolder.set(dbType); } // 獲取數據源名 public static String getDB() { return (contextHolder.get()); } // 清除數據源名 public static void clearDB() { contextHolder.remove(); } }
獲取執行中所使用的數據源。因爲數據源被保存在了DataSourceContextHolder
中的ThreadLocal
中,因此直接獲取就好了。
/** * 獲取本地數據源 */ public class DynamicDataSource extends AbstractRoutingDataSource { private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class); @Override protected Object determineCurrentLookupKey() { LOG.info("數據源爲{}", DataSourceContextHolder.getDB()); return DataSourceContextHolder.getDB(); } }
至此就完成了集成quartz
及數據源切換的功能。而後就是具體的任務了。
具體的任務須要繼承BaseJob
並在execute
方法中重寫具體須要執行的任務。
@Slf4j @Service public class ReadNumJob extends BaseJob { @Autowired private RedisService redisService; @Autowired private JdbcTemplate jdbcTemplate; private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class); @Override public void execute(JobExecutionContext context) { doSomething(); } }
而後在添加任務時指定任務所使用的數據源
ReadNumJob job = new ReadNumJob(); job.setJobName("test"); job.setJobGroup("hys"); job.setDescription("test"); // 指定數據源 job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName()); job.setCronExpression( "0 */1 * * * ?" ); try { jobAndTriggerService.addJob(job); } catch (SchedulerException e) { e.printStackTrace(); }
轉評贊就是最大的鼓勵