精通Quartz-源碼分析-建立任務

Quartz中核心的元素有scheduler, trigger和job, 其中 trigger 和 job 是任務調度的元數據, scheduler 是實際執行調度的控制器; mysql

本文將首先介紹Job,Trigger相關類及持久化的數據庫表, 再經過源碼分析建立任務時job和trigger的持久化相關內容。sql

1.Job介紹

Job:是一個接口,只有一個方法void execute(JobExecutionContext context),開發者實現該接口定義運行任務,JobExecutionContext類提供了調度上下文的各類信息。Job運行時的信息保存在JobDataMap實例中。
數據庫

JobDetail:用來描述Job實現類及其它相關的靜態信息,如Job名字、是否禁止併發(IS_NONCONCURRENT)、是否持久化(IS_DURABLE),是否失敗轉移(REQUESTS_RECOVERY)等信息。bash

Job持久化只涉及到一張表:qrtz_job_details 比較簡單併發

2.Trigger介紹

4種類型Trigger

CronTrigger,經過Cron表達式,結合Calendar能夠支持各類複雜的調度策略;源碼分析

SimpleTrigger,指定從某一個時間開始,以必定的時間間隔(單位是毫秒)執行必定次數任務,能夠方便的支持到秒、分、時。ui

CalendarIntervalTrigger,指定從某一個時間開始,以必定的時間間隔執行必定次數任務,支持的間隔單位更全,有秒,分鐘,小時,天,月,年,星期this

DailyTimeIntervalTrigger,指定天天的某個時間段內,以必定的時間間隔執行任務。而且它能夠支持指定星期。它適合的任務相似於:指定天天9:00 至 18:00 ,每隔30秒執行一次spa

6張Trigger相關表

先介紹5張表,4張表是用於存儲trigger具體實例信息,1張表是存儲trigger的通用信息,(假設表名前綴是qrtz_) 設計

1. qrtz_cron_triggers,存儲CronTrigger基本信息

2. qrtz_simple_triggers,存儲SimpleTrigger基本信息

3. qrtz_simprop_triggers,存儲DailyTimeIntervalTrigger、CalendarIntervalTrigger基本信息

4. qrtz_blob_triggers, 除了上面介紹的四種Trigger,自定義Trigger 默認會使用此表來存儲自定義Trigger的信息

5. qrtz_triggers,存儲通用trigger信息;包括上次觸發時間、下次觸發時間,狀態等; 全部類型的Trigger在持久化時都會保存一份通用數據到此表;獲取任務、執行任務等都是經過此表來操做的


再介紹另1張表名字中也包含trigger的表,用途與上述5張表徹底不同

6. fired_triggers, 存儲全部類型Trigger即將觸發或正在觸發時所產生的運行時信息

3.Trigger持久化源碼分析

Quartz簡單實例

先看一個簡單的Quartz實例, 例子中定義了相應的Job類,包裝Job的JobDetail, 調度策略的Trigger, 例子中使用的是CronTrigger, 而後由scheduler進行調度

public static void main(String[] args) throws Throwable {
    SchedulerFactory factory = new StdSchedulerFactory();
    // 從工廠裏面拿到一個scheduler實例
    Scheduler scheduler = factory.getScheduler();
    // 真正執行的任務並非Job接口的實例,而是用反射的方式實例化的一個JobDetail實例
    JobDetail job = JobBuilder.newJob(MyJob.class).withIdentity("job1", "group1").build();
    // 定義一個觸發器,startAt方法定義了任務應當開始的時間
    CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?").withMisfireHandlingInstructionDoNothing();
    Trigger trigger = newTrigger().withIdentity("trigger1", "group1").withSchedule(cronScheduleBuilder).build();
    // 將任務和Trigger放入scheduler
    scheduler.scheduleJob(job, trigger);
    scheduler.start();
}複製代碼

本文要講的內容是Quartz是持久化配置時的Trigger是如何持久化的,即項目的 resources/quartz.properties 有相似以下定義:

org.quartz.threadPool.threadCount: 10 
org.quartz.threadPool.threadPriority: 5  
# for cluster
org.quartz.jobStore.tablePrefix: qrtz_
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.dataSource:qzDS
   
#============================================================================ 
# Configure Datasources 
#============================================================================ 
#JDBC驅動 
org.quartz.dataSource.qzDS.driver:com.mysql.jdbc.Driver  
org.quartz.dataSource.qzDS.URL:jdbc:mysql://localhost:3306/quartz  
org.quartz.dataSource.qzDS.user:root  
org.quartz.dataSource.qzDS.password:
org.quartz.dataSource.qzDS.maxConnection:10 

複製代碼

QuartzScheduler#scheduleJob

調用Scheduler#scheduleJob()時,Quartz都會將JobDetail和Trigger的信息保存到數據庫中,咱們來一步一步分析一下trigger的持久化部分的源碼實現

scheduler#scheduleJob(job, trigger); Quartz的scheduler最終都委託給了QuartzScheduler, 代碼以下

// 先看 Scheduler 中的代碼
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
    throws SchedulerException {
    //  Quartz的scheduler最終都委託給了QuartzScheduler
    return sched.scheduleJob(jobDetail, trigger);
}

// 再看 QuartzScheduler 中的代碼
public Date scheduleJob(JobDetail jobDetail,
        Trigger trigger) throws SchedulerException {
    validateState();

    // 此處省略了一部分代碼。。。。

    // name, group, jobName, jobGroup的非空檢查 等等
    trig.validate();

    Calendar cal = null;
    if (trigger.getCalendarName() != null) {
        cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
    }
    // 計算出trig的下一次觸發時間nextFireTime, 由於是新增的因此也能夠說是第一次的觸發時間
    Date ft = trig.computeFirstFireTime(cal);

    if (ft == null) {
        throw new SchedulerException(
                "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
    }

    // 開始持久化,即調用JobStoreSupport#storeJobAndTrigger, 下個段落將展開分析
    // 抽象類JobStoreSupport實現了Quartz任務持久化接口JobStore,實現了持久化的主要邏輯
    // JobStoreSupport的實現子類主要有JobStoreTX, JobStoreCMT
    // 這些子類基本上都是對得到Connection進行了封裝,而沒有去覆蓋實現持久化任務的邏輯
    resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
   
    // 發送事件通知,能夠自定義SchedulerListener來接收處理jobAdded
    notifySchedulerListenersJobAdded(jobDetail);

    // 向QuartzSchedulerThread發送通知,更新signaled爲true,通知 QuartzSchedulerThread從新獲取待執行的任務列表
    notifySchedulerThread(trigger.getNextFireTime().getTime());

    // 發送事件通知,能夠自定義SchedulerListener來接收處理jobScheduled
    notifySchedulerListenersSchduled(trigger);

    return ft;
}複製代碼

JobStoreSupport#storeJobAndTrigger

// 本方法開始對Job和Trigger進行持久化
public void storeJobAndTrigger(final JobDetail newJob,
        final OperableTrigger newTrigger) 
    throws JobPersistenceException {
    // 加鎖處理 (默認配置)
    executeInLock(
        (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
        new VoidTransactionCallback() {
            public void executeVoid(Connection conn) throws JobPersistenceException {
                // 保存Job, 比較簡單,本文不具體展開了
                storeJob(conn, newJob, false);
                // 保存Trigger, 調用加鎖後事務回調的具體方法
                storeTrigger(conn, newTrigger, newJob, false,
                        Constants.STATE_WAITING, false, false);
            }
        });
}
// 被回調的具體幹活的方法
protected void storeTrigger(Connection conn,
        OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
        boolean forceState, boolean recovering)
    throws JobPersistenceException {
    // storeTrigger方法會被多個地方調用,但在保存Trigger的這個場景下existingTrigger確定是false
    boolean existingTrigger = triggerExists(conn, newTrigger.getKey());

    if ((existingTrigger) && (!replaceExisting)) { 
        throw new ObjectAlreadyExistsException(newTrigger); 
    }
    
    try {
        boolean shouldBepaused;
        
        // 此處省略了一部分代碼。。。。

        if (job.isConcurrentExectionDisallowed() && !recovering) { 
            state = checkBlockedState(conn, job.getKey(), state);
        }
        
        if (existingTrigger) {
            getDelegate().updateTrigger(conn, newTrigger, state, job);
        } else {
             // getDelegate()得到所委託的具體實現類即StdJDBCDelegate,見下個段落的分析            
            getDelegate().insertTrigger(conn, newTrigger, state, job);
        }
    } catch (Exception e) {
        throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" 
                + newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
    }
}
複製代碼
// JobStoreSupport#getDelegate 得到任務持久化的委託執行類, 默認返回 StdJDBCDelegate
protected DriverDelegate getDelegate() throws NoSuchDelegateException {
    synchronized(this) {
        // 第一次訪問getDelegate方法時delegate確定爲null
        if(null == delegate) {
            try {
                if(delegateClassName != null) {
                    // quartz.properties文件指定了delegateClassName則加載相應的類,默認爲null
                    delegateClass = getClassLoadHelper().loadClass(delegateClassName, DriverDelegate.class);
                }
                // JobStoreSupport定義了默認的delegateClass                
                // protected Class<? extends DriverDelegate> delegateClass = StdJDBCDelegate.class;
                // 加載DriverDelegate具體實例
                delegate = delegateClass.newInstance();
                // StdJDBCDelegate 初始化
                delegate.initialize(getLog(), tablePrefix, instanceName, instanceId, getClassLoadHelper(), canUseProperties(), getDriverDelegateInitString());
                
            } catch (InstantiationException e) {
               // 各類catch, 代碼省略...
            }
        }
        return delegate;複製代碼

StdJDBCDelegate初始化及TriggerPersistenceDelegate列表的初始化

//  StdJDBCDelegate#initialize
public void initialize(Logger logger, String tablePrefix, String schedName, String instanceId, ClassLoadHelper classLoadHelper, boolean useProperties, String initString) throws NoSuchDelegateException {

    this.logger = logger;
    this.tablePrefix = tablePrefix;
    this.schedName = schedName;
    this.instanceId = instanceId;
    this.useProperties = useProperties;
    this.classLoadHelper = classLoadHelper;
    // 添加默認的持久化委託類, 此方法很重要,見下文分析
    addDefaultTriggerPersistenceDelegates();

    if(initString == null)
        return;
    // 默認此時已返回, 若是有自定義的持久化委託,則繼續解析initString, 添加到具體的委託類列表中
    String[] settings = initString.split("\\|");
    
    for(String setting: settings) {
        String[] parts = setting.split("=");
        String name = parts[0];
        if(parts.length == 1 || parts[1] == null || parts[1].equals(""))
            continue;

        if(name.equals("triggerPersistenceDelegateClasses")) {
            
            String[] trigDelegates = parts[1].split(",");
            
            for(String trigDelClassName: trigDelegates) {
                try {
                    Class<?> trigDelClass = classLoadHelper.loadClass(trigDelClassName);
                    addTriggerPersistenceDelegate((TriggerPersistenceDelegate) trigDelClass.newInstance());
                } catch (Exception e) {
                    throw new NoSuchDelegateException("Error instantiating TriggerPersistenceDelegate of type: " + trigDelClassName, e);
                } 
            }
        }
        else
            throw new NoSuchDelegateException("Unknown setting: '" + name + "'");
    }
}

// 添加默認的4種Trigger的持久化委託類,全部的委託類都要實現接口方法 canHandleTriggerType
// canHandleTriggerType 方法是肯定Trigger由哪個委託來執行的關鍵
protected void addDefaultTriggerPersistenceDelegates() {
    addTriggerPersistenceDelegate(new SimpleTriggerPersistenceDelegate());
    addTriggerPersistenceDelegate(new CronTriggerPersistenceDelegate());
    addTriggerPersistenceDelegate(new CalendarIntervalTriggerPersistenceDelegate());
    addTriggerPersistenceDelegate(new DailyTimeIntervalTriggerPersistenceDelegate());
}




複製代碼

StdJDBCDelegate#insertTrigger

public int insertTrigger(Connection conn, OperableTrigger trigger, String state,
        JobDetail jobDetail) throws SQLException, IOException {

    ByteArrayOutputStream baos = null;
    if(trigger.getJobDataMap().size() > 0) {
        baos = serializeJobData(trigger.getJobDataMap());
    }
    
    PreparedStatement ps = null;

    int insertResult = 0;

    try {
        // 建立statement保存qrtz_TRIGGERS
        ps = conn.prepareStatement(rtp(INSERT_TRIGGER));
        ps.setString(1, trigger.getKey().getName());
        ps.setString(2, trigger.getKey().getGroup());
        ps.setString(3, trigger.getJobKey().getName());
        ps.setString(4, trigger.getJobKey().getGroup());
        ps.setString(5, trigger.getDescription());
        if(trigger.getNextFireTime() != null)
            ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger
                    .getNextFireTime().getTime())));
        else
            ps.setBigDecimal(6, null);
        long prevFireTime = -1;
        if (trigger.getPreviousFireTime() != null) {
            prevFireTime = trigger.getPreviousFireTime().getTime();
        }
        ps.setBigDecimal(7, new BigDecimal(String.valueOf(prevFireTime)));
        ps.setString(8, state);
        // 這一句是重點查找當前Trigger的持久化委託類, 委託類中的委託,設計得有點複雜, 在下面段落會有分析
        TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
        
        String type = TTYPE_BLOB;
        if(tDel != null)
            type = tDel.getHandledTriggerTypeDiscriminator();
        ps.setString(9, type);
        
        // 此處省略了一部分代碼。。。。
        
         // 執行保存操做,qrtz_TRIGGERS
        insertResult = ps.executeUpdate();
        
        if(tDel == null)
            // 不是那4種默認Trigger類則執行
            insertBlobTrigger(conn, trigger);
        else
            // 4種Trigger都有各自的實現類
            // CronTriggerPersistenceDelegate 操做的表是qrtz_CRON_TRIGGERS
            // SimpleTriggerPersistenceDelegate 操做的表是qrtz_SIMPLE_TRIGGERS
            // DailyTimeIntervalTriggerPersistenceDelegate和CalendarIntervalTriggerPersistenceDelegate
            // 最後兩個實現類都是繼承了SimplePropertiesTriggerPersistenceDelegateSupport
            // 最後兩個實現類操做的表都是qrtz_SIMPROP_TRIGGERS
            tDel.insertExtendedTriggerProperties(conn, trigger, state, jobDetail);
        
    } finally {
        closeStatement(ps);
    }

    return insertResult;
}
複製代碼


擴展:

Quartz的頂層設計各功能模塊均可以擴展自定義的,拿本文來講能夠擴展實現本身的Trigger, 也能夠擴展Trigger的存儲方式;

能夠自定義一個Trigger,繼承上面的4種Trigger, 也能夠自定義該Trigger相應的存儲TriggerPersistenceDelegate;

自定義的Trigger存儲器怎麼才能生效呢?

固然是修改quartz.properties文件,增長一行配置,例:

org.quartz.jobStore.driverDelegateInitString: triggerPersistenceDelegateClasses=com.shengu.quartz.trigger.MyPersistenceDelegateClasses複製代碼

也能夠支持定義多個Trigger存儲器,經過上面的源碼能夠看出來,多個的定義是經過 | 來分隔的;

quartz的各類擴展通常都是經過在quartz.properties中增長配置來實現的


小結:

建立任務時,Job操做一張表 qrtz_job_detail;Trigger會操做兩張表

1.保存Trigger通用信息到qrtz_triggers

2.保存Trigger具體實例信息到下面對應的一個表中

2.1 系統默認的4種Trigger分別對應 qrtz_cron_triggers,qrtz_simple_triggers, qrtz_simprop_triggers,qrtz_simprop_triggers

2.2 自定義Trigger默認對應保存到qrtz_blob_triggers 


Trigger存儲是支持擴展的,能夠自定義Trigger存儲在哪張表中

相關文章
相關標籤/搜索