目錄:html
1、Quartz 基本介紹前端
1.1 Quartz 概述java
1.2 Quartz特色mysql
1.3 Quartz 集羣配置redis
2、Quartz 原理及流程spring
2.1 quartz基本原理sql
2.2 quartz啓動流程shell
3、Spring + Quartz 實現企業級調度的實現示例數據庫
3.1 環境信息express
3.2 相關代碼及配置
4、問題及解決方案
5、相關知識
6、參考資料
總結
Quartz 是 OpenSymphony 開源組織在任務調度領域的一個開源項目,徹底基於 Java 實現。該項目於 2009 年被 Terracotta 收購,目前是 Terracotta 旗下的一個項目。讀者能夠到 http://www.quartz-scheduler.org/站點下載 Quartz 的發佈版本及其源代碼。
做爲一個優秀的開源調度框架,Quartz 具備如下特色:
另外,做爲 Spring 默認的調度框架,Quartz 很容易與 Spring 集成實現靈活可配置的調度功能。
quartz調度核心元素:
quartz集羣是經過數據庫表來感知其餘的應用的,各個節點之間並無直接的通訊。只有使用持久的JobStore才能完成Quartz集羣。
數據庫表:之前有12張表,如今只有11張表,如今沒有存儲listener相關的表,多了QRTZ_SIMPROP_TRIGGERS表:
Table name | Description |
---|---|
QRTZ_CALENDARS | 存儲Quartz的Calendar信息 |
QRTZ_CRON_TRIGGERS | 存儲CronTrigger,包括Cron表達式和時區信息 |
QRTZ_FIRED_TRIGGERS | 存儲與已觸發的Trigger相關的狀態信息,以及相聯Job的執行信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 存儲已暫停的Trigger組的信息 |
QRTZ_SCHEDULER_STATE | 存儲少許的有關Scheduler的狀態信息,和別的Scheduler實例 |
QRTZ_LOCKS | 存儲程序的悲觀鎖的信息 |
QRTZ_JOB_DETAILS | 存儲每個已配置的Job的詳細信息 |
QRTZ_SIMPLE_TRIGGERS | 存儲簡單的Trigger,包括重複次數、間隔、以及已觸的次數 |
QRTZ_BLOG_TRIGGERS | Trigger做爲Blob類型存儲 |
QRTZ_TRIGGERS | 存儲已配置的Trigger的信息 |
QRTZ_SIMPROP_TRIGGERS |
QRTZ_LOCKS就是Quartz集羣實現同步機制的行鎖表,包括如下幾個鎖:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。
核心元素
Quartz 任務調度的核心元素是 scheduler, trigger 和 job,其中 trigger 和 job 是任務調度的元數據, scheduler 是實際執行調度的控制器。
在 Quartz 中,trigger 是用於定義調度時間的元素,即按照什麼時間規則去執行任務。Quartz 中主要提供了四種類型的 trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和 NthIncludedDayTrigger。這四種 trigger 能夠知足企業應用中的絕大部分需求。咱們將在企業應用一節中進一步討論四種 trigger 的功能。
在 Quartz 中,job 用於表示被調度的任務。主要有兩種類型的 job:無狀態的(stateless)和有狀態的(stateful)。對於同一個 trigger 來講,有狀態的 job 不能被並行執行,只有上一次觸發的任務被執行完以後,才能觸發下一次執行。Job 主要有兩種屬性:volatility 和 durability,其中 volatility 表示任務是否被持久化到數據庫存儲,而 durability 表示在沒有 trigger 關聯的時候任務是否被保留。二者都是在值爲 true 的時候任務被持久化或保留。一個 job 能夠被多個 trigger 關聯,可是一個 trigger 只能關聯一個 job。
在 Quartz 中, scheduler 由 scheduler 工廠建立:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二種工廠 StdSchedulerFactory 使用較多,由於 DirectSchedulerFactory 使用起來不夠方便,須要做許多詳細的手工編碼設置。 Scheduler 主要有三種:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。本文以最經常使用的 StdScheduler 爲例講解。這也是筆者在項目中所使用的 scheduler 類。
Quartz 核心元素之間的關係以下圖所示:
圖 1. Quartz 核心元素關係圖
線程視圖
在 Quartz 中,有兩類線程,Scheduler 調度線程和任務執行線程,其中任務執行線程一般使用一個線程池維護一組線程。
圖 2. Quartz 線程視圖
Scheduler 調度線程主要有兩個: 執行常規調度的線程,和執行 misfired trigger 的線程。常規調度線程輪詢存儲的全部 trigger,若是有須要觸發的 trigger,即到達了下一次觸發的時間,則從任務執行線程池獲取一個空閒線程,執行與該 trigger 關聯的任務。Misfire 線程是掃描全部的 trigger,查看是否有 misfired trigger,若是有的話根據 misfire 的策略分別處理。下圖描述了這兩個線程的基本流程:
圖 3. Quartz 調度線程流程圖
關於 misfired trigger,咱們在企業應用一節中將進一步描述。
數據存儲
Quartz 中的 trigger 和 job 須要存儲下來才能被使用。Quartz 中有兩種存儲方式:RAMJobStore, JobStoreSupport,其中 RAMJobStore 是將 trigger 和 job 存儲在內存中,而 JobStoreSupport 是基於 jdbc 將 trigger 和 job 存儲到數據庫中。RAMJobStore 的存取速度很是快,可是因爲其在系統被中止後全部的數據都會丟失,因此在一般應用中,都是使用 JobStoreSupport。
在 Quartz 中,JobStoreSupport 使用一個驅動代理來操做 trigger 和 job 的數據存儲:StdJDBCDelegate。StdJDBCDelegate 實現了大部分基於標準 JDBC 的功能接口,可是對於各類數據庫來講,須要根據其具體實現的特色作某些特殊處理,所以各類數據庫須要擴展 StdJDBCDelegate 以實現這些特殊處理。Quartz 已經自帶了一些數據庫的擴展實現,能夠直接使用,以下圖所示:
圖 4. Quartz 數據庫驅動代理
做爲嵌入式數據庫的表明,Derby 近來很是流行。若是使用 Derby 數據庫,可使用上圖中的 CloudscapeDelegate 做爲 trigger 和 job 數據存儲的代理類。
若quartz是配置在spring中,當服務器啓動時,就會裝載相關的bean。SchedulerFactoryBean實現了InitializingBean接口,所以在初始化bean的時候,會執行afterPropertiesSet方法,該方法將會調用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,一般用StdSchedulerFactory)建立Scheduler。SchedulerFactory在建立quartzScheduler的過程當中,將會讀取配置參數,初始化各個組件,關鍵組件以下:
ThreadPool:通常是使用SimpleThreadPool,SimpleThreadPool建立了必定數量的WorkerThread實例來使得Job可以在線程中進行處理。WorkerThread是定義在SimpleThreadPool類中的內部類,它實質上就是一個線程。在SimpleThreadPool中有三個list:workers-存放池中全部的線程引用,availWorkers-存放全部空閒的線程,busyWorkers-存放全部工做中的線程;
線程池的配置參數以下所示:
1 2 3 |
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount=3 org.quartz.threadPool.threadPriority=5 |
JobStore:分爲存儲在內存的RAMJobStore和存儲在數據庫的JobStoreSupport(包括JobStoreTX和JobStoreCMT兩種實現,JobStoreCMT是依賴於容器來進行事務的管理,而JobStoreTX是本身管理事務),若要使用集羣要使用JobStoreSupport的方式;
另外,SchedulerFactoryBean還實現了SmartLifeCycle接口,所以初始化完成後,會執行start()方法,該方法將主要會執行如下的幾個動做:
此示例中的環境: Spring 4.1.6.RELEASE + quartz 2.2.1 + Mysql 5.6
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for task_schedule_job
-- ----------------------------
DROP TABLE IF EXISTS `task_schedule_job`;
CREATE TABLE `task_schedule_job` (
`job_id` bigint(20) NOT NULL AUTO_INCREMENT,
`create_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`job_name` varchar(255) DEFAULT NULL,
`job_group` varchar(255) DEFAULT NULL,
`job_status` varchar(255) DEFAULT NULL,
`cron_expression` varchar(255) NOT NULL,
`description` varchar(255) DEFAULT NULL,
`bean_class` varchar(255) DEFAULT NULL,
`is_concurrent` varchar(255) DEFAULT NULL COMMENT '1',
`spring_id` varchar(255) DEFAULT NULL,
`method_name` varchar(255) NOT NULL
PRIMARY KEY (`job_id`),
UNIQUE KEY `name_group` (`job_name`,`job_group`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
在Quartz包下docs/dbTables,選擇對應的數據庫腳本,建立相應的數據庫表便可,我用的是mysql5.6,這裏有一個須要注意的地方,mysql5.5以前用的表存儲引擎是MyISAM,使用的是表級鎖,鎖發生衝突的機率比較高,併發度低;5.6以後默認的存儲引擎爲InnoDB,InnoDB採用的鎖機制是行級鎖,併發度也較高。而quartz集羣使用數據庫鎖的
機制來來實現同一個任務在同一個時刻只被實例執行,因此爲了防止衝突,咱們建表的時候要選取InnoDB做爲表的存
儲引擎。以下:
<1>spring-quartz.xml 配置 在application.xml 文件中引入
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <!-- 註冊本地調度任務 <bean id="localQuartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"></bean>--> <!-- 註冊集羣調度任務 --> <bean id="schedulerFactoryBean" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" destroy-method="destroy"> <!--可選,QuartzScheduler 啓動時更新己存在的Job,這樣就不用每次修改targetObject後刪除qrtz_job_details表對應記錄了 --> <property name="overwriteExistingJobs" value="true" /> <!--必須的,QuartzScheduler 延時啓動,應用啓動完後 QuartzScheduler 再啓動 --> <property name="startupDelay" value="3" /> <!-- 設置自動啓動 --> <property name="autoStartup" value="true" /> <property name="applicationContextSchedulerContextKey" value="applicationContext" /> <property name="configLocation" value="classpath:quartz.properties" /> </bean> </beans>
<2>quartz.properties 文件配置
#============================================================== #Configure Main Scheduler Properties #============================================================== org.quartz.scheduler.instanceName = KuanrfGSQuartzScheduler org.quartz.scheduler.instanceId = AUTO #============================================================== #Configure JobStore #============================================================== org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.jobStore.clusterCheckinInterval = 20000 org.quartz.jobStore.dataSource = myDS org.quartz.jobStore.maxMisfiresToHandleAtATime = 1 org.quartz.jobStore.misfireThreshold = 120000 org.quartz.jobStore.txIsolationLevelSerializable = false #============================================================== #Configure DataSource #============================================================== org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver org.quartz.dataSource.myDS.URL = 你的數據連接 org.quartz.dataSource.myDS.user = 用戶名 org.quartz.dataSource.myDS.password = 密碼 org.quartz.dataSource.myDS.maxConnections = 30 org.quartz.jobStore.selectWithLockSQL = SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE #============================================================== #Configure ThreadPool #============================================================== org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount= 10 org.quartz.threadPool.threadPriority= 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true #============================================================== #Skip Check Update #update:true #not update:false #============================================================== org.quartz.scheduler.skipUpdateCheck = true #============================================================================ # Configure Plugins #============================================================================ org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin org.quartz.plugin.shutdownhook.cleanShutdown = true
<3>關鍵代碼
package com.netease.ad.omp.service.sys; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Set; import javax.annotation.PostConstruct; import javax.annotation.Resource; import com.netease.ad.omp.common.utils.SpringUtils; import com.netease.ad.omp.dao.sys.mapper.ScheduleJobMapper; import com.netease.ad.omp.entity.sys.ScheduleJob; import com.netease.ad.omp.quartz.job.JobUtils; import com.netease.ad.omp.quartz.job.MyDetailQuartzJobBean; import com.netease.ad.omp.quartz.job.QuartzJobFactory; import com.netease.ad.omp.quartz.job.QuartzJobFactoryDisallowConcurrentExecution; import org.apache.log4j.Logger; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.quartz.impl.matchers.GroupMatcher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Service; /** * 計劃任務管理 */ @Service public class JobTaskService { public final Logger log = Logger.getLogger(this.getClass()); @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Autowired private ScheduleJobMapper scheduleJobMapper; /** * 從數據庫中取 區別於getAllJob * * @return */ public List<ScheduleJob> getAllTask() { return scheduleJobMapper.select(null); } /** * 添加到數據庫中 區別於addJob */ public void addTask(ScheduleJob job) { job.setCreateTime(new Date()); scheduleJobMapper.insertSelective(job); } /** * 從數據庫中查詢job */ public ScheduleJob getTaskById(Long jobId) { return scheduleJobMapper.selectByPrimaryKey(jobId); } /** * 更改任務狀態 * * @throws SchedulerException */ public void changeStatus(Long jobId, String cmd) throws SchedulerException { ScheduleJob job = getTaskById(jobId); if (job == null) { return; } if ("stop".equals(cmd)) { deleteJob(job); job.setJobStatus(JobUtils.STATUS_NOT_RUNNING); } else if ("start".equals(cmd)) { job.setJobStatus(JobUtils.STATUS_RUNNING); addJob(job); } scheduleJobMapper.updateByPrimaryKeySelective(job); } /** * 更改任務 cron表達式 * * @throws SchedulerException */ public void updateCron(Long jobId, String cron) throws SchedulerException { ScheduleJob job = getTaskById(jobId); if (job == null) { return; } job.setCronExpression(cron); if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) { updateJobCron(job); } scheduleJobMapper.updateByPrimaryKeySelective(job); } /** * 添加任務 * * @throws SchedulerException */ public void addJob(ScheduleJob job) throws SchedulerException { if (job == null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) { return; } Scheduler scheduler = schedulerFactoryBean.getScheduler(); log.debug(scheduler + ".......................................................................................add"); TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); // 不存在,建立一個 if (null == trigger) { Class clazz = JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class; JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build(); jobDetail.getJobDataMap().put("scheduleJob", job); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail, trigger); } else { // Trigger已存在,那麼更新相應的定時設置 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); // 按新的cronExpression表達式從新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); // 按新的trigger從新設置job執行 scheduler.rescheduleJob(triggerKey, trigger); } } @PostConstruct public void init() throws Exception { // 這裏獲取任務信息數據 List<ScheduleJob> jobList = scheduleJobMapper.select(null); for (ScheduleJob job : jobList) { addJob(job); } } /** * 獲取全部計劃中的任務列表 * * @return * @throws SchedulerException */ public List<ScheduleJob> getAllJob() throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup(); Set<JobKey> jobKeys = scheduler.getJobKeys(matcher); List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(); for (JobKey jobKey : jobKeys) { List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) { ScheduleJob job = new ScheduleJob(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDescription("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } } return jobList; } /** * 全部正在運行的job * * @return * @throws SchedulerException */ public List<ScheduleJob> getRunningJob() throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs(); List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size()); for (JobExecutionContext executingJob : executingJobs) { ScheduleJob job = new ScheduleJob(); JobDetail jobDetail = executingJob.getJobDetail(); JobKey jobKey = jobDetail.getKey(); Trigger trigger = executingJob.getTrigger(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDescription("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } return jobList; } /** * 暫停一個job * * @param scheduleJob * @throws SchedulerException */ public void pauseJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.pauseJob(jobKey); } /** * 恢復一個job * * @param scheduleJob * @throws SchedulerException */ public void resumeJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.resumeJob(jobKey); } /** * 刪除一個job * * @param scheduleJob * @throws SchedulerException */ public void deleteJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.deleteJob(jobKey); } /** * 當即執行job * * @param scheduleJob * @throws SchedulerException */ public void runAJobNow(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.triggerJob(jobKey); } /** * 更新job時間表達式 * * @param scheduleJob * @throws SchedulerException */ public void updateJobCron(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression()); trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); scheduler.rescheduleJob(triggerKey, trigger); } public static void main(String[] args) { CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("xxxxx"); } }
package com.netease.ad.omp.quartz.job; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import com.netease.ad.omp.common.utils.SpringUtils; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.quartz.JobExecutionContext; import org.springframework.context.ApplicationContext; /** * Created with IntelliJ IDEA * ProjectName: omp * Author: bjsonghongxu * CreateTime : 15:58 * Email: bjsonghongxu@crop.netease.com * Class Description: * 定時任務工具類 */ public class JobUtils { public final static Logger log = Logger.getLogger(JobUtils.class); public static final String STATUS_RUNNING = "1"; //啓動狀態 public static final String STATUS_NOT_RUNNING = "0"; //未啓動狀態 public static final String CONCURRENT_IS = "1"; public static final String CONCURRENT_NOT = "0"; private ApplicationContext ctx; /** * 經過反射調用scheduleJob中定義的方法 * * @param scheduleJob */ public static void invokMethod(ScheduleJob scheduleJob,JobExecutionContext context) { Object object = null; Class clazz = null; if (StringUtils.isNotBlank(scheduleJob.getSpringId())) { object = SpringUtils.getBean(scheduleJob.getSpringId()); } else if (StringUtils.isNotBlank(scheduleJob.getBeanClass())) { try { clazz = Class.forName(scheduleJob.getBeanClass()); object = clazz.newInstance(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (object == null) { log.error("任務名稱 = [" + scheduleJob.getJobName() + "]---------------未啓動成功,請檢查是否配置正確!!!"); return; } clazz = object.getClass(); Method method = null; try { method = clazz.getMethod(scheduleJob.getMethodName(), new Class[] {JobExecutionContext.class}); } catch (NoSuchMethodException e) { log.error("任務名稱 = [" + scheduleJob.getJobName() + "]---------------未啓動成功,方法名設置錯誤!!!"); } catch (SecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (method != null) { try { method.invoke(object, new Object[] {context}); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } } log.info("任務名稱 = [" + scheduleJob.getJobName() + "]----------啓動成功"); } }
package com.netease.ad.omp.quartz.job; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.log4j.Logger; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * * @Description: 計劃任務執行處 無狀態 * Spring調度任務 (重寫 quartz 的 QuartzJobBean 類緣由是在使用 quartz+spring 把 quartz 的 task 實例化進入數據庫時,會產生: serializable 的錯誤) */ public class QuartzJobFactory implements Job { public final Logger log = Logger.getLogger(this.getClass()); @Override public void execute(JobExecutionContext context) throws JobExecutionException { ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob"); JobUtils.invokMethod(scheduleJob,context); } }
package com.netease.ad.omp.quartz.job; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * * @Description: 若一個方法一次執行不完下次輪轉時則等待該方法執行完後才執行下一次操做 * Spring調度任務 (重寫 quartz 的 QuartzJobBean 類緣由是在使用 quartz+spring 把 quartz 的 task 實例化進入數據庫時,會產生: serializable 的錯誤) */ @DisallowConcurrentExecution public class QuartzJobFactoryDisallowConcurrentExecution implements Job { public final Logger log = Logger.getLogger(this.getClass()); @Override public void execute(JobExecutionContext context) throws JobExecutionException { ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob"); JobUtils.invokMethod(scheduleJob,context); } }
package com.netease.ad.omp.entity.sys; import javax.persistence.Id; import javax.persistence.Table; import java.io.Serializable; import java.util.Date; /** * Created with IntelliJ IDEA * ProjectName: omp * Author: bjsonghongxu * CreateTime : 15:48 * Email: bjsonghongxu@crop.netease.com * Class Description: * 計劃任務信息 */ @Table(name = "task_schedule_job") public class ScheduleJob implements Serializable { @Id private Long jobId; private Date createTime; private Date updateTime; /** * 任務名稱 */ private String jobName; /** * 任務分組 */ private String jobGroup; /** * 任務狀態 是否啓動任務 */ private String jobStatus; /** * cron表達式 */ private String cronExpression; /** * 描述 */ private String description; /** * 任務執行時調用哪一個類的方法 包名+類名 */ private String beanClass; /** * 任務是否有狀態 */ private String isConcurrent; /** * spring bean */ private String springId; /** * 任務調用的方法名 */ private String methodName; public Long getJobId() { return jobId; } public void setJobId(Long jobId) { this.jobId = jobId; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getJobName() { return jobName; } public void setJobName(String jobName) { this.jobName = jobName; } public String getJobGroup() { return jobGroup; } public void setJobGroup(String jobGroup) { this.jobGroup = jobGroup; } public String getJobStatus() { return jobStatus; } public void setJobStatus(String jobStatus) { this.jobStatus = jobStatus; } public String getCronExpression() { return cronExpression; } public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getBeanClass() { return beanClass; } public void setBeanClass(String beanClass) { this.beanClass = beanClass; } public String getIsConcurrent() { return isConcurrent; } public void setIsConcurrent(String isConcurrent) { this.isConcurrent = isConcurrent; } public String getSpringId() { return springId; } public void setSpringId(String springId) { this.springId = springId; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } }
package com.netease.ad.omp.common.utils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; public final class SpringUtils implements BeanFactoryPostProcessor { private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環境 @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } /** * 獲取對象 * * @param name * @return Object 一個以所給名字註冊的bean的實例 * @throws BeansException * */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 獲取類型爲requiredType的對象 * * @param clz * @return * @throws BeansException * */ public static <T> T getBean(Class<T> clz) throws BeansException { @SuppressWarnings("unchecked") T result = (T) beanFactory.getBean(clz); return result; } /** * 若是BeanFactory包含一個與所給名稱匹配的bean定義,則返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判斷以給定名字註冊的bean定義是一個singleton仍是一個prototype。 * 若是與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws NoSuchBeanDefinitionException * */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 註冊對象的類型 * @throws NoSuchBeanDefinitionException * */ public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 若是給定的bean名字在bean定義中有別名,則返回這些別名 * * @param name * @return * @throws NoSuchBeanDefinitionException * */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } }
至於前端本身畫個簡單的界面便可使用了。
quartz文檔提到,若是在集羣環境下,最好將配置項org.quartz.jobStore.txIsolationLevelSerializable設置爲true
問題:
這個選項在mysql下會很是容易出現死鎖問題。
2014-12-29 09:55:28.006 [QuartzScheduler_clusterQuartzSchedular-BJ-YQ-64.2491419487774923_ClusterManager] ERROR o.q.impl.jdbcjobstore.JobStoreTX [U][] - ClusterManager: Error managing cluster: Failure updating scheduler state when checking-in: Deadlock found when trying to get lock; try restarting transaction
這個選項存在乎義:
quartz須要提高隔離級別來保障本身的運做,不過,因爲各數據庫實現的隔離級別定義都不同,因此quartz提供一個設置序列化這樣的隔離級別存在,由於例如oracle中是沒有未提交讀和可重複讀這樣的隔離級別存在。可是因爲mysql默認的是可重複讀,比提交讀高了一個級別,因此已經能夠知足quartz集羣的正常運行。
線程的主要邏輯代碼以下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
while (!halted.get()) { int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow()); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } List<TriggerFiredResult> bndle = qsRsrcs.getJobStore().triggersFired(triggers); for(int i = 0;i < res.size();i++){ JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); qsRsrcs.getThreadPool().runInThread(shell); } } |
線程執行流程以下圖所示:
QuartzSchedulerThread時序圖
任務調度執行過程當中,trigger的狀態變化以下圖所示:
該圖來自參考文獻5
下面這些緣由可能形成 misfired job:
執行流程:
misfireHandler線程執行流程以下圖所示:
misfireHandler線程時序圖
初始化:
failedInstance=failed+self+firedTrigger表中的schedulerName在scheduler_state表中找不到的(孤兒)
線程執行:
每一個服務器會定時(org.quartz.jobStore.clusterCheckinInterval這個時間)更新SCHEDULER_STATE表的LAST_CHECKIN_TIME,若這個字段遠遠超出了該更新的時間,則認爲該服務器實例掛了;
注意:每一個服務器實例有惟一的id,若配置爲AUTO,則爲hostname+current_time
線程執行的具體流程:
目前代碼中行鎖只用到了STATE_ACCESS 和TRIGGER_ACCESS 這兩種。
一、TRIGGER_ACCESS
先了解一篇文章,經過源碼來分析quartz是如何經過加鎖來實現集羣環境,觸發器狀態的一致性。
http://www.360doc.com/content/14/0926/08/15077656_412418636.shtml
能夠看到觸發器的操做主要用主線程StdScheduleThread來完成,無論是獲取須要觸發的30S內的觸發器,仍是觸發過程。select和update觸發器表時
都會先加鎖,後解鎖。若是數據庫資源競爭比較大的話,鎖會影響整個性能。能夠考慮將任務信息放在分佈式內存,如redis上進行處理。數據庫只是定時從redis上load數據下來作統計。
實現都在JobStoreSupport類
加鎖類型 | 加鎖方法 | 底層數據庫操做 | 備註 |
executeInNonManagedTXLock | acquireNextTrigger | selectTriggerToAcquire selectTrigger selectJobDetail insertFiredTrigger |
查詢須要點火的trigger 選擇須要執行的trigger加入到fired_trigger表 |
for執行 triggerFired | selectJobDetail selectCalendar updateFiredTrigger triggerExists updateTrigger |
點火trigger 修改trigger狀態爲可執行狀態。 |
|
recoverJobs | updateTriggerStatesFromOtherStates hasMisfiredTriggersInState doUpdateOfMisfiredTrigger selectTriggersForRecoveringJobs selectTriggersInState deleteFiredTriggers |
非集羣環境下從新執行 failed與misfired的trigger |
|
retryExecuteInNonManagedTXLock | releaseAcquiredTrigger | updateTriggerStateFromOtherState deleteFiredTrigger |
異常狀況下從新釋放trigger到初使狀態。 |
triggeredJobComplete | selectTriggerStatus removeTrigger updateTriggerState deleteFiredTrigger |
觸發JOB任務完成後的處理。 | |
obtainLock | recoverMisfiredJobs | hasMisfiredTriggersInState doUpdateOfMisfiredTrigger | 從新執行misfired的trigger 能夠在啓動時執行,也能夠由misfired線程按期執行。 |
clusterRecover | selectInstancesFiredTriggerRecords updateTriggerStatesForJobFromOtherState storeTrigger deleteFiredTriggers selectFiredTriggerRecords removeTrigger deleteSchedulerState |
集羣有結點faied,讓JOB能從新執行。 | |
executeInLock 數據庫集羣裏等同於 executeInNonManagedTXLock |
storeJobAndTrigger | updateJobDetail insertJobDetail triggerExists selectJobDetail updateTrigger insertTrigger |
保存JOB和TRIGGER配置 |
storeJob | 保存JOB | ||
removeJob | 刪除JOB | ||
removeJobs | 批量刪除JOB | ||
removeTriggers | 批量刪除triggers | ||
storeJobsAndTriggers | 保存JOB和多個trigger配置 | ||
removeTrigger | 刪除trigger | ||
replaceTrigger | 替換trigger | ||
storeCalendar | 保存定時日期 | ||
removeCalendar | 刪除定時日期 | ||
clearAllSchedulingData | 清除全部定時數據 | ||
pauseTrigger | 中止觸發器 | ||
pauseJob | 中止任務 | ||
pauseJobs | 批量中止任務 | ||
resumeTrigger | 恢復觸發器 | ||
resumeJob | 恢復任務 | ||
resumeJobs | 批量恢復任務 | ||
pauseTriggers | 批量中止觸發器 | ||
resumeTriggers | 批量恢復觸發器 | ||
pauseAll | 中止全部 | ||
resumeAll | 恢復全部 |
---
二、STATE_TRIGGER
實現都在JobStoreSupport類
加鎖類型 | 加鎖方法 | 底層數據庫操做 | 備註 |
obtainLock | doCheckin | clusterCheckIn | 判斷集羣狀態 先用LOCK_STATE_ACCESS鎖集羣狀態 再用LOCK_TRIGGER_ACCESS恢復集羣運行 |
---
經過這段時間對quartz資料的整理和結合工做中的運用,深刻理解了quartz這一優秀的調度框架。在技術這條路上,作技術千萬不要淺嘗輒止,必定要深刻的去理解所用的東西,纔會使本身的能力提高,此外,一些系統的知識分析對本身和他人都是十分有益的。