Quartz中核心的元素有scheduler, trigger和job, 其中 trigger 和 job 是任務調度的元數據, scheduler 是實際執行調度的控制器; mysql
本文將首先介紹Job,Trigger相關類及持久化的數據庫表, 再經過源碼分析建立任務時job和trigger的持久化相關內容。sql
Job:是一個接口,只有一個方法void execute(JobExecutionContext context),開發者實現該接口定義運行任務,JobExecutionContext類提供了調度上下文的各類信息。Job運行時的信息保存在JobDataMap實例中。
數據庫
JobDetail:用來描述Job實現類及其它相關的靜態信息,如Job名字、是否禁止併發(IS_NONCONCURRENT)、是否持久化(IS_DURABLE),是否失敗轉移(REQUESTS_RECOVERY)等信息。bash
Job持久化只涉及到一張表:qrtz_job_details 比較簡單併發
CronTrigger,經過Cron表達式,結合Calendar能夠支持各類複雜的調度策略;源碼分析
SimpleTrigger,指定從某一個時間開始,以必定的時間間隔(單位是毫秒)執行必定次數任務,能夠方便的支持到秒、分、時。ui
CalendarIntervalTrigger,指定從某一個時間開始,以必定的時間間隔執行必定次數任務,支持的間隔單位更全,有秒,分鐘,小時,天,月,年,星期this
DailyTimeIntervalTrigger,指定天天的某個時間段內,以必定的時間間隔執行任務。而且它能夠支持指定星期。它適合的任務相似於:指定天天9:00 至 18:00 ,每隔30秒執行一次spa
先介紹5張表,4張表是用於存儲trigger具體實例信息,1張表是存儲trigger的通用信息,(假設表名前綴是qrtz_) 設計
1. qrtz_cron_triggers,存儲CronTrigger基本信息
2. qrtz_simple_triggers,存儲SimpleTrigger基本信息
3. qrtz_simprop_triggers,存儲DailyTimeIntervalTrigger、CalendarIntervalTrigger基本信息
4. qrtz_blob_triggers, 除了上面介紹的四種Trigger,自定義Trigger 默認會使用此表來存儲自定義Trigger的信息
5. qrtz_triggers,存儲通用trigger信息;包括上次觸發時間、下次觸發時間,狀態等; 全部類型的Trigger在持久化時都會保存一份通用數據到此表;獲取任務、執行任務等都是經過此表來操做的
再介紹另1張表名字中也包含trigger的表,用途與上述5張表徹底不同
6. fired_triggers, 存儲全部類型Trigger即將觸發或正在觸發時所產生的運行時信息
先看一個簡單的Quartz實例, 例子中定義了相應的Job類,包裝Job的JobDetail, 調度策略的Trigger, 例子中使用的是CronTrigger, 而後由scheduler進行調度
public static void main(String[] args) throws Throwable {
SchedulerFactory factory = new StdSchedulerFactory();
// 從工廠裏面拿到一個scheduler實例
Scheduler scheduler = factory.getScheduler();
// 真正執行的任務並非Job接口的實例,而是用反射的方式實例化的一個JobDetail實例
JobDetail job = JobBuilder.newJob(MyJob.class).withIdentity("job1", "group1").build();
// 定義一個觸發器,startAt方法定義了任務應當開始的時間
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?").withMisfireHandlingInstructionDoNothing();
Trigger trigger = newTrigger().withIdentity("trigger1", "group1").withSchedule(cronScheduleBuilder).build();
// 將任務和Trigger放入scheduler
scheduler.scheduleJob(job, trigger);
scheduler.start();
}複製代碼
本文要講的內容是Quartz是持久化配置時的Trigger是如何持久化的,即項目的 resources/quartz.properties 有相似以下定義:
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
# for cluster
org.quartz.jobStore.tablePrefix: qrtz_
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.dataSource:qzDS
#============================================================================
# Configure Datasources
#============================================================================
#JDBC驅動
org.quartz.dataSource.qzDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL:jdbc:mysql://localhost:3306/quartz
org.quartz.dataSource.qzDS.user:root
org.quartz.dataSource.qzDS.password:
org.quartz.dataSource.qzDS.maxConnection:10
複製代碼
調用Scheduler#scheduleJob()時,Quartz都會將JobDetail和Trigger的信息保存到數據庫中,咱們來一步一步分析一下trigger的持久化部分的源碼實現
scheduler#scheduleJob(job, trigger); Quartz的scheduler最終都委託給了QuartzScheduler, 代碼以下
// 先看 Scheduler 中的代碼
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
throws SchedulerException {
// Quartz的scheduler最終都委託給了QuartzScheduler
return sched.scheduleJob(jobDetail, trigger);
}
// 再看 QuartzScheduler 中的代碼
public Date scheduleJob(JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
validateState();
// 此處省略了一部分代碼。。。。
// name, group, jobName, jobGroup的非空檢查 等等
trig.validate();
Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}
// 計算出trig的下一次觸發時間nextFireTime, 由於是新增的因此也能夠說是第一次的觸發時間
Date ft = trig.computeFirstFireTime(cal);
if (ft == null) {
throw new SchedulerException(
"Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
}
// 開始持久化,即調用JobStoreSupport#storeJobAndTrigger, 下個段落將展開分析
// 抽象類JobStoreSupport實現了Quartz任務持久化接口JobStore,實現了持久化的主要邏輯
// JobStoreSupport的實現子類主要有JobStoreTX, JobStoreCMT
// 這些子類基本上都是對得到Connection進行了封裝,而沒有去覆蓋實現持久化任務的邏輯
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
// 發送事件通知,能夠自定義SchedulerListener來接收處理jobAdded
notifySchedulerListenersJobAdded(jobDetail);
// 向QuartzSchedulerThread發送通知,更新signaled爲true,通知 QuartzSchedulerThread從新獲取待執行的任務列表
notifySchedulerThread(trigger.getNextFireTime().getTime());
// 發送事件通知,能夠自定義SchedulerListener來接收處理jobScheduled
notifySchedulerListenersSchduled(trigger);
return ft;
}複製代碼
// 本方法開始對Job和Trigger進行持久化
public void storeJobAndTrigger(final JobDetail newJob,
final OperableTrigger newTrigger)
throws JobPersistenceException {
// 加鎖處理 (默認配置)
executeInLock(
(isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
// 保存Job, 比較簡單,本文不具體展開了
storeJob(conn, newJob, false);
// 保存Trigger, 調用加鎖後事務回調的具體方法
storeTrigger(conn, newTrigger, newJob, false,
Constants.STATE_WAITING, false, false);
}
});
}
// 被回調的具體幹活的方法
protected void storeTrigger(Connection conn,
OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
boolean forceState, boolean recovering)
throws JobPersistenceException {
// storeTrigger方法會被多個地方調用,但在保存Trigger的這個場景下existingTrigger確定是false
boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
if ((existingTrigger) && (!replaceExisting)) {
throw new ObjectAlreadyExistsException(newTrigger);
}
try {
boolean shouldBepaused;
// 此處省略了一部分代碼。。。。
if (job.isConcurrentExectionDisallowed() && !recovering) {
state = checkBlockedState(conn, job.getKey(), state);
}
if (existingTrigger) {
getDelegate().updateTrigger(conn, newTrigger, state, job);
} else {
// getDelegate()得到所委託的具體實現類即StdJDBCDelegate,見下個段落的分析
getDelegate().insertTrigger(conn, newTrigger, state, job);
}
} catch (Exception e) {
throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '"
+ newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
}
}
複製代碼
// JobStoreSupport#getDelegate 得到任務持久化的委託執行類, 默認返回 StdJDBCDelegate
protected DriverDelegate getDelegate() throws NoSuchDelegateException {
synchronized(this) {
// 第一次訪問getDelegate方法時delegate確定爲null
if(null == delegate) {
try {
if(delegateClassName != null) {
// quartz.properties文件指定了delegateClassName則加載相應的類,默認爲null
delegateClass = getClassLoadHelper().loadClass(delegateClassName, DriverDelegate.class);
}
// JobStoreSupport定義了默認的delegateClass
// protected Class<? extends DriverDelegate> delegateClass = StdJDBCDelegate.class;
// 加載DriverDelegate具體實例
delegate = delegateClass.newInstance();
// StdJDBCDelegate 初始化
delegate.initialize(getLog(), tablePrefix, instanceName, instanceId, getClassLoadHelper(), canUseProperties(), getDriverDelegateInitString());
} catch (InstantiationException e) {
// 各類catch, 代碼省略...
}
}
return delegate;複製代碼
// StdJDBCDelegate#initialize
public void initialize(Logger logger, String tablePrefix, String schedName, String instanceId, ClassLoadHelper classLoadHelper, boolean useProperties, String initString) throws NoSuchDelegateException {
this.logger = logger;
this.tablePrefix = tablePrefix;
this.schedName = schedName;
this.instanceId = instanceId;
this.useProperties = useProperties;
this.classLoadHelper = classLoadHelper;
// 添加默認的持久化委託類, 此方法很重要,見下文分析
addDefaultTriggerPersistenceDelegates();
if(initString == null)
return;
// 默認此時已返回, 若是有自定義的持久化委託,則繼續解析initString, 添加到具體的委託類列表中
String[] settings = initString.split("\\|");
for(String setting: settings) {
String[] parts = setting.split("=");
String name = parts[0];
if(parts.length == 1 || parts[1] == null || parts[1].equals(""))
continue;
if(name.equals("triggerPersistenceDelegateClasses")) {
String[] trigDelegates = parts[1].split(",");
for(String trigDelClassName: trigDelegates) {
try {
Class<?> trigDelClass = classLoadHelper.loadClass(trigDelClassName);
addTriggerPersistenceDelegate((TriggerPersistenceDelegate) trigDelClass.newInstance());
} catch (Exception e) {
throw new NoSuchDelegateException("Error instantiating TriggerPersistenceDelegate of type: " + trigDelClassName, e);
}
}
}
else
throw new NoSuchDelegateException("Unknown setting: '" + name + "'");
}
}
// 添加默認的4種Trigger的持久化委託類,全部的委託類都要實現接口方法 canHandleTriggerType
// canHandleTriggerType 方法是肯定Trigger由哪個委託來執行的關鍵
protected void addDefaultTriggerPersistenceDelegates() {
addTriggerPersistenceDelegate(new SimpleTriggerPersistenceDelegate());
addTriggerPersistenceDelegate(new CronTriggerPersistenceDelegate());
addTriggerPersistenceDelegate(new CalendarIntervalTriggerPersistenceDelegate());
addTriggerPersistenceDelegate(new DailyTimeIntervalTriggerPersistenceDelegate());
}
複製代碼
public int insertTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
ByteArrayOutputStream baos = null;
if(trigger.getJobDataMap().size() > 0) {
baos = serializeJobData(trigger.getJobDataMap());
}
PreparedStatement ps = null;
int insertResult = 0;
try {
// 建立statement保存qrtz_TRIGGERS
ps = conn.prepareStatement(rtp(INSERT_TRIGGER));
ps.setString(1, trigger.getKey().getName());
ps.setString(2, trigger.getKey().getGroup());
ps.setString(3, trigger.getJobKey().getName());
ps.setString(4, trigger.getJobKey().getGroup());
ps.setString(5, trigger.getDescription());
if(trigger.getNextFireTime() != null)
ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger
.getNextFireTime().getTime())));
else
ps.setBigDecimal(6, null);
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(7, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(8, state);
// 這一句是重點查找當前Trigger的持久化委託類, 委託類中的委託,設計得有點複雜, 在下面段落會有分析
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(9, type);
// 此處省略了一部分代碼。。。。
// 執行保存操做,qrtz_TRIGGERS
insertResult = ps.executeUpdate();
if(tDel == null)
// 不是那4種默認Trigger類則執行
insertBlobTrigger(conn, trigger);
else
// 4種Trigger都有各自的實現類
// CronTriggerPersistenceDelegate 操做的表是qrtz_CRON_TRIGGERS
// SimpleTriggerPersistenceDelegate 操做的表是qrtz_SIMPLE_TRIGGERS
// DailyTimeIntervalTriggerPersistenceDelegate和CalendarIntervalTriggerPersistenceDelegate
// 最後兩個實現類都是繼承了SimplePropertiesTriggerPersistenceDelegateSupport
// 最後兩個實現類操做的表都是qrtz_SIMPROP_TRIGGERS
tDel.insertExtendedTriggerProperties(conn, trigger, state, jobDetail);
} finally {
closeStatement(ps);
}
return insertResult;
}
複製代碼
擴展:
Quartz的頂層設計各功能模塊均可以擴展自定義的,拿本文來講能夠擴展實現本身的Trigger, 也能夠擴展Trigger的存儲方式;
能夠自定義一個Trigger,繼承上面的4種Trigger, 也能夠自定義該Trigger相應的存儲TriggerPersistenceDelegate;
自定義的Trigger存儲器怎麼才能生效呢?
固然是修改quartz.properties文件,增長一行配置,例:
org.quartz.jobStore.driverDelegateInitString: triggerPersistenceDelegateClasses=com.shengu.quartz.trigger.MyPersistenceDelegateClasses複製代碼
也能夠支持定義多個Trigger存儲器,經過上面的源碼能夠看出來,多個的定義是經過 | 來分隔的;
quartz的各類擴展通常都是經過在quartz.properties中增長配置來實現的
小結:
建立任務時,Job操做一張表 qrtz_job_detail;Trigger會操做兩張表
1.保存Trigger通用信息到qrtz_triggers
2.保存Trigger具體實例信息到下面對應的一個表中
2.1 系統默認的4種Trigger分別對應 qrtz_cron_triggers,qrtz_simple_triggers, qrtz_simprop_triggers,qrtz_simprop_triggers
2.2 自定義Trigger默認對應保存到qrtz_blob_triggers
Trigger存儲是支持擴展的,能夠自定義Trigger存儲在哪張表中