美團CRM系統中天天有大量的後臺任務須要調度執行,如構建索引、統計報表、週期同步數據等等,要求任務調度系統具有高可用性、負載均衡特性,能夠管理並監控任務的執行流程,以保證任務的正確執行。html
美團CRM系統的任務調度模塊經歷瞭如下歷史方案。python
1. Crontab+SQLspring
天天晚上運行定時任務,經過SQL腳本+crontab方式執行,例如,sql
#crm 0 2 * * * /xxx/mtcrm/shell/mtcrm_daily_stat.sql //天天凌晨2:00執行統計 30 7 * * * /xxx/mtcrm/shell/mtcrm_data_fix.sql //天天早上7:30執行數據修復
該方案存在如下問題:shell
2. Python+SQL數據庫
採用python腳本(多數據源)+SQL方式執行,例如,服務器
def connectCRM(): return MySQLdb.Connection("host1", "uname", "xxx", "crm", 3306, charset="utf8") def connectTemp(): return MySQLdb.Connection("host1", "uname", "xxx", "temp", 3306, charset="utf8")
該方案存在問題:數據結構
3. Spring+JDK Timer架構
該方案使用spring+JDK Timer方式,調用接口完成定時任務,在分佈式部署環境下,防止多個節點同時運行任務,須要寫死host,控制在一臺服務器上執行task。併發
<bean id="accountStatusTaskScanner" class="xxx.crm.service.impl.AccountStatusTaskScanner" /> <task:scheduler id="taskScheduler" pool-size="5" /> <task:scheduled-tasks scheduler="taskScheduler"> <task:scheduled ref="accountStatusTaskScanner" method="execute" cron="0 0 1 * * ?" /> </task:scheduled-tasks>
該方案較方案1,2有很大改進,但仍存在如下問題:
CRM系統定時任務走過了不少彎路:定時任務多種實現方式,使配置和代碼分散在多處,難以維護和監控;任務執行過程沒有保證,沒有錯誤恢復;任務執行異常沒有反饋(郵件);沒有集羣支持、負載均衡。CRM系統須要分佈式的任務調度框架,統一解決問題,Java可使用的任務調度框架有Quartz,Jcrontab,cron4j,咱們選擇了Quartz。
Quartz是Java領域最著名的開源任務調度工具。Quartz提供了極爲普遍的特性如持久化任務,集羣和分佈式任務等,其特色以下:
CRM中Quartz與Spring結合使用,Spring經過提供org.springframework.scheduling.quartz下的封裝類對Quartz支持。
Quartz集羣部署:
Quartz集羣中的每一個節點是一個獨立的Quartz應用,它又管理着其餘的節點。該集羣須要分別對每一個節點分別啓動或中止,不像應用服務器的集羣,獨立的Quartz節點並不與另外一個節點或是管理節點通訊。Quartz應用是經過數據庫表來感知到另外一應用。只有使用持久的JobStore才能完成Quqrtz集羣。
基於Spring的集羣配置:
<!-- 調度工廠 --> <bean id="quartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="quartzProperties"> <props> <prop key="org.quartz.scheduler.instanceName">CRMscheduler</prop> <prop key="org.quartz.scheduler.instanceId">AUTO</prop> <!-- 線程池配置 --> <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop> <prop key="org.quartz.threadPool.threadCount">20</prop> <prop key="org.quartz.threadPool.threadPriority">5</prop> <!-- JobStore 配置 --> <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop> <!-- 集羣配置 --> <prop key="org.quartz.jobStore.isClustered">true</prop> <prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop> <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop> <prop key="org.quartz.jobStore.misfireThreshold">120000</prop> <prop key="org.quartz.jobStore.tablePrefix">QRTZ_</prop> </props> </property> <property name="schedulerName" value="CRMscheduler" /> <!--必須的,QuartzScheduler 延時啓動,應用啓動完後 QuartzScheduler 再啓動 --> <property name="startupDelay" value="30" /> <property name="applicationContextSchedulerContextKey" value="applicationContextKey" /> <!--可選,QuartzScheduler 啓動時更新己存在的Job,這樣就不用每次修改targetObject後刪除qrtz_job_details表對應記錄了 --> <property name="overwriteExistingJobs" value="true" /> <!-- 設置自動啓動 --> <property name="autoStartup" value="true" /> <!-- 註冊觸發器 --> <property name="triggers"> <list> <ref bean="userSyncScannerTrigger" /> ...... </list> </property> <!-- 註冊jobDetail --> <property name="jobDetails"> <list> </list> </property> <property name="schedulerListeners"> <list> <ref bean="quartzExceptionSchedulerListener" /> </list> </property> </bean> org.quartz.jobStore.class屬性爲JobStoreTX,將任務持久化到數據中。由於集羣中節點依賴於數據庫來傳播Scheduler實例的狀態,你只能在使用JDBC JobStore時應用Quartz集羣。 org.quartz.jobStore.isClustered屬性爲true,通知Scheduler實例要它參與到一個集羣當中。 org.quartz.jobStore.clusterCheckinInterval屬性定義了Scheduler實例檢入到數據庫中的頻率(單位:毫秒)。Scheduler檢查是否其餘的實例到了它們應當檢入的時候未檢入;這能指出一個失敗的Scheduler實例,且當前 Scheduler會以此來接管任何執行失敗並可恢復的Job。經過檢入操做,Scheduler 也會更新自身的狀態記錄。clusterChedkinInterval越小,Scheduler節點檢查失敗的Scheduler實例就越頻繁。默認值是 15000 (即15 秒)。 其他參數在後文將會詳細介紹。
Quartz監控
CRM後臺目前能夠作到對Quartz實例的監控、操做以及動態部署Trigger.
Triggers監控:
JobDetails監控:
1. Quartz集羣數據庫表
Quartz的集羣部署方案在架構上是分佈式的,沒有負責集中管理的節點,而是利用數據庫鎖的方式來實現集羣環境下進行併發控制。BTW,分佈式部署時須要保證各個節點的系統時間一致。
Quartz數據庫核心表以下:
Table Name | Description |
---|---|
QRTZ_CALENDARS | 存儲Quartz的Calendar信息 |
QRTZ_CRON_TRIGGERS | 存儲CronTrigger,包括Cron表達式和時區信息 |
QRTZ_FIRED_TRIGGERS | 存儲與已觸發的Trigger相關的狀態信息,以及相聯Job的執行信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 存儲已暫停的Trigger組的信息 |
QRTZ_SCHEDULER_STATE | 存儲少許的有關Scheduler的狀態信息,和別的Scheduler實例 |
QRTZ_LOCKS | 存儲程序的悲觀鎖的信息 |
QRTZ_JOB_DETAILS | 存儲每個已配置的Job的詳細信息 |
QRTZ_JOB_LISTENERS | 存儲有關已配置的JobListener的信息 |
QRTZ_SIMPLE_TRIGGERS | 存儲簡單的Trigger,包括重複次數、間隔、以及已觸的次數 |
QRTZ_BLOG_TRIGGERS | Trigger做爲Blob類型存儲 |
QRTZ_TRIGGER_LISTENERS | 存儲已配置的TriggerListener的信息 |
QRTZ_TRIGGERS | 存儲已配置的Trigger的信息 |
其中,QRTZ_LOCKS就是Quartz集羣實現同步機制的行鎖表,其表結構以下:
--QRTZ_LOCKS表結構 CREATE TABLE `QRTZ_LOCKS` ( `LOCK_NAME` varchar(40) NOT NULL, PRIMARY KEY (`LOCK_NAME`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; --QRTZ_LOCKS記錄 +-----------------+ | LOCK_NAME | +-----------------+ | CALENDAR_ACCESS | | JOB_ACCESS | | MISFIRE_ACCESS | | STATE_ACCESS | | TRIGGER_ACCESS | +-----------------+
能夠看出QRTZ_LOCKS中有5條記錄,表明5把鎖,分別用於實現多個Quartz Node對Job、Trigger、Calendar訪問的同步控制。
2. Quartz線程模型
在Quartz中有兩類線程:Scheduler調度線程和任務執行線程。任務執行線程:Quartz不會在主線程(QuartzSchedulerThread)中處理用戶的Job。Quartz把線程管理的職責委託給ThreadPool,通常的設置使用SimpleThreadPool。SimpleThreadPool建立了必定數量的WorkerThread實例來使得Job可以在線程中進行處理。WorkerThread是定義在SimpleThreadPool類中的內部類,它實質上就是一個線程。例如,CRM中配置以下:
<!-- 線程池配置 --> <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop> <prop key="org.quartz.threadPool.threadCount">20</prop> <prop key="org.quartz.threadPool.threadPriority">5</prop>
QuartzSchedulerThread調度主線程:QuartzScheduler被建立時建立一個QuartzSchedulerThread實例。
3. 集羣源碼分析
Quartz到底是如何保證集羣狀況下trgger處理的信息同步?
下面跟着源碼一步一步分析,QuartzSchedulerThread包含有決定什麼時候下一個Job將被觸發的處理循環,主要邏輯在其run()方法中:
public void run() { boolean lastAcquireFailed = false; while (!halted.get()) { ...... int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { ...... //調度器在trigger隊列中尋找30秒內必定數目的trigger(須要保證集羣節點的系統時間一致) triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); ...... //觸發trigger List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); ...... //釋放trigger for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } } }
由此可知,QuartzScheduler調度線程不斷獲取trigger,觸發trigger,釋放trigger。下面分析trigger的獲取過程,qsRsrcs.getJobStore()返回對象是JobStore,集羣環境配置以下:
<!-- JobStore 配置 --> <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
JobStoreTX繼承自JobStoreSupport,而JobStoreSupport的acquireNextTriggers、triggersFired、releaseAcquiredTrigger方法負責具體trigger相關操做,都必須得到TRIGGER_ACCESS鎖。核心邏輯在executeInNonManagedTXLock方法中:
protected <T> T executeInNonManagedTXLock( String lockName, TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException { boolean transOwner = false; Connection conn = null; try { if (lockName != null) { if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } //獲取鎖 transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) { conn = getNonManagedTXConnection(); } final T result = txCallback.execute(conn); try { commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() { @Override public Boolean execute(Connection conn) throws JobPersistenceException { return txValidator.validate(conn, result); } })) { throw e; } } Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null && sigTime >= 0) { signalSchedulingChangeImmediately(sigTime); } return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { releaseLock(lockName, transOwner); //釋放鎖 } finally { cleanupConnection(conn); } } }
由上代碼可知Quartz集羣基於數據庫鎖的同步操做流程以下圖所示:
一個調度器實例在執行涉及到分佈式問題的數據庫操做前,首先要獲取QUARTZ_LOCKS表中對應的行級鎖,獲取鎖後便可執行其餘表中的數據庫操做,隨着操做事務的提交,行級鎖被釋放,供其餘調度實例獲取。集羣中的每個調度器實例都遵循這樣一種嚴格的操做規程。
getLockHandler()方法返回的對象類型是Semaphore,獲取鎖和釋放鎖的具體邏輯由該對象維護
public interface Semaphore { boolean obtainLock(Connection conn, String lockName) throws LockException; void releaseLock(String lockName) throws LockException; boolean requiresConnection(); }
該接口的實現類完成具體操做鎖的邏輯,在JobStoreSupport的初始化方法中注入的Semaphore具體類型是StdRowLockSemaphore
setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
StdRowLockSemaphore的源碼以下所示:
public class StdRowLockSemaphore extends DBSemaphore { //鎖定SQL語句 public static final String SELECT_FOR_LOCK = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_LOCK_NAME + " = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)"; //指定鎖定SQL protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException { PreparedStatement ps = null; ResultSet rs = null; try { ps = conn.prepareStatement(expandedSQL); ps.setString(1, lockName); ...... rs = ps.executeQuery(); if (!rs.next()) { throw new SQLException(Util.rtp( "No row exists in table " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " for lock named: " + lockName, getTablePrefix())); } } catch (SQLException sqle) { } finally { ...... //release resources } } } //獲取QRTZ_LOCKS行級鎖 public boolean obtainLock(Connection conn, String lockName) throws LockException { lockName = lockName.intern(); if (!isLockOwner(conn, lockName)) { executeSQL(conn, lockName, expandedSQL); getThreadLocks().add(lockName); } return true; } //釋放QRTZ_LOCKS行級鎖 public void releaseLock(Connection conn, String lockName) { lockName = lockName.intern(); if (isLockOwner(conn, lockName)) { getThreadLocks().remove(lockName); } ...... }
至此,總結一下Quartz集羣同步機制:每當要進行與某種業務相關的數據庫操做時,先去QRTZ_LOCKS表中查詢操做相關的業務對象所須要的鎖,在select語句以後加for update來實現。例如,TRIGGER_ACCESS表示對任務觸發器相關的信息進行修改、刪除操做時所須要得到的鎖。這時,執行查詢這個表數據的SQL形如:
select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update
當一個線程使用上述的SQL對錶中的數據執行查詢操做時,若查詢結果中包含相關的行,數據庫就對該行進行ROW LOCK;若此時,另一個線程使用相同的SQL對錶的數據進行查詢,因爲查詢出的數據行已經被數據庫鎖住了,此時這個線程就只能等待,直到擁有該行鎖的線程完成了相關的業務操做,執行了commit動做後,數據庫纔會釋放了相關行的鎖,這個線程才能繼續執行。
經過這樣的機制,在集羣環境下,結合悲觀鎖的機制就能夠防止一個線程對數據庫數據的操做的結果被另一個線程所覆蓋,從而能夠避免一些難以覺察的錯誤發生。固然,達到這種效果的前提是須要把Connection設置爲手動提交,即autoCommit爲false。