上一篇文章Quartz數據庫表分析介紹了Quartz默認提供的11張表,本文將具體分析Quartz是如何調度的,是如何經過數據庫的方式來如今分佈式調度。sql
Quartz內部提供的調度類是QuartzScheduler,而QuartzScheduler會委託QuartzSchedulerThread去實時調度;當調度完須要去執行job的時候QuartzSchedulerThread並無直接去執行job,
而是交給ThreadPool去執行job,具體使用什麼ThreadPool,初始化多線線程,能夠在配置文件中進行配置:shell
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount: 10 org.quartz.threadPool.threadPriority: 5
經常使用的線程池是SimpleThreadPool,這裏默認啓動了10個線程,在SimpleThreadPool會建立10個WorkerThread,由WorkerThread去執行具體的job;數據庫
QuartzSchedulerThread是調度的核心類,具體Quartz是如何實現調度的,能夠查看QuartzSchedulerThread核心源碼:segmentfault
public void run() { boolean lastAcquireFailed = false; while (!halted.get()) { try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } } if (halted.get()) { break; } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed = true; continue; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed = true; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to 'executing' List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }
這是兩個boolean值的標誌參數,分別表示:中止和暫停;halted默認爲false,當QuartzScheduler執行shutdown()時纔會更新爲true;paused默認是true,當QuartzScheduler執行start()時
更新爲false;正常啓動以後QuartzSchedulerThread就能夠往下執行了;併發
查詢SimpleThreadPool是否有可用的WorkerThread,若是availThreadCount>0能夠往下繼續執行其餘邏輯,不然繼續檢查;app
查詢一段時間內將要被調度的triggers,這裏有3個比較重要的參數分別是:idleWaitTime,maxBatchSize,batchTimeWindow,這3個參數均可以在配置文件中進行配置:dom
org.quartz.scheduler.idleWaitTime:30000 org.quartz.scheduler.batchTriggerAcquisitionMaxCount:1 org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow:0
idleWaitTime:在調度程序處於空閒狀態時,調度程序將在從新查詢可用觸發器以前等待的時間量(以毫秒爲單位),默認是30秒;
batchTriggerAcquisitionMaxCount:容許調度程序節點一次獲取(用於觸發)的觸發器的最大數量,默認是1;
batchTriggerAcquisitionFireAheadTimeWindow:容許觸發器在其預約的火災時間以前被獲取和觸發的時間(毫秒)的時間量,默認是0;分佈式
往下繼續查看acquireNextTriggers方法源碼:ide
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException { String lockName; if(isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName, new TransactionCallback<List<OperableTrigger>>() { public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); } }, ...... }); }
能夠發現只有在設置了acquireTriggersWithinLock或者batchTriggerAcquisitionMaxCount>1狀況下才使用LOCK_TRIGGER_ACCESS鎖,也就是說在默認參數配置的狀況下,這裏是沒有使用鎖的,
那麼若是多個節點同時去執行acquireNextTriggers,會不會出現同一個trigger在多個節點都被執行?
注:acquireTriggersWithinLock能夠在配置文件中進行配置:oop
org.quartz.jobStore.acquireTriggersWithinLock=true
acquireTriggersWithinLock:獲取triggers的時候是否須要使用鎖,默認是false,若是batchTriggerAcquisitionMaxCount>1最好同時設置acquireTriggersWithinLock爲true;
帶着問題繼續查看TransactionCallback內部的acquireNextTrigger方法源碼:
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { if (timeWindow < 0) { throw new IllegalArgumentException(); } List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>(); Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>(); final int MAX_DO_LOOP_RETRY = 3; int currentLoopCount = 0; do { currentLoopCount ++; try { List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); // No trigger is ready to fire yet. if (keys == null || keys.size() == 0) return acquiredTriggers; long batchEnd = noLaterThan; for(TriggerKey triggerKey: keys) { // If our trigger is no longer available, try a new one. OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); if(nextTrigger == null) { continue; // next trigger } // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then // put it back into the timeTriggers set and continue to search for next trigger. JobKey jobKey = nextTrigger.getJobKey(); JobDetail job; try { job = retrieveJob(conn, jobKey); } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } continue; } if (job.isConcurrentExectionDisallowed()) { if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { continue; // next trigger } else { acquiredJobKeysForNoConcurrentExec.add(jobKey); } } if (nextTrigger.getNextFireTime().getTime() > batchEnd) { break; } // We now have a acquired trigger, let's add to return list. // If our trigger was no longer in the expected state, try a new one. int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING); if (rowsUpdated <= 0) { continue; // next trigger } nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null); if(acquiredTriggers.isEmpty()) { batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow; } acquiredTriggers.add(nextTrigger); } // if we didn't end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { continue; } // We are done with the while loop. break; } catch (Exception e) { throw new JobPersistenceException( "Couldn't acquire next trigger: " + e.getMessage(), e); } } while (true); // Return the acquired trigger list return acquiredTriggers; }
首先看一下在執行selectTriggerToAcquire方法時引入了新的參數:misfireTime=當前時間-MisfireThreshold,MisfireThreshold能夠在配置文件中進行配置:
org.quartz.jobStore.misfireThreshold: 60000
misfireThreshold:叫觸發器超時,好比有10個線程,可是有11個任務,這樣就有一個任務被延遲執行了,能夠理解爲調度引擎能夠忍受這個超時的時間;具體的查詢SQL以下所示:
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM qrtz_TRIGGERS WHERE SCHED_NAME = 'myScheduler' AND TRIGGER_STATE = 'WAITING' AND NEXT_FIRE_TIME <= noLaterThan AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= noEarlierThan)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
這裏的noLaterThan=當前時間+idleWaitTime+batchTriggerAcquisitionFireAheadTimeWindow,
noEarlierThan=當前時間-MisfireThreshold;
在查詢完以後,會遍歷執行updateTriggerStateFromOtherState()方法更新trigger的狀態從STATE_WAITING到STATE_ACQUIRED,而且會判斷rowsUpdated是否大於0,這樣就算多個節點都查詢到相同的trigger,可是確定只會有一個節點更新成功;更新完狀態以後,往qrtz_fired_triggers表中插入一條記錄,表示當前trigger已經觸發,狀態爲STATE_ACQUIRED;
Quartz的分佈式鎖被用在不少地方,下面具體看一下Quartz是如何實現分佈式鎖的,executeInNonManagedTXLock方法源碼以下:
protected <T> T executeInNonManagedTXLock( String lockName, TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException { boolean transOwner = false; Connection conn = null; try { if (lockName != null) { // If we aren't using db locks, then delay getting DB connection // until after acquiring the lock since it isn't needed. if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) { conn = getNonManagedTXConnection(); } final T result = txCallback.execute(conn); try { commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() { @Override public Boolean execute(Connection conn) throws JobPersistenceException { return txValidator.validate(conn, result); } })) { throw e; } } Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null && sigTime >= 0) { signalSchedulingChangeImmediately(sigTime); } return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { releaseLock(lockName, transOwner); } finally { cleanupConnection(conn); } } }
大體分紅3個步驟:獲取鎖,執行邏輯,釋放鎖;getLockHandler().obtainLock表示獲取鎖txCallback.execute(conn)表示執行邏輯,commitConnection(conn)表示釋放鎖
Quartz的分佈式鎖接口類是Semaphore,默認具體的實現是StdRowLockSemaphore,具體接口以下:
public interface Semaphore { boolean obtainLock(Connection conn, String lockName) throws LockException; void releaseLock(String lockName) throws LockException; boolean requiresConnection(); }
具體看一下obtainLock()是如何獲取鎖的,源碼以下:
public boolean obtainLock(Connection conn, String lockName) throws LockException { if (!isLockOwner(lockName)) { executeSQL(conn, lockName, expandedSQL, expandedInsertSQL); getThreadLocks().add(lockName); } else if(log.isDebugEnabled()) { } return true; } protected void executeSQL(Connection conn, final String lockName, final String expandedSQL, final String expandedInsertSQL) throws LockException { PreparedStatement ps = null; ResultSet rs = null; SQLException initCause = null; int count = 0; do { count++; try { ps = conn.prepareStatement(expandedSQL); ps.setString(1, lockName); rs = ps.executeQuery(); if (!rs.next()) { getLog().debug( "Inserting new lock row for lock: '" + lockName + "' being obtained by thread: " + Thread.currentThread().getName()); rs.close(); rs = null; ps.close(); ps = null; ps = conn.prepareStatement(expandedInsertSQL); ps.setString(1, lockName); int res = ps.executeUpdate(); if(res != 1) { if(count < 3) { try { Thread.sleep(1000L); } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } continue; } } } return; // obtained lock, go } catch (SQLException sqle) { ...... } while(count < 4); }
obtainLock首先判斷是否已經獲取到鎖,若是沒有執行方法executeSQL,其中有兩條重要的SQL,分別是:expandedSQL和expandedInsertSQL,以SCHED_NAME = ‘myScheduler’爲例:
SELECT * FROM QRTZ_LOCKS WHERE SCHED_NAME = 'myScheduler' AND LOCK_NAME = ? FOR UPDATE INSERT INTO QRTZ_LOCKS(SCHED_NAME, LOCK_NAME) VALUES ('myScheduler', ?)
select語句後面添加了FOR UPDATE,若是LOCK_NAME存在,當多個節點去執行此SQL時,只有第一個節點會成功,其餘的節點都將進入等待;
若是LOCK_NAME不存在,多個節點同時執行expandedInsertSQL,只會有一個節點插入成功,執行插入失敗的節點將進入重試,從新執行expandedSQL;
txCallback執行完以後,執行commitConnection操做,這樣當前節點就釋放了LOCK_NAME,其餘節點能夠競爭獲取鎖,最後執行了releaseLock;
表示觸發trigger,具體代碼以下:
protected TriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger) throws JobPersistenceException { JobDetail job; Calendar cal = null; // Make sure trigger wasn't deleted, paused, or completed... try { // if trigger was deleted, state will be STATE_DELETED String state = getDelegate().selectTriggerState(conn, trigger.getKey()); if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn't select trigger state: " + e.getMessage(), e); } try { job = retrieveJob(conn, trigger.getJobKey()); if (job == null) { return null; } } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, trigger.getKey(), STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } throw jpe; } if (trigger.getCalendarName() != null) { cal = retrieveCalendar(conn, trigger.getCalendarName()); if (cal == null) { return null; } } try { getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job); } catch (SQLException e) { throw new JobPersistenceException("Couldn't insert fired trigger: " + e.getMessage(), e); } Date prevFireTime = trigger.getPreviousFireTime(); // call triggered - to update the trigger's next-fire-time state... trigger.triggered(cal); String state = STATE_WAITING; boolean force = true; if (job.isConcurrentExectionDisallowed()) { state = STATE_BLOCKED; force = false; try { getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_WAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_ACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_PAUSED_BLOCKED, STATE_PAUSED); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't update states of blocked triggers: " + e.getMessage(), e); } } if (trigger.getNextFireTime() == null) { state = STATE_COMPLETE; force = true; } storeTrigger(conn, trigger, job, true, state, force, false); job.getJobDataMap().clearDirtyFlag(); return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); }
首先查詢trigger的狀態是否STATE_ACQUIRED狀態,若是不是直接返回null;而後經過經過jobKey獲取對應的jobDetail,更新對應的FiredTrigger爲EXECUTING狀態;最後斷定job的DisallowConcurrentExecution是否開啓,若是開啓了不能併發執行job,那麼trigger的狀態爲STATE_BLOCKED狀態,不然爲STATE_WAITING;若是狀態爲STATE_BLOCKED,那麼下次調度
對應的trigger不會被拉取,只有等對應的job執行完以後,更新狀態爲STATE_WAITING以後才能夠執行,保證了job的串行;
經過ThreadPool來執行封裝job的JobRunShell;
在文章Spring整合Quartz分佈式調度中,最後作了幾回測試分佈式調度,如今能夠作出相應的解釋
上文中能夠發現Quartz使用了分佈式鎖和狀態來保證只有一個節點能執行
由於調度線程和任務執行線程是分開的,認爲執行在Threadpool中執行,互相不影響;
在triggerFired中若是使用了DisallowConcurrentExecution,會引入STATE_BLOCKED狀態,保證任務的串行;
本文從源碼的角度大體介紹了一下Quartz調度的流程,固然太細節的東西沒有去深刻;經過本文大體能夠對多節點調度產生的現象作一個合理的解釋。