經過源碼分析Java開源任務調度框架Quartz的主要流程java
從使用效果、調用鏈路跟蹤、E-R圖、循環調度邏輯幾個方面分析Quartz。mysql
github項目地址: https://github.com/tanliwei/spring-quartz-cluster-sample , 補充了SQL輸出git
系統說明:github
IDE: IntelliJspring
JDK:1.8sql
Quartz:2.2.1shell
使用效果數據庫
1.從github項目https://github.com/tanliwei/spring-quartz-cluster-sample中,拉取項目到本地,導入IDEA。tomcat
相信讀者都有必定工做經驗,這些細節不贅述。服務器
2.本文采用Mysql數據庫。
請執行 resources/scripts/tables_mysql_innodb.sql
3.修改jdbc.properties中數據庫配置
4.經過IDEA, Edit Configurations -> Add Tomcat Server, 部署到Tomcat
暴露的Restful 接口 /say-hello.do 以及添加好任務後的調用效果:
添加任務
在tomcat啓動成功後,在首頁點擊「添加任務」,添加以下任務:
代碼執行邏輯在SyncJobFactory類中,從Output中能夠看到執行的輸出信息,
調用鏈跟蹤的最後會回到這個類來。
如今開始跟蹤調用鏈路。
從配置文件applicationContext.xml配置中找到任務調度核心類SchedulerFactoryBean
resources/applicationContext.xml
<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> ... </bean>
使用IDEA快捷鍵,點擊進入SchedulerFactoryBean類,它實現了InitializingBean接口,
在Spring中凡是實現了InitializingBean接口的Bean,都會在Bean屬性都設置完成後調用afterPropertiesSet()方法.
SchedulerFactoryBean.java
//--------------------------------------------------------------------- // Implementation of InitializingBean interface // 實現 InitializingBean 接口 //--------------------------------------------------------------------- public void afterPropertiesSet() throws Exception { //... // Create SchedulerFactory instance. // 建立 SchedulerFactory 調度器工廠實例 SchedulerFactory schedulerFactory = (SchedulerFactory) BeanUtils.instantiateClass(this.schedulerFactoryClass); initSchedulerFactory(schedulerFactory); //... // Get Scheduler instance from SchedulerFactory. // 經過調度器工廠 獲取 調度器實例 try { this.scheduler = createScheduler(schedulerFactory, this.schedulerName); //... }
SchedulerFactoryBean.java
/** * Create the Scheduler instance for the given factory and scheduler name. * 經過制定工廠和調度器名稱建立調度器實例 * Called by {@link #afterPropertiesSet}. * <p>The default implementation invokes SchedulerFactory's <code>getScheduler</code> * method. Can be overridden for custom Scheduler creation. */ protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName) throws SchedulerException { //... try { SchedulerRepository repository = SchedulerRepository.getInstance(); synchronized (repository) { Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null); Scheduler newScheduler = schedulerFactory.getScheduler(); if (newScheduler == existingScheduler) { throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " + "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!"); } //... }
這個項目走的邏輯是 StdSchedulerFactory.getScheduler()方法,可自行debug。
StdSchedulerFactory.java
/** * Returns a handle to the Scheduler produced by this factory. * 返回該工廠創造的調度器的句柄 */ public Scheduler getScheduler() throws SchedulerException { if (cfg == null) { initialize(); } SchedulerRepository schedRep = SchedulerRepository.getInstance(); Scheduler sched = schedRep.lookup(getSchedulerName()); //... sched = instantiate(); return sched; }
StdSchedulerFactory.java
private Scheduler instantiate() throws SchedulerException { //... //大量的配置初始化、實例化代碼 //... //第1298行代碼 qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry); //... }
QuartzScheduler.java
/** * Create a <code>QuartzScheduler</code> with the given configuration * 根據給定的配置 建立Quartz調度器 */ public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException { this.resources = resources; if (resources.getJobStore() instanceof JobListener) { addInternalJobListener((JobListener)resources.getJobStore()); } //private QuartzSchedulerThread schedThread; this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); //經過線程池執行 Quartz調度器線程 schedThreadExecutor.execute(this.schedThread); //... }
QuartzSchedulerThread.java
/** * <p> * The main processing loop of the <code>QuartzSchedulerThread</code>. * Quartz調度器線程的主循環邏輯 * </p> */ @Override public void run() { //while循環執行,只要調度器爲被暫停 while(!halted.get()){ JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } if (qsRsrcs.getThreadPool().runInThread(shell) == false){} } }
JobRunShell.java
public void run() { //... Job job = jec.getJobInstance(); //... try { log.debug("Calling execute on job " + jobDetail.getKey()); //執行 job.execute(jec); endTime = System.currentTimeMillis(); } //... //更新Trigger觸發器狀態,刪除FIRED_TRIGGERS觸發記錄 instCode = trigger.executionComplete(jec, jobExEx); //... }
QuartzJobBean.java
/** * This implementation applies the passed-in job data map as bean property * values, and delegates to <code>executeInternal</code> afterwards. * 這個實現 把傳入的map數據做爲bean屬性值,而後委託給 executeInternal 方法 */ public final void execute(JobExecutionContext context) throws JobExecutionException { try { //執行 executeInternal(context); }
SyncJobFactory.java
//回到了咱們的業務類SyncJobFactory的executeInternal方法, //裏面執行咱們的業務代碼 protected void executeInternal(JobExecutionContext context) throws JobExecutionException { try { LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort()); } //... System.out.println("jobName:" + scheduleJob.getJobName() + " " + scheduleJob); //... }
2、E-R圖
梳理6張主要的Quartz表:
QRTZ_TRIGGERS 觸發器表
SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。 聯合主鍵,QRTZ_JOB_DETAILS表SCHED_NAME外鍵
JOB_NAME,任務名。自定義值。 聯合主鍵,QRTZ_JOB_DETAILS表JOB_NAME外鍵
JOB_GROUP,任務組。 自定義值。聯合主鍵,QRTZ_JOB_DETAILS表JOB_GROUP外鍵
TRIGGER_STATE,觸發器狀態: WAITING , ACQUIRED, BLOCKING
NEXT_FIRE_TIME, 下次觸發時間:
MISFIRE_INSTR,執行失敗後的指令,
非失敗策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
失敗策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0;
TRIGGER_TYPE, 觸發器類型,例如CRON,cron表達式類型的觸發器
PRIORITY,優先級
QRTZ_CRON_TRIGGERS cron類型觸發器表
SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。 聯合主鍵,QRTZ_TRIGGERS表SCHED_NAME外鍵
JOB_NAME,任務名。自定義值。 聯合主鍵,QRTZ_TRIGGERS表JOB_NAME外鍵
JOB_GROUP,任務組。 自定義值。聯合主鍵,QRTZ_TRIGGERS表JOB_GROUP外鍵
CRON_EXPRESSION, cron表達式, 例如每30秒執行一次, 0/30 * * * * ?
QRTZ_JOB_DETAILS 任務詳細表
SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。聯合主鍵
JOB_NAME,任務名。自定義值。 聯合主鍵
JOB_GROUP,任務組。 自定義值。聯合主鍵
JOB_DATA,blob類型,任務參數
QRTZ_FIRED_TRIGGERS 任務觸發表
SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。聯合主鍵
ENTRY_ID,entry id,聯合主鍵
JOB_NAME,任務名。自定義值。
JOB_GROUP,任務組。 自定義值。
FIRED_TIME, 任務觸發時間
STATE,狀態
INSTANCE_NAME, 服務器實例名
PRIORITY,優先級
QRTZ_SCHEDULER_STATE
SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。聯合主鍵
INSTANCE_NAME,服務器實例名。聯合主鍵
LAST_CHECKIN_TIME,上次檢查時間
CHECKIN_INTERVAL,檢查間隔
QRTZ_LOCKS 全局鎖
SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。聯合主鍵
LOCK_NAME,鎖名稱,例如,TRIGGER_ACCESS。聯合主鍵
3、循環調度邏輯
主要流程以下:
源碼以下:
QuartzSchedulerThread.java
public void run() { //... while (!halted.get()) { try { //合理休眠 //... //獲取接下來的觸發器 //1.狀態爲WAITING //2.觸發時間在30秒內 //3.不是錯過執行的或者錯過了可是時間不超過兩分鐘 triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //... //觸發任務 List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); //... JobRunShell shell = null; //... //執行代碼 if (qsRsrcs.getThreadPool().runInThread(shell) == false) { //... } // while (!halted) //.. }
JobRunShell.java
protected QuartzScheduler qs = null; public void run() { qs.addInternalSchedulerListener(this); try { //... do { Job job = jec.getJobInstance(); // execute the job try { //執行任務代碼 job.execute(jec); //更新觸發器,刪除觸發記錄 qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode); break; } while (true); } //... }
4、擴展
除了對主線程 QuartzSchedulerThread 的分析
繼續分析JobStoreSupport類的兩個線程 ClusterManager 和 MisfireHandler 的分析, 它們維護觸發器的MISFIRE_INSTR狀態,和調度器狀態QRTZ_SCHEDULER_STATE。