1、簡介java
Quartz是一個優秀的調度框架,徹底基於Java實現。具備如下幾大特色:mysql
(1)強大的調度功能,例如支持豐富多樣的調度方法,能夠知足各類常規及特殊需求;git
(2)靈活的應用方式,例如支持任務和調度的多種組合方式,支持調度數據的多種存儲方式;github
(3)分佈式和集羣能力,Terracotta收購後在原來功能基礎上做了進一步提高。算法
核心概念sql
Job 表示一個工做,要執行的具體內容。此接口中只有一個方法,以下:數據庫
void execute(JobExecutionContext context)
JobDetail 表示一個具體的可執行的調度程序,Job 是這個可執行程調度程序所要執行的內容,另外 JobDetail 還包含了這個任務調度的方案和策略。
服務器
Trigger 觸發器,指定什麼時候觸發任務。
架構
Scheduler 表明一個調度容器,一個調度容器中能夠註冊多個 JobDetail 和 Trigger。當 Trigger 與 JobDetail 組合,就能夠被 Scheduler 容器調度了。 併發
Quartz線程視圖
在Quartz中,有兩類線程,Scheduler調度線程和任務執行線程,其中任務執行線程一般使用一個線程池維護一組線程。
Scheduler調度線程主要有兩個:執行常規調度的線程,和執行misfiredtrigger的線程。常規調度線程輪詢存儲的全部trigger,若是有須要觸發的trigger,即到達了下一次觸發的時間,則從任務執行線程池獲取一個空閒線程,執行與該trigger關聯的任務。Misfire線程是掃描全部的trigger,查看是否有misfiredtrigger,若是有的話根據misfire的策略分別處理(fire now OR wait for the next fire)。
Quartz Job數據存儲
Quartz中的trigger和job須要存儲下來才能被使用。Quartz中有兩種存儲方式:RAMJobStore,JobStoreSupport,其中RAMJobStore是將trigger和job存儲在內存中,而JobStoreSupport是基於jdbc將trigger和job存儲到數據庫中。RAMJobStore的存取速度很是快,可是因爲其在系統被中止後全部的數據都會丟失,因此在集羣應用中,必須使用JobStoreSupport
2、Quartz集羣架構
一個Quartz集羣中的每一個節點是一個獨立的Quartz應用,它又管理着其餘的節點。這就意味着你必須對每一個節點分別啓動或中止。Quartz集羣中,獨立的Quartz節點並不與另外一其的節點或是管理節點通訊,而是經過相同的數據庫表來感知到另外一Quartz應用的。
數據庫準備
由於Quzrtz集羣依賴於數據庫,因此必須先建立數據庫表,數據表示官方提供的,我用的是quartz2.3.0版本,有11張表,以下:
表信息介紹
qrtz_blob_triggers : 以Blob 類型存儲的觸發器。
qrtz_calendars存儲Quartz的Calendar信息
qrtz_cron_triggers存儲CronTrigger,包括Cron表達式和時區信息
qrtz_fired_triggers存儲與已觸發的Trigger相關的狀態信息,以及相聯Job的執行信息
qrtz_job_details存儲每個已配置的Job的詳細信息
qrtz_locks存儲程序的悲觀鎖的信息
qrtz_paused_trigger_grps存儲已暫停的Trigger組的信息
qrtz_scheduler_state存儲少許的有關Scheduler的狀態信息,和別的Scheduler實例
qrtz_simple_triggers存儲簡單的Trigger,包括重複次數、間隔、以及已觸的次數
qrtz_simprop_triggers 存儲CalendarIntervalTrigger和DailyTimeIntervalTrigger兩種類型的觸發器
qrtz_triggers存儲已配置的Trigger的信息
qrtz_locks就是Quartz集羣實現同步機制的行鎖表,包括如下幾個鎖:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS
從故障實例中恢復Job
當一個Sheduler實例在執行某個Job時失敗了,有可能由另外一正常工做的Scheduler實例接過這個Job從新運行。要實現這種行爲,配置給JobDetail對象的Job可恢復屬性必須設置爲true(job.setRequestsRecovery(true))。若是可恢復屬性被設置爲false(默認爲false),當某個Scheduler在運行該job失敗時,它將不會從新運行;而是由另外一個Scheduler實例在下一次觸發時間觸發。Scheduler實例出現故障後多快能被偵測到取決於每一個Scheduler的檢入間隔(即2.3中提到的org.quartz.jobStore.clusterCheckinInterval)。
測試項目及配置
項目代碼下載:https://github.com/feixiameiruhua/my-quartz-cluster.git
quartz.properties文件
# 固定前綴org.quartz # 主要分爲scheduler、threadPool、jobStore、plugin等部分 # # #調度器實例編號自動生成 org.quartz.scheduler.instanceId = AUTO #調度器實例名稱 org.quartz.scheduler.instanceName = DefaultQuartzScheduler org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false org.quartz.scheduler.wrapJobExecutionInUserTransaction = false # 實例化ThreadPool時,使用的線程類爲SimpleThreadPool org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool # threadCount和threadPriority將以setter的形式注入ThreadPool實例 # 併發個數 org.quartz.threadPool.threadCount = 5 # 優先級 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true org.quartz.jobStore.misfireThreshold = 5000 # 默認存儲在內存中 #org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore #持久化 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #開啓分佈式部署 org.quartz.jobStore.isClustered = true org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.dataSource = qzDS org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver org.quartz.dataSource.qzDS.URL = jdbc:mysql://localhost:3306/quartz_test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true org.quartz.dataSource.qzDS.user = root org.quartz.dataSource.qzDS.password = 123456 org.quartz.dataSource.qzDS.maxConnections = 10
配置文件說明
org.quartz.jobStore.isClustered = true
在集羣中的每個實例都必須有一個惟一的"instance id" ("org.quartz.scheduler.instanceId" 屬性), 默認爲AUTO就能夠。還要有相同的"scheduler instance name" ("org.quartz.scheduler.instanceName"),也就是說集羣中的每個實例都必須使用相同的quartz.properties 配置文件。
調度任務代碼
package com.fwmagic.quartz.schedule; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.ResourceBundle; public class SchedulerExecJob implements Job { private static Logger logger = LoggerFactory.getLogger(SchedulerExecJob.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { String jobName = context.getJobDetail().getKey().getName(); switch (jobName) { /*每5s執行一次*/ case "quartz_test1": System.err.println(getAddress()+" "+getDate()+"====>quartz_test1<===="); break; /*每5s執行一次*/ case "quartz_test2": System.err.println(getAddress()+" "+getDate()+"====>quartz_test2<===="); break; /*每5s執行一次*/ case "quartz_test3": System.err.println(getAddress()+" "+getDate()+"====>quartz_test3<===="); break; default: System.err.println(getAddress()+" "+getDate()+"====>other task<===="); break; } } public static String getDate(){ return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); } public static String getAddress(){ return "http://localhost:"+ResourceBundle.getBundle("application").getString("server.port"); } }
2、集羣任務測試
項目中有三個任務,我用不一樣端口(三個服務同時啓動,端口不同,分別爲:9099(ServerA),,9098(ServerB),9097(ServerC))在本機啓動同一個項目,效果圖以下:
當開啓ServerA時,三個任務都在9099端口執行,控制檯信息爲:
開啓ServerB後,有部分任務分配過來:
開啓ServerC,集羣所有啓動的狀況下,會有任務交錯執行或者任務在同一臺機器上執行的效果:
注意事項
Quartz實際並不關心你是在相同仍是不一樣的機器上運行節點。當集羣放置在不一樣的機器上時,稱之爲水平集羣。節點跑在同一臺機器上時,稱之爲垂直集羣。對於垂直集羣,存在着單點故障的問題。這對高可用性的應用來講是沒法接受的,由於一旦機器崩潰了,全部的節點也就被終止了。對於水平集羣,存在着時間同步問題。
節點用時間戳來通知其餘實例它本身的最後檢入時間。假如節點的時鐘被設置爲未來的時間,那麼運行中的Scheduler將再也意識不到那個結點已經宕掉了。另外一方面,若是某個節點的時鐘被設置爲過去的時間,也許另外一節點就會認定那個節點已宕掉並試圖接過它的Job重運行。最簡單的同步計算機時鐘的方式是使用某一個Internet時間服務器(Internet Time Server ITS)。
節點爭搶Job問題
由於Quartz使用了一個隨機的負載均衡算法, Job以隨機的方式由不一樣的實例執行。Quartz官網上提到當前,還不存在一個方法來指派(釘住) 一個 Job 到集羣中特定的節點。
3、集羣源碼分析
Quartz如何保證多個節點的應用只進行一次調度(即某一時刻的調度任務只由其中一臺服務器執行)?,正如Quartz集羣架構上的那副圖,
Quartz的集羣是在同一個數據庫下, 由數據庫的數據來肯定調度任務是否正在執行, 正在執行則其餘服務器就不能去執行該行調度數據。 這個跟不少項目是用Zookeeper作集羣不同, 這些項目是靠Zookeeper選舉出來的的服務器去執行, 能夠理解爲Quartz靠數據庫選舉一個服務器來執行。
Quartz最主要的一個類QuartzSchedulerThread職責是觸發任務, 是一個不斷運行的Quartz主線程, 仍是從這裏入手瞭解集羣原理。
QuartzSchedulerThread繼承自Thread,實現了run方法,在run方法中調用了以下幾個重要的方法,都進行了加鎖的操做:
一、qsRsrcs.getJobStore().acquireNextTriggers【查找即將觸發的Trigger】
二、sigLock.wait(timeUntilTrigger)【等待執行】
三、qsRsrcs.getJobStore().triggersFired(triggers)【執行】
四、qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)) 【釋放Trigger】
以acquireNextTriggers爲例,能夠看到:
protected static final String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";//數據庫鎖名字
將鎖名傳入核心的加鎖方法(executeInNonManagedTXLock)中:
protected <T> T executeInNonManagedTXLock( String lockName, TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException { boolean transOwner = false; Connection conn = null; try { if (lockName != null) { // If we aren't using db locks, then delay getting DB connection // until after acquiring the lock since it isn't needed. if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } //獲取鎖 transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) { conn = getNonManagedTXConnection(); } //回調執行 final T result = txCallback.execute(conn); try { commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() { @Override public Boolean execute(Connection conn) throws JobPersistenceException { return txValidator.validate(conn, result); } })) { throw e; } } Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null && sigTime >= 0) { signalSchedulingChangeImmediately(sigTime); } return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { //釋放鎖 releaseLock(lockName, transOwner); } finally { cleanupConnection(conn); } } }
經過這行代碼查找鎖是怎麼來的
transOwner = getLockHandler().obtainLock(conn, lockName);
在JobStoreSupport的initialize方法中:
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) throws SchedulerConfigException { if (dsName == null) { throw new SchedulerConfigException("DataSource name not set."); } // If the user hasn't specified an explicit lock handler, then // choose one based on CMT/Clustered/UseDBLocks. if (getLockHandler() == null) { // If the user hasn't specified an explicit lock handler, // then we *must* use DB locks with clustering if (isClustered()) { setUseDBLocks(true); } //…… // 在初始化方法裏面賦值了 setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL())); } else { getLog().info( "Using thread monitor-based data access locking (synchronization)."); setLockHandler(new SimpleSemaphore()); } } }
在new StdRowLockSemaphore構造方法中
public StdRowLockSemaphore(String tablePrefix, String schedName, String selectWithLockSQL) { super(tablePrefix, schedName, selectWithLockSQL != null ? selectWithLockSQL : SELECT_FOR_LOCK, INSERT_LOCK); }
能夠發現有兩個鎖名稱:
public static final String SELECT_FOR_LOCK = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";
數據庫的qrtz_locks中存放兩個鎖的記錄
能夠看出採用了Quartz集羣採用了悲觀鎖的方式對triggers表進行行加鎖, 以保證任務同步的正確性。
當線程使用上述的SQL對錶中的數據執行操做時,數據庫對該行進行行加鎖; 於此同時, 另外一個線程對該行數據執行操做前須要獲取鎖, 而此時已被佔用, 那麼這個線程就只能等待, 直到該行鎖被釋放。