org.quartz-scheduler 代碼分析

Scheduler 

  1. 經過調度器工廠SchedulerFactory的實例對象StdSchedulerFactory構建Scheduler ;

     
    1. 從指定的文件初始化配置信息(默認文件名"quartz.properties", 系統變量爲"org.quartz.properties")

      定義的默認執行QuartzSchedulerThread的線程池爲:
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
    2.  通常返回StdScheduler,但核心是QuartzScheduler!

      StdSchedulerFactory.instantiate() 很重要!

      初始化RAMJobStoreSimpleThreadPoolQuartzSchedulerJobFactorySchedulerPluginJobRunShellFactoryQuartzSchedulerResources 等對象,返回StdScheduler實例。java

  2. QuartzScheduler核心: 線程對象QuartzSchedulerThread。隨着Scheduler實例化而被建立並扔進線程池執行。
    該線程就是調度線程,主要任務就是不停的從JobStore中獲取即將被觸發的觸發器來執行。
    緩存

scheduleJob

  1. 構建JobDetail<JobDetailImpl>
    JobDetail result = JobBuilder.newJob(Class <? extends Job> jobClass).withIdentity(String name).build();
  2.  構建Trigger (有各類類型,由ScheduleBuilder來指定。 eg. SimpleScheduleBuilder -> SimpleTrigger<SimpleTriggerImpl>;  CronScheduleBuilder-> CronTrigger<CronTriggerImpl>)
    TriggerBuilder.newTrigger().withIdentity(String name).withSchedule(ScheduleBuilder schedBuilder).build()
  3. scheduler<QuartzScheduler>.scheduleJob(JobDetail jobDetail, Trigger trigger)
    1. Trigger的JobKey必定與JobDetail的Key得相同, 不然異常!
    2. 計算出trigger在scheduler中可以第一次執行的時間,若無效則異常!(eg. CronCalendar
    3. 在JobStore中註冊jobDetail和trigger;
    4. 喚醒QuartzSchedulerThread中的sigLock等待鎖;並將trigger下一次要實行的時間NextFireTime經過SchedulerSignalerImpl傳遞到QuartzSchedulerThread
  4. scheduler<QuartzScheduler>.start(): 設置QuartzSchedulerThread中的paused爲false,觸發任務的執行。

SimpleThreadPool

根據count生成WorkerThread並保存在availWorkers 中;
當使用線程池時,經過synchronized 來控制併發。從availWorkers 移出第一個WorkerThread使用並保存到busyWorkers中 ;若是沒有空閒線程則wait。併發

與JDK提供的線程池有很大的不一樣, 沒有緩存隊列、最大線程數拒絕策略等。 經過阻塞wait直到有空閒workers再執行。若是Shutdow後,還有任務被提交執行,則直接新實例化WorkerThreadui

public class SimpleThreadPool implements ThreadPool {

    private int count = -1; // 線程個數
    private List<WorkerThread> workers; //只是維護初始化的workerThread集合
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); //可用線程
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); //繁忙線程
       ......
    public void initialize() throws SchedulerConfigException {
       ......
        // create the worker threads and start them
        Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
        while(workerThreads.hasNext()) {
            WorkerThread wt = workerThreads.next();
            wt.start();
            availWorkers.add(wt);
        }
    }

 public boolean runInThread(Runnable runnable) {
        if (runnable == null) {
            return false;
        }

        synchronized (nextRunnableLock) {
            handoffPending = true;
            // Wait until a worker thread is available
            while ((availWorkers.size() < 1) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }
            if (!isShutdown) {
                WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
                busyWorkers.add(wt);
                wt.run(runnable);
            } else {
                // If the thread pool is going down, execute the Runnable
                // within a new additional worker thread (no thread from the pool).
                WorkerThread wt = new WorkerThread(this, threadGroup,
                        "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
                busyWorkers.add(wt);
                workers.add(wt);
                wt.start();
            }
            nextRunnableLock.notifyAll();
            handoffPending = false;
        }

        return true;
    }

WorkerThread

控制單個Runnable對象的執行過程。
this

成員變量有個Object類型對象做爲lock,WorkerThread實例化後run()過程當中若是尚未注入runnable且執行標記run==false時循環調用:lock.wait(500);spa

當SimpleThreadPool.runThread(Runnable) 調用WorkerThread.run(Runnable)時,注入執行Runnable並 lock.notifyAll()
線程

QuartzSchedulerThread

quartz核心處理過程, 經過synchronized(sigLock ) 來控制線程併發。code

 run()對象

  1. 經過Object對象sigLock & paused & 原子整型 halted 來控制當先線程是否運行任務。
    1. sigLock: 方法signalSchedulingChange、togglePause、halt 都觸發notifyAll()。
    2. paused:  初始化爲true,默認阻塞線程執行。
  2. 執行會優先斷定線程池<SimpleThreadPool>中是否有空閒有效的WorkerThread,沒有則阻塞。blog

    int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
  3. 有空閒有效工做線程時從JobStore中獲取指定時間內<默認30s>要執行的的Trigger列表。

    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                    now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
  4. 若是triggers中的第一個trigger的NextFireTime距離當前時間大於2ms, 則等待直到<2ms。

  5. 從JobStore中獲取TriggerFiredResult列表(綁定了JobDetail和OperableTrigger<CronTrigger>間的關係)。

    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
  6. 依每個TriggerFiredResult初始化JobRunShellPropertySettingJobFactory經過TriggerFiredBundle獲取到JobDetail, 初始化Job實例對象 ,反射設置好屬性 最後封裝到執行上下文中JobExecutionContextImpl。

JobRunShell

最終執行實際任務的對象。

JobExecutionContextImpl中獲取必要的JobDetail、trigger、Job等信息,並執行:job.execute(jec)  ---->本身的業務邏輯

當執行結束時trigger須要肯定下一個狀態碼,在JobStore.triggeredJobComplete處依此來斷定trigger的生命週期

AbstractTrigger.executionComplete

RAMJobStore.triggeredJobComplete

相關文章
相關標籤/搜索