Quartz詞義爲"石英"水晶,而後聰明的人類利用它發明了石英手錶,因石英晶體在受到電流影響時,它會產生規律的振動,因而,這種時間上的規律,也被應用到了軟件界,來命名了一款任務調度框架--Quartz。現實軟件邏輯中,週期任務有着普遍的存在,如定時刷新配置信息,按期盤點庫存,定時收發郵件等,至於定時任務處理,也有Spring的ScheduledThreadPool,還有基於註解@Scheduled的方式,ScheduledThreadPool主要是基於相對時間,不方便控制,而@Scheduled則會致使連鎖錯誤,因此咱們來用下Quartz,看看有啥優點。ios
工具:Idea201902/JDK11/ZK3.5.5/Gradle5.4.1/RabbitMQ3.7.13/Mysql8.0.11/Lombok0.26/Erlang21.2/postman7.5.0/Redis3.2/RocketMQ4.5.2/Sentinel1.6.3/SpringBoot2.1.6/Quartz2.3.1/Nacos1.1.3git
難度: 新手--戰士--老兵--大師github
目標:1.使用Quartz實現物流訂單按期延誤檢查;sql
步驟:數據庫
1.總體框架依舊,多模塊微服務框架商城系統,一個共享模塊,多個功能模塊,具體見項目代碼結構。併發
2.按照慣例,先上幾個Quartz的核心概念的菜:app
Job-任務:一個接口,只有一個execute方法,使用時該方法內容即爲須要執行的任務邏輯,還有個關聯的JobDetail接口,注意這二者並非繼承關係,Quartz在每次執行Job時,都從新建立一個Job實例,因此它不直接接受一個Job的實例,相反它接收一個Job實現類,以便運行時經過newInstance()的反射機制實例化Job。所以須要經過一個類來描述Job的實現類及其它相關的靜態信息,如Job名字、描述、關聯監聽器等信息,JobDetail承擔了這一角色,Quartz源碼中描述二者關係:"Quartz不會保存一個Job接口的實例,但能夠經過JobDetail來定義一個實例",JobDetail包含一個getJobClass()得到Job實例字節碼的方法;框架
Trigger-觸發器:手槍的扳機,何時發射,就看何時觸發了該類設定的條件,可自由定義觸發規則,多個觸發器可做用於一個Job,但一個觸發器只可做用於一個Job;異步
Scheduler-調度器:表明一個Quartz的獨立運行容器,Trigger和JobDetail能夠註冊到Scheduler中,二者在Scheduler中擁有各自的組及名稱,組及名稱是Scheduler查找定位容器中某一對象的依據,Trigger的組及名稱必須惟一,JobDetail的組和名稱也必須惟一(但能夠和Trigger的組和名稱相同,由於它們是不一樣類型的);async
3.先作個簡單的上手小菜,定義一個HelloJob類,內容就是打印HelloWrold:
public class HelloJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println(System.currentTimeMillis()+"helloWorld"); } }
再直接使用main入口,定義jobDetail -->Trigger-->Scheduler,能夠看到這裏並無直接使用HelloJob類,而是以Class形式放入JobDetail 中,很明顯使用的就是Java反射機制了,代碼清晰簡單,不解釋了。
public class ScheduledTaskMain { public static void main(String[] args) throws SchedulerException { /*建立一個jobDetail的實例,將該實例與HelloJob Class綁定*/ JobDetail jobDetail = JobBuilder.newJob(HelloJob.class).withIdentity("HelloJob").build(); /*建立一個觸發器,每2秒執行一次任務,一直持續下去*/ SimpleTrigger cronTrigger= TriggerBuilder.newTrigger().withIdentity("HelloTrigger").startNow() .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(2).repeatForever()).build(); /*建立schedule實例*/ StdSchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler = factory.getScheduler(); /*將Job和trigger放入Scheduler容器*/ scheduler.scheduleJob(jobDetail,cronTrigger); scheduler.start(); } }
4.愉快地跑一個:
從輸出能夠看到Quartz內部系列對象的建立過程,並創建了10線程的ThreadPool,最後執行了HelloWorld任務。
5.固然,咱們的主菜不多是作HelloWorld任務,那還不簡單!改起來!爲了試驗方便,我只改動logistic模塊,新建一個Job類:com.biao.mall.logistic.schedule.HelloJob2,直接注入SpringBean任務,這個任務就是按期檢查過時未發的物流單,具體見deliveryService.checkDelayed()方法:
@Component @DisallowConcurrentExecution //標識這個任務是串行執行,不是併發執行 public class HelloJob2 implements Job, Serializable { @Autowired private DubboDeliveryService deliveryService; /*經測試,下面這種構造方法注入deliveryService的方式會致使NPE!*/ /* @Autowired public HelloJob2(DubboDeliveryService deliveryService){ this.deliveryService = deliveryService; } public HelloJob2(){} */ @Override public void execute(JobExecutionContext context) throws JobExecutionException { int num = deliveryService.checkDelayed(); System.out.println("delayed num is >>> "+ num); } }
com.biao.mall.logistic.impl.DubboDeliveryServiceImpl中,實現deliveryService.checkDelayed()方法:
//檢查延誤未發的物流單, /*僅供邏輯測試,直接查找出全部10天以前的訂單,生產邏輯確定比這複雜*/ @Override public int checkDelayed() { QueryWrapper qw = new QueryWrapper(); LocalDateTime timeNow = LocalDateTime.now(ZoneId.systemDefault()); qw.lt(true,"gmt_create",timeNow.minusDays(10L)); List<DubboDeliveryEntity> list = deliveryDao.selectList(qw); return Objects.isNull(list)? 0: list.size(); }
再將ScheduledTaskMain中替換爲Job2,運行結果NPE:
這就有點讓人失望了,緣由在哪?這是由於經過實現Job接口的方式來建立定時任務,這個類在實例化時是被Quartz實例化的,同時沒有注入到Spring中。而自定義的Service是Spring容器管理的,所以就致使了被Spring所管理的Bean不能被自動注入進來,Quartz也沒法感知自定義的ServiceBean的存在!
6.關於@DisallowConcurrentExecution 註解:即該Job不併發執行,好比當前一個Job未執行完,而下一個Job也知足Trigger條件,此時就會被阻塞。(詳細解釋請見後記)
7.該NPE解決思路,就是要將Scheduler也歸入Spring容器管理, 先定義com.biao.mall.logistic.schedule.MyJobFactory,繼承自AdaptableJobFactory:
AdaptableJobFactory 是一個支持Runnable和Job對象的工廠,實現了JobFactory接口;
TriggerFiredBundle是一個接收從JobStore到QuartzSchedulerThread執行時數據的類;
@Component public class MyJobFactory extends AdaptableJobFactory { /** * AutowireCapableBeanFactory接口是BeanFactory的子類 * 能夠鏈接和填充那些生命週期不被Spring管理的已存在的bean實例 */ private AutowireCapableBeanFactory capableBeanFactory; @Autowired public MyJobFactory(AutowireCapableBeanFactory capableBeanFactory){ this.capableBeanFactory = capableBeanFactory; } @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception{ //調用父類方法 Object jobInstance = super.createJobInstance(bundle); //進行注入(Spring接管該Bean) capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
8.再定義一個com.biao.mall.logistic.schedule.QuartzConf配置類,能夠經過SchedulerFactoryBean這個橋樑來完成ApplicationContext和SchedulerContext關聯,以下,再運行程序即正常執行!
@Configuration public class QuartzConfig { //不推薦這裏註解@Autowired,使用構造函數注入 private MyJobFactory myJobFactory; @Autowired public QuartzConfig(MyJobFactory myJobFactory){ this.myJobFactory = myJobFactory; } @Bean(name = "factoryBean") public SchedulerFactoryBean schedulerFactoryBean(){ // Spring提供SchedulerFactoryBean爲Scheduler提供配置信息,並被Spring容器管理其生命週期 SchedulerFactoryBean factoryBean = new SchedulerFactoryBean(); factoryBean.setOverwriteExistingJobs(true); //設置是否自動啓動 factoryBean.setAutoStartup(false); //設置系統啓動後,Starting Quartz Scheduler的延遲時間 factoryBean.setStartupDelay(30); // 設置自定義Job Factory,用於Spring管理Job bean factoryBean.setJobFactory(myJobFactory); return factoryBean; } @Bean(name = "myScheduler") public Scheduler getScheduler(){ Scheduler scheduler = schedulerFactoryBean().getScheduler(); return scheduler; } }
來看點SchedulerFactoryBean的源碼(部分):
/** * {@link FactoryBean} that creates and configures a Quartz {@link org.quartz.Scheduler}, * manages its lifecycle as part of the Spring application context, and exposes the * Scheduler as bean reference for dependency injection. * * <p>Allows registration of JobDetails, Calendars and Triggers, automatically * starting the scheduler on initialization and shutting it down on destruction. * In scenarios that just require static registration of jobs at startup, there * is no need to access the Scheduler instance itself in application code. // 代碼省略部分 ... ... */ public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>, BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle { public static final String PROP_THREAD_COUNT = "org.quartz.threadPool.threadCount"; public static final int DEFAULT_THREAD_COUNT = 10; private static final ThreadLocal<ResourceLoader> configTimeResourceLoaderHolder = new ThreadLocal<>(); private static final ThreadLocal<Executor> configTimeTaskExecutorHolder = new ThreadLocal<>(); private static final ThreadLocal<DataSource> configTimeDataSourceHolder = new ThreadLocal<>(); private static final ThreadLocal<DataSource> configTimeNonTransactionalDataSourceHolder = new ThreadLocal<>(); // 代碼省略部分 ... ... //--------------------------------------------------------------------- // 實現接口InitializingBean,即SpringBean生命週期中的afterPropertiesSet()方法,dataSource是持久化屬性, @Override public void afterPropertiesSet() throws Exception { if (this.dataSource == null && this.nonTransactionalDataSource != null) { this.dataSource = this.nonTransactionalDataSource; } if (this.applicationContext != null && this.resourceLoader == null) { this.resourceLoader = this.applicationContext; } // 初始化Scheduler實例,將Jobs/Triggers註冊 this.scheduler = prepareScheduler(prepareSchedulerFactory()); try { registerListeners(); registerJobsAndTriggers(); } catch (Exception ex) { try { this.scheduler.shutdown(true); } catch (Exception ex2) { logger.debug("Scheduler shutdown exception after registration failure", ex2); } throw ex; } } // 代碼省略部分 ... ... /** * 初始化當前的SchedulerFactory, 應用本地定義的屬性值 * @param參數schedulerFactory爲須要初始化的對象 */ private void initSchedulerFactory(StdSchedulerFactory schedulerFactory) throws SchedulerException, IOException { Properties mergedProps = new Properties(); if (this.resourceLoader != null) { mergedProps.setProperty(StdSchedulerFactory.PROP_SCHED_CLASS_LOAD_HELPER_CLASS, ResourceLoaderClassLoadHelper.class.getName()); } if (this.taskExecutor != null) { mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, LocalTaskExecutorThreadPool.class.getName()); } else { // 設置必要的默認屬性,Quartz會使用顯式屬性設置覆蓋默認屬性 mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName()); mergedProps.setProperty(PROP_THREAD_COUNT, Integer.toString(DEFAULT_THREAD_COUNT)); } if (this.configLocation != null) { if (logger.isInfoEnabled()) { logger.info("Loading Quartz config from [" + this.configLocation + "]"); } PropertiesLoaderUtils.fillProperties(mergedProps, this.configLocation); } CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps); if (this.dataSource != null) { mergedProps.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, LocalDataSourceJobStore.class.getName()); } if (this.schedulerName != null) { mergedProps.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, this.schedulerName); } schedulerFactory.initialize(mergedProps); } // 代碼省略部分 ... ... /** * 根據給定的factory 和scheduler name生成Scheduler實例,由afterPropertiesSet調用 缺省實現將觸發SchedulerFactory的getScheduler方法 * @param 參數schedulerFactory生產Scheduler的工廠 * @param schedulerName the name of the scheduler to create * @return the Scheduler instance * @throws SchedulerException if thrown by Quartz methods * @see #afterPropertiesSet * @see org.quartz.SchedulerFactory#getScheduler */ protected Scheduler createScheduler(SchedulerFactory schedulerFactory, @Nullable String schedulerName) throws SchedulerException { // Override thread context ClassLoader to work around native Quartz ClassLoadHelper loading. Thread currentThread = Thread.currentThread(); ClassLoader threadContextClassLoader = currentThread.getContextClassLoader(); boolean overrideClassLoader = (this.resourceLoader != null && this.resourceLoader.getClassLoader() != threadContextClassLoader); if (overrideClassLoader) { currentThread.setContextClassLoader(this.resourceLoader.getClassLoader()); } try { SchedulerRepository repository = SchedulerRepository.getInstance(); synchronized (repository) { Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null); Scheduler newScheduler = schedulerFactory.getScheduler(); if (newScheduler == existingScheduler) { throw new IllegalStateException("Active Scheduler of name ' " + schedulerName + " ' already registered " + "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!"); } if (!this.exposeSchedulerInRepository) { // Need to remove it in this case, since Quartz shares the Scheduler instance by default! SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName()); } return newScheduler; } } finally { if (overrideClassLoader) { // 重置初始的線程ClassLoader. currentThread.setContextClassLoader(threadContextClassLoader); } } } /** * 將指定的或當前的ApplicationContext暴露給Quartz SchedulerContext. */ private void populateSchedulerContext(Scheduler scheduler) throws SchedulerException { // 將對象放入Scheduler context. if (this.schedulerContextMap != null) { scheduler.getContext().putAll(this.schedulerContextMap); } // 在Scheduler context中註冊ApplicationContext. if (this.applicationContextSchedulerContextKey != null) { if (this.applicationContext == null) { throw new IllegalStateException( "SchedulerFactoryBean needs to be set up in an ApplicationContext " + "to be able to handle an ' applicationContextSchedulerContextKey'"); } scheduler.getContext().put(this.applicationContextSchedulerContextKey, this.applicationContext); } } /** * 根據startupDelay設置啓動Scheduler,注意這裏是異步啓動 * @param scheduler the Scheduler to start * @param startupDelay the number of seconds to wait before starting * the Scheduler asynchronously */ protected void startScheduler(final Scheduler scheduler, final int startupDelay) throws SchedulerException { if (startupDelay <= 0) { logger.info("Starting Quartz Scheduler now"); scheduler.start(); } else { if (logger.isInfoEnabled()) { logger.info("Will start Quartz Scheduler [" + scheduler.getSchedulerName() + "] in " + startupDelay + " seconds"); } // 因這裏明確須要一個守護線程,因此不使用Quartz的startDelayed方法, // 這樣當其餘線程所有終止時,應用就終止,JVM也會退出 Thread schedulerThread = new Thread() { @Override public void run() { try { Thread.sleep(TimeUnit.SECONDS.toMillis(startupDelay)); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); // 簡單處理 } if (logger.isInfoEnabled()) { logger.info("Starting Quartz Scheduler now, after delay of " + startupDelay + " seconds"); } try { scheduler.start(); } catch (SchedulerException ex) { throw new SchedulingException("Could not start Quartz Scheduler after delay", ex); } } }; schedulerThread.setName("Quartz Scheduler [" + scheduler.getSchedulerName() + "]"); schedulerThread.setDaemon(true); // 指定thread爲deamon類型 schedulerThread.start(); } } // 代碼省略部分 ... ... }
類描述的兩段:此類做爲Bean工廠產生並配置Scheduler,並將其歸入Spring應用上下文中的Bean生命週期管理;提供JobDetails, Calendars and Triggers的註冊,在應用啓動時,自動啓動Scheduler,在應用關閉時,自動中止Scheduler;
實現了ApplicationContextAware接口,即能被ApplicationContext接管,因此這以後使用ApplicationContext.getBean()也可取得Scheduler;
多個static final型的類變量,其DEFAULT_THREAD_COUNT指定了Quartz後臺線程池大小,幾個ThreadLocal用於保存線程Context,這也是Job能保持獨立性的關鍵基礎;
initSchedulerFactory方法初始化當前的SchedulerFactory, 應用本地定義的屬性值,好比指定線程池大小;
afterPropertiesSet方法,經過實現接口InitializingBean,使用SpringBean生命週期中的afterPropertiesSet()方法來設置Scheduler;
createScheduler建立Scheduler,並交給Spring來接管,並對「同名Scheduler」異常作處理;
startScheduler異步守護線程方式啓動Scheduler;
總結,Scheduler就是一個容器,有本身的內部對象和上下文,屬於重量級對象,理解上能夠類比SpringContext,可以使用scheduler.getContext()取得上下文信息。
9.Quartz的任務管理,經過Scheduler能夠start、pause、resume、stop,addJob和deleteJob等重要方法來調度Job的執行。這裏只須要注意一點,若是stop以後,就沒法再直接start,必須重啓應用,不知道這個是否屬於bug。
10.定義一個com.biao.mall.logistic.schedule.QuartzService類,來封裝這些方法:
@Service public class QuartzService { private final String groupName = "group1"; @Autowired @Qualifier(value = "factoryBean") private SchedulerFactoryBean factoryBean; //如下方式也能夠獲取bean // Scheduler scheduler = SpringUtil.getBean("myScheduler"); @Autowired @Qualifier(value = "myScheduler") private Scheduler scheduler; // 啓動 Scheduler public void startScheduleJobs() throws SchedulerException { if (this.scheduler.isStarted()){ return; } this.setCheckScheduler(scheduler); this.scheduler.start(); } // 中止 Scheduler public void stopScheduleJobs() { scheduler = factoryBean.getScheduler(); try { if (!scheduler.isShutdown()){ scheduler.shutdown(); } } catch (SchedulerException e) { e.printStackTrace(); } } // 添加 Job 並替換 public void addJobandReplace(){ //打印hellowrold的job JobDetail jobDetail = JobBuilder.newJob(HelloJob.class).withIdentity("HelloJob").storeDurably(true).build(); // 第二個參數爲replace,是否替換存在的同名job // jobDetail必須是durable屬性,表示任務完成以後是否依然保留到數據庫,且無定義關聯的trigger try { this.scheduler.addJob(jobDetail,true); } catch (SchedulerException e) { e.printStackTrace(); } } // 添加 Job 不替換 public void addJobwithoutReplace(){ //打印hellowrold的job JobDetail jobDetail = JobBuilder.newJob(HelloJob.class).withIdentity("HelloJob").storeDurably(true).build(); // 第二個參數爲replace,是否替換存在的同名job try { this.scheduler.addJob(jobDetail,false); } catch (SchedulerException e) { e.printStackTrace(); } } // 暫停全部 Job,還可指定具體的Job public void pauseScheduler(){ try { this.scheduler.pauseAll(); } catch (SchedulerException e) { e.printStackTrace(); } } // 恢復並繼續全部Job執行,還可指定具體的Job public void resumeJobs(){ try { this.scheduler.resumeAll(); } catch (SchedulerException e) { e.printStackTrace(); } } //配置一個自定義的scheduler private void setCheckScheduler(Scheduler scheduler) throws SchedulerException { //添加HelloJob2做爲任務內容 JobDetail jobDetail = JobBuilder.newJob(HelloJob2.class) .withIdentity("job1",groupName).build(); //cron表達式制定觸發規則,每10秒執行一次 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?"); CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity("trigger1",groupName) .withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail,cronTrigger); } }
還有很多其餘方法,就不一一列舉了,注意下:
setCheckScheduler()中加入自定義的JobDetail和Trigger,並註冊進Scheduler容器,若是有多個,就定義多個相似方法加入便可,而後startScheduleJobs()啓動Scheduler;
cron表達式:用一個cron字符串表示一個時間規則;
11.測試:啓動ZK-->Nacos-->RocketMQ-->business-->logistic, 寫幾個簡陋的controller方法:
@RestController public class DubboDeliveryController { // 代碼省略部分 ... ... @RequestMapping("/delivery/start") public String start() throws SchedulerException { quartzService.startScheduleJobs(); return "startScheduleJobs success"; } @RequestMapping("/delivery/stop") public String stop(){ quartzService.stopScheduleJobs(); return "stopScheduleJobs success"; } @RequestMapping("/delivery/add") public String addJob(){ quartzService.addJobandReplace(); return "addJobandReplace success"; } @RequestMapping("/delivery/pause") public String pauseJob(){ quartzService.pauseScheduler(); return "pauseScheduler success"; } @RequestMapping("/delivery/resume") public String resumeJob(){ quartzService.resumeJobs(); return "resumeJobs success"; } }
數據庫狀況:
URL給個訪問:
結果:
12.項目代碼地址:其中的day15 https://github.com/xiexiaobiao/dubbo-project.git
後記:
1.Quartz使用FixedThreadPool(固定數線程池)來執行Job,默認數量爲10(本例中可經過QuartzConfig修改),此ThreadPool接收Runnable對象,若是併發過大,就阻塞。各線程經過ThreadLocal來保存本身的獨立上下文。
2.關於Job併發解釋:
Job有一個StatefulJob子接口,表明有狀態的任務,該接口是一個沒有方法的標籤接口,其目的是讓Quartz知道任務的類型,以便採用不一樣的執行方案。無狀態任務在執行時擁有本身的JobDataMap拷貝,對JobDataMap的更改不會影響下次的執行。而有狀態任務共享同一個JobDataMap實例,每次任務執行對JobDataMap所作的更改會保存下來,後面的執行能夠看到這個更改,也即每次執行任務後都會對後面的執行發生影響。
正由於這個緣由,無狀態的Job能夠併發執行,而有狀態的StatefulJob不能併發執行,這意味着若是前次的StatefulJob尚未執行完畢,下一次的任務將阻塞等待,直到前次任務執行完畢。有狀態任務比無狀態任務須要考慮更多的因素,程序每每擁有更高的複雜度,所以除非必要,應該儘可能使用無狀態的Job。
若是Quartz使用了數據庫持久化任務調度信息,無狀態的JobDataMap僅會在Scheduler註冊任務時保持一次,而有狀態任務對應的JobDataMap在每次執行任務後都會進行保存。
3.Quartz支持集羣模式和持久化機制,能夠寫入後臺DB進行保存和恢復。請君自行研究。另尋時間我另起一篇。
4.爲何Quartz的各個Job執行互不影響?源碼註釋:
"Note that Quartz instantiates a new Job for each execution, in contrast to Timer which uses a TimerTask instance that is shared between repeated executions. Just JobDetail descriptors are shared."
核心總結即「每次執行Quartz都是實例化一個新的Job
」!
5.Cron表達式在線生成,輕輕鬆鬆不傷腦:http://cron.qqe2.com/
推薦閱讀: