實際項目中,複雜的定時任務都會結合持久化,動態改變定時任務狀態,本文將介紹基於Quartz
的定時任務單點持久化方式,經過RESTful
風格,演示定時任務的CRUD
,最後使用Swagger
測試。java
//Quartz表 qrtz_calendars:以 Blob 類型存儲 Quartz 的 Calendar 信息 qrtz_cron_triggers:存儲 Cron Trigger,包括 Cron 表達式和時區信息 qrtz_fired_triggers:存儲與已觸發的 Trigger 相關的狀態信息,以及相聯 Job 的執行信息 qrtz_paused_trigger_grps:存儲已暫停的 Trigger 組的信息 qrtz_scheduler_state:存儲少許的有關調度器 (Scheduler) 的狀態,和別的 調度器 (Scheduler)實例(假如是用於一個集羣中) qrtz_locks:儲程序的非觀鎖的信息(假如使用了悲觀鎖) qrtz_job_details:存儲每個已配置的 Job 的詳細信息(jobDetail) qrtz_job_listeners:存儲有關已配置的 Job 監聽器 的信息 qrtz_simple_triggers:存儲簡單的 Trigger,包括重複次數,間隔,以及已觸的次數 qrtz_blog_triggers:以 Blob 類型存儲的Trigger(用於 Quartz 用戶用 JDBC 建立他們本身定製的 Trigger 類型,JobStore 並不知道如何存儲實例的時候) qrtz_trigger_listeners:存儲已配置的觸發器監聽器 ( Trigger Listener ) 的信息 qrtz_triggers:存儲已配置的 觸發器 (Trigger) 的信息 //新建表 ScheduleJob:自定義定時任務詳細狀態表,方便管理定時任務
/resourecs/quartz.sql
<!--quartz相關依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <!--數據庫相關依賴--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.9</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.2</version> </dependency>
#數據庫鏈接池配置 spring: datasource: name: mysql_test type: com.alibaba.druid.pool.DruidDataSource #druid相關配置 druid: #監控統計攔截的filters filters: stat driver-class-name: com.mysql.jdbc.Driver #基本屬性 url: jdbc:mysql://127.0.0.1:3306/springboot?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true username: root password: 123456 #配置初始化大小/最小/最大 initial-size: 1 min-idle: 1 max-active: 20 #獲取鏈接等待超時時間 max-wait: 60000 #間隔多久進行一次檢測,檢測須要關閉的空閒鏈接 time-between-eviction-runs-millis: 60000 #一個鏈接在池中最小生存的時間 min-evictable-idle-time-millis: 300000 validation-query: SELECT 'x' test-while-idle: true test-on-borrow: false test-on-return: false #打開PSCache,並指定每一個鏈接上PSCache的大小。oracle設爲true,mysql設爲false。分庫分表較多推薦設置爲false pool-prepared-statements: false max-pool-prepared-statement-per-connection-size: 20 #Quartz配置 quartz: jdbc: initialize-schema: always job-store-type: jdbc ##Mybatis配置 mybatis: #Mapper.xml所在的位置 mapper-locations: classpath:mapping/*.xml #entity掃描的包名 type-aliases-package: com.mkeeper.entity
Spring
接管Quartz
@Component public class ScheduleJobFactory extends AdaptableJobFactory { // 讓不受spring管理的類具備spring自動注入的特性 @Autowired private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
Quartz
配置SchedulerFactoryBean
初始化@Configuration public class ScheduleConfig { @Autowired private ScheduleJobFactory scheduleJobFactory; @Bean @Qualifier("scheduleBean") public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("dataSource") DataSource dataSource) { SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); // 名稱 schedulerFactoryBean.setSchedulerName("TASK_EXECUTOR"); // 延遲10秒啓動Scheduler schedulerFactoryBean.setStartupDelay(10); // 經過applicationContextSchedulerContextKey屬性配置spring上下文 schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey"); // 設置是否任意一個已定義的Job會覆蓋現有的Job。默認爲false,即已定義的Job不會覆蓋現有的Job。 schedulerFactoryBean.setOverwriteExistingJobs(true); // 自動開始 schedulerFactoryBean.setAutoStartup(true); // 數據源 schedulerFactoryBean.setDataSource(dataSource); // 將JobFactory改成自定義的,不然在 Job 中注入 Bean 會失敗 schedulerFactoryBean.setJobFactory(scheduleJobFactory); return schedulerFactoryBean; } }
實體mysql
@Data public class ScheduleJob implements Serializable { private static final Long serialVersionUID = 1435515995276255188L; private Long id; private String className; private String cronExpression; private String jobName; private String jobGroup; private String triggerName; private String triggerGroup; private Boolean pause; private Boolean enable; private String description; private Date createTime; private Date lastUpdateTime; }
爲了節約篇幅,mapping,dao省略,請參考源碼git
Quartz
任務調度工具類(重點)@Slf4j public class ScheduleUtil { /** * 獲取 Trigger Key * * @param scheduleJob * @return */ public static TriggerKey getTriggerKey(ScheduleJob scheduleJob) { return TriggerKey.triggerKey(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup()); } /** * 獲取 Job Key * * @param scheduleJob * @return */ public static JobKey getJobKey(ScheduleJob scheduleJob) { return JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); } /** * 獲取 Cron Trigger * * @param scheduler * @param scheduleJob * @return * @throws ServiceException */ public static CronTrigger getCronTrigger(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException { try { return (CronTrigger) scheduler.getTrigger(getTriggerKey(scheduleJob)); } catch (SchedulerException e) { throw new ServiceException("Get Cron trigger failed", e); } } /** * 建立任務 * * @param scheduler * @param scheduleJob * @throws ServiceException */ public static void createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException { validateCronExpression(scheduleJob); try { // 要執行的 Job 的類 Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(scheduleJob.getClassName()).newInstance().getClass(); JobDetail jobDetail = JobBuilder.newJob(jobClass) .withIdentity(scheduleJob.getJobName(), scheduleJob.getJobGroup()) .withDescription(scheduleJob.getDescription()) .build(); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression()) .withMisfireHandlingInstructionDoNothing(); CronTrigger cronTrigger = TriggerBuilder.newTrigger() .withIdentity(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup()) .withDescription(scheduleJob.getDescription()) .withSchedule(scheduleBuilder) .startNow() .build(); scheduler.scheduleJob(jobDetail, cronTrigger); log.info("Create schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName()); if (scheduleJob.getPause()) { pauseJob(scheduler, scheduleJob); } } catch (Exception e) { e.printStackTrace(); log.error("Execute schedule job failed"); throw new ServiceException("Execute schedule job failed", e); } } /** * 更新任務 * * @param scheduler * @param scheduleJob * @throws ServiceException */ public static void updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException { validateCronExpression(scheduleJob); try { TriggerKey triggerKey = getTriggerKey(scheduleJob); CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression()) .withMisfireHandlingInstructionDoNothing(); CronTrigger cronTrigger = getCronTrigger(scheduler, scheduleJob); cronTrigger = cronTrigger.getTriggerBuilder() .withIdentity(triggerKey) .withDescription(scheduleJob.getDescription()) .withSchedule(cronScheduleBuilder).build(); scheduler.rescheduleJob(triggerKey, cronTrigger); log.info("Update schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName()); if (scheduleJob.getPause()) { pauseJob(scheduler, scheduleJob); } } catch (SchedulerException e) { e.printStackTrace(); log.error("Update schedule job failed"); throw new ServiceException("Update schedule job failed", e); } } /** * 執行任務 * * @param scheduler * @param scheduleJob * @throws ServiceException */ public static void run(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException { try { scheduler.triggerJob(getJobKey(scheduleJob)); log.info("Run schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName()); } catch (SchedulerException e) { e.printStackTrace(); log.error("Run schedule job failed"); throw new ServiceException("Run schedule job failed", e); } } /** * 暫停任務 * * @param scheduler * @param scheduleJob */ public static void pauseJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException { try { scheduler.pauseJob(getJobKey(scheduleJob)); log.info("Pause schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName()); } catch (SchedulerException e) { e.printStackTrace(); log.error("Pause schedule job failed"); throw new ServiceException("Pause job failed", e); } } /** * 繼續執行任務 * * @param scheduler * @param scheduleJob * @throws ServiceException */ public static void resumeJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException { try { scheduler.resumeJob(getJobKey(scheduleJob)); log.info("Resume schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName()); } catch (SchedulerException e) { e.printStackTrace(); log.error("Resume schedule job failed"); throw new ServiceException("Resume job failed", e); } } /** * 刪除任務 * * @param scheduler * @param scheduleJob * @throws ServiceException */ public static void deleteJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException { try { scheduler.deleteJob(getJobKey(scheduleJob)); log.info("Delete schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName()); } catch (SchedulerException e) { e.printStackTrace(); log.error("Delete schedule job failed"); throw new ServiceException("Delete job failed", e); } } /** * 校驗Cron表達式 */ public static void validateCronExpression(ScheduleJob scheduleJob) throws ServiceException { if (!CronExpression.isValidExpression(scheduleJob.getCronExpression())) { throw new ServiceException(String.format("Job %s expression %s is not correct!", scheduleJob.getClassName(), scheduleJob.getCronExpression())); } } }
@Service public class JobService { @Resource private JobMapper jobMapper; @Resource private Scheduler scheduler; public List<ScheduleJob> getAllEnableJob() { return jobMapper.getAllEnableJob(); } public ScheduleJob select(Long jobId) throws ServiceException { ScheduleJob scheduleJob = jobMapper.select(jobId); if (scheduleJob == null) { throw new ServiceException("ScheduleJob:" + jobId + " not found"); } return scheduleJob; } @Transactional(rollbackFor = DataAccessException.class) public ScheduleJob update(Long jobId, ScheduleJob scheduleJob) throws ServiceException { if (jobMapper.update(scheduleJob) <= 0) { throw new ServiceException("Update product:" + jobId + "failed"); } ScheduleUtil.updateScheduleJob(scheduler, scheduleJob); return scheduleJob; } @Transactional(rollbackFor = DataAccessException.class) public boolean add(ScheduleJob scheduleJob) throws ServiceException { Integer num = jobMapper.insert(scheduleJob); if (num <= 0) { throw new ServiceException("Add product failed"); } ScheduleUtil.createScheduleJob(scheduler, scheduleJob); return true; } @Transactional(rollbackFor = DataAccessException.class) public boolean delete(Long jobId) throws ServiceException { ScheduleJob scheduleJob = select(jobId); Integer num = jobMapper.delete(jobId); if (num <= 0) { throw new ServiceException("Delete product:" + jobId + "failed"); } ScheduleUtil.deleteJob(scheduler, scheduleJob); return true; } public List<ScheduleJob> getAllJob() { return jobMapper.getAllJob(); } public boolean resume(Long jobId) throws ServiceException { ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false); ScheduleUtil.resumeJob(scheduler, scheduleJob); return true; } public boolean pause(Long jobId) throws ServiceException { ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, true); ScheduleUtil.pauseJob(scheduler, scheduleJob); return true; } public boolean run(Long jobId) throws ServiceException { ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false); ScheduleUtil.run(scheduler, scheduleJob); return true; } private ScheduleJob updateScheduleJobStatus(Long jobId, Boolean isPause) throws ServiceException { ScheduleJob scheduleJob = select(jobId); scheduleJob.setPause(isPause); update(scheduleJob.getId(), scheduleJob); return scheduleJob; } }
/** * 啓動應用時運行定時任務 * * @author mkeeper * @create 2018/10/19 10:05 */ @Slf4j @Component public class ApplicationListener implements CommandLineRunner { @Resource private JobService jobService; @Resource private Scheduler scheduler; @Override public void run(String... args) { List<ScheduleJob> scheduleJobList = jobService.getAllEnableJob(); for (ScheduleJob scheduleJob : scheduleJobList) { try { CronTrigger cronTrigger = ScheduleUtil.getCronTrigger(scheduler, scheduleJob); if (cronTrigger == null) { ScheduleUtil.createScheduleJob(scheduler, scheduleJob); } else { ScheduleUtil.updateScheduleJob(scheduler, scheduleJob); } log.info("Startup {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName()); } catch (ServiceException e) { log.error("Job ERROR", e); } } } }
@Slf4j @Component public class TestJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext){ // Do what you want here log.info("Test job is executing at: " + System.currentTimeMillis()/1000); } }
Controller
@RestController @RequestMapping("/job") public class JobController { @Autowired private JobService jobService; @GetMapping public R getAllJob() { return R.isOk().data(jobService.getAllJob()); } @GetMapping("/{id}") public R getJob(@PathVariable("id") Long jobId) throws ServiceException { return R.isOk().data(jobService.select(jobId)); } @PutMapping("/update/{id}") public R updateJob(@PathVariable("id") Long jobId, @RequestBody ScheduleJob newScheduleJob) throws ServiceException { return R.isOk().data(jobService.update(jobId, newScheduleJob)); } @DeleteMapping("/delete/{id}") public R deleteJob(@PathVariable("id") Long jobId) throws ServiceException { return R.isOk().data(jobService.delete(jobId)); } @PostMapping("/add") public R saveJob(@RequestBody ScheduleJob newScheduleJob) throws ServiceException { return R.isOk().data(jobService.add(newScheduleJob)); } @GetMapping("/run/{id}") public R runJob(@PathVariable("id") Long jobId) throws ServiceException { return R.isOk().data(jobService.run(jobId)); } @GetMapping("/pause/{id}") public R pauseJob(@PathVariable("id") Long jobId) throws ServiceException { return R.isOk().data(jobService.pause(jobId)); } @GetMapping("/resume/{id}") public R resumeJob(@PathVariable("id") Long jobId) throws ServiceException { return R.isOk().data(jobService.resume(jobId)); } }
考慮到要測試的接口不少,這裏推薦
Swagger
Swagger
是一個規範和完整的框架,用於生成、描述、調用和可視化RESTful
風格的Web
服務 添加依賴github
<!--swagger2--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.6.1</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.6.1</version> </dependency>
添加配置文件spring
@Configuration @EnableSwagger2 public class SwaggerConfig { @Value("${swagger.enable:false}") private boolean enable; @Bean public Docket createRestApi() { return new Docket(DocumentationType.SWAGGER_2) .enable(enable) .apiInfo(apiInfo()) .select() .apis(RequestHandlerSelectors.basePackage("com.mkeeper.controller")) .paths(PathSelectors.any()) .build(); } private ApiInfo apiInfo() { return new ApiInfoBuilder() .title("Quartz定時任務單點持久化接口文檔") .description("Quartz定時任務單點持久化") .version("1.0") .build(); } }
application.yml
中開啓Swagger
sql
swagger: enable: true
啓動服務,瀏覽器中訪問測試接口,地址:http://localhost:8080/swagger-ui.htm數據庫
說點什麼呢,有任何建議,歡迎留言探討,本文源碼。express
歡迎關注博主公衆號,第一時間推送最新文章api