quartz集羣調度機制調研及源碼分析---轉載

quartz2.2.1集羣調度機制調研及源碼分析
引言
quartz集羣架構
調度器實例化
調度過程
觸發器的獲取
觸發trigger:
Job執行過程:
總結:
附:java

 

引言

quratz是目前最爲成熟,使用最普遍的java任務調度框架,功能強大配置靈活.在企業應用中佔重要地位.quratz在集羣環境中的使用方式是每一個企業級系統都要考慮的問題.早在2006年,在ITeye上就有一篇關於quratz集羣方案的討論:http://www.iteye.com/topic/40970 ITeye創始人@Robbin在8樓給出了本身對quartz集羣應用方案的意見.node

後來有人總結了三種quratz集羣方案:http://www.iteye.com/topic/114965git

1.單獨啓動一個Job Server來跑job,不部署在web容器中.其餘web節點當須要啓動異步任務的時候,能夠經過種種方式(DB, JMS, Web Service, etc)通知Job Server,而Job Server收到這個通知以後,把異步任務加載到本身的任務隊列中去。web

2.獨立出一個job server,這個server上跑一個spring+quartz的應用,這個應用專門用來啓動任務。在jobserver上加上hessain,獲得業務接口,這樣jobserver就能夠調用web container中的業務操做,也就是正真執行任務的仍是在cluster中的tomcat。在jobserver啓動定時任務以後,輪流調用各地址上的業務操做(相似apache分發tomcat同樣),這樣可讓不一樣的定時任務在不一樣的節點上運行,減低了一臺某個node的壓力算法

3.quartz自己事實上也是支持集羣的。在這種方案下,cluster上的每個node都在跑quartz,而後也是經過數據中記錄的狀態來判斷這個操做是否正在執行,這就要求cluster上全部的node的時間應該是同樣的。並且每個node都跑應用就意味着每個node都須要有本身的線程池來跑quartz.spring

總的來講,第一種方法,在單獨的server上執行任務,對任務的適用範圍有很大的限制,要訪問在web環境中的各類資源很是麻煩.可是集中式的管理容易從架構上規避了分佈式環境的種種同步問題.第二種方法在在第一種方法的基礎上減輕了jobserver的重量,只發送調用請求,不直接執行任務,這樣解決了獨立server沒法訪問web環境的問題,並且能夠作到節點的輪詢.能夠有效地均衡負載.第三種方案是quartz自身支持的集羣方案,在架構上徹底是分佈式的,沒有集中的管理,quratz經過數據庫鎖以及標識字段保證多個節點對任務不重複獲取,而且有負載平衡機制和容錯機制,用少許的冗餘,換取了高可用性(high avilable HA)和高可靠性.(我的認爲和git的機制有殊途同歸之處,分佈式的冗餘設計,換取可靠性和速度).sql

本文旨在研究quratz爲解決分佈式任務調度中存在的防止重複執行和負載均衡等問題而創建的機制.以調度流程做爲順序,配合源碼理解其中原理.shell

quratz的配置,及具體應用請參考CRM項目組的另外一篇文章:CRM使用Quartz集羣總結分享數據庫

quartz集羣架構

quartz的分佈式架構如上圖,能夠看到數據庫是各節點上調度器的樞紐.各個節點並不感知其餘節點的存在,只是經過數據庫來進行間接的溝通.apache

實際上,quartz的分佈式策略就是一種以數據庫做爲邊界資源的併發策略.每一個節點都遵照相同的操做規範,使得對數據庫的操做能夠串行執行.而不一樣名稱的調度器又能夠互不影響的並行運行.

組件間的通信圖以下:(*注:主要的sql語句附在文章最後)

quartz運行時由QuartzSchedulerThread類做爲主體,循環執行調度流程。JobStore做爲中間層,按照quartz的併發策略執行數據庫操做,完成主要的調度邏輯。JobRunShellFactory負責實例化JobDetail對象,將其放入線程池運行。LockHandler負責獲取LOCKS表中的數據庫鎖。

整個quartz對任務調度的時序大體以下:

梳理一下其中的流程,能夠表示爲:

0.調度器線程run()

1.獲取待觸發trigger

    1.1數據庫LOCKS表TRIGGER_ACCESS行加鎖

    1.2讀取JobDetail信息

    1.3讀取trigger表中觸發器信息並標記爲"已獲取"

    1.4commit事務,釋放鎖

2.觸發trigger

    2.1數據庫LOCKS表STATE_ACCESS行加鎖

    2.2確認trigger的狀態

    2.3讀取trigger的JobDetail信息

    2.4讀取trigger的Calendar信息

    2.3更新trigger信息

    2.3commit事務,釋放鎖

3實例化並執行Job

    3.1從線程池獲取線程執行JobRunShell的run方法

能夠看到,這個過程當中有兩個類似的過程:一樣是對數據表的更新操做,一樣是在執行操做前獲取鎖 操做完成後釋放鎖.這一規則能夠看作是quartz解決集羣問題的核心思想.

規則流程圖:

進一步解釋這條規則就是:一個調度器實例在執行涉及到分佈式問題的數據庫操做前,首先要獲取QUARTZ2_LOCKS表中對應當前調度器的行級鎖,獲取鎖後便可執行其餘表中的數據庫操做,隨着操做事務的提交,行級鎖被釋放,供其餘調度器實例獲取.

集羣中的每個調度器實例都遵循這樣一種嚴格的操做規程,那麼對於同一類調度器來講,每一個實例對數據庫的操做只能是串行的.而不一樣名的調度器之間卻能夠並行執行.

下面咱們深刻源碼,從微觀上觀察quartz集羣調度的細節

調度器實例化

一個最簡單的quartz helloworld應用以下:

public class HelloWorldMain {
    Log log = LogFactory.getLog(HelloWorldMain.class);
     
    public void run() {
        try {
            //取得Schedule對象
            SchedulerFactory sf = new StdSchedulerFactory();
            Scheduler sch = sf.getScheduler(); 
             
            JobDetail jd = new JobDetail("HelloWorldJobDetail",Scheduler.DEFAULT_GROUP,HelloWorldJob.class);
            Trigger tg = TriggerUtils.makeMinutelyTrigger(1);
            tg.setName("HelloWorldTrigger");
             
            sch.scheduleJob(jd, tg);
            sch.start();
        } catch ( Exception e ) {
            e.printStackTrace();
             
        }
    }
    public static void main(String[] args) {
        HelloWorldMain hw = new HelloWorldMain();
        hw.run();
    }
}

咱們看到初始化一個調度器須要用工廠類獲取實例:

SchedulerFactory sf =  new  StdSchedulerFactory();
Scheduler sch = sf.getScheduler(); 

而後啓動:

sch.start();
下面跟進StdSchedulerFactory的getScheduler()方法:
public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
        //從"調度器倉庫"中根據properties的SchedulerName配置獲取一個調度器實例
        Scheduler sched = schedRep.lookup(getSchedulerName());
        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }
        //初始化調度器
        sched = instantiate();
        return sched;
    }

跟進初始化調度器方法sched = instantiate();發現是一個700多行的初始化方法,涉及到

    • 讀取配置資源,
    • 生成QuartzScheduler對象,
    • 建立該對象的運行線程,並啓動線程;
    • 初始化JobStore,QuartzScheduler,DBConnectionManager等重要組件,
      至此,調度器的初始化工做已完成,初始化工做中quratz讀取了數據庫中存放的對應當前調度器的鎖信息,對應CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS兩個LOCK_NAME.
public void initialize(ClassLoadHelper loadHelper,
            SchedulerSignaler signaler) throws SchedulerConfigException {
        if (dsName == null) {
            throw new SchedulerConfigException("DataSource name not set.");
        }
        classLoadHelper = loadHelper;
        if(isThreadsInheritInitializersClassLoadContext()) {
            log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());
            initializersLoader = Thread.currentThread().getContextClassLoader();
        }
         
        this.schedSignaler = signaler;
        // If the user hasn't specified an explicit lock handler, then
        // choose one based on CMT/Clustered/UseDBLocks.
        if (getLockHandler() == null) {
             
            // If the user hasn't specified an explicit lock handler,
            // then we *must* use DB locks with clustering
            if (isClustered()) {
                setUseDBLocks(true);
            }
             
            if (getUseDBLocks()) {
                if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
                    if(getSelectWithLockSQL() == null) {
                        //讀取數據庫LOCKS表中對應當前調度器的鎖信息
                        String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";
                        getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
                        setSelectWithLockSQL(msSqlDflt);
                    }
                }
                getLog().info("Using db table-based data access locking (synchronization).");
                setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
            } else {
                getLog().info(
                    "Using thread monitor-based data access locking (synchronization).");
                setLockHandler(new SimpleSemaphore());
            }
        }
    }

當調用sch.start();方法時,scheduler作了以下工做:

1.通知listener開始啓動

2.啓動調度器線程

3.啓動plugin

4.通知listener啓動完成

public void start() throws SchedulerException {
        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }
        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        //通知該調度器的listener啓動開始
        notifySchedulerListenersStarting();
        if (initialStart == null) {
            initialStart = new Date();
            //啓動調度器的線程
            this.resources.getJobStore().schedulerStarted();            
            //啓動plugins
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();
        }
        schedThread.togglePause(false);
        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        //通知該調度器的listener啓動完成
        notifySchedulerListenersStarted();
    }

調度過程

調度器啓動後,調度器的線程就處於運行狀態了,開始執行quartz的主要工做–調度任務.

前面已介紹過,任務的調度過程大體分爲三步:

1.獲取待觸發trigger

2.觸發trigger

3.實例化並執行Job

下面分別分析三個階段的源碼.

QuartzSchedulerThread是調度器線程類,調度過程的三個步驟就承載在run()方法中,分析見代碼註釋:

public void run() {
        boolean lastAcquireFailed = false;
        //
        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }
                    if (halted.get()) {
                        break;
                    }
                }
                /獲取當前線程池中線程的數量
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
                    List<OperableTrigger> triggers = null;
                    long now = System.currentTimeMillis();
                    clearSignaledSchedulingChange();
                    try {
                        //調度器在trigger隊列中尋找30秒內必定數目的trigger準備執行調度,
                        //參數1:nolaterthan = now+3000ms,參數2 最大獲取數量,大小取線程池線程剩餘量與定義值得較小者
                        //參數3 時間窗口 默認爲0,程序會在nolaterthan後加上窗口大小來選擇trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        //上一步獲取成功將失敗標誌置爲false;
                        lastAcquireFailed = false;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if(!lastAcquireFailed) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        //捕捉到異常則值標誌爲true,再次嘗試獲取
                        lastAcquireFailed = true;
                        continue;
                    } catch (RuntimeException e) {
                        if(!lastAcquireFailed) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        lastAcquireFailed = true;
                        continue;
                    }
                    if (triggers != null && !triggers.isEmpty()) {
                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;//計算距離trigger觸發的時間
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                //若是這時調度器發生了改變,新的trigger添加進來,那麼有可能新添加的trigger比當前待執行的trigger
                                //更急迫,那麼須要放棄當前trigger從新獲取,然而,這裏存在一個值不值得的問題,若是從新獲取新trigger
                                //的時間要長於當前時間到新trigger出發的時間,那麼即便放棄當前的trigger,仍然會致使xntrigger獲取失敗,
                                //但咱們又不知道獲取新的trigger須要多長時間,因而,咱們作了一個主觀的評判,若jobstore爲RAM,那麼
                                //假定獲取時間須要7ms,若jobstore是持久化的,假定其須要70ms,當前時間與新trigger的觸發時間之差小於
                                // 這個值的咱們認爲不值得從新獲取,返回false
                                //這裏判斷是否有上述狀況發生,值不值得放棄本次trigger,若斷定不放棄,則線程直接等待至trigger觸發的時刻
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            //該方法調用了上面的斷定方法,做爲再次斷定的邏輯
                            //到達這裏有兩種狀況1.決定放棄當前trigger,那麼再斷定一次,若是仍然有放棄,那麼清空triggers列表並
                            // 退出循環 2.不放棄當前trigger,且線程已經wait到trigger觸發的時刻,那麼什麼也不作
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                            //這時觸發器已經即將觸發,值會<2
                        }
                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;
                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                //觸發triggers,結果付給bndles,注意,從這裏返回後,trigger在數據庫中已經通過了鎖定,解除鎖定,這一套過程
                                //因此說,quratz定不是等到job執行完才釋放trigger資源的佔有,而是讀取完本次觸發所需的信息後當即釋放資源
                                //而後再執行jobs
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }
                        }
                        //迭代trigger的信息,分別跑job
                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();
                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }
                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            //在特殊狀況下,bndle可能爲null,看triggerFired方法能夠看到,當從數據庫獲取trigger時,若是status不是
                            //STATE_ACQUIRED,那麼會直接返回空.quratz這種狀況下本調度器啓動重試流程,從新獲取4次,若仍有問題,
                            // 則拋出異常.
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }
                            //執行job
                            JobRunShell shell = null;
                            try {
                                //建立一個job的Runshell
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
                            //把runShell放在線程池裏跑
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }
                        }
                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }
                //保證負載平衡的方法,每次執行一輪觸發後,本scheduler會等待一個隨機的時間,這樣就使得其餘節點上的scheduler能夠獲得資源.
                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }
            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)
        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

調度器每次獲取到的trigger是30s內須要執行的,因此要等待一段時間至trigger執行前2ms.在等待過程當中涉及到一個新加進來更緊急的trigger的處理邏輯.分析寫在註釋中,再也不贅述.

能夠看到調度器的只要在運行狀態,就會不停地執行調度流程.值得注意的是,在流程的最後線程會等待一個隨機的時間.這就是quartz自帶的負載平衡機制.

如下是三個步驟的跟進:

觸發器的獲取

調度器調用:

triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

在數據庫中查找必定時間範圍內將會被觸發的trigger.參數的意義以下:參數1:nolaterthan = now+3000ms,即將來30s內將會被觸發.參數2 最大獲取數量,大小取線程池線程剩餘量與定義值得較小者.參數3 時間窗口 默認爲0,程序會在nolaterthan後加上窗口大小來選擇trigger.quratz會在每次觸發trigger後計算出trigger下次要執行的時間,並在數據庫QRTZ2_TRIGGERS中的NEXT_FIRE_TIME字段中記錄.查找時將當前毫秒數與該字段比較,就能找出下一段時間內將會觸發的觸發器.查找時,調用在JobStoreSupport類中的方法:

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
         
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) {
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName,
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                        //...異常處理回調方法
                    }
                });
    }

該方法關鍵的一點在於執行了executeInNonManagedTXLock()方法,這一方法指定了一個鎖名,兩個回調函數.在開始執行時得到鎖,在方法執行完畢後隨着事務的提交鎖被釋放.在該方法的底層,使用 for update語句,在數據庫中加入行級鎖,保證了在該方法執行過程當中,其餘的調度器對trigger進行獲取時將會等待該調度器釋放該鎖.此方法是前面介紹的quartz集羣策略的的具體實現,這一模板方法在後面的trigger觸發過程還會被使用.

public  static  final  String SELECT_FOR_LOCK =  "SELECT * FROM "
             + TABLE_PREFIX_SUBST + TABLE_LOCKS +  " WHERE "  + COL_SCHEDULER_NAME +  " = "  + SCHED_NAME_SUBST
             " AND "  + COL_LOCK_NAME +  " = ? FOR UPDATE" ;

進一步解釋:quratz在獲取數據庫資源以前,先要以for update方式訪問LOCKS表中相應LOCK_NAME數據將改行鎖定.若是在此前該行已經被鎖定,那麼等待,若是沒有被鎖定,那麼讀取知足要求的trigger,並把它們的status置爲STATE_ACQUIRED,若是有tirgger已被置爲STATE_ACQUIRED,那麼說明該trigger已被別的調度器實例認領,無需再次認領,調度器會忽略此trigger.調度器實例之間的間接通訊就體如今這裏.

JobStoreSupport.acquireNextTrigger()方法中:

int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);

最後釋放鎖,這時若是下一個調度器在排隊獲取trigger的話,則仍會執行相同的步驟.這種機制保證了trigger不會被重複獲取.按照這種算法正常運行狀態下調度器每次讀取的trigger中會有至關一部分已被標記爲被獲取.

獲取trigger的過程進行完畢.

觸發trigger:

QuartzSchedulerThread line336:

List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

調用JobStoreSupport類的triggersFired()方法:

public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
        return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
                new TransactionCallback<List<TriggerFiredResult>>() {
                    public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
                        List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
                        TriggerFiredResult result;
                        for (OperableTrigger trigger : triggers) {
                            try {
                              TriggerFiredBundle bundle = triggerFired(conn, trigger);
                              result = new TriggerFiredResult(bundle);
                            } catch (JobPersistenceException jpe) {
                                result = new TriggerFiredResult(jpe);
                            } catch(RuntimeException re) {
                                result = new TriggerFiredResult(re);
                            }
                            results.add(result);
                        }
                        return results;
                    }
                },
                new TransactionValidator<List<TriggerFiredResult>>() {
                    @Override
                    public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {
                        //...異常處理回調方法
                    }
                });
    }

此處再次用到了quratz的行爲規範:executeInNonManagedTXLock()方法,在獲取鎖的狀況下對trigger進行觸發操做.其中的觸發細節以下:

protected TriggerFiredBundle triggerFired(Connection conn,
            OperableTrigger trigger)
        throws JobPersistenceException {
        JobDetail job;
        Calendar cal = null;
        // Make sure trigger wasn't deleted, paused, or completed...
        try { // if trigger was deleted, state will be STATE_DELETED
            String state = getDelegate().selectTriggerState(conn,
                    trigger.getKey());
            if (!state.equals(STATE_ACQUIRED)) {
                return null;
            }
        } catch (SQLException e) {
            throw new JobPersistenceException("Couldn't select trigger state: "
                    + e.getMessage(), e);
        }
        try {
            job = retrieveJob(conn, trigger.getJobKey());
            if (job == null) { return null; }
        } catch (JobPersistenceException jpe) {
            try {
                getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                getDelegate().updateTriggerState(conn, trigger.getKey(),
                        STATE_ERROR);
            } catch (SQLException sqle) {
                getLog().error("Unable to set trigger state to ERROR.", sqle);
            }
            throw jpe;
        }
        if (trigger.getCalendarName() != null) {
            cal = retrieveCalendar(conn, trigger.getCalendarName());
            if (cal == null) { return null; }
        }
        try {
            getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
        } catch (SQLException e) {
            throw new JobPersistenceException("Couldn't insert fired trigger: "
                    + e.getMessage(), e);
        }
        Date prevFireTime = trigger.getPreviousFireTime();
        // call triggered - to update the trigger's next-fire-time state...
        trigger.triggered(cal);
        String state = STATE_WAITING;
        boolean force = true;
         
        if (job.isConcurrentExectionDisallowed()) {
            state = STATE_BLOCKED;
            force = false;
            try {
                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                        STATE_BLOCKED, STATE_WAITING);
                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                        STATE_BLOCKED, STATE_ACQUIRED);
                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                        STATE_PAUSED_BLOCKED, STATE_PAUSED);
            } catch (SQLException e) {
                throw new JobPersistenceException(
                        "Couldn't update states of blocked triggers: "
                                + e.getMessage(), e);
            }
        }
             
        if (trigger.getNextFireTime() == null) {
            state = STATE_COMPLETE;
            force = true;
        }
        storeTrigger(conn, trigger, job, true, state, force, false);
        job.getJobDataMap().clearDirtyFlag();
        return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
                .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
                .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
    }

該方法作了如下工做:

1.獲取trigger當前狀態

2.經過trigger中的JobKey讀取trigger包含的Job信息

3.將trigger更新至觸發狀態

4.結合calendar的信息觸發trigger,涉及屢次狀態更新

5.更新數據庫中trigger的信息,包括更改狀態至STATE_COMPLETE,及計算下一次觸發時間.

6.返回trigger觸發結果的數據傳輸類TriggerFiredBundle

 

從該方法返回後,trigger的執行過程已基本完畢.回到執行quratz操做規範的executeInNonManagedTXLock方法,將數據庫鎖釋放.

trigger觸發操做完成

Job執行過程:

再回到線程類QuartzSchedulerThread的 line353這時觸發器都已出發完畢,job的詳細信息都已就位

QuartzSchedulerThread line:368

 

qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
shell.initialize(qs);

 

爲每一個Job生成一個可運行的RunShell,並放入線程池運行.

在最後調度線程生成了一個隨機的等待時間,進入短暫的等待,這使得其餘節點的調度器都有機會獲取數據庫資源.如此就實現了quratz的負載平衡.

這樣一次完整的調度過程就結束了.調度器線程進入下一次循環.

總結:

簡單地說,quartz的分佈式調度策略是以數據庫爲邊界資源的一種異步策略.各個調度器都遵照一個基於數據庫鎖的操做規則保證了操做的惟一性.同時多個節點的異步運行保證了服務的可靠.但這種策略有本身的侷限性.摘錄官方文檔中對quratz集羣特性的說明:

Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers. 

The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).

說明指出,集羣特性對於高cpu使用率的任務效果很好,可是對於大量的短任務,各個節點都會搶佔數據庫鎖,這樣就出現大量的線程等待資源.這種狀況隨着節點的增長會愈來愈嚴重.

附:

通信圖中關鍵步驟的主要sql語句:

3.
select TRIGGER_ACCESS from QRTZ2_LOCKS for update
4.
SELECT TRIGGER_NAME,
TRIGGER_GROUP,
NEXT_FIRE_TIME,
PRIORITY
FROM QRTZ2_TRIGGERS
WHERE SCHEDULER_NAME = 'CRMscheduler'
AND TRIGGER_STATE = 'ACQUIRED'
AND NEXT_FIRE_TIME <= '{timekey 30s latter}'
AND ( MISFIRE_INSTR = -1
OR ( MISFIRE_INSTR != -1
AND NEXT_FIRE_TIME >= '{timekey now}' ) )
ORDER BY NEXT_FIRE_TIME ASC,
PRIORITY DESC;
5.
SELECT *
FROM QRTZ2_JOB_DETAILS
WHERE SCHEDULER_NAME = CRMscheduler
AND JOB_NAME = ?
AND JOB_GROUP = ?;
6.
UPDATE TQRTZ2_TRIGGERS
SET TRIGGER_STATE = 'ACQUIRED'
WHERE SCHED_NAME = 'CRMscheduler'
AND TRIGGER_NAME = '{triggerName}'
AND TRIGGER_GROUP = '{triggerGroup}'
AND TRIGGER_STATE = 'waiting';
7.
INSERT INTO QRTZ2_FIRED_TRIGGERS
(SCHEDULER_NAME,
ENTRY_ID,
TRIGGER_NAME,
TRIGGER_GROUP,
INSTANCE_NAME,
FIRED_TIME,
SCHED_TIME,
STATE,
JOB_NAME,
JOB_GROUP,
IS_NONCONCURRENT,
REQUESTS_RECOVERY,
PRIORITY)
VALUES( 'CRMscheduler', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
8.
commit;
12.
select STAT_ACCESS from QRTZ2_LOCKS for update
13.
SELECT TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?;
14.
SELECT TRIGGER_STATE
FROM QRTZ2_TRIGGERS
WHERE SCHEDULER_NAME = 'CRMscheduler'
AND TRIGGER_NAME = ?
AND TRIGGER_GROUP = ?;
14.
SELECT *
FROM QRTZ2_JOB_DETAILS
WHERE SCHEDULER_NAME = CRMscheduler
AND JOB_NAME = ?
AND JOB_GROUP = ?;
15.
SELECT *
FROM QRTZ2_CALENDARS
WHERE SCHEDULER_NAME = 'CRMscheduler'
AND CALENDAR_NAME = ?;
16.
UPDATE QRTZ2_FIRED_TRIGGERS
SET INSTANCE_NAME = ?,
FIRED_TIME = ?,
SCHED_TIME = ?,
ENTRY_STATE = ?,
JOB_NAME = ?,
JOB_GROUP = ?,
IS_NONCONCURRENT = ?,
REQUESTS_RECOVERY = ?
WHERE SCHEDULER_NAME = 'CRMscheduler'
AND ENTRY_ID = ?;
17.
UPDATE TQRTZ2_TRIGGERS
SET TRIGGER_STATE = ?
WHERE SCHED_NAME = 'CRMscheduler'
AND TRIGGER_NAME = '{triggerName}'
AND TRIGGER_GROUP = '{triggerGroup}'
AND TRIGGER_STATE = ?;
18.
UPDATE QRTZ2_TRIGGERS
SET JOB_NAME = ?,
JOB_GROUP = ?,
DESCRIPTION = ?,
NEXT_FIRE_TIME = ?,
PREV_FIRE_TIME = ?,
TRIGGER_STATE = ?,
TRIGGER_TYPE = ?,
START_TIME = ?,
END_TIME = ?,
CALENDAR_NAME = ?,
MISFIRE_INSTRUCTION = ?,
PRIORITY = ?,
JOB_DATAMAP = ?
WHERE SCHEDULER_NAME = SCHED_NAME_SUBST
AND TRIGGER_NAME = ?
AND TRIGGER_GROUP = ?;
19.
commit;

原文地址:http://demo.netfoucs.com/gklifg/article/details/27090179

相關文章
相關標籤/搜索