引言
quratz是目前最爲成熟,使用最普遍的java任務調度框架,功能強大配置靈活.在企業應用中佔重要地位.quratz在集羣環境中的使用方式是每一個企業級系統都要考慮的問題.早在2006年,在ITeye上就有一篇關於quratz集羣方案的討論:http://www.iteye.com/topic/40970 ITeye創始人@Robbin在8樓給出了本身對quartz集羣應用方案的意見.java
後來有人總結了三種quratz集羣方案:http://www.iteye.com/topic/114965node
1.單獨啓動一個Job Server來跑job,不部署在web容器中.其餘web節點當須要啓動異步任務的時候,能夠經過種種方式(DB, JMS, Web Service, etc)通知Job Server,而Job Server收到這個通知以後,把異步任務加載到本身的任務隊列中去。git
2.獨立出一個job server,這個server上跑一個spring+quartz的應用,這個應用專門用來啓動任務。在jobserver上加上hessain,獲得業務接口,這樣jobserver就能夠調用web container中的業務操做,也就是正真執行任務的仍是在cluster中的tomcat。在jobserver啓動定時任務以後,輪流調用各地址上的業務操做(相似apache分發tomcat同樣),這樣可讓不一樣的定時任務在不一樣的節點上運行,減低了一臺某個node的壓力web
3.quartz自己事實上也是支持集羣的。在這種方案下,cluster上的每個node都在跑quartz,而後也是經過數據中記錄的狀態來判斷這個操做是否正在執行,這就要求cluster上全部的node的時間應該是同樣的。並且每個node都跑應用就意味着每個node都須要有本身的線程池來跑quartz.算法
總的來講,第一種方法,在單獨的server上執行任務,對任務的適用範圍有很大的限制,要訪問在web環境中的各類資源很是麻煩.可是集中式的管理容易從架構上規避了分佈式環境的種種同步問題.第二種方法在在第一種方法的基礎上減輕了jobserver的重量,只發送調用請求,不直接執行任務,這樣解決了獨立server沒法訪問web環境的問題,並且能夠作到節點的輪詢.能夠有效地均衡負載.第三種方案是quartz自身支持的集羣方案,在架構上徹底是分佈式的,沒有集中的管理,quratz經過數據庫鎖以及標識字段保證多個節點對任務不重複獲取,而且有負載平衡機制和容錯機制,用少許的冗餘,換取了高可用性(high avilable HA)和高可靠性.(我的認爲和git的機制有殊途同歸之處,分佈式的冗餘設計,換取可靠性和速度).spring
本文旨在研究quratz爲解決分佈式任務調度中存在的防止重複執行和負載均衡等問題而創建的機制.以調度流程做爲順序,配合源碼理解其中原理.sql
quratz的配置,及具體應用請參考CRM項目組的另外一篇文章:CRM使用Quartz集羣總結分享shell
quartz集羣架構
quartz的分佈式架構如上圖,能夠看到數據庫是各節點上調度器的樞紐.各個節點並不感知其餘節點的存在,只是經過數據庫來進行間接的溝通.數據庫
實際上,quartz的分佈式策略就是一種以數據庫做爲邊界資源的併發策略.每一個節點都遵照相同的操做規範,使得對數據庫的操做能夠串行執行.而不一樣名稱的調度器又能夠互不影響的並行運行.apache
組件間的通信圖以下:(*注:主要的sql語句附在文章最後)
quartz運行時由QuartzSchedulerThread類做爲主體,循環執行調度流程。JobStore做爲中間層,按照quartz的併發策略執行數據庫操做,完成主要的調度邏輯。JobRunShellFactory負責實例化JobDetail對象,將其放入線程池運行。LockHandler負責獲取LOCKS表中的數據庫鎖。
整個quartz對任務調度的時序大體以下:
梳理一下其中的流程,能夠表示爲:
0.調度器線程run()
1.獲取待觸發trigger
1.1數據庫LOCKS表TRIGGER_ACCESS行加鎖
1.2讀取JobDetail信息
1.3讀取trigger表中觸發器信息並標記爲"已獲取"
1.4commit事務,釋放鎖
2.觸發trigger
2.1數據庫LOCKS表STATE_ACCESS行加鎖
2.2確認trigger的狀態
2.3讀取trigger的JobDetail信息
2.4讀取trigger的Calendar信息
2.3更新trigger信息
2.3commit事務,釋放鎖
3實例化並執行Job
3.1從線程池獲取線程執行JobRunShell的run方法
能夠看到,這個過程當中有兩個類似的過程:一樣是對數據表的更新操做,一樣是在執行操做前獲取鎖 操做完成後釋放鎖.這一規則能夠看作是quartz解決集羣問題的核心思想.
規則流程圖:
進一步解釋這條規則就是:一個調度器實例在執行涉及到分佈式問題的數據庫操做前,首先要獲取QUARTZ2_LOCKS表中對應當前調度器的行級鎖,獲取鎖後便可執行其餘表中的數據庫操做,隨着操做事務的提交,行級鎖被釋放,供其餘調度器實例獲取.
集羣中的每個調度器實例都遵循這樣一種嚴格的操做規程,那麼對於同一類調度器來講,每一個實例對數據庫的操做只能是串行的.而不一樣名的調度器之間卻能夠並行執行.
下面咱們深刻源碼,從微觀上觀察quartz集羣調度的細節
調度器實例化
一個最簡單的quartz helloworld應用以下:
public
class
HelloWorldMain {
Log log = LogFactory.getLog(HelloWorldMain.
class
);
public
void
run() {
try
{
SchedulerFactory sf =
new
StdSchedulerFactory();
Scheduler sch = sf.getScheduler();
JobDetail jd =
new
JobDetail(
"HelloWorldJobDetail"
,Scheduler.DEFAULT_GROUP,HelloWorldJob.
class
);
Trigger tg = TriggerUtils.makeMinutelyTrigger(
1
);
tg.setName(
"HelloWorldTrigger"
);
sch.scheduleJob(jd, tg);
sch.start();
}
catch
( Exception e ) {
e.printStackTrace();
}
}
public
static
void
main(String[] args) {
HelloWorldMain hw =
new
HelloWorldMain();
hw.run();
}
}
|
咱們看到初始化一個調度器須要用工廠類獲取實例:
SchedulerFactory sf =
new
StdSchedulerFactory();
Scheduler sch = sf.getScheduler();
|
而後啓動:
下面跟進StdSchedulerFactory的getScheduler()方法:
public
Scheduler getScheduler()
throws
SchedulerException {
if
(cfg ==
null
) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if
(sched !=
null
) {
if
(sched.isShutdown()) {
schedRep.remove(getSchedulerName());
}
else
{
return
sched;
}
}
sched = instantiate();
return
sched;
}
|
跟進初始化調度器方法sched = instantiate();發現是一個700多行的初始化方法,涉及到
- 讀取配置資源,
- 生成QuartzScheduler對象,
- 建立該對象的運行線程,並啓動線程;
- 初始化JobStore,QuartzScheduler,DBConnectionManager等重要組件,
至此,調度器的初始化工做已完成,初始化工做中quratz讀取了數據庫中存放的對應當前調度器的鎖信息,對應CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS兩個LOCK_NAME.
public
void
initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler)
throws
SchedulerConfigException {
if
(dsName ==
null
) {
throw
new
SchedulerConfigException(
"DataSource name not set."
);
}
classLoadHelper = loadHelper;
if
(isThreadsInheritInitializersClassLoadContext()) {
log.info(
"JDBCJobStore threads will inherit ContextClassLoader of thread: "
+ Thread.currentThread().getName());
initializersLoader = Thread.currentThread().getContextClassLoader();
}
this
.schedSignaler = signaler;
if
(getLockHandler() ==
null
) {
if
(isClustered()) {
setUseDBLocks(
true
);
}
if
(getUseDBLocks()) {
if
(getDriverDelegateClass() !=
null
&& getDriverDelegateClass().equals(MSSQLDelegate.
class
.getName())) {
if
(getSelectWithLockSQL() ==
null
) {
String msSqlDflt =
"SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE "
+ COL_SCHEDULER_NAME +
" = {1} AND LOCK_NAME = ?"
;
getLog().info(
"Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '"
+ msSqlDflt +
"'."
);
setSelectWithLockSQL(msSqlDflt);
}
}
getLog().info(
"Using db table-based data access locking (synchronization)."
);
setLockHandler(
new
StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
}
else
{
getLog().info(
"Using thread monitor-based data access locking (synchronization)."
);
setLockHandler(
new
SimpleSemaphore());
}
}
}
|
當調用sch.start();方法時,scheduler作了以下工做:
1.通知listener開始啓動
2.啓動調度器線程
3.啓動plugin
4.通知listener啓動完成
public
void
start()
throws
SchedulerException {
if
(shuttingDown|| closed) {
throw
new
SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called."
);
}
notifySchedulerListenersStarting();
if
(initialStart ==
null
) {
initialStart =
new
Date();
this
.resources.getJobStore().schedulerStarted();
startPlugins();
}
else
{
resources.getJobStore().schedulerResumed();
}
schedThread.togglePause(
false
);
getLog().info(
"Scheduler "
+ resources.getUniqueIdentifier() +
" started."
);
notifySchedulerListenersStarted();
}
|
調度過程
調度器啓動後,調度器的線程就處於運行狀態了,開始執行quartz的主要工做–調度任務.
前面已介紹過,任務的調度過程大體分爲三步:
1.獲取待觸發trigger
2.觸發trigger
3.實例化並執行Job
下面分別分析三個階段的源碼.
QuartzSchedulerThread是調度器線程類,調度過程的三個步驟就承載在run()方法中,分析見代碼註釋:
public
void
run() {
boolean
lastAcquireFailed =
false
;
while
(!halted.get()) {
try
{
synchronized
(sigLock) {
while
(paused && !halted.get()) {
try
{
sigLock.wait(1000L);
}
catch
(InterruptedException ignore) {
}
}
if
(halted.get()) {
break
;
}
}
/獲取當前線程池中線程的數量
int
availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if
(availThreadCount >
0
) {
List<OperableTrigger> triggers =
null
;
long
now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try
{
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed =
false
;
if
(log.isDebugEnabled())
log.debug(
"batch acquisition of "
+ (triggers ==
null
?
0
: triggers.size()) +
" triggers"
);
}
catch
(JobPersistenceException jpe) {
if
(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire."
,
jpe);
}
lastAcquireFailed =
true
;
continue
;
}
catch
(RuntimeException e) {
if
(!lastAcquireFailed) {
getLog().error(
"quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed =
true
;
continue
;
}
if
(triggers !=
null
&& !triggers.isEmpty()) {
now = System.currentTimeMillis();
long
triggerTime = triggers.get(
0
).getNextFireTime().getTime();
long
timeUntilTrigger = triggerTime - now;
while
(timeUntilTrigger >
2
) {
synchronized
(sigLock) {
if
(halted.get()) {
break
;
}
if
(!isCandidateNewTimeEarlierWithinReason(triggerTime,
false
)) {
try
{
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if
(timeUntilTrigger >=
1
)
sigLock.wait(timeUntilTrigger);
}
catch
(InterruptedException ignore) {
}
}
}
if
(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break
;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
if
(triggers.isEmpty())
continue
;
List<TriggerFiredResult> bndles =
new
ArrayList<TriggerFiredResult>();
boolean
goAhead =
true
;
synchronized
(sigLock) {
goAhead = !halted.get();
}
if
(goAhead) {
try
{
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if
(res !=
null
)
bndles = res;
}
catch
(SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers +
"'"
, se);
for
(
int
i =
0
; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue
;
}
}
for
(
int
i =
0
; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if
(exception
instanceof
RuntimeException) {
getLog().error(
"RuntimeException while firing trigger "
+ triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue
;
}
if
(bndle ==
null
) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue
;
}
JobRunShell shell =
null
;
try
{
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
}
catch
(SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue
;
}
if
(qsRsrcs.getThreadPool().runInThread(shell) ==
false
) {
getLog().error(
"ThreadPool.runInThread() return false!"
);
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue
;
}
}
else
{
continue
;
}
long
now = System.currentTimeMillis();
long
waitTime = now + getRandomizedIdleWaitTime();
long
timeUntilContinue = waitTime - now;
synchronized
(sigLock) {
try
{
if
(!halted.get()) {
if
(!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
}
catch
(InterruptedException ignore) {
}
}
}
catch
(RuntimeException re) {
getLog().error(
"Runtime error occurred in main trigger firing loop."
, re);
}
}
qs =
null
;
qsRsrcs =
null
;
}
|
調度器每次獲取到的trigger是30s內須要執行的,因此要等待一段時間至trigger執行前2ms.在等待過程當中涉及到一個新加進來更緊急的trigger的處理邏輯.分析寫在註釋中,再也不贅述.
能夠看到調度器的只要在運行狀態,就會不停地執行調度流程.值得注意的是,在流程的最後線程會等待一個隨機的時間.這就是quartz自帶的負載平衡機制.
如下是三個步驟的跟進:
觸發器的獲取
調度器調用:
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
|
在數據庫中查找必定時間範圍內將會被觸發的trigger.參數的意義以下:參數1:nolaterthan = now+3000ms,即將來30s內將會被觸發.參數2 最大獲取數量,大小取線程池線程剩餘量與定義值得較小者.參數3 時間窗口 默認爲0,程序會在nolaterthan後加上窗口大小來選擇trigger.quratz會在每次觸發trigger後計算出trigger下次要執行的時間,並在數據庫QRTZ2_TRIGGERS中的NEXT_FIRE_TIME字段中記錄.查找時將當前毫秒數與該字段比較,就能找出下一段時間內將會觸發的觸發器.查找時,調用在JobStoreSupport類中的方法:
public
List<OperableTrigger> acquireNextTriggers(
final
long
noLaterThan,
final
int
maxCount,
final
long
timeWindow)
throws
JobPersistenceException {
String lockName;
if
(isAcquireTriggersWithinLock() || maxCount >
1
) {
lockName = LOCK_TRIGGER_ACCESS;
}
else
{
lockName =
null
;
}
return
executeInNonManagedTXLock(lockName,
new
TransactionCallback<List<OperableTrigger>>() {
public
List<OperableTrigger> execute(Connection conn)
throws
JobPersistenceException {
return
acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new
TransactionValidator<List<OperableTrigger>>() {
public
Boolean validate(Connection conn, List<OperableTrigger> result)
throws
JobPersistenceException {
}
});
}
|
該方法關鍵的一點在於執行了executeInNonManagedTXLock()方法,這一方法指定了一個鎖名,兩個回調函數.在開始執行時得到鎖,在方法執行完畢後隨着事務的提交鎖被釋放.在該方法的底層,使用 for update語句,在數據庫中加入行級鎖,保證了在該方法執行過程當中,其餘的調度器對trigger進行獲取時將會等待該調度器釋放該鎖.此方法是前面介紹的quartz集羣策略的的具體實現,這一模板方法在後面的trigger觸發過程還會被使用.
public
static
final
String SELECT_FOR_LOCK =
"SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS +
" WHERE "
+ COL_SCHEDULER_NAME +
" = "
+ SCHED_NAME_SUBST
+
" AND "
+ COL_LOCK_NAME +
" = ? FOR UPDATE"
;
|
進一步解釋:quratz在獲取數據庫資源以前,先要以for update方式訪問LOCKS表中相應LOCK_NAME數據將改行鎖定.若是在此前該行已經被鎖定,那麼等待,若是沒有被鎖定,那麼讀取知足要求的trigger,並把它們的status置爲STATE_ACQUIRED,若是有tirgger已被置爲STATE_ACQUIRED,那麼說明該trigger已被別的調度器實例認領,無需再次認領,調度器會忽略此trigger.調度器實例之間的間接通訊就體如今這裏.
JobStoreSupport.acquireNextTrigger()方法中:
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
最後釋放鎖,這時若是下一個調度器在排隊獲取trigger的話,則仍會執行相同的步驟.這種機制保證了trigger不會被重複獲取.按照這種算法正常運行狀態下調度器每次讀取的trigger中會有至關一部分已被標記爲被獲取.
獲取trigger的過程進行完畢.
觸發trigger:
QuartzSchedulerThread line336:
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
調用JobStoreSupport類的triggersFired()方法:
public
List<TriggerFiredResult> triggersFired(
final
List<OperableTrigger> triggers)
throws
JobPersistenceException {
return
executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
new
TransactionCallback<List<TriggerFiredResult>>() {
public
List<TriggerFiredResult> execute(Connection conn)
throws
JobPersistenceException {
List<TriggerFiredResult> results =
new
ArrayList<TriggerFiredResult>();
TriggerFiredResult result;
for
(OperableTrigger trigger : triggers) {
try
{
TriggerFiredBundle bundle = triggerFired(conn, trigger);
result =
new
TriggerFiredResult(bundle);
}
catch
(JobPersistenceException jpe) {
result =
new
TriggerFiredResult(jpe);
}
catch
(RuntimeException re) {
result =
new
TriggerFiredResult(re);
}
results.add(result);
}
return
results;
}
},
new
TransactionValidator<List<TriggerFiredResult>>() {
@Override
public
Boolean validate(Connection conn, List<TriggerFiredResult> result)
throws
JobPersistenceException {
}
});
}
|
此處再次用到了quratz的行爲規範:executeInNonManagedTXLock()方法,在獲取鎖的狀況下對trigger進行觸發操做.其中的觸發細節以下:
protected
TriggerFiredBundle triggerFired(Connection conn,
OperableTrigger trigger)
throws
JobPersistenceException {
JobDetail job;
Calendar cal =
null
;
try
{
String state = getDelegate().selectTriggerState(conn,
trigger.getKey());
if
(!state.equals(STATE_ACQUIRED)) {
return
null
;
}
}
catch
(SQLException e) {
throw
new
JobPersistenceException(
"Couldn't select trigger state: "
+ e.getMessage(), e);
}
try
{
job = retrieveJob(conn, trigger.getJobKey());
if
(job ==
null
) {
return
null
; }
}
catch
(JobPersistenceException jpe) {
try
{
getLog().error(
"Error retrieving job, setting trigger state to ERROR."
, jpe);
getDelegate().updateTriggerState(conn, trigger.getKey(),
STATE_ERROR);
}
catch
(SQLException sqle) {
getLog().error(
"Unable to set trigger state to ERROR."
, sqle);
}
throw
jpe;
}
if
(trigger.getCalendarName() !=
null
) {
cal = retrieveCalendar(conn, trigger.getCalendarName());
if
(cal ==
null
) {
return
null
; }
}
try
{
getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
}
catch
(SQLException e) {
throw
new
JobPersistenceException(
"Couldn't insert fired trigger: "
+ e.getMessage(), e);
}
Date prevFireTime = trigger.getPreviousFireTime();
trigger.triggered(cal);
String state = STATE_WAITING;
boolean
force =
true
;
if
(job.isConcurrentExectionDisallowed()) {
state = STATE_BLOCKED;
force =
false
;
try
{
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_WAITING);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_ACQUIRED);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_PAUSED_BLOCKED, STATE_PAUSED);
}
catch
(SQLException e) {
throw
new
JobPersistenceException(
"Couldn't update states of blocked triggers: "
+ e.getMessage(), e);
}
}
if
(trigger.getNextFireTime() ==
null
) {
state = STATE_COMPLETE;
force =
true
;
}
storeTrigger(conn, trigger, job,
true
, state, force,
false
);
job.getJobDataMap().clearDirtyFlag();
return
new
TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
.equals(Scheduler.DEFAULT_RECOVERY_GROUP),
new
Date(), trigger
.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
}
|
該方法作了如下工做:
1.獲取trigger當前狀態
2.經過trigger中的JobKey讀取trigger包含的Job信息
3.將trigger更新至觸發狀態
4.結合calendar的信息觸發trigger,涉及屢次狀態更新
5.更新數據庫中trigger的信息,包括更改狀態至STATE_COMPLETE,及計算下一次觸發時間.
6.返回trigger觸發結果的數據傳輸類TriggerFiredBundle
從該方法返回後,trigger的執行過程已基本完畢.回到執行quratz操做規範的executeInNonManagedTXLock方法,將數據庫鎖釋放.
trigger觸發操做完成
Job執行過程:
再回到線程類QuartzSchedulerThread的 line353這時觸發器都已出發完畢,job的詳細信息都已就位
QuartzSchedulerThread line:368
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
shell.initialize(qs);
|
爲每一個Job生成一個可運行的RunShell,並放入線程池運行.
在最後調度線程生成了一個隨機的等待時間,進入短暫的等待,這使得其餘節點的調度器都有機會獲取數據庫資源.如此就實現了quratz的負載平衡.
這樣一次完整的調度過程就結束了.調度器線程進入下一次循環.
總結:
簡單地說,quartz的分佈式調度策略是以數據庫爲邊界資源的一種異步策略.各個調度器都遵照一個基於數據庫鎖的操做規則保證了操做的惟一性.同時多個節點的異步運行保證了服務的可靠.但這種策略有本身的侷限性.摘錄官方文檔中對quratz集羣特性的說明:
Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers.
The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).
說明指出,集羣特性對於高cpu使用率的任務效果很好,可是對於大量的短任務,各個節點都會搶佔數據庫鎖,這樣就出現大量的線程等待資源.這種狀況隨着節點的增長會愈來愈嚴重.
附:
通信圖中關鍵步驟的主要sql語句:
3.
select
TRIGGER_ACCESS
from
QRTZ2_LOCKS
for
update
4.
SELECT
TRIGGER_NAME,
TRIGGER_GROUP,
NEXT_FIRE_TIME,
PRIORITY
FROM
QRTZ2_TRIGGERS
WHERE
SCHEDULER_NAME =
'CRMscheduler'
AND
TRIGGER_STATE =
'ACQUIRED'
AND
NEXT_FIRE_TIME <=
'{timekey 30s latter}'
AND
( MISFIRE_INSTR = -1
|