經過源碼分析Java開源任務調度框架Quartz的主要流程

經過源碼分析Java開源任務調度框架Quartz的主要流程java

從使用效果、調用鏈路跟蹤、E-R圖、循環調度邏輯幾個方面分析Quartz。mysql

github項目地址: https://github.com/tanliwei/spring-quartz-cluster-sample , 補充了SQL輸出git

 

系統說明:github

IDE: IntelliJspring

JDK:1.8sql

Quartz:2.2.1shell

 

使用效果數據庫

1.從github項目https://github.com/tanliwei/spring-quartz-cluster-sample中,拉取項目到本地,導入IDEA。tomcat

    相信讀者都有必定工做經驗,這些細節不贅述。服務器

2.本文采用Mysql數據庫。

    請執行 resources/scripts/tables_mysql_innodb.sql

3.修改jdbc.properties中數據庫配置

 

4.經過IDEA, Edit Configurations -> Add Tomcat Server, 部署到Tomcat

 

 

暴露的Restful 接口 /say-hello.do 以及添加好任務後的調用效果:


 

 

添加任務

在tomcat啓動成功後,在首頁點擊「添加任務」,添加以下任務:


 

 

代碼執行邏輯在SyncJobFactory類中,從Output中能夠看到執行的輸出信息,

調用鏈跟蹤的最後會回到這個類來。

 

如今開始跟蹤調用鏈路。 

 

IDEA 快捷鍵:
進入方法:  Ctrl + 鼠標左鍵
光標前進/後退: Ctrl + Shirt + 右方向鍵/左方向鍵
 
 
1、 調用鏈路跟蹤

從配置文件applicationContext.xml配置中找到任務調度核心類SchedulerFactoryBean

 resources/applicationContext.xml

<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
...
</bean>

 

使用IDEA快捷鍵,點擊進入SchedulerFactoryBean類,它實現了InitializingBean接口,

在Spring中凡是實現了InitializingBean接口的Bean,都會在Bean屬性都設置完成後調用afterPropertiesSet()方法.

 SchedulerFactoryBean.java

//---------------------------------------------------------------------
// Implementation of InitializingBean interface
// 實現 InitializingBean 接口
//---------------------------------------------------------------------
public void afterPropertiesSet() throws Exception {
    //...
    // Create SchedulerFactory instance.
    // 建立 SchedulerFactory 調度器工廠實例
    SchedulerFactory schedulerFactory = (SchedulerFactory)
            BeanUtils.instantiateClass(this.schedulerFactoryClass);
    initSchedulerFactory(schedulerFactory);
    //...
    // Get Scheduler instance from SchedulerFactory.
    // 經過調度器工廠 獲取 調度器實例
    try {
        this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
    //...
}

 

 SchedulerFactoryBean.java

/**
 * Create the Scheduler instance for the given factory and scheduler name.
 * 經過制定工廠和調度器名稱建立調度器實例
 * Called by {@link #afterPropertiesSet}.
 * <p>The default implementation invokes SchedulerFactory's <code>getScheduler</code>
 * method. Can be overridden for custom Scheduler creation.
 */
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
        throws SchedulerException {
    //...
    try {
        SchedulerRepository repository = SchedulerRepository.getInstance();
        synchronized (repository) {
            Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
            Scheduler newScheduler = schedulerFactory.getScheduler();
            if (newScheduler == existingScheduler) {
                throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
                        "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
            }
            //...
}

 

 這個項目走的邏輯是 StdSchedulerFactory.getScheduler()方法,可自行debug。

 StdSchedulerFactory.java

/**
 * Returns a handle to the Scheduler produced by this factory.
 * 返回該工廠創造的調度器的句柄
 */
public Scheduler getScheduler() throws SchedulerException {
    if (cfg == null) {
        initialize();
    }

    SchedulerRepository schedRep = SchedulerRepository.getInstance();

    Scheduler sched = schedRep.lookup(getSchedulerName());
    //...
    sched = instantiate();
    return sched;
}

 

StdSchedulerFactory.java

private Scheduler instantiate() throws SchedulerException {
    //...
    //大量的配置初始化、實例化代碼
    //...
    //第1298行代碼
    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
    //...
}

 

QuartzScheduler.java

/**
 * Create a <code>QuartzScheduler</code> with the given configuration
 * 根據給定的配置 建立Quartz調度器
 */
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }
        //private QuartzSchedulerThread schedThread;
        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        //經過線程池執行 Quartz調度器線程
        schedThreadExecutor.execute(this.schedThread);
        //...
}

 

 QuartzSchedulerThread.java

/**
 * <p>
 * The main processing loop of the <code>QuartzSchedulerThread</code>.
 * Quartz調度器線程的主循環邏輯
 * </p>
 */
@Override
public void run() {
    //while循環執行,只要調度器爲被暫停
    while(!halted.get()){
                        JobRunShell shell = null;
                        try {
                            shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                            shell.initialize(qs);
                        }
                        if (qsRsrcs.getThreadPool().runInThread(shell) == false){}

    }
}

 

 JobRunShell.java

public void run() {
        //...
        Job job = jec.getJobInstance();
        //...
        try {
            log.debug("Calling execute on job " + jobDetail.getKey());
            //執行
            job.execute(jec);
            endTime = System.currentTimeMillis();
        }
        //...
        //更新Trigger觸發器狀態,刪除FIRED_TRIGGERS觸發記錄
        instCode = trigger.executionComplete(jec, jobExEx);
        //...
}

 

QuartzJobBean.java

/**
 * This implementation applies the passed-in job data map as bean property
 * values, and delegates to <code>executeInternal</code> afterwards.
 * 這個實現 把傳入的map數據做爲bean屬性值,而後委託給 executeInternal 方法
 */
public final void execute(JobExecutionContext context) throws JobExecutionException {
    try {
    //執行
 executeInternal(context);
}

 

  SyncJobFactory.java

//回到了咱們的業務類SyncJobFactory的executeInternal方法,
//裏面執行咱們的業務代碼
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
    try {
        LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort());
    }
    //...
    System.out.println("jobName:" + scheduleJob.getJobName() + "  " + scheduleJob);
    //...
}

 

 

 2、E-R圖

梳理6張主要的Quartz表:

 

 
QRTZ_TRIGGERS 觸發器表

    SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。 聯合主鍵,QRTZ_JOB_DETAILS表SCHED_NAME外鍵

    JOB_NAME,任務名。自定義值。 聯合主鍵,QRTZ_JOB_DETAILS表JOB_NAME外鍵

    JOB_GROUP,任務組。 自定義值。聯合主鍵,QRTZ_JOB_DETAILS表JOB_GROUP外鍵

    TRIGGER_STATE,觸發器狀態: WAITING , ACQUIRED, BLOCKING

    NEXT_FIRE_TIME, 下次觸發時間:

    MISFIRE_INSTR,執行失敗後的指令,

        非失敗策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1; 

        失敗策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0;

    TRIGGER_TYPE, 觸發器類型,例如CRON,cron表達式類型的觸發器

    PRIORITY,優先級

 

QRTZ_CRON_TRIGGERS cron類型觸發器表

    SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。 聯合主鍵,QRTZ_TRIGGERS表SCHED_NAME外鍵

    JOB_NAME,任務名。自定義值。 聯合主鍵,QRTZ_TRIGGERS表JOB_NAME外鍵

    JOB_GROUP,任務組。 自定義值。聯合主鍵,QRTZ_TRIGGERS表JOB_GROUP外鍵

    CRON_EXPRESSION, cron表達式, 例如每30秒執行一次, 0/30 * * * * ?

 

QRTZ_JOB_DETAILS 任務詳細表

    SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。聯合主鍵

    JOB_NAME,任務名。自定義值。 聯合主鍵

    JOB_GROUP,任務組。 自定義值。聯合主鍵

    JOB_DATA,blob類型,任務參數

 

QRTZ_FIRED_TRIGGERS 任務觸發表

    SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。聯合主鍵

    ENTRY_ID,entry id,聯合主鍵

    JOB_NAME,任務名。自定義值。 

    JOB_GROUP,任務組。 自定義值。

    FIRED_TIME, 任務觸發時間

    STATE,狀態

    INSTANCE_NAME, 服務器實例名

    PRIORITY,優先級

 

QRTZ_SCHEDULER_STATE 

    SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。聯合主鍵

    INSTANCE_NAME,服務器實例名。聯合主鍵

    LAST_CHECKIN_TIME,上次檢查時間

    CHECKIN_INTERVAL,檢查間隔

 

QRTZ_LOCKS 全局鎖

    SCHED_NAME,調度器名稱,集羣時爲常量值:「ClusterScheduler」。聯合主鍵

    LOCK_NAME,鎖名稱,例如,TRIGGER_ACCESS。聯合主鍵

   

 

3、循環調度邏輯

    主要流程以下:

 

    源碼以下:

QuartzSchedulerThread.java

 public void run() {
        //...
        while (!halted.get()) {
            try {
                //合理休眠
                //...
                        //獲取接下來的觸發器
                        //1.狀態爲WAITING
                        //2.觸發時間在30秒內
                        //3.不是錯過執行的或者錯過了可是時間不超過兩分鐘
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                       
                                //... 
                                //觸發任務
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                //...
                            JobRunShell shell = null;
                            //...
                            //執行代碼
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                            //...
        } // while (!halted)
        //..
    } 

 

 JobRunShell.java

    protected QuartzScheduler qs = null;
    
    public void run() {
        qs.addInternalSchedulerListener(this);
        try {
            //...
            do {
                Job job = jec.getJobInstance();
                // execute the job
                try {
                    //執行任務代碼
                    job.execute(jec);
                //更新觸發器,刪除觸發記錄
                qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                break;
            } while (true);
        } 
    //...
    }

 

 

4、擴展

 

除了對主線程 QuartzSchedulerThread 的分析

繼續分析JobStoreSupport類的兩個線程 ClusterManager 和 MisfireHandler 的分析, 它們維護觸發器的MISFIRE_INSTR狀態,和調度器狀態QRTZ_SCHEDULER_STATE。

相關文章
相關標籤/搜索