項目中使用Quartz集羣分享--轉載

原文:http://hot66hot.iteye.com/blog/1726143html

在公司分享了Quartz,發佈出來,但願你們討論補充. 

CRM使用Quartz集羣分享 
一:CRM對定時任務的依賴與問題 
二:什麼是quartz,如何使用,集羣,優化 
三:CRM中quartz與Spring結合使用 

1:CRM對定時任務的依賴與問題 
1)依賴 
(1)天天晚上的定時任務,經過sql腳本 + crontab方式執行 java

Xml代碼   收藏代碼
  1. #crm  
  2. 0 2 * * * /opt/***/javafiles/***/shell/***_daily_stat.sql  
  3. 30 7 * * * /opt/***/javafiles/***/shell/***_data_fix  
  4. 30 0 * * * /opt/***/javafiles/***/shell/***_sync_log  
  5. 0 1 * * * /opt/***/javafiles/***/shell/***_clear_log  
  6. 20 8 * * * /opt/***/javafiles/***/shell/***_daily >> /var/***/logs/***_daily.log 2>&1  
  7. 40 1 * * * /opt/***/javafiles/***/shell/***_sync_account2  
  8. 0 2 * * 1 /opt/***/javafiles/***/shell/***_weekly >> /var/***/logs/***_weekly.log 2>&1  


存在的問題:當須要跨庫或許數據的,sql無能爲力,引入許多中間表,完成複雜統計需求。大範圍對線上熱表掃描,形成鎖表,延遲嚴重 
(2)使用python(多數據源) + SQL的方式 python

Python代碼   收藏代碼
  1. def connectCRM():  
  2.     return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")  
  3.   
  4. def connectTemp():  
  5.     return MySQLdb.Connection("localhost", "***", "***",  "***", 3306, charset="utf8")  
  6.   
  7. def connectOA():  
  8.     return MySQLdb.Connection("localhost", "***", "***",  "***", 3306, charset="utf8")  
  9.   
  10. def connectCore():  
  11.     return MySQLdb.Connection("localhost", "***", "***",  "***", 3306, charset="utf8")  
  12.   
  13. def connectCT():  
  14.     return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")  



存在的問題:直接訪問數據,須要理解各系統的數據結構,沒法知足動態任務問題,各系統業務接口沒有重用 

(3)使用spring + JDK timer方式調用接口完成定時任務 web

Xml代碼   收藏代碼
  1. <bean id="accountStatusTaskScanner"  class="***.impl.AccountStatusTaskScanner" />  
  2.     <task:scheduler id="taskScheduler" pool-size="5" />  
  3.     <task:scheduled-tasks scheduler="taskScheduler">  
  4.     <task:scheduled ref="accountStatusTaskScanner" method="execute" cron="0 0 1 * * ?" />  
  5. </task:scheduled-tasks>  


使用寫死服務器Host(srv23)的方式,控制只在一臺服務器上執行task spring

Java代碼   收藏代碼
  1. public abstract class SingletonServerTaskScanner implements TaskScanner {  
  2.     private final Logger logger = LoggerFactory.getLogger(SingletonServerTaskScanner.class);  
  3.     @Override  
  4.     public void execute() {  
  5.         String hostname = "";  
  6.         try {  
  7.             hostname = InetAddress.getLocalHost().getHostName();  
  8.         } catch (UnknownHostException e) {  
  9.             logger.error(e.getMessage(), e);  
  10.         }  
  11.         //判斷是否爲當前可執行服務器  
  12.         if (ConfigUtil.getValueByKey("core.scan.server").equals(hostname)) {  
  13.             doScan();  
  14.         }  
  15.     }  
  16.     public abstract void doScan();  
  17. }  



//對於srv23的重啓,保存在內存中的任務將丟失,每次重啓srv23從新生成定時任務 sql

Java代碼   收藏代碼
  1. public class CrmInitializer implements InitializingBean {  
  2.     private Logger logger = LoggerFactory.getLogger(CrmInitializer.class);  
  3.     @Override  
  4.     public void afterPropertiesSet() throws Exception {  
  5.         // 掃描商家狀態,建立定時任務  
  6.         logger.info("掃描商家狀態,建立定時任務");  
  7.         accountStatusTaskScanner.execute();  
  8.         // 掃描N天未拜訪商家,建立定時任務  
  9.         logger.info("掃描N天未拜訪商家,建立定時任務");  
  10.         nDaysActivityScanner.execute();  
  11.     }  
  12. }  

 

Java代碼   收藏代碼
  1. //經過調用srv23的特定URL的方式,動態指定任務(如取消N天未拜訪,私海進保護期,保護期進公海等)  
  2. public class SingletonServerTaskController {  
  3.         @Resource  
  4.         private AccountService accountService;  
  5.         @RequestMapping(value = "/reschedule")  
  6.         public @ResponseBody  
  7.             String checkAndRescheduleAccount(Integer accountId) {  
  8.             logger.debug("reschedule task for accountId:" + accountId);  
  9.             if (isCurrentServer()) {  
  10.                 accountService.checkAndRescheduleAccount(Arrays.asList(accountId));  
  11.             }  
  12.             return "ok";  
  13.         }  
  14.     private boolean isCurrentServer() {  
  15.         String hostname = "";  
  16.         try {  
  17.             hostname = InetAddress.getLocalHost().getHostName();  
  18.         } catch (UnknownHostException e) {  
  19.             logger.error(e.getMessage(), e);  
  20.         }  
  21.         if (ConfigUtil.getValueByKey("core.scan.server").equals(hostname)) {  
  22.             return true;  
  23.         } else {  
  24.             return false;  
  25.         }  
  26.     }  
  27. }  


存在的問題:實現步驟複雜,分散,任務調度不能恢復,嚴重依賴於srv23,回調URL時可能失敗等狀況。 
CRM定時任務走過了不少彎路: 
定時任務多種實現方式,使配置和代碼分散在多處,難以維護和監控 
任務執行過程沒有保證,沒有錯誤恢復 
任務執行異常沒有反饋(郵件) 
沒有集羣支持 
CRM須要分佈式的任務調度框架,統一解決問題. 
JAVA可使用的任務調度框架:Quartz , Jcrontab , cron4j , taobao-pamirs-schedule 
爲何選擇Quartz: 
1)資歷夠老,創立於1998年,比struts1還早,可是一直在更新(27 April 2012: Quartz 2.1.5 Released),文檔齊全. 
2)徹底由Java寫成,設計用於J2SE和J2EE應用.方便集成:JVM,RMI. 
3)設計清晰簡單:核心概念scheduler,trigger,job,jobDetail,listener,calendar 
4)支持集羣:org.quartz.jobStore.isClustered 
5)支持任務恢復:requestsRecovery 

從http://www.quartz-scheduler.org 獲取最新Quartz 
1)學習Quartz 

 
圖1 介紹了quartz關鍵的組件和簡單流程 

(1)Quartz 的目錄結構和內容 

docs/api                                      Quartz 框架的JavaDoc Api 說明文檔 
docs/dbTables                            建立 Quartz 的數據庫對象的腳本 
docs/wikidocs                             Quartz 的幫助文件,點擊 index.html 開始查看 
Examples                                    多方面使用 Quartz 的例子Lib Quartz 使用到的第三方包 
src/java/org/quartz                      使用 Quartz 的客戶端程序源代碼,公有 API 
src/java/org/quartz/core              使用 Quartz 的服務端程序源代碼,私有 API 
src/java/org/quartz/simpl            Quartz 提供的不衣賴於第三方產品的簡單實現 
src/java/org/quartz/impl              依賴於第三方產品的支持模塊的實現 
src/java/org/quartz/utils              整個框架要用到的輔助類和工具組件 
src/jboss                                     提供了特定於 JBoss 特性的源代碼 
src/oracle                                   提供了特定於 Oracle 特性的源代碼 
src/weblogic                              提供了特定於 WebLogic 特性的源代碼 

Quartz 框架包含許多的類和接口,它們分佈在大概 11 個包中。多數所要使用到的類或接口放置在 org.quartz 包中。這個包含蓋了 Quartz 框架的公有 API. 

(2)Quartz核心接口 Scheduler 


圖2 
Scheduler 是 Quartz 的主要 API。與Quartz大部分交互是發生於 Scheduler 之上的。客服端與Scheduler 交互是經過org.quartz.Scheduler接口。 
Scheduler的實現:對方法調用會傳遞到 QuartzScheduler 實例上。QuartzScheduler 對於客戶端是不可見的,而且也不存在與此實例的直接交互。 

 

圖3 

建立Scheduler 
Quartz 框架提供了 org.quartz.SchedulerFactory 接口。 
SchedulerFactory 實例就是用來產生 Scheduler 實例的。當 Scheduler 實例被建立以後,就會存到一個倉庫中(org.quartz.impl.SchedulerRepository). 
Scheduler 工廠分別是 org.quartz.impl.DirectSchedulerFactory 和 org.quartz.impl.StdSchedulerFactory 
DirectSchedulerFactory 是爲精細化控制 Scheduler 實例產生的工廠類,通常不用,不過有利於理解quartz內部組件。 shell

Java代碼   收藏代碼
  1. -- 最簡單  
  2. public void createScheduler(ThreadPool threadPool, JobStore jobStore);  
  3. -- 最複雜  
  4. public void createScheduler(String schedulerName, String schedulerInstanceId,ThreadPool threadPool, JobStore jobStore, String rmiRegistryHost, int rmiRegistryPort);  

 

Java代碼   收藏代碼
  1. public scheduler createScheduler(){  
  2.  DirectSchedulerFactory factory=DirectSchedulerFactory.getInstance();  
  3.  try {  
  4.     //建立線程池  
  5.     SimpleThreadPool threadPool = new SimpleThreadPool(10, Thread.NORM_PRIORITY);  
  6.     threadPool.initialize();  
  7.     //建立job存儲類  
  8.     JobStoreTX jdbcJobStore = new JobStoreTX();  
  9.     jdbcJobStore.setDataSource("someDatasource");  
  10.         jdbcJobStore.setPostgresStyleBlobs(true);  
  11.         jdbcJobStore.setTablePrefix("QRTZ_");  
  12.         jdbcJobStore.setInstanceId("My Instance");  
  13.       
  14.     logger.info("Scheduler starting up...");  
  15.     factory.createScheduler(threadPool,jdbcJobStore);  
  16.     // Get a scheduler from the factory  
  17.         Scheduler scheduler = factory.getScheduler();  
  18.   
  19.     // 必須啓動scheduler  
  20.         scheduler.start();  
  21.         return scheduler;  
  22.     }  
  23.         return null;  
  24. }  



org.quartz.impl.StdSchedulerFactory 依賴於屬性類(Properties)決定如何生產 Scheduler 實例 

經過加載屬性文件,Properties 提供啓動參數: 數據庫

Java代碼   收藏代碼
  1. public scheduler createScheduler(){  
  2.     // Create an instance of the factory  
  3.     StdSchedulerFactory factory = new StdSchedulerFactory();  
  4.       
  5.     // Create the properties to configure the factory  
  6.     Properties props = new Properties();  
  7.     // required to supply threadpool class and num of threads  
  8.     props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,"org.quartz.simpl.SimpleThreadPool");  
  9.     props.put("org.quartz.threadPool.threadCount", "10");  
  10.       
  11.     try {  
  12.         // Initialize the factory with properties  
  13.         factory.initialize(props);  
  14.         Scheduler scheduler = factory.getScheduler();  
  15.         logger.info("Scheduler starting up...");  
  16.         scheduler.start();  
  17.     } catch (SchedulerException ex) {  
  18.         logger.error(ex);  
  19.     }  
  20. }  


調用靜態方法 getDefaultScheduler() 方法中調用了空的構造方法。若是以前未調用過任何一個 initialize() 方法,那麼無參的initialize() 方法會被調用。這會開始去按照下面說的順序加載文件。 
默認狀況下,quartz.properties 會被定位到,並從中加載屬性。 

properties加載順序: 
1. 檢查 System.getProperty("org.quartz.properties") 中是否設置了別的文件名 
2. 不然,使用 quartz.properties 做爲要加載的文件名 
3. 試圖從當前工做目錄中加載這個文件 
4. 試圖從系統 classpath 下加載這個文件 
在 Quartz Jar 包中有一個默認的 quartz.properties 文件 

默認配置以下 
# Default Properties file for use by StdSchedulerFactory 
# to create a Quartz Scheduler Instance, if a different 
# properties file is not explicitly specified. 

org.quartz.scheduler.instanceName = DefaultQuartzScheduler 
org.quartz.scheduler.rmi.export = false 
org.quartz.scheduler.rmi.proxy = false 
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false 
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool 
org.quartz.threadPool.threadCount = 10 
org.quartz.threadPool.threadPriority = 5 
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true 
org.quartz.jobStore.misfireThreshold = 60000 
org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore 
到此建立Scheduler完成 

經過Scheduler理解Quartz 
Scheduler 的 API 能夠分組成如下三個類別: 
·管理 Scheduler 

(1)啓動 Scheduler windows

Java代碼   收藏代碼
  1. //Start the scheduler  
  2.  scheduler.start();  


start() 方法被調用,Scheduler 就開始搜尋須要執行的 Job。在你剛獲得一個 Scheduler 新的實例時,或者 Scheduler 
被設置爲 standby 模式後,你才能夠調用 start() 方法。 設計模式

Java代碼   收藏代碼
  1. public void standby() throws SchedulerException;  


只要調用了 shutdown() 方法以後,你就不能再調用 Scheduler 實例的 start() 方法了。 
這是由於 shutdown() 方法銷燬了爲 Scheduler 建立的全部的資源(線程,數據庫鏈接等)。 
你可能須要Standby 模式:設置 Scheduler 爲 standby 模式會致使 Scheduler搜尋要執行的 Job 的線程被暫停下來 

中止 Scheduler 

Java代碼   收藏代碼
  1. //waitForJobsToComplete 是否讓當前正在進行的Job正常執行完成才中止Scheduler  
  2. public void shutdown(boolean waitForJobsToComplete) throws SchedulerException;  
  3. public void shutdown() throws SchedulerException;  


其它管理Scheduler 方法見API... 
管理 Job 
什麼是 Quartz Job? 
一個Quart Job就是一個任何一個繼承job或job子接口的Java類,你能夠用這個類作任何事情! 

org.quartz.Job 接口 

Java代碼   收藏代碼
  1. public void execute(JobExecutionContext context)throws JobExecutionException;  
  2. JobExecutionContext  


當 Scheduler 調用一個 Job,一個 JobexecutionContext 傳遞給 execute() 方法。JobExecutionContext 對象讓 Job 能 
訪問 Quartz 運行時候環境和 Job 自己的數據。相似於在 Java Web 應用中的 servlet 訪問 ServletContext 。 
經過 JobExecutionContext,Job 可訪問到所處環境的全部信息,包括註冊到 Scheduler 上與該 Job 相關聯的 JobDetail 和 Trigger。 
JobDetail 
部署在 Scheduler 上的每個 Job 只建立了一個 JobDetail實例。JobDetail 是做爲 Job 實例進行定義的 
// Create the JobDetail 
JobDetail jobDetail = new JobDetail("PrintInfoJob",Scheduler.DEFAULT_GROUP, PrintInfoJob.class); 
// Create a trigger that fires now and repeats forever 
Trigger trigger = TriggerUtils.makeImmediateTrigger( 
SimpleTrigger.REPEAT_INDEFINITELY, 10000); 
trigger.setName("PrintInfoJobTrigger");// register with the Scheduler 
scheduler.scheduleJob(jobDetail, trigger); 
JobDetail 被加到 Scheduler 中了,而不是 job。Job 類是做爲 JobDetail 的一部份,job直到Scheduler準備要執行它的時候纔會被實例化的,所以job不存在線成安全性問題. 

使用 JobDataMap 對象設定 Job 狀態 

Java代碼   收藏代碼
  1. public void executeScheduler() throws SchedulerException{  
  2.     scheduler = StdSchedulerFactory.getDefaultScheduler();  
  3.     scheduler.start();  
  4.     logger.info("Scheduler was started at " + new Date());  
  5.     // Create the JobDetail  
  6.     JobDetail jobDetail = new JobDetail("PrintJobDataMapJob",Scheduler.DEFAULT_GROUP,PrintJobDataMapJob.class);  
  7.     // Store some state for the Job  
  8.     jobDetail.getJobDataMap().put("name", "John Doe");  
  9.     jobDetail.getJobDataMap().put("age", 23);  
  10.     jobDetail.getJobDataMap().put("balance",new BigDecimal(1200.37));  
  11.     // Create a trigger that fires once  
  12.     Trigger trigger = TriggerUtils.makeImmediateTrigger(0, 10000);  
  13.     trigger.setName("PrintJobDataMapJobTrigger");  
  14.     scheduler.scheduleJob(jobDetail, trigger);  
  15. }  
  16. //Job 能經過 JobExecutionContext 對象訪問 JobDataMap  
  17. public class PrintJobDataMapJob implements Job {  
  18.     public void execute(JobExecutionContext context)throws JobExecutionException {  
  19.         logger.info("in PrintJobDataMapJob");  
  20.         // Every job has its own job detail  
  21.         JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();  
  22.         // Iterate through the key/value pairs  
  23.         Iterator iter = jobDataMap.keySet().iterator();  
  24.         while (iter.hasNext()) {  
  25.             Object key = iter.next();  
  26.             Object value = jobDataMap.get(key);  
  27.             logger.info("Key: " + key + " - Value: " + value);  
  28.         }  
  29.     }  
  30.  }  


在Quartz 1.5以後,JobDataMap在 Trigger 級也是可用的。它的用途相似於Job級的JobDataMap,支持在同一個JobDetail上的多個Trigger。 
伴隨着加入到 Quartz 1.5 中的這一加強特性,可使用 JobExecutionContext 的一個新的更方便的方法獲取到 Job 和 Trigger 級的並集的 map 中的值。 

這個方法就是getMergedJobDataMap() 取job 和 Trigger級的並集map,它可以在 Job 中使用。管法推薦使用這個方法. 

* 實際使用中trigger級別有時取不到map中的值, 使用getMergedJobDataMap 能夠獲取到(官方推薦此方法). 

有狀態的Job: org.quartz.StatefulJob 接口 
當須要在兩次 Job 執行間維護狀態,使用StatefulJob 接口. 

Job 和 StatefulJob 在框架中使用中存在兩個關鍵差別。 
(一) JobDataMap 在每次執行以後從新持久化到 JobStore 中。這樣就確保你對 Job 數據的改變直到下次執行仍然保持着。 
(二) 兩個或多個有狀態的 JobDetail 實例不能併發執行。保證JobDataMap線程安全 

注意:實際使用時使用jobStoreTX/jobStoreCMT ,StatefulJob,大量的trigger對應一個JobDetail的狀況下Mysql會產生鎖超時問題. 

中斷 Job 
Quartz 包括一個接口叫作 org.quartz.InterruptableJob,它擴展了普通的 Job 接口並提供了一個 interrupt() 方法:
沒有深刻研究,只知道 Scheduler會調用自定義的Job的 interrupt()方法。由用戶決定 Job 決定如何中斷.沒有測試!!!

job的特性 
易失性 volatility 
一個易失性的 Job 是在程序關閉以後不會被持久化。一個 Job 是經過調用 JobDetail 的 setVolatility(true)被設置爲易失. 
Job易失性的默認值是 false. 
注意:只有採用持久性JobStore時纔有效 

Job 持久性 durability 
設置JobDetail 的 setDurability(false),在全部的觸發器觸發以後JobDetail將從 JobStore 中移出。 
Job持久性默認值是false. 
Scheduler將移除沒有trigger關聯的jobDetail 

Job 可恢復性 shuldRecover 
當一個Job在執行中,Scheduler非正常的關閉,設置JobDetail 的setRequestsRecovery(true) 在 Scheduler 重啓以後可恢復的Job還會再次被執行。這個 
Job 會從新開始執行。注意job代碼事務特性. 
Job可恢復性默認爲false,Scheduler不會試着去恢復job操做。 

 

圖爲表述沒有執行完成的job數據庫記錄 

Scheduler 中移除 Job 
移除全部與這個 Job 相關聯的 Trigger;若是這個 Job 是非持久性的,它將會從 Scheduler 中移出。 
更直接的方式是使用 deleteJob() 方法,它還會刪除全部與當前job關聯的trigger 

public boolean deleteJob(String jobName, String groupName) throws SchedulerException; 
quartz 自己提供的 Job 
org.quartz.jobs.FileScanJob 檢查某個指定文件是否變化,並在文件被改變時通知到相應監聽器的 Job 
org.quartz.jobs.FileScanListener 在文件被修改後通知 FileScanJob 的監聽器 
org.quartz.jobs.NativeJob 用來執行本地程序(如 windows 下 .exe 文件) 的 Job 
org.quartz.jobs.NoOpJob 什麼也不作,但用來測試監聽器不是頗有用的。一些用戶甚至僅僅用它來致使一個監聽器的運行 
org.quartz.jobs.ee.mail.SendMailJob 使用 JavaMail API 發送 e-mail 的 Job 
org.quartz.jobs.ee.jmx.JMXInvokerJob 調用 JMX bean 上的方法的 Job 
org.quartz.jobs.ee.ejb.EJBInvokerJob 用來調用 EJB 上方法的 Job 

job的理解到此結束 

理解quartz Trigger 
Job 包含了要執行任務的邏輯,可是Job不負責什麼時候執行。這個事情由觸發器(Trigger)負責。 
Quartz Trigger繼承了抽象的org.quartz.Trigger 類。 
目前,Quartz 有三個可用的實現 

org.quartz.SimpleTrigger 
org.quartz.CronTrigger 
org.quartz.NthIncludeDayTrigger 
使用org.quartz.SimpleTrigger 
SimpleTrigger 是設置和使用是最爲簡單的一種 Quartz Trigger。它是爲那種須要在特定的日期/時間啓動,且以一個可能的間隔時間重複執行 n 次的 Job 所設計的。 

SimpleTrigger 存在幾個變種的構造方法。他們是從無參的版本一直到帶所有參數的版本。 

下面代碼版斷顯示了一個僅帶有trigger 的名字和組的簡單構造方法 

SimpleTrigger sTrigger = new SimpleTrigger("myTrigger", Scheduler.DEFAULT_GROUP); 
這個 Trigger 會當即執行,而不重複。還有一個構造方法帶有多個參數,配置 Triiger 在某一特定時刻觸發,重複執行屢次,和兩 
次觸發間的延遲時間。 

Java代碼   收藏代碼
  1. public SimpleTrigger(String name, String group,String jobName, String jobGroup,  
  2.  Date startTime,Date endTime, int repeatCount, long repeatInterval);  


使用org.quartz.CronTrigger 
CronTrigger 是基於 Unix 相似於 cron 的表達式觸發,也是功能最強大和最經常使用的Trigger 
Cron表達式: 

Java代碼   收藏代碼
  1. "0 0 12 * * ?"                     Fire at 12pm (noon) every day  
  2. "0 15 10 ? * *"                   Fire at 10:15am every day  
  3. "0 15 10 * * ?"                   Fire at 10:15am every day  
  4. "0 15 10 * * ? *"                 Fire at 10:15am every day  
  5. "0 15 10 * * ? 2005"           Fire at 10:15am every day during the year 2005  
  6. "0 * 14 * * ?"                     Fire every minute starting at 2pm and ending at 2:59pm, every day  
  7. "0 0/5 14 * * ?"                  Fire every 5 minutes starting at 2pm and ending at 2:55pm, every day  
  8. "0 0/5 14,18 * * ?"              Fire every 5 minutes starting at 2pm and ending at 2:55pm, AND fire every 5 minutes starting at 6pm and ending at 6:55pm, every day  
  9. "0 0-5 14 * * ?"                   Fire every minute starting at 2pm and ending at 2:05pm, every day  
  10. "0 10,44 14 ? 3 WED"         Fire at 2:10pm and at 2:44pm every Wednesday in the month of March.  
  11. "0 15 10 ? * MON-FRI"        Fire at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday  
  12. "0 15 10 15 * ?"                  Fire at 10:15am on the 15th day of every month  
  13. "0 15 10 L * ?"                    Fire at 10:15am on the last day of every month  
  14. "0 15 10 ? * 6L"                   Fire at 10:15am on the last Friday of every month  
  15. "0 15 10 ? * 6L"                   Fire at 10:15am on the last Friday of every month  
  16. "0 15 10 ? * 6L 2002-2005"   Fire at 10:15am on every last Friday of every month during the years 2002, 2003, 2004 and 2005  
  17. "0 15 10 ? * 6#3"                 Fire at 10:15am on the third Friday of every month  



使用 org.quartz.NthIncludedDayTrigger 
org.quartz.NthIncludedDayTrigger是設計用於在每一間隔類型的第幾天執行 Job。 
例如,你要在每月的 12 號執行發工資提醒的Job。接下來的代碼片段描繪瞭如何建立一個 NthIncludedDayTrigger. 

Java代碼   收藏代碼
  1. //建立每月的12號的NthIncludedDayTrigger  
  2. NthIncludedDayTrigger trigger = new NthIncludedDayTrigger("MyTrigger", Scheduler.DEFAULT_GROUP);  
  3. trigger.setN(12);  
  4. trigger.setIntervalType(NthIncludedDayTrigger.INTERVAL_TYPE_MONTHLY);  


jobDetail + trigger組成最基本的定時任務: 
特別注意:一個job能夠對應多個Trgger , 一個Trigger只能對應一個job . 

如:CRM中N天未拜訪的job對應全部的N天未拜訪商家(一個商家一個trigger) 大約1:1000的比例 
    job和trigger都是經過name 和 group 屬性肯定惟一性的. 

Quartz Calendar 
Quartz 的 Calendar 對象與 Java API 的 java.util.Calendar不一樣。 
Java 的 Calender 對象是通用的日期和時間工具; 
Quartz 的 Calender 專門用於屏閉一個時間區間,使 Trigger 在這個區間中不被觸發。 
例如,讓咱們假如取消節假日執行job。 

Quartz包括許多的 Calender 實現足以知足大部分的需求. 

org.quartz.impl.calendar.BaseCalender 爲高級的 Calender 實現了基本的功能,實現了 org.quartz.Calender 接口 
org.quartz.impl.calendar.WeeklyCalendar 排除星期中的一天或多天,例如,可用於排除週末 
org.quartz.impl.calendar.MonthlyCalendar 排除月份中的數天,例如,可用於排除每個月的最後一天 
org.quartz.impl.calendar.AnnualCalendar 排除年中一天或多天 
org.quartz.impl.calendar.HolidayCalendar 特別的用於從 Trigger 中排除節假日 

使用Calendar,只需實例化後並加入你要排除的日期,而後用 Scheduler 註冊,最後必須讓Calender依附於Trigger實例。 

排除國慶節實例 

Java代碼   收藏代碼
  1. private void scheduleJob(Scheduler scheduler, Class jobClass) {  
  2.     try {  
  3.         // Create an instance of the Quartz AnnualCalendar  
  4.         AnnualCalendar cal = new AnnualCalendar();  
  5.         // exclude 國慶節  
  6.         Calendar gCal = GregorianCalendar.getInstance();  
  7.         gCal.set(Calendar.MONTH, Calendar.OCTOBER);  
  8.         List<Calendar> mayHolidays = new ArraysList<Calendar>();  
  9.         for(int i=1; i<=7; i++){  
  10.             gCal.set(Calendar.DATE, i);  
  11.             mayHolidays.add(gCal);  
  12.         }  
  13.         cal.setDaysExcluded(mayHolidays);  
  14.         // Add to scheduler, replace existing, update triggers  
  15.         scheduler.addCalendar("crmHolidays", cal, true, true);  
  16.         /* 
  17.         * Set up a trigger to start firing now, repeat forever 
  18.         * and have (60000 ms) between each firing. 
  19.         */  
  20.         Trigger trigger = TriggerUtils.makeImmediateTrigger("myTrigger",-1,60000);  
  21.         // Trigger will use Calendar to exclude firing times  
  22.         trigger.setCalendarName("crmHolidays");  
  23.         JobDetail jobDetail = new JobDetail(jobClass.getName(), Scheduler.DEFAULT_GROUP, jobClass);  
  24.         // Associate the trigger with the job in the scheduler  
  25.         scheduler.scheduleJob(jobDetail, trigger);  
  26.     } catch (SchedulerException ex) {  
  27.         logger.error(ex);  
  28.     }  
  29. }  



Quartz 監聽器 
Quartz 提供了三種類型的監聽器:監聽Job,監聽Trigger,和監聽Scheduler. 

監聽器是做爲擴展點存在的. 
Quartz 監聽器是擴展點,能夠擴展框架並定製來作特定的事情。跟Spring,Hibernate,Servlet監聽器相似. 
實現監聽 
1. 建立一個 Java 類,實現監聽器接口 
2. 用你的應用中特定的邏輯實現監聽器接口的全部方法 
3. 註冊監聽器 

全局和非全局監聽器 
JobListener 和 TriggerListener 可被註冊爲全局或非全局監聽器。一個全局監聽器能接收到全部的 Job/Trigger 的事件通知。 
而一個非全局監聽器只能接收到那些在其上已註冊了監聽器的 Job 或 Triiger 的事件。 

做者:James House描述全局和非全局監聽器 
全局監聽器是主動意識的,它們爲了執行它們的任務而熱切的去尋找每個可能的事件。一般,全局監聽器要作的工做不用指定到特定的 Job 或 Trigger。 
非全局監聽器通常是被動意識的,它們在所關注的 Trigger 激發以前或是 Job 執行以前什麼事也不作。所以,非全局的監聽器比起全局監聽器而言更適合於修改或增長 Job 執行的工做。 
相似裝飾設計模式 
監聽 Job 事件 
org.quartz.JobListener 接口包含一系列的方法,它們會由 Job 在其生命週期中產生的某些關鍵事件時被調用 

Java代碼   收藏代碼
  1. public interface JobListener {  
  2.     //命名jobListener 只對非全局監聽器有效  
  3.     public String getName();  
  4.   
  5.     //Scheduler 在 JobDetail 將要被執行時調用這個方法。  
  6.     public void jobToBeExecuted(JobExecutionContext context);  
  7.   
  8. //Scheduler 在 JobDetail 即將被執行,但又被否決時調用這個方法。  
  9.     public void jobExecutionVetoed(JobExecutionContext context);  
  10.   
  11. //Scheduler 在 JobDetail 被執行以後調用這個方法。  
  12.     public void jobWasExecuted(JobExecutionContext context,JobExecutionException jobException);  



 
圖7 job listener參與job的執行生命週期 

註冊全局監聽器 

Java代碼   收藏代碼
  1. scheduler.addGlobalJobListener(jobListener);  



註冊非全局監聽器(依次完成,順序不能顛倒) 

Java代碼   收藏代碼
  1. scheduler.addJobListener(jobListener);  
  2. jobDetail.addJobListener(jobListener.getName());  
  3. //若是已經存在jobDetail則覆蓋.  
  4. scheduler.addjob(jobDetail,true);  


監聽 Trigger 事件 
org.quartz.TriggerListener 接口定義Trigger監聽器 

Java代碼   收藏代碼
  1. public interface TriggerListener {  
  2.     //命名triggerListener 只對非全局監聽器有效  
  3.     public String getName();  
  4.   
  5.     //當與監聽器相關聯的 Trigger 被觸發,Job 上的 execute() 方法將要被執行時,調用這個方法。  
  6.     //在全局TriggerListener 狀況下,這個方法爲全部 Trigger 被調用。(不要作耗時操做)  
  7.     public void triggerFired(Trigger trigger, JobExecutionContext context);  
  8.   
  9.     //在 Trigger 觸發後,Job 將要被執行時由調用這個方法。  
  10.     //TriggerListener給了一個選擇去否決 Job 的執行。假如這個方法返回 true,這個 Job 將不會爲這次 Trigger 觸發而獲得執行。  
  11.     public boolean vetoJobExecution(Trigger trigger, JobExecutidonContext context);  
  12.   
  13.     // Scheduler 調用這個方法是在 Trigger 錯過觸發時。  
  14.     // JavaDoc 指出:你應該關注此方法中持續時間長的邏輯:在出現許多錯過觸發的 Trigger 時,長邏輯會致使骨牌效應。你應當保持這上方法儘可能的小  
  15.     public void triggerMisfired(Trigger trigger);  
  16.   
  17.     //Trigger 被觸發而且完成了Job的執行時調用這個方法。  
  18.     public void triggerComplete(Trigger trigger, JobExecutionContext context, int triggerInstructionCode);  
  19. }  


triggerListener的註冊與jobListener相同 

監聽 Scheduler 事件 
org.quartz.SchedulerListener 接口定義Trigger監聽器 

Java代碼   收藏代碼
  1. public interface SchedulerListener {  
  2.     //有新的JobDetail部署調用這個方法。  
  3.     public void jobScheduled(Trigger trigger);  
  4.   
  5.     //卸載時調用這個方法。  
  6.     public void jobUnscheduled(String triggerName, String triggerGroup);  
  7.   
  8.     //當一個Trigger到達不再會觸發時調用這個方法。  
  9.     public void triggerFinalized(Trigger trigger);  
  10.   
  11.     //Scheduler 調用這個方法是發生在一個Trigger或多個Trigger被暫停時。假如是多個Trigger的話,triggerName 參數將爲null。  
  12.     public void triggersPaused(String triggerName, String triggerGroup);  
  13.   
  14.     //Scheduler 調用這個方法是發生成一個 Trigger 或 Trigger 組從暫停中恢復時。假如是多個Trigger的話,triggerName 參數將爲 null。  
  15.     public void triggersResumed(String triggerName,String triggerGroup);  
  16.   
  17.     //當一個或一組 JobDetail 暫停時調用這個方法。  
  18.     public void jobsPaused(String jobName, String jobGroup);  
  19.   
  20.     //當一個或一組 Job 從暫停上恢復時調用這個方法。假如是多個Job,jobName參數將爲 null。  
  21.     public void jobsResumed(String jobName, String jobGroup);  
  22.   
  23.     // 在Scheduler 的正常運行期間產生一個嚴重錯誤時調用這個方法。錯誤的類型會各式的,可是下面列舉了一些錯誤例子:  
  24.     // 可使用 SchedulerException 的 getErrorCode() 或者 getUnderlyingException() 方法或獲取到特定錯誤的更詳盡的信息  
  25.     public void schedulerError(String msg, SchedulerException cause);  
  26.   
  27.     //Scheduler 調用這個方法用來通知 SchedulerListener Scheduler 將要被關閉。  
  28.     public void schedulerShutdown();  
  29. }  



註冊SchedulerListener(SchedulerListener不存在全局非全局性) 
scheduler.addSchedulerListener(schedulerListener); 
因爲scheduler異常存在不打印問題,CRM使用監聽器代碼打印. 

Java代碼   收藏代碼
  1. public class QuartzExceptionSchedulerListener extends SchedulerListenerSupport{  
  2.     private Logger logger = LoggerFactory.getLogger(QuartzExceptionSchedulerListener.class);  
  3.     @Override  
  4.     public void schedulerError(String message, SchedulerException e) {  
  5.         super.schedulerError(message, e);  
  6.         logger.error(message, e.getUnderlyingException());  
  7.     }  
  8. }  

 

Java代碼   收藏代碼
  1. <bean  id="quartzExceptionSchedulerListener"  class="com.***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>  
  2. <!-- 配置監聽器 -->  
  3. <property name="schedulerListeners">  
  4.     <list>  
  5.         <ref bean="quartzExceptionSchedulerListener"/>  
  6.     </list>  
  7. </property>  


quartz與線程 
主處理線程:QuartzSchedulerThread 
啓動Scheduler時。QuartzScheduler被建立並建立一個org.quartz.core.QuartzSchedulerThread 類的實例。 
QuartzSchedulerThread 包含有決定什麼時候下一個Job將被觸發的處理循環。QuartzSchedulerThread 是一個 Java 線程。它做爲一個非守護線程運行在正常優先級下。 

QuartzSchedulerThread 的主處理輪循步驟: 
1. 當 Scheduler 正在運行時: 
A. 檢查是否有轉換爲 standby 模式的請求。 
1. 假如 standby 方法被調用,等待繼續的信號 
B. 詢問 JobStore 下次要被觸發的 Trigger. 
1. 若是沒有 Trigger 待觸發,等候一小段時間後再次檢查 
2. 假若有一個可用的 Trigger,等待觸發它的確切時間的到來 
D. 時間到了,爲 Trigger 獲取到 triggerFiredBundle. 
E. 使用Scheduler和triggerFiredBundle 爲 Job 建立一個JobRunShell實例 
F. 在ThreadPool 申請一個線程運行 JobRunShell 實例. 

代碼邏輯在QuartzSchedulerThread 的 run() 中,以下: 

Java代碼   收藏代碼
  1. /** 
  2.  * QuartzSchedulerThread.run 
  3.     * <p> 
  4.     * The main processing loop of the <code>QuartzSchedulerThread</code>. 
  5.     * </p> 
  6.     */  
  7.    public void run() {  
  8.        boolean lastAcquireFailed = false;  
  9.        while (!halted.get()) {  
  10.            try {  
  11.                // check if we're supposed to pause...  
  12.                synchronized (sigLock) {  
  13.                    while (paused && !halted.get()) {  
  14.                        try {  
  15.                            // wait until togglePause(false) is called...  
  16.                            sigLock.wait(1000L);  
  17.                        } catch (InterruptedException ignore) {  
  18.                        }  
  19.                    }  
  20.      
  21.                    if (halted.get()) {  
  22.                        break;  
  23.                    }  
  24.                }  
  25.   
  26.                int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();  
  27.                if(availTreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...  
  28.                    Trigger trigger = null;  
  29.   
  30.                    long now = System.currentTimeMillis();  
  31.                    clearSignaledSchedulingChange();  
  32.                    try {  
  33.                        trigger = qsRsrcs.getJobStore().acquireNextTrigger(  
  34.                                ctxt, now + idleWaitTime);  
  35.                        lastAcquireFailed = false;  
  36.                    } catch (JobPersistenceException jpe) {  
  37.                        if(!lastAcquireFailed) {  
  38.                            qs.notifySchedulerListenersError(  
  39.                                "An error occured while scanning for the next trigger to fire.",  
  40.                                jpe);  
  41.                        }  
  42.                        lastAcquireFailed = true;  
  43.                    } catch (RuntimeException e) {  
  44.                        if(!lastAcquireFailed) {  
  45.                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "  
  46.                                    +e.getMessage(), e);  
  47.                        }  
  48.                        lastAcquireFailed = true;  
  49.                    }  
  50.   
  51.                    if (trigger != null) {  
  52.                        now = System.currentTimeMillis();  
  53.                        long triggerTime = trigger.getNextFireTime().getTime();  
  54.                        long timeUntilTrigger = triggerTime - now;  
  55.                        while(timeUntilTrigger > 2) {  
  56.                         synchronized(sigLock) {  
  57.                             if(!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {  
  58.                                 try {  
  59.                                     // we could have blocked a long while  
  60.                                     // on 'synchronize', so we must recompute  
  61.                                     now = System.currentTimeMillis();  
  62.                                     timeUntilTrigger = triggerTime - now;  
  63.                                     if(timeUntilTrigger >= 1)  
  64.                                         sigLock.wait(timeUntilTrigger);  
  65.                                 } catch (InterruptedException ignore) {  
  66.                                 }  
  67.                             }  
  68.                         }                                 
  69.                         if(releaseIfScheduleChangedSignificantly(trigger, triggerTime)) {  
  70.                             trigger = null;  
  71.                             break;  
  72.                         }  
  73.                         now = System.currentTimeMillis();  
  74.                         timeUntilTrigger = triggerTime - now;  
  75.                        }  
  76.                        if(trigger == null)  
  77.                         continue;  
  78.                          
  79.                        // set trigger to 'executing'  
  80.                        TriggerFiredBundle bndle = null;  
  81.   
  82.                        boolean goAhead = true;  
  83.                        synchronized(sigLock) {  
  84.                         goAhead = !halted.get();  
  85.                        }  
  86.   
  87.                        if(goAhead) {  
  88.                            try {  
  89.                                bndle = qsRsrcs.getJobStore().triggerFired(ctxt,  
  90.                                        trigger);  
  91.                            } catch (SchedulerException se) {  
  92.                                qs.notifySchedulerListenersError(  
  93.                                        "An error occured while firing trigger '"  
  94.                                                + trigger.getFullName() + "'", se);  
  95.                            } catch (RuntimeException e) {  
  96.                                getLog().error(  
  97.                                    "RuntimeException while firing trigger " +  
  98.                                    trigger.getFullName(), e);  
  99.                                // db connection must have failed... keep  
  100.                                // retrying until it's up...  
  101.                                releaseTriggerRetryLoop(trigger);  
  102.                            }  
  103.                        }  
  104.                          
  105.                        // it's possible to get 'null' if the trigger was paused,  
  106.                        // blocked, or other similar occurrences that prevent it being  
  107.                        // fired at this time...  or if the scheduler was shutdown (halted)  
  108.                        if (bndle == null) {  
  109.                            try {  
  110.                                qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,  
  111.                                        trigger);  
  112.                            } catch (SchedulerException se) {  
  113.                                qs.notifySchedulerListenersError(  
  114.                                        "An error occured while releasing trigger '"  
  115.                                                + trigger.getFullName() + "'", se);  
  116.                                // db connection must have failed... keep retrying  
  117.                                // until it's up...  
  118.                                releaseTriggerRetryLoop(trigger);  
  119.                            }  
  120.                            continue;  
  121.                        }  
  122.   
  123.                        // TODO: improvements:  
  124.                        //  
  125.                        // 2- make sure we can get a job runshell before firing trigger, or  
  126.                        //   don't let that throw an exception (right now it never does,  
  127.                        //   but the signature says it can).  
  128.                        // 3- acquire more triggers at a time (based on num threads available?)  
  129.                        JobRunShell shell = null;  
  130.                        try {  
  131.                            shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();  
  132.                            shell.initialize(qs, bndle);  
  133.                        } catch (SchedulerException se) {  
  134.                            try {  
  135.                                qsRsrcs.getJobStore().triggeredJobComplete(ctxt,  
  136.                                        trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);  
  137.                            } catch (SchedulerException se2) {  
  138.                                qs.notifySchedulerListenersError(  
  139.                                        "An error occured while placing job's triggers in error state '"  
  140.                                                + trigger.getFullName() + "'", se2);  
  141.                                // db connection must have failed... keep retrying  
  142.                                // until it's up...  
  143.                                errorTriggerRetryLoop(bndle);  
  144.                            }  
  145.                            continue;  
  146.                        }  
  147.   
  148.                        if (qsRsrcs.getThreadPool().runInThread(shell) == false) {  
  149.                            try {  
  150.                                // this case should never happen, as it is indicative of the  
  151.                                // scheduler being shutdown or a bug in the thread pool or  
  152.                                // a thread pool being used concurrently - which the docs  
  153.                                // say not to do...  
  154.                                getLog().error("ThreadPool.runInThread() return false!");  
  155.                                qsRsrcs.getJobStore().triggeredJobComplete(ctxt,  
  156.                                        trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);  
  157.                            } catch (SchedulerException se2) {  
  158.                                qs.notifySchedulerListenersError(  
  159.                                        "An error occured while placing job's triggers in error state '"  
  160.                                                + trigger.getFullName() + "'", se2);  
  161.                                // db connection must have failed... keep retrying  
  162.                                // until it's up...  
  163.                                releaseTriggerRetryLoop(trigger);  
  164.                            }  
  165.                        }  
  166.                        continue;  
  167.                    }  
  168.                } else { // if(availTreadCount > 0)  
  169.                    continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract  
  170.                }  
  171.   
  172.                long now = System.currentTimeMillis();  
  173.                long waitTime = now + getRandomizedIdleWaitTime();  
  174.                long timeUntilContinue = waitTime - now;  
  175.                synchronized(sigLock) {  
  176.                 try {  
  177.                     sigLock.wait(timeUntilContinue);  
  178.                 } catch (InterruptedException ignore) {  
  179.                 }  
  180.                }  
  181.   
  182.            } catch(RuntimeException re) {  
  183.                getLog().error("Runtime error occured in main trigger firing loop.", re);  
  184.            }  
  185.        } // loop...  
  186.   
  187.        // drop references to scheduler stuff to aid garbage collection...  
  188.        qs = null;  
  189.        qsRsrcs = null;  
  190.    }  



quartz工做者線程 
Quartz 不會在主線程(QuartzSchedulerThread)中處理用戶的Job。Quartz 把線程管理的職責委託給ThreadPool。
通常的設置使用org.quartz.simpl.SimpleThreadPool。SimpleThreadPool 建立了必定數量的 WorkerThread 實例來使得Job可以在線程中進行處理。 
WorkerThread 是定義在 SimpleThreadPool 類中的內部類,它實質上就是一個線程。 
要建立 WorkerThread 的數量以及配置他們的優先級是在文件quartz.properties中並傳入工廠。 

spring properties 

Java代碼   收藏代碼
  1. <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>  
  2. <prop key="org.quartz.threadPool.threadCount">20</prop>  
  3. <prop key="org.quartz.threadPool.threadPriority">5</prop>  


主線程(QuartzSchedulerThread)請求ThreadPool去運行 JobRunShell 實例,ThreadPool 就檢查看是否有一個可用的工做者線 
程。假如因此已配置的工做者線程都是忙的,ThreadPool 就等待直到有一個變爲可用。當一個工做者線程是可用的, 
而且有一個JobRunShell 等待執行,工做者線程就會調用 JobRunShell 類的 run() 方法。 

Quartz 框架容許替換線程池,但必須實現org.quartz.spi.ThreadPool 接口. 

 
圖4 quartz內部的主線程和工做者線程 

Quartz的存儲和持久化 
Quartz 用 JobStores 對 Job、Trigger、calendar 和 Schduler 數據提供一種存儲機制。Scheduler 應用已配置的JobStore 來存儲和獲取到部署信息,並決定正被觸發執行的 Job 的職責。 
全部的關於哪一個 Job 要執行和以什麼時間表來執行他們的信息都來存儲在 JobStore。 

在 Quartz 中兩種可用的 Job 存儲類型是: 
內存(非持久化) 存儲 
持久化存儲 

JobStore 接口 
Quartz 爲全部類型的Job存儲提供了一個接口。叫 JobStore。全部的Job存儲機制,無論是在哪裏或是如何存儲他們的信息的,都必須實現這個接口。 
JobStore 接口的 API 可概括爲下面幾類: 
Job 相關的 API 
Trigger 相關的 API 
Calendar 相關的 API 
Scheduler 相關的 API 

使用內存來存儲 Scheduler 信息 
Quartz 的內存Job存儲類叫作 org.quartz.simple.RAMJobStore,它實現了JobStore 接口的。 
RAMJobStore 是 Quartz 的默認的解決方案。 
使用這種內存JobStore的好處。 

RAMJobStore是配置最簡單的 JobStore:默認已經配置好了。見quartz.jar:org.quartz.quartz.properties 
RAMJobStore的速度很是快。全部的 quartz存儲操做都在計算機內存中 

使用持久性的 JobStore 
持久性 JobStore = JDBC + 關係型數據庫 

Quartz 全部的持久化的 JobStore 都擴展自 org.quartz.impl.jdbcjobstore.JobStoreSupport 類。 

 

圖5 
JobStoreSupport 實現了 JobStore 接口,是做爲 Quartz 提供的兩個具體的持久性 JobStore 類的基類。 
Quartz 提供了兩種不一樣類型的JobStoreSupport實現類,每個設計爲針對特定的數據庫環境和配置: 
·org.quartz.impl.jdbcjobstore.JobStoreTX 
·org.quartz.impl.jdbcjobstore.JobStoreCMT 

獨立環境中的持久性存儲 
JobStoreTX 類設計爲用於獨立環境中。這裏的 "獨立",咱們是指這樣一個環境,在其中不存在與應用容器的事務集成。 

#properties配置 
org.quartz.jobStore.class = org.quartz.ompl.jdbcjobstore.JobStoreTX 


依賴容器相關的持久性存儲 
JobStoreCMT 類設計爲與程序容器事務集成,容器管理的事物(Container Managed Transactions (CMT)) 

crm使用JobStoreTX 由於quart有長時間鎖等待狀況,不參與系統自己事務(crm任務內事務與quartz自己事務分離).

Quartz 數據庫結構 

表名描述 
QRTZ_CALENDARS 以 Blob 類型存儲 Quartz 的 Calendar 信息 
QRTZ_CRON_TRIGGERS 存儲 Cron Trigger,包括 Cron 表達式和時區信息 
QRTZ_FIRED_TRIGGERS 存儲與已觸發的 Trigger 相關的狀態信息,以及相聯 Job 的執行信息 
QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的 Trigger 組的信息 
QRTZ_SCHEDULER_STATE 存儲少許的有關 Scheduler 的狀態信息,和別的 Scheduler 實例(假如是用於一個集羣中) 
QRTZ_LOCKS 存儲程序的非觀鎖的信息(假如使用了悲觀鎖) 
QRTZ_JOB_DETAILS 存儲每個已配置的 Job 的詳細信息 
QRTZ_JOB_LISTENERS 存儲有關已配置的 JobListener 的信息 
QRTZ_SIMPLE_TRIGGERS 存儲簡單的 Trigger,包括重複次數,間隔,以及已觸的次數 
QRTZ_BLOG_TRIGGERS Trigger 做爲 Blob 類型存儲(用於 Quartz 用戶用 JDBC 建立他們本身定製的 Trigger 類型,JobStore 並不知道如何存儲實例的時候) 
QRTZ_TRIGGER_LISTENERS 存儲已配置的 TriggerListener 的信息 
QRTZ_TRIGGERS 存儲已配置的 Trigger 的信息 
全部的表默認之前綴QRTZ_開始。能夠經過在 quartz.properties配置修改(org.quartz.jobStore.tablePrefix = QRTZ_)。 
能夠對不一樣的Scheduler實例使用多套的表,經過改變前綴來實現。 

優化 quartz數據表結構 
-- 1:對關鍵查詢路徑字段創建索引 

Java代碼   收藏代碼
  1. create index idx_qrtz_t_next_fire_time on QRTZ_TRIGGERS(NEXT_FIRE_TIME);  
  2. create index idx_qrtz_t_state on QRTZ_TRIGGERS(TRIGGER_STATE);  
  3. create index idx_qrtz_t_nf_st on QRTZ_TRIGGERS(TRIGGER_STATE,NEXT_FIRE_TIME);  
  4. create index idx_qrtz_ft_trig_group on QRTZ_FIRED_TRIGGERS(TRIGGER_GROUP);  
  5. create index idx_qrtz_ft_trig_name on QRTZ_FIRED_TRIGGERS(TRIGGER_NAME);  
  6. create index idx_qrtz_ft_trig_n_g on QRTZ_FIRED_TRIGGERS(TRIGGER_NAME,TRIGGER_GROUP);  
  7. create index idx_qrtz_ft_trig_inst_name on QRTZ_FIRED_TRIGGERS(INSTANCE_NAME);  
  8. create index idx_qrtz_ft_job_name on QRTZ_FIRED_TRIGGERS(JOB_NAME);  
  9. create index idx_qrtz_ft_job_group on QRTZ_FIRED_TRIGGERS(JOB_GROUP);  



-- 2:根據Mysql innodb表結構特性,調整主鍵,下降二級索引的大小 

Java代碼   收藏代碼
  1. ALTER TABLE QRTZ_TRIGGERS  
  2. ADD UNIQUE KEY IDX_NAME_GROUP(TRIGGER_NAME,TRIGGER_GROUP),  
  3. DROP PRIMARY KEY,  
  4. ADD ID INT UNSIGNED NOT NULL AUTO_INCREMENT FIRST,  
  5. ADD PRIMARY KEY (ID);  
  6. ALTER TABLE QRTZ_JOB_DETAILS  
  7. ADD UNIQUE KEY IDX_NAME_GROUP(JOB_NAME,JOB_GROUP),  
  8. DROP PRIMARY KEY,  
  9. ADD ID INT UNSIGNED NOT NULL AUTO_INCREMENT FIRST,  
  10. ADD PRIMARY KEY (ID);  


Quartz集羣 
只有使用持久的JobStore才能完成Quqrtz集羣 

 
圖6 
一個 Quartz 集羣中的每一個節點是一個獨立的 Quartz 應用,它又管理着其餘的節點。 
須要分別對每一個節點分別啓動或中止。不像應用服務器的集羣,獨立的 Quartz 節點並不與另外一個節點或是管理節點通訊。 
Quartz 應用是經過數據庫表來感知到另外一應用。 

配置集羣 

Xml代碼   收藏代碼
  1. <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>  
  2. <!-- 集羣配置 -->  
  3. <prop key="org.quartz.jobStore.isClustered">true</prop>  
  4. <prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>  
  5. <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>  
  6. <!-- 數據源配置 使用DBCP鏈接池 數據源與dataSource一致 -->  
  7. <prop key="org.quartz.jobStore.dataSource">myDS</prop>  
  8. <prop key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>  
  9. <prop key="org.quartz.dataSource.myDS.URL">${database.url}</prop>  
  10. <prop key="org.quartz.dataSource.myDS.user">${database.username}</prop>  
  11. <prop key="org.quartz.dataSource.myDS.password">${database.password}</prop>  
  12. <prop key="org.quartz.dataSource.myDS.maxConnections">5</prop>  



org.quartz.jobStore.class 屬性爲 JobStoreTX, 
將任務持久化到數據中。由於集羣中節點依賴於數據庫來傳播Scheduler實例的狀態,你只能在使用 JDBC JobStore 時應用 Quartz 集羣。 

org.quartz.jobStore.isClustered 屬性爲 true,通知Scheduler實例要它參與到一個集羣當中。 

org.quartz.jobStore.clusterCheckinInterval 

屬性定義了Scheduler 實例檢入到數據庫中的頻率(單位:毫秒)。 
Scheduler 檢查是否其餘的實例到了它們應當檢入的時候未檢入; 
這能指出一個失敗的 Scheduler 實例,且當前 Scheduler 會以此來接管任何執行失敗並可恢復的 Job。 
經過檢入操做,Scheduler 也會更新自身的狀態記錄。clusterChedkinInterval 越小,Scheduler 節點檢查失敗的 Scheduler 實例就越頻繁。默認值是 15000 (即15 秒) 

集羣實現分析 
Quartz原來碼分析: 
基於數據庫表鎖實現多Quartz_Node 對Job,Trigger,Calendar等同步機制 

Sql代碼   收藏代碼
  1. -- 數據庫鎖定表  
  2. CREATE TABLE `QRTZ_LOCKS` (  
  3.   `LOCK_NAME` varchar(40) NOT NULL,  
  4.   PRIMARY KEY (`LOCK_NAME`)  
  5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  
  6. -- 記錄  
  7. +-----------------+  
  8. | LOCK_NAME       |  
  9. +-----------------+  
  10. | CALENDAR_ACCESS |   
  11. | JOB_ACCESS      |   
  12. | MISFIRE_ACCESS  |   
  13. | STATE_ACCESS    |   
  14. | TRIGGER_ACCESS  |   
  15. +-----------------+  



經過行級別鎖實現多節點處理 

Java代碼   收藏代碼
  1. /** 
  2.  * Internal database based lock handler for providing thread/resource locking  
  3.  * in order to protect resources from being altered by multiple threads at the  
  4.  * same time. 
  5.  *  
  6.  * @author jhouse 
  7.  */  
  8. public class StdRowLockSemaphore extends DBSemaphore {  
  9.   
  10.     /* 
  11.      * Constants. 
  12.      * 鎖定SQL語句 
  13.      *  
  14.      */  
  15.     public static final String SELECT_FOR_LOCK = "SELECT * FROM "  
  16.             + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_LOCK_NAME  
  17.             + " = ? FOR UPDATE";  
  18.   
  19.     /** 
  20.      * This constructor is for using the <code>StdRowLockSemaphore</code> as 
  21.      * a bean. 
  22.      */  
  23.     public StdRowLockSemaphore() {  
  24.         super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK);  
  25.     }  
  26.   
  27.     public StdRowLockSemaphore(String tablePrefix, String seletWithLockSQL) {  
  28.         super(tablePrefix, selectWithLockSQL, SELECT_FOR_LOCK);  
  29.     }  
  30.   
  31.     /** 
  32.      * Execute the SQL select for update that will lock the proper database row. 
  33.      * 指定鎖定SQL 
  34.      */  
  35.     protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException {  
  36.         PreparedStatement ps = null;  
  37.         ResultSet rs = null;  
  38.         try {  
  39.             ps = conn.prepareStatement(expandedSQL);  
  40.             ps.setString(1, lockName);  
  41.   
  42.             if (getLog().isDebugEnabled()) {  
  43.                 getLog().debug(  
  44.                     "Lock '" + lockName + "' is being obtained: " +   
  45.                     Thread.currentThread().getName());  
  46.             }  
  47.             rs = ps.executeQuery();  
  48.             if (!rs.next()) {  
  49.                 throw new SQLException(Util.rtp(  
  50.                     "No row exists in table " + TABLE_PREFIX_SUBST +   
  51.                     TABLE_LOCKS + " for lock named: " + lockName, getTablePrefix()));  
  52.             }  
  53.         } catch (SQLException sqle) {  
  54.             if (getLog().isDebugEnabled()) {  
  55.                 getLog().debug(  
  56.                     "Lock '" + lockName + "' was not obtained by: " +   
  57.                     Thread.currentThread().getName());  
  58.             }  
  59.             throw new LockException("Failure obtaining db row lock: "  
  60.                     + sqle.getMessage(), sqle);  
  61.         } finally {  
  62.             if (rs != null) {   
  63.                 try {  
  64.                     rs.close();  
  65.                 } catch (Exception ignore) {  
  66.                 }  
  67.             }  
  68.             if (ps != null) {  
  69.                 try {  
  70.                     ps.close();  
  71.                 } catch (Exception ignore) {  
  72.                 }  
  73.             }  
  74.         }  
  75.     }  
  76.   
  77.     protected String getSelectWithLockSQL() {  
  78.         return getSQL();  
  79.     }  
  80.   
  81.     public void setSelectWithLockSQL(String selectWithLockSQL) {  
  82.         setSQL(selectWithLockSQL);  
  83.     }  
  84. }  
  85.   
  86.    /** 
  87.      * Grants a lock on the identified resource to the calling thread (blocking 
  88.      * until it is available). 
  89.      * 獲取QRTZ_LOCKS行級鎖 
  90.      * @return true if the lock was obtained. 
  91.      */  
  92.     public boolean obtainLock(Connection conn, String lockName) throws LockException {  
  93.         lockName = lockName.intern();  
  94.   
  95.         Logger log = getLog();  
  96.   
  97.         if(log.isDebugEnabled()) {  
  98.             log.debug(  
  99.                 "Lock '" + lockName + "' is desired by: "  
  100.                         + Thread.currentThread().getName());  
  101.         }  
  102.         if (!isLockOwner(conn, lockName)) {  
  103.             executeSQL(conn, lockName, expandedSQL);  
  104.               
  105.             if(log.isDebugEnabled()) {  
  106.                 log.debug(  
  107.                     "Lock '" + lockName + "' given to: "  
  108.                             + Thread.currentThread().getName());  
  109.             }  
  110.             getThreadLocks().add(lockName);  
  111.             //getThreadLocksObtainer().put(lockName, new  
  112.             // Exception("Obtainer..."));  
  113.         } else if(log.isDebugEnabled()) {  
  114.             log.debug(  
  115.                 "Lock '" + lockName + "' Is already owned by: "  
  116.                         + Thread.currentThread().getName());  
  117.         }  
  118.         return true;  
  119.     }  
  120.   
  121.     /** 
  122.      * Release the lock on the identified resource if it is held by the calling thread. 
  123.      * 釋放QRTZ_LOCKS行級鎖 
  124.      */  
  125.     public void releaseLock(Connection conn, String lockName) {  
  126.         lockName = lockName.intern();  
  127.   
  128.         if (isLockOwner(conn, lockName)) {  
  129.             if(getLog().isDebugEnabled()) {  
  130.                 getLog().debug(  
  131.                     "Lock '" + lockName + "' returned by: "  
  132.                             + Thread.currentThread().getName());  
  133.             }  
  134.             getThreadLocks().remove(lockName);  
  135.             //getThreadLocksObtainer().remove(lockName);  
  136.         } else if (getLog().isDebugEnabled()) {  
  137.             getLog().warn(  
  138.                 "Lock '" + lockName + "' attempt to return by: "  
  139.                         + Thread.currentThread().getName()  
  140.                         + " -- but not owner!",  
  141.                 new Exception("stack-trace of wrongful returner"));  
  142.         }  
  143.     }  



JobStoreTX 控制併發代碼 

Java代碼   收藏代碼
  1. /** 
  2.  * Execute the given callback having optionally aquired the given lock. 
  3.  * For <code>JobStoreTX</code>, because it manages its own transactions 
  4.  * and only has the one datasource, this is the same behavior as  
  5.  * executeInNonManagedTXLock().  
  6.  * @param lockName The name of the lock to aquire, for example  
  7.  * "TRIGGER_ACCESS".  If null, then no lock is aquired, but the 
  8.  * lockCallback is still executed in a transaction. 
  9.  *  
  10.  * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback) 
  11.     * @see JobStoreCMT#executeInLock(String, TransactionCallback) 
  12.     * @see JobStoreSupport#getNonManagedTXConnection() 
  13.     * @see JobStoreSupport#getConnection() 
  14. */  
  15.    protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException {  
  16.        return executeInNonManagedTXLock(lockName, txCallback);  
  17.    }  
  18.   
  19. 使用JobStoreSupport.executeInNonManagedTXLock 實現:  
  20. /** 
  21.     * Execute the given callback having optionally aquired the given lock. 
  22.     * This uses the non-managed transaction connection. 
  23.     *  
  24.     * @param lockName The name of the lock to aquire, for example  
  25.     * "TRIGGER_ACCESS".  If null, then no lock is aquired, but the 
  26.     * lockCallback is still executed in a non-managed transaction.  
  27.     */  
  28.    protected Object executeInNonManagedTXLock(  
  29.            String lockName,   
  30.            TransactionCallback txCallback) throws JobPersistenceException {  
  31.        boolean transOwner = false;  
  32.        Connection conn = null;  
  33.        try {  
  34.            if (lockName != null) {  
  35.                // If we aren't using db locks, then delay getting DB connection   
  36.                // until after acquiring the lock since it isn't needed.  
  37.                if (getLockHandler().requiresConnection()) {  
  38.                    conn = getNonManagedTXConnection();  
  39.                }  
  40.             //獲取鎖  
  41.                transOwner = getLockHandler().obtainLock(conn, lockName);  
  42.            }  
  43.            if (conn == null) {  
  44.                conn = getNonManagedTXConnection();  
  45.            }  
  46.             //回調須要執行的sql語句如:(更新Trigger爲運行中(ACQUIRED),刪除執行過的Trigger等)  
  47.            Object result = txCallback.execute(conn);  
  48.         //JobStoreTX自身維護事務  
  49.            commitConnection(conn);  
  50.            Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();  
  51.            if(sigTime != null && sigTime >= 0) {  
  52.                signalSchedulingChangeImmediately(sigTime);  
  53.            }  
  54.            return result;  
  55.        } catch (JobPersistenceException e) {  
  56.            rollbackConnection(conn);  
  57.            throw e;  
  58.        } catch (RuntimeException e) {  
  59.            rollbackConnection(conn);  
  60.            throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e);  
  61.        } finally {  
  62.            try {  
  63.             //釋放鎖  
  64.                releaseLock(conn, lockName, transOwner);  
  65.            } finally {  
  66.                cleanupConnection(conn);  
  67.            }  
  68.        }  
  69.    }  



JobStoreCMT 控制併發代碼 

Java代碼   收藏代碼
  1. /** 
  2.     * Execute the given callback having optionally acquired the given lock.   
  3.     * Because CMT assumes that the connection is already part of a managed 
  4.     * transaction, it does not attempt to commit or rollback the  
  5.     * enclosing transaction. 
  6.     *  
  7.     * @param lockName The name of the lock to acquire, for example  
  8.     * "TRIGGER_ACCESS".  If null, then no lock is acquired, but the 
  9.     * txCallback is still executed in a transaction. 
  10.     *  
  11.     * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback) 
  12.     * @see JobStoreTX#executeInLock(String, TransactionCallback) 
  13.     * @see JobStoreSupport#getNonManagedTXConnection() 
  14.     * @see JobStoreSupport#getConnection() 
  15.     */  
  16.   
  17.    protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException {  
  18.        boolean transOwner = false;  
  19.        Connection conn = null;  
  20.        try {  
  21.            if (lockName != null) {  
  22.                // If we aren't using db locks, then delay getting DB connection   
  23.                // until after acquiring the lock since it isn't needed.  
  24.                if (getLockHandler().requiresConnection()) {  
  25.                    conn = getConnection();  
  26.                }  
  27.                transOwner = getLockHandler().obtainLock(conn, lockName);  
  28.            }  
  29.   
  30.            if (conn == null) {  
  31.                conn = getConnection();  
  32.            }  
  33.         //沒有事務提交操做,與任務共享一個事務  
  34.            return txCallback.execute(conn);  
  35.        } finally {  
  36.            try {  
  37.                releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);  
  38.            } finally {  
  39.                cleanupConnection(conn);  
  40.            }  
  41.        }  
  42.    }  



CRM中quartz與Spring結合使用 
Spring 經過提供org.springframework.scheduling.quartz下的封裝類對quartz支持 
可是目前存在問題 
1:Spring3.0目前不支持Quartz2.x以上版本 

Caused by: java.lang.IncompatibleClassChangeError: class org.springframework.scheduling.quartz.CronTriggerBean 
has interface org.quartz.CronTrigger as super class 
緣由是 org.quartz.CronTrigger在2.0從class變成了一個interface形成IncompatibleClassChangeError錯誤。 

解決:無解,要想使用spring和quartz結合的方式 只能使用Quartz1.x版本。 

2:org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean報 
java.io.NotSerializableException異常,須要本身實現QuartzJobBean。 

解決:spring bug己經在http://jira.springframework.org/browse/SPR-3797找到解決方案, 
做者重寫了MethodInvokingJobDetailFactoryBean. 

3:Spring內bean必需要實現序列化接口,不然不能經過Sprng 屬性注入的方式爲job提供業務對象 

解決: 

Java代碼   收藏代碼
  1. //使用可序列化工具類獲取Spring容器對象  
  2. @Service("springBeanService")  
  3. public class SpringBeanService implements Serializable{private static final long serialVersionUID = -2228376078979553838L;  
  4.     public <T> T getBean(Class<T> clazz,String beanName){  
  5.         ApplicationContext context = ContextLoader.getCurrentWebApplicationContext();  
  6.         return (T)context.getBean(beanName);  
  7.     }  
  8. }  



CRM中quartz模塊部分代碼 
1:定義全部job的父類,並負責異常發送郵件任務和日誌任務 

Java代碼   收藏代碼
  1. public abstract class BaseQuartzJob implements Job, Serializable {  
  2.     private static final long serialVersionUID = 3347549365534415931L;  
  3.     private Logger logger = LoggerFactory.getLogger(this.getClass());  
  4.       
  5.     //定義抽象方法,供子類實現  
  6.     public abstract void action(JobExecutionContext context);  
  7.       
  8.     @Override  
  9.     public void execute(JobExecutionContext context) throws JobExecutionException {  
  10.         try {  
  11.             long start = System.currentTimeMillis();  
  12.             this.action(context);  
  13.             long end = System.currentTimeMillis();  
  14.             JobDetail jobDetail = context.getJobDetail();  
  15.             Trigger trigger = context.getTrigger();  
  16.             StringBuilder buffer = new StringBuilder();  
  17.             buffer.append("jobName = ").append(jobDetail.getName()).append(" triggerName = ")  
  18.             .append(trigger.getName()).append(" 執行完成 , 耗時: ").append((end - start)).append(" ms");  
  19.             logger.info(buffer.toString());  
  20.         } catch (Exception e) {  
  21.             doResolveException(context != null ? context.getMergedJobDataMap() : null, e);  
  22.         }  
  23.     }  
  24.     @SuppressWarnings("unchecked")  
  25.     private void doResolveException(JobDataMap dataMap, Exception ex) {  
  26.         //發送郵件實現此處省略  
  27.         //...  
  28.     }  
  29. }  



2:抽象Quartz操做接口(實現類 toSee: QuartzServiceImpl) 

Java代碼   收藏代碼
  1. /** 
  2.  * 
  3.  * @author zhangyijun 
  4.  * @created 2012-10-22 
  5.  * 
  6.  * @version 1.0 
  7.  */  
  8. @Service  
  9. public interface QuartzService {  
  10. /** 
  11.  * 獲取全部trigger 
  12.  * @param page 
  13.  * @param orderName 
  14.  * @param sortType 
  15.  * @return 
  16.  */  
  17.  List<Map<String, Object>> getQrtzTriggers(Page page, String orderName, String sortType);  
  18. /** 
  19.  * 獲取全部jobDetail 
  20.  * 
  21.  * @return 
  22.  */  
  23.  List<Map<String, Object>> getQrtzJobDetails();  
  24. /** 
  25.  * 執行Trigger操做 
  26.  * 
  27.  * @param name 
  28.  * @param group 
  29.  * @param action 
  30.  * <br/> 
  31.  */  
  32.  void executeTriggerAction(String name, String group, Integer action);  
  33. /** 
  34.  * 執行JobDetail操做 
  35.  * 
  36.  * @param name 
  37.  * @param group 
  38.  * @param action 
  39.  * <br/> 
  40.  */  
  41.  void executeJobAction(String name, String group, Integer action);  
  42. /** 
  43.  * 動態添加trigger 
  44.  * 
  45.  * @param jobName 
  46.  * @param jobGroup 
  47.  * @param triggerBean 
  48.  */  
  49.  void addTrigger(String jobName, String jobGroup, TriggerViewBean triggerBean);  
  50. /** 
  51.  * 定時執行任務 
  52.  * 
  53.  * @param jobDetail 
  54.  * @param data 
  55.  */  
  56.   
  57.  void addTriggerForDate(JobDetail jobDetail, String triggerName , String  
  58.  triggerGroup , Date date, Map<String, Object> triggerDataMap) ;  
  59. /** 
  60.  * 獲取分佈式Scheduler列表 
  61.  * 
  62.  * @return 
  63.  */  
  64.  List<Map<String, Object>> getSchedulers();  
  65. /** 
  66.  * 獲取觸發器 
  67.  * @param name 
  68.  * @param group 
  69.  * @return 
  70.  */  
  71.  public Trigger getTrigger(String name, String group);  
  72. /** 
  73.  * 獲取JobDetail 
  74.  * @param name 
  75.  * @param group 
  76.  * @return 
  77.  */  
  78.  public JobDetail getJobDetail(String name, String group);  
  79. }  


3:在Spring配置job,trigger,Scheduler,Listener組件 

Xml代碼   收藏代碼
  1. <!-- 掃描商家狀態建立定時任務 -->  
  2. <bean id="accountStatusTaskScannerJobDetail"  
  3.  class="org.springframework.scheduling.quartz.JobDetailBean">  
  4.     <property name="name" value="accountStatusTaskScannerJobDetail"></property>  
  5.     <property name="group" value="CrmAccountGroup"></property>  
  6.     <property name="jobClass" value="***.crm.quartz.job.AccountStatusTaskScannerJob"></property>  
  7.     <!-- requestsRecovery屬性爲true,則當Quartz服務被停止後,再次啓動任務時會嘗試恢復執行以前未完成的全部任務-->  
  8.     <property name="requestsRecovery" value="true"/>  
  9.     <!-- 標識job是持久的,刪除全部觸發器的時候不被刪除 -->  
  10.     <property name="durability" value="true"/>  
  11.     <property name="volatility" value="false"></property>  
  12. </bean>  
  13. <bean id="accountStatusTaskScannerTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">  
  14.      <property name="group" value="CrmDealGroup"></property>  
  15.      <property name="name" value="accountStatusTaskScannerTrigger"></property>  
  16.     <property name="jobDetail" ref="accountStatusTaskScannerJobDetail"></property>  
  17.     <property name="cronExpression" value="0 0 1 * * ?"></property>  
  18. </bean>  
  19.   
  20. <!-- 定義Quartz 監聽器 -->  
  21. <bean id="quartzExceptionSchedulerListener"   
  22. class="***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>  
  23.   
  24. <!-- Quartz調度工廠 -->  
  25. <bean id="quartzScheduler"  
  26.  class="org.springframework.scheduling.quartz.SchedulerFactoryBean">  
  27.     <property name="quartzProperties">  
  28.     <props>  
  29.         <prop key="org.quartz.scheduler.instanceName">CRMscheduler</prop>  
  30.         <prop key="org.quartz.scheduler.instanceId">AUTO</prop>  
  31.         <!-- 線程池配置 -->  
  32.         <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>  
  33.         <prop key="org.quartz.threadPool.threadCount">20</prop>  
  34.         <prop key="org.quartz.threadPool.threadPriority">5</prop>  
  35.         <!-- JobStore 配置 -->  
  36.         <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>  
  37.         <!-- 集羣配置 -->  
  38.         <prop key="org.quartz.jobStore.isClustered">false</prop>  
  39.         <prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>  
  40.         <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>  
  41.         <!-- 數據源配置 使用DBCP鏈接池 數據源與dataSource一致 -->  
  42.         <prop key="org.quartz.jobStore.dataSource">myDS</prop>  
  43.         <prop key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>  
  44.         <prop key="org.quartz.dataSource.myDS.URL">${database.url}</prop>  
  45.         <prop key="org.quartz.dataSource.myDS.user">${database.username}</prop>  
  46.         <prop key="org.quartz.dataSource.myDS.password">${database.password}</prop>  
  47.         <prop key="org.quartz.dataSource.myDS.maxConnections">5</prop>  
  48.         <prop key="org.quartz.jobStore.misfireThreshold">120000</prop>  
  49.     </props>  
  50.     </property>  
  51.     <property name="schedulerName" value="CRMscheduler" />  
  52.     <!--必須的,QuartzScheduler 延時啓動,應用啓動完後 QuartzScheduler 再啓動-->  
  53.     <property name="startupDelay" value="30"/>  
  54.     <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />  
  55.     <!--可選,QuartzScheduler 啓動時更新己存在的Job,這樣就不用每次修改targetObject後刪除qrtz_job_details表對應記錄了 -->  
  56.     <property name="overwriteExistingJobs" value="true" />  
  57.     <!-- 設置自動啓動 -->  
  58.     <property name="autoStartup" value="true" />  
  59.     <!-- 註冊觸發器 -->  
  60.     <property name="triggers">  
  61.  <list>  
  62.     <ref bean="dailyStatisticsTrigger" />  
  63.     <ref bean="accountGrabedScannerTrigger" />  
  64.     <ref bean="syncAccountFromPOITrigger" />  
  65.     <ref bean="userSyncScannerTrigger" />  
  66.     <ref bean="syncParentBranchFromPOITrigger"/>  
  67.     <ref bean="privateReminderTrigger" />  
  68.     <ref bean="onlineBranchesScannerTrigger" />  
  69.     <ref bean="syncCtContactServiceTrigger" />  
  70.     <ref bean="dealLinkDianpingScannerTrigger" />  
  71.     <ref bean="accountStatusTaskScannerTrigger"/>  
  72.     <ref bean="nDaysActivityScannerTrigger"/>  
  73.  </list>  
  74.  </property>  
  75. <!-- 註冊jobDetail -->  
  76.  <property name="jobDetails">  
  77.     <list>  
  78.         <ref bean="myTestQuartzJobDetail"/>  
  79.         <ref bean="accountPrivateToProtectedJobDetail"/>  
  80.         <ref bean="accountProtectedToPublicJobDetail"/>  
  81.  <ref bean="nDaysActivityToProtectedJobDetail"/>  
  82.  </list>  
  83.  </property>  
  84. <property name="schedulerListeners">  
  85.     <list>  
  86.         <ref bean="quartzExceptionSchedulerListener"/>  
  87.     </list>  
  88.  </property>  
  89. </bean>  



Crm目前能夠作到對Quartz實例的監控,操做.動態部署Trigger 


 
 後續待開發功能和問題 1:目前實現對job,Trigger操做,動態部署Trigger,後續須要加入Calendar(排除特定日期),Listener(動態加載監控),Job的動態部署(只要bean的名稱和方法名,就可完成對job生成,部署) 2:因爲Quartz集羣中的job目前是在任意一臺server中執行,Quartz日誌生成各自的系統目錄中, quartz日誌沒法統一. 3:Quartz2.x已經支持可選節點執行job(期待Spring升級後對新Quartz支持) 4:Quartz內部的DB操做大量Trigger存在嚴重競爭問題,瞬間大量trigger執行,目前只能經過(org.quartz.jobStore.tablePrefix = QRTZ)分表操做,存在長時間lock_wait(新版本聽說有提升); 5:若是有須要,能夠抽取出Quartz,變成單獨的服務,供其它系統調度使用使用

相關文章
相關標籤/搜索