原文:http://hot66hot.iteye.com/blog/1726143html
在公司分享了Quartz,發佈出來,但願你們討論補充.
CRM使用Quartz集羣分享
一:CRM對定時任務的依賴與問題
二:什麼是quartz,如何使用,集羣,優化
三:CRM中quartz與Spring結合使用
1:CRM對定時任務的依賴與問題
1)依賴
(1)天天晚上的定時任務,經過sql腳本 + crontab方式執行 java
- #crm
- 0 2 * * * /opt/***/javafiles/***/shell/***_daily_stat.sql
- 30 7 * * * /opt/***/javafiles/***/shell/***_data_fix
- 30 0 * * * /opt/***/javafiles/***/shell/***_sync_log
- 0 1 * * * /opt/***/javafiles/***/shell/***_clear_log
- 20 8 * * * /opt/***/javafiles/***/shell/***_daily >> /var/***/logs/***_daily.log 2>&1
- 40 1 * * * /opt/***/javafiles/***/shell/***_sync_account2
- 0 2 * * 1 /opt/***/javafiles/***/shell/***_weekly >> /var/***/logs/***_weekly.log 2>&1
存在的問題:當須要跨庫或許數據的,sql無能爲力,引入許多中間表,完成複雜統計需求。大範圍對線上熱表掃描,形成鎖表,延遲嚴重
(2)使用python(多數據源) + SQL的方式 python
- def connectCRM():
- return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")
-
- def connectTemp():
- return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")
-
- def connectOA():
- return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")
-
- def connectCore():
- return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")
-
- def connectCT():
- return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")
存在的問題:直接訪問數據,須要理解各系統的數據結構,沒法知足動態任務問題,各系統業務接口沒有重用
(3)使用spring + JDK timer方式調用接口完成定時任務 web
- <bean id="accountStatusTaskScanner" class="***.impl.AccountStatusTaskScanner" />
- <task:scheduler id="taskScheduler" pool-size="5" />
- <task:scheduled-tasks scheduler="taskScheduler">
- <task:scheduled ref="accountStatusTaskScanner" method="execute" cron="0 0 1 * * ?" />
- </task:scheduled-tasks>
使用寫死服務器Host(srv23)的方式,控制只在一臺服務器上執行task spring
- public abstract class SingletonServerTaskScanner implements TaskScanner {
- private final Logger logger = LoggerFactory.getLogger(SingletonServerTaskScanner.class);
- @Override
- public void execute() {
- String hostname = "";
- try {
- hostname = InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- logger.error(e.getMessage(), e);
- }
-
- if (ConfigUtil.getValueByKey("core.scan.server").equals(hostname)) {
- doScan();
- }
- }
- public abstract void doScan();
- }
//對於srv23的重啓,保存在內存中的任務將丟失,每次重啓srv23從新生成定時任務 sql
- public class CrmInitializer implements InitializingBean {
- private Logger logger = LoggerFactory.getLogger(CrmInitializer.class);
- @Override
- public void afterPropertiesSet() throws Exception {
-
- logger.info("掃描商家狀態,建立定時任務");
- accountStatusTaskScanner.execute();
-
- logger.info("掃描N天未拜訪商家,建立定時任務");
- nDaysActivityScanner.execute();
- }
- }
- public class SingletonServerTaskController {
- @Resource
- private AccountService accountService;
- @RequestMapping(value = "/reschedule")
- public @ResponseBody
- String checkAndRescheduleAccount(Integer accountId) {
- logger.debug("reschedule task for accountId:" + accountId);
- if (isCurrentServer()) {
- accountService.checkAndRescheduleAccount(Arrays.asList(accountId));
- }
- return "ok";
- }
- private boolean isCurrentServer() {
- String hostname = "";
- try {
- hostname = InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- logger.error(e.getMessage(), e);
- }
- if (ConfigUtil.getValueByKey("core.scan.server").equals(hostname)) {
- return true;
- } else {
- return false;
- }
- }
- }
存在的問題:實現步驟複雜,分散,任務調度不能恢復,嚴重依賴於srv23,回調URL時可能失敗等狀況。
CRM定時任務走過了不少彎路:
定時任務多種實現方式,使配置和代碼分散在多處,難以維護和監控
任務執行過程沒有保證,沒有錯誤恢復
任務執行異常沒有反饋(郵件)
沒有集羣支持
CRM須要分佈式的任務調度框架,統一解決問題.
JAVA可使用的任務調度框架:Quartz , Jcrontab , cron4j , taobao-pamirs-schedule
爲何選擇Quartz:
1)資歷夠老,創立於1998年,比struts1還早,可是一直在更新(27 April 2012: Quartz 2.1.5 Released),文檔齊全.
2)徹底由Java寫成,設計用於J2SE和J2EE應用.方便集成:JVM,RMI.
3)設計清晰簡單:核心概念scheduler,trigger,job,jobDetail,listener,calendar
4)支持集羣:org.quartz.jobStore.isClustered
5)支持任務恢復:requestsRecovery
從http://www.quartz-scheduler.org 獲取最新Quartz
1)學習Quartz
圖1 介紹了quartz關鍵的組件和簡單流程
(1)Quartz 的目錄結構和內容
docs/api Quartz 框架的JavaDoc Api 說明文檔
docs/dbTables 建立 Quartz 的數據庫對象的腳本
docs/wikidocs Quartz 的幫助文件,點擊 index.html 開始查看
Examples 多方面使用 Quartz 的例子Lib Quartz 使用到的第三方包
src/java/org/quartz 使用 Quartz 的客戶端程序源代碼,公有 API
src/java/org/quartz/core 使用 Quartz 的服務端程序源代碼,私有 API
src/java/org/quartz/simpl Quartz 提供的不衣賴於第三方產品的簡單實現
src/java/org/quartz/impl 依賴於第三方產品的支持模塊的實現
src/java/org/quartz/utils 整個框架要用到的輔助類和工具組件
src/jboss 提供了特定於 JBoss 特性的源代碼
src/oracle 提供了特定於 Oracle 特性的源代碼
src/weblogic 提供了特定於 WebLogic 特性的源代碼
Quartz 框架包含許多的類和接口,它們分佈在大概 11 個包中。多數所要使用到的類或接口放置在 org.quartz 包中。這個包含蓋了 Quartz 框架的公有 API.
(2)Quartz核心接口 Scheduler

圖2
Scheduler 是 Quartz 的主要 API。與Quartz大部分交互是發生於 Scheduler 之上的。客服端與Scheduler 交互是經過org.quartz.Scheduler接口。
Scheduler的實現:對方法調用會傳遞到 QuartzScheduler 實例上。QuartzScheduler 對於客戶端是不可見的,而且也不存在與此實例的直接交互。
圖3
建立Scheduler
Quartz 框架提供了 org.quartz.SchedulerFactory 接口。
SchedulerFactory 實例就是用來產生 Scheduler 實例的。當 Scheduler 實例被建立以後,就會存到一個倉庫中(org.quartz.impl.SchedulerRepository).
Scheduler 工廠分別是 org.quartz.impl.DirectSchedulerFactory 和 org.quartz.impl.StdSchedulerFactory
DirectSchedulerFactory 是爲精細化控制 Scheduler 實例產生的工廠類,通常不用,不過有利於理解quartz內部組件。 shell
- -- 最簡單
- public void createScheduler(ThreadPool threadPool, JobStore jobStore);
- -- 最複雜
- public void createScheduler(String schedulerName, String schedulerInstanceId,ThreadPool threadPool, JobStore jobStore, String rmiRegistryHost, int rmiRegistryPort);
- public scheduler createScheduler(){
- DirectSchedulerFactory factory=DirectSchedulerFactory.getInstance();
- try {
-
- SimpleThreadPool threadPool = new SimpleThreadPool(10, Thread.NORM_PRIORITY);
- threadPool.initialize();
-
- JobStoreTX jdbcJobStore = new JobStoreTX();
- jdbcJobStore.setDataSource("someDatasource");
- jdbcJobStore.setPostgresStyleBlobs(true);
- jdbcJobStore.setTablePrefix("QRTZ_");
- jdbcJobStore.setInstanceId("My Instance");
-
- logger.info("Scheduler starting up...");
- factory.createScheduler(threadPool,jdbcJobStore);
-
- Scheduler scheduler = factory.getScheduler();
-
-
- scheduler.start();
- return scheduler;
- }
- return null;
- }
org.quartz.impl.StdSchedulerFactory 依賴於屬性類(Properties)決定如何生產 Scheduler 實例
經過加載屬性文件,Properties 提供啓動參數: 數據庫
- public scheduler createScheduler(){
-
- StdSchedulerFactory factory = new StdSchedulerFactory();
-
-
- Properties props = new Properties();
-
- props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,"org.quartz.simpl.SimpleThreadPool");
- props.put("org.quartz.threadPool.threadCount", "10");
-
- try {
-
- factory.initialize(props);
- Scheduler scheduler = factory.getScheduler();
- logger.info("Scheduler starting up...");
- scheduler.start();
- } catch (SchedulerException ex) {
- logger.error(ex);
- }
- }
調用靜態方法 getDefaultScheduler() 方法中調用了空的構造方法。若是以前未調用過任何一個 initialize() 方法,那麼無參的initialize() 方法會被調用。這會開始去按照下面說的順序加載文件。
默認狀況下,quartz.properties 會被定位到,並從中加載屬性。
properties加載順序:
1. 檢查 System.getProperty("org.quartz.properties") 中是否設置了別的文件名
2. 不然,使用 quartz.properties 做爲要加載的文件名
3. 試圖從當前工做目錄中加載這個文件
4. 試圖從系統 classpath 下加載這個文件
在 Quartz Jar 包中有一個默認的 quartz.properties 文件
默認配置以下
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
到此建立Scheduler完成
經過Scheduler理解Quartz
Scheduler 的 API 能夠分組成如下三個類別:
·管理 Scheduler
(1)啓動 Scheduler windows
start() 方法被調用,Scheduler 就開始搜尋須要執行的 Job。在你剛獲得一個 Scheduler 新的實例時,或者 Scheduler
被設置爲 standby 模式後,你才能夠調用 start() 方法。 設計模式
- public void standby() throws SchedulerException;
只要調用了 shutdown() 方法以後,你就不能再調用 Scheduler 實例的 start() 方法了。
這是由於 shutdown() 方法銷燬了爲 Scheduler 建立的全部的資源(線程,數據庫鏈接等)。
你可能須要Standby 模式:設置 Scheduler 爲 standby 模式會致使 Scheduler搜尋要執行的 Job 的線程被暫停下來
中止 Scheduler
- public void shutdown(boolean waitForJobsToComplete) throws SchedulerException;
- public void shutdown() throws SchedulerException;
其它管理Scheduler 方法見API...
管理 Job
什麼是 Quartz Job?
一個Quart Job就是一個任何一個繼承job或job子接口的Java類,你能夠用這個類作任何事情!
org.quartz.Job 接口
- public void execute(JobExecutionContext context)throws JobExecutionException;
- JobExecutionContext
當 Scheduler 調用一個 Job,一個 JobexecutionContext 傳遞給 execute() 方法。JobExecutionContext 對象讓 Job 能
訪問 Quartz 運行時候環境和 Job 自己的數據。相似於在 Java Web 應用中的 servlet 訪問 ServletContext 。
經過 JobExecutionContext,Job 可訪問到所處環境的全部信息,包括註冊到 Scheduler 上與該 Job 相關聯的 JobDetail 和 Trigger。
JobDetail
部署在 Scheduler 上的每個 Job 只建立了一個 JobDetail實例。JobDetail 是做爲 Job 實例進行定義的
// Create the JobDetail
JobDetail jobDetail = new JobDetail("PrintInfoJob",Scheduler.DEFAULT_GROUP, PrintInfoJob.class);
// Create a trigger that fires now and repeats forever
Trigger trigger = TriggerUtils.makeImmediateTrigger(
SimpleTrigger.REPEAT_INDEFINITELY, 10000);
trigger.setName("PrintInfoJobTrigger");// register with the Scheduler
scheduler.scheduleJob(jobDetail, trigger);
JobDetail 被加到 Scheduler 中了,而不是 job。Job 類是做爲 JobDetail 的一部份,job直到Scheduler準備要執行它的時候纔會被實例化的,所以job不存在線成安全性問題.
使用 JobDataMap 對象設定 Job 狀態
- public void executeScheduler() throws SchedulerException{
- scheduler = StdSchedulerFactory.getDefaultScheduler();
- scheduler.start();
- logger.info("Scheduler was started at " + new Date());
-
- JobDetail jobDetail = new JobDetail("PrintJobDataMapJob",Scheduler.DEFAULT_GROUP,PrintJobDataMapJob.class);
-
- jobDetail.getJobDataMap().put("name", "John Doe");
- jobDetail.getJobDataMap().put("age", 23);
- jobDetail.getJobDataMap().put("balance",new BigDecimal(1200.37));
-
- Trigger trigger = TriggerUtils.makeImmediateTrigger(0, 10000);
- trigger.setName("PrintJobDataMapJobTrigger");
- scheduler.scheduleJob(jobDetail, trigger);
- }
- public class PrintJobDataMapJob implements Job {
- public void execute(JobExecutionContext context)throws JobExecutionException {
- logger.info("in PrintJobDataMapJob");
-
- JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
-
- Iterator iter = jobDataMap.keySet().iterator();
- while (iter.hasNext()) {
- Object key = iter.next();
- Object value = jobDataMap.get(key);
- logger.info("Key: " + key + " - Value: " + value);
- }
- }
- }
在Quartz 1.5以後,JobDataMap在 Trigger 級也是可用的。它的用途相似於Job級的JobDataMap,支持在同一個JobDetail上的多個Trigger。
伴隨着加入到 Quartz 1.5 中的這一加強特性,可使用 JobExecutionContext 的一個新的更方便的方法獲取到 Job 和 Trigger 級的並集的 map 中的值。
這個方法就是getMergedJobDataMap() 取job 和 Trigger級的並集map,它可以在 Job 中使用。管法推薦使用這個方法.
* 實際使用中trigger級別有時取不到map中的值, 使用getMergedJobDataMap 能夠獲取到(官方推薦此方法).
有狀態的Job: org.quartz.StatefulJob 接口
當須要在兩次 Job 執行間維護狀態,使用StatefulJob 接口.
Job 和 StatefulJob 在框架中使用中存在兩個關鍵差別。
(一) JobDataMap 在每次執行以後從新持久化到 JobStore 中。這樣就確保你對 Job 數據的改變直到下次執行仍然保持着。
(二) 兩個或多個有狀態的 JobDetail 實例不能併發執行。保證JobDataMap線程安全
注意:實際使用時使用jobStoreTX/jobStoreCMT ,StatefulJob,大量的trigger對應一個JobDetail的狀況下Mysql會產生鎖超時問題.
中斷 Job
Quartz 包括一個接口叫作 org.quartz.InterruptableJob,它擴展了普通的 Job 接口並提供了一個 interrupt() 方法:
沒有深刻研究,只知道 Scheduler會調用自定義的Job的 interrupt()方法。由用戶決定 Job 決定如何中斷.沒有測試!!!
job的特性
易失性 volatility
一個易失性的 Job 是在程序關閉以後不會被持久化。一個 Job 是經過調用 JobDetail 的 setVolatility(true)被設置爲易失.
Job易失性的默認值是 false.
注意:只有採用持久性JobStore時纔有效
Job 持久性 durability
設置JobDetail 的 setDurability(false),在全部的觸發器觸發以後JobDetail將從 JobStore 中移出。
Job持久性默認值是false.
Scheduler將移除沒有trigger關聯的jobDetail
Job 可恢復性 shuldRecover
當一個Job在執行中,Scheduler非正常的關閉,設置JobDetail 的setRequestsRecovery(true) 在 Scheduler 重啓以後可恢復的Job還會再次被執行。這個
Job 會從新開始執行。注意job代碼事務特性.
Job可恢復性默認爲false,Scheduler不會試着去恢復job操做。
圖爲表述沒有執行完成的job數據庫記錄
Scheduler 中移除 Job
移除全部與這個 Job 相關聯的 Trigger;若是這個 Job 是非持久性的,它將會從 Scheduler 中移出。
更直接的方式是使用 deleteJob() 方法,它還會刪除全部與當前job關聯的trigger
public boolean deleteJob(String jobName, String groupName) throws SchedulerException;
quartz 自己提供的 Job
org.quartz.jobs.FileScanJob 檢查某個指定文件是否變化,並在文件被改變時通知到相應監聽器的 Job
org.quartz.jobs.FileScanListener 在文件被修改後通知 FileScanJob 的監聽器
org.quartz.jobs.NativeJob 用來執行本地程序(如 windows 下 .exe 文件) 的 Job
org.quartz.jobs.NoOpJob 什麼也不作,但用來測試監聽器不是頗有用的。一些用戶甚至僅僅用它來致使一個監聽器的運行
org.quartz.jobs.ee.mail.SendMailJob 使用 JavaMail API 發送 e-mail 的 Job
org.quartz.jobs.ee.jmx.JMXInvokerJob 調用 JMX bean 上的方法的 Job
org.quartz.jobs.ee.ejb.EJBInvokerJob 用來調用 EJB 上方法的 Job
job的理解到此結束
理解quartz Trigger
Job 包含了要執行任務的邏輯,可是Job不負責什麼時候執行。這個事情由觸發器(Trigger)負責。
Quartz Trigger繼承了抽象的org.quartz.Trigger 類。
目前,Quartz 有三個可用的實現
org.quartz.SimpleTrigger
org.quartz.CronTrigger
org.quartz.NthIncludeDayTrigger
使用org.quartz.SimpleTrigger
SimpleTrigger 是設置和使用是最爲簡單的一種 Quartz Trigger。它是爲那種須要在特定的日期/時間啓動,且以一個可能的間隔時間重複執行 n 次的 Job 所設計的。
SimpleTrigger 存在幾個變種的構造方法。他們是從無參的版本一直到帶所有參數的版本。
下面代碼版斷顯示了一個僅帶有trigger 的名字和組的簡單構造方法
SimpleTrigger sTrigger = new SimpleTrigger("myTrigger", Scheduler.DEFAULT_GROUP);
這個 Trigger 會當即執行,而不重複。還有一個構造方法帶有多個參數,配置 Triiger 在某一特定時刻觸發,重複執行屢次,和兩
次觸發間的延遲時間。
- public SimpleTrigger(String name, String group,String jobName, String jobGroup,
- Date startTime,Date endTime, int repeatCount, long repeatInterval);
使用org.quartz.CronTrigger
CronTrigger 是基於 Unix 相似於 cron 的表達式觸發,也是功能最強大和最經常使用的Trigger
Cron表達式:
- "0 0 12 * * ?" Fire at 12pm (noon) every day
- "0 15 10 ? * *" Fire at 10:15am every day
- "0 15 10 * * ?" Fire at 10:15am every day
- "0 15 10 * * ? *" Fire at 10:15am every day
- "0 15 10 * * ? 2005" Fire at 10:15am every day during the year 2005
- "0 * 14 * * ?" Fire every minute starting at 2pm and ending at 2:59pm, every day
- "0 0/5 14 * * ?" Fire every 5 minutes starting at 2pm and ending at 2:55pm, every day
- "0 0/5 14,18 * * ?" Fire every 5 minutes starting at 2pm and ending at 2:55pm, AND fire every 5 minutes starting at 6pm and ending at 6:55pm, every day
- "0 0-5 14 * * ?" Fire every minute starting at 2pm and ending at 2:05pm, every day
- "0 10,44 14 ? 3 WED" Fire at 2:10pm and at 2:44pm every Wednesday in the month of March.
- "0 15 10 ? * MON-FRI" Fire at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday
- "0 15 10 15 * ?" Fire at 10:15am on the 15th day of every month
- "0 15 10 L * ?" Fire at 10:15am on the last day of every month
- "0 15 10 ? * 6L" Fire at 10:15am on the last Friday of every month
- "0 15 10 ? * 6L" Fire at 10:15am on the last Friday of every month
- "0 15 10 ? * 6L 2002-2005" Fire at 10:15am on every last Friday of every month during the years 2002, 2003, 2004 and 2005
- "0 15 10 ? * 6#3" Fire at 10:15am on the third Friday of every month
使用 org.quartz.NthIncludedDayTrigger
org.quartz.NthIncludedDayTrigger是設計用於在每一間隔類型的第幾天執行 Job。
例如,你要在每月的 12 號執行發工資提醒的Job。接下來的代碼片段描繪瞭如何建立一個 NthIncludedDayTrigger.
- NthIncludedDayTrigger trigger = new NthIncludedDayTrigger("MyTrigger", Scheduler.DEFAULT_GROUP);
- trigger.setN(12);
- trigger.setIntervalType(NthIncludedDayTrigger.INTERVAL_TYPE_MONTHLY);
jobDetail + trigger組成最基本的定時任務:
特別注意:一個job能夠對應多個Trgger , 一個Trigger只能對應一個job .
如:CRM中N天未拜訪的job對應全部的N天未拜訪商家(一個商家一個trigger) 大約1:1000的比例
job和trigger都是經過name 和 group 屬性肯定惟一性的.
Quartz Calendar
Quartz 的 Calendar 對象與 Java API 的 java.util.Calendar不一樣。
Java 的 Calender 對象是通用的日期和時間工具;
Quartz 的 Calender 專門用於屏閉一個時間區間,使 Trigger 在這個區間中不被觸發。
例如,讓咱們假如取消節假日執行job。
Quartz包括許多的 Calender 實現足以知足大部分的需求.
org.quartz.impl.calendar.BaseCalender 爲高級的 Calender 實現了基本的功能,實現了 org.quartz.Calender 接口
org.quartz.impl.calendar.WeeklyCalendar 排除星期中的一天或多天,例如,可用於排除週末
org.quartz.impl.calendar.MonthlyCalendar 排除月份中的數天,例如,可用於排除每個月的最後一天
org.quartz.impl.calendar.AnnualCalendar 排除年中一天或多天
org.quartz.impl.calendar.HolidayCalendar 特別的用於從 Trigger 中排除節假日
使用Calendar,只需實例化後並加入你要排除的日期,而後用 Scheduler 註冊,最後必須讓Calender依附於Trigger實例。
排除國慶節實例
- private void scheduleJob(Scheduler scheduler, Class jobClass) {
- try {
-
- AnnualCalendar cal = new AnnualCalendar();
-
- Calendar gCal = GregorianCalendar.getInstance();
- gCal.set(Calendar.MONTH, Calendar.OCTOBER);
- List<Calendar> mayHolidays = new ArraysList<Calendar>();
- for(int i=1; i<=7; i++){
- gCal.set(Calendar.DATE, i);
- mayHolidays.add(gCal);
- }
- cal.setDaysExcluded(mayHolidays);
-
- scheduler.addCalendar("crmHolidays", cal, true, true);
-
- Trigger trigger = TriggerUtils.makeImmediateTrigger("myTrigger",-1,60000);
-
- trigger.setCalendarName("crmHolidays");
- JobDetail jobDetail = new JobDetail(jobClass.getName(), Scheduler.DEFAULT_GROUP, jobClass);
-
- scheduler.scheduleJob(jobDetail, trigger);
- } catch (SchedulerException ex) {
- logger.error(ex);
- }
- }
Quartz 監聽器
Quartz 提供了三種類型的監聽器:監聽Job,監聽Trigger,和監聽Scheduler.
監聽器是做爲擴展點存在的.
Quartz 監聽器是擴展點,能夠擴展框架並定製來作特定的事情。跟Spring,Hibernate,Servlet監聽器相似.
實現監聽
1. 建立一個 Java 類,實現監聽器接口
2. 用你的應用中特定的邏輯實現監聽器接口的全部方法
3. 註冊監聽器
全局和非全局監聽器
JobListener 和 TriggerListener 可被註冊爲全局或非全局監聽器。一個全局監聽器能接收到全部的 Job/Trigger 的事件通知。
而一個非全局監聽器只能接收到那些在其上已註冊了監聽器的 Job 或 Triiger 的事件。
做者:James House描述全局和非全局監聽器
全局監聽器是主動意識的,它們爲了執行它們的任務而熱切的去尋找每個可能的事件。一般,全局監聽器要作的工做不用指定到特定的 Job 或 Trigger。
非全局監聽器通常是被動意識的,它們在所關注的 Trigger 激發以前或是 Job 執行以前什麼事也不作。所以,非全局的監聽器比起全局監聽器而言更適合於修改或增長 Job 執行的工做。
相似裝飾設計模式
監聽 Job 事件
org.quartz.JobListener 接口包含一系列的方法,它們會由 Job 在其生命週期中產生的某些關鍵事件時被調用
- public interface JobListener {
-
- public String getName();
-
-
- public void jobToBeExecuted(JobExecutionContext context);
-
- public void jobExecutionVetoed(JobExecutionContext context);
-
- public void jobWasExecuted(JobExecutionContext context,JobExecutionException jobException);
圖7 job listener參與job的執行生命週期
註冊全局監聽器
- scheduler.addGlobalJobListener(jobListener);
註冊非全局監聽器(依次完成,順序不能顛倒)
- scheduler.addJobListener(jobListener);
- jobDetail.addJobListener(jobListener.getName());
- scheduler.addjob(jobDetail,true);
監聽 Trigger 事件
org.quartz.TriggerListener 接口定義Trigger監聽器
- public interface TriggerListener {
-
- public String getName();
-
-
-
- public void triggerFired(Trigger trigger, JobExecutionContext context);
-
-
-
- public boolean vetoJobExecution(Trigger trigger, JobExecutidonContext context);
-
-
-
- public void triggerMisfired(Trigger trigger);
-
-
- public void triggerComplete(Trigger trigger, JobExecutionContext context, int triggerInstructionCode);
- }
triggerListener的註冊與jobListener相同
監聽 Scheduler 事件
org.quartz.SchedulerListener 接口定義Trigger監聽器
- public interface SchedulerListener {
-
- public void jobScheduled(Trigger trigger);
-
-
- public void jobUnscheduled(String triggerName, String triggerGroup);
-
-
- public void triggerFinalized(Trigger trigger);
-
-
- public void triggersPaused(String triggerName, String triggerGroup);
-
-
- public void triggersResumed(String triggerName,String triggerGroup);
-
-
- public void jobsPaused(String jobName, String jobGroup);
-
-
- public void jobsResumed(String jobName, String jobGroup);
-
-
-
- public void schedulerError(String msg, SchedulerException cause);
-
-
- public void schedulerShutdown();
- }
註冊SchedulerListener(SchedulerListener不存在全局非全局性)
scheduler.addSchedulerListener(schedulerListener);
因爲scheduler異常存在不打印問題,CRM使用監聽器代碼打印.
- public class QuartzExceptionSchedulerListener extends SchedulerListenerSupport{
- private Logger logger = LoggerFactory.getLogger(QuartzExceptionSchedulerListener.class);
- @Override
- public void schedulerError(String message, SchedulerException e) {
- super.schedulerError(message, e);
- logger.error(message, e.getUnderlyingException());
- }
- }
- <bean id="quartzExceptionSchedulerListener" class="com.***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>
- <!-- 配置監聽器 -->
- <property name="schedulerListeners">
- <list>
- <ref bean="quartzExceptionSchedulerListener"/>
- </list>
- </property>
quartz與線程
主處理線程:QuartzSchedulerThread
啓動Scheduler時。QuartzScheduler被建立並建立一個org.quartz.core.QuartzSchedulerThread 類的實例。
QuartzSchedulerThread 包含有決定什麼時候下一個Job將被觸發的處理循環。QuartzSchedulerThread 是一個 Java 線程。它做爲一個非守護線程運行在正常優先級下。
QuartzSchedulerThread 的主處理輪循步驟:
1. 當 Scheduler 正在運行時:
A. 檢查是否有轉換爲 standby 模式的請求。
1. 假如 standby 方法被調用,等待繼續的信號
B. 詢問 JobStore 下次要被觸發的 Trigger.
1. 若是沒有 Trigger 待觸發,等候一小段時間後再次檢查
2. 假若有一個可用的 Trigger,等待觸發它的確切時間的到來
D. 時間到了,爲 Trigger 獲取到 triggerFiredBundle.
E. 使用Scheduler和triggerFiredBundle 爲 Job 建立一個JobRunShell實例
F. 在ThreadPool 申請一個線程運行 JobRunShell 實例.
代碼邏輯在QuartzSchedulerThread 的 run() 中,以下:
- public void run() {
- boolean lastAcquireFailed = false;
- while (!halted.get()) {
- try {
-
- synchronized (sigLock) {
- while (paused && !halted.get()) {
- try {
-
- sigLock.wait(1000L);
- } catch (InterruptedException ignore) {
- }
- }
-
- if (halted.get()) {
- break;
- }
- }
-
- int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
- if(availTreadCount > 0) {
- Trigger trigger = null;
-
- long now = System.currentTimeMillis();
- clearSignaledSchedulingChange();
- try {
- trigger = qsRsrcs.getJobStore().acquireNextTrigger(
- ctxt, now + idleWaitTime);
- lastAcquireFailed = false;
- } catch (JobPersistenceException jpe) {
- if(!lastAcquireFailed) {
- qs.notifySchedulerListenersError(
- "An error occured while scanning for the next trigger to fire.",
- jpe);
- }
- lastAcquireFailed = true;
- } catch (RuntimeException e) {
- if(!lastAcquireFailed) {
- getLog().error("quartzSchedulerThreadLoop: RuntimeException "
- +e.getMessage(), e);
- }
- lastAcquireFailed = true;
- }
-
- if (trigger != null) {
- now = System.currentTimeMillis();
- long triggerTime = trigger.getNextFireTime().getTime();
- long timeUntilTrigger = triggerTime - now;
- while(timeUntilTrigger > 2) {
- synchronized(sigLock) {
- if(!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
- try {
-
-
- now = System.currentTimeMillis();
- timeUntilTrigger = triggerTime - now;
- if(timeUntilTrigger >= 1)
- sigLock.wait(timeUntilTrigger);
- } catch (InterruptedException ignore) {
- }
- }
- }
- if(releaseIfScheduleChangedSignificantly(trigger, triggerTime)) {
- trigger = null;
- break;
- }
- now = System.currentTimeMillis();
- timeUntilTrigger = triggerTime - now;
- }
- if(trigger == null)
- continue;
-
-
- TriggerFiredBundle bndle = null;
-
- boolean goAhead = true;
- synchronized(sigLock) {
- goAhead = !halted.get();
- }
-
- if(goAhead) {
- try {
- bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
- trigger);
- } catch (SchedulerException se) {
- qs.notifySchedulerListenersError(
- "An error occured while firing trigger '"
- + trigger.getFullName() + "'", se);
- } catch (RuntimeException e) {
- getLog().error(
- "RuntimeException while firing trigger " +
- trigger.getFullName(), e);
-
-
- releaseTriggerRetryLoop(trigger);
- }
- }
-
-
-
-
- if (bndle == null) {
- try {
- qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
- trigger);
- } catch (SchedulerException se) {
- qs.notifySchedulerListenersError(
- "An error occured while releasing trigger '"
- + trigger.getFullName() + "'", se);
-
-
- releaseTriggerRetryLoop(trigger);
- }
- continue;
- }
-
-
-
-
-
-
-
- JobRunShell shell = null;
- try {
- shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
- shell.initialize(qs, bndle);
- } catch (SchedulerException se) {
- try {
- qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
- trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
- } catch (SchedulerException se2) {
- qs.notifySchedulerListenersError(
- "An error occured while placing job's triggers in error state '"
- + trigger.getFullName() + "'", se2);
-
-
- errorTriggerRetryLoop(bndle);
- }
- continue;
- }
-
- if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
- try {
-
-
-
-
- getLog().error("ThreadPool.runInThread() return false!");
- qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
- trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
- } catch (SchedulerException se2) {
- qs.notifySchedulerListenersError(
- "An error occured while placing job's triggers in error state '"
- + trigger.getFullName() + "'", se2);
-
-
- releaseTriggerRetryLoop(trigger);
- }
- }
- continue;
- }
- } else {
- continue;
- }
-
- long now = System.currentTimeMillis();
- long waitTime = now + getRandomizedIdleWaitTime();
- long timeUntilContinue = waitTime - now;
- synchronized(sigLock) {
- try {
- sigLock.wait(timeUntilContinue);
- } catch (InterruptedException ignore) {
- }
- }
-
- } catch(RuntimeException re) {
- getLog().error("Runtime error occured in main trigger firing loop.", re);
- }
- }
-
-
- qs = null;
- qsRsrcs = null;
- }
quartz工做者線程
Quartz 不會在主線程(QuartzSchedulerThread)中處理用戶的Job。Quartz 把線程管理的職責委託給ThreadPool。
通常的設置使用org.quartz.simpl.SimpleThreadPool。SimpleThreadPool 建立了必定數量的 WorkerThread 實例來使得Job可以在線程中進行處理。
WorkerThread 是定義在 SimpleThreadPool 類中的內部類,它實質上就是一個線程。
要建立 WorkerThread 的數量以及配置他們的優先級是在文件quartz.properties中並傳入工廠。
spring properties
- <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
- <prop key="org.quartz.threadPool.threadCount">20</prop>
- <prop key="org.quartz.threadPool.threadPriority">5</prop>
主線程(QuartzSchedulerThread)請求ThreadPool去運行 JobRunShell 實例,ThreadPool 就檢查看是否有一個可用的工做者線
程。假如因此已配置的工做者線程都是忙的,ThreadPool 就等待直到有一個變爲可用。當一個工做者線程是可用的,
而且有一個JobRunShell 等待執行,工做者線程就會調用 JobRunShell 類的 run() 方法。
Quartz 框架容許替換線程池,但必須實現org.quartz.spi.ThreadPool 接口.
圖4 quartz內部的主線程和工做者線程
Quartz的存儲和持久化
Quartz 用 JobStores 對 Job、Trigger、calendar 和 Schduler 數據提供一種存儲機制。Scheduler 應用已配置的JobStore 來存儲和獲取到部署信息,並決定正被觸發執行的 Job 的職責。
全部的關於哪一個 Job 要執行和以什麼時間表來執行他們的信息都來存儲在 JobStore。
在 Quartz 中兩種可用的 Job 存儲類型是:
內存(非持久化) 存儲
持久化存儲
JobStore 接口
Quartz 爲全部類型的Job存儲提供了一個接口。叫 JobStore。全部的Job存儲機制,無論是在哪裏或是如何存儲他們的信息的,都必須實現這個接口。
JobStore 接口的 API 可概括爲下面幾類:
Job 相關的 API
Trigger 相關的 API
Calendar 相關的 API
Scheduler 相關的 API
使用內存來存儲 Scheduler 信息
Quartz 的內存Job存儲類叫作 org.quartz.simple.RAMJobStore,它實現了JobStore 接口的。
RAMJobStore 是 Quartz 的默認的解決方案。
使用這種內存JobStore的好處。
RAMJobStore是配置最簡單的 JobStore:默認已經配置好了。見quartz.jar:org.quartz.quartz.properties
RAMJobStore的速度很是快。全部的 quartz存儲操做都在計算機內存中
使用持久性的 JobStore
持久性 JobStore = JDBC + 關係型數據庫
Quartz 全部的持久化的 JobStore 都擴展自 org.quartz.impl.jdbcjobstore.JobStoreSupport 類。
圖5
JobStoreSupport 實現了 JobStore 接口,是做爲 Quartz 提供的兩個具體的持久性 JobStore 類的基類。
Quartz 提供了兩種不一樣類型的JobStoreSupport實現類,每個設計爲針對特定的數據庫環境和配置:
·org.quartz.impl.jdbcjobstore.JobStoreTX
·org.quartz.impl.jdbcjobstore.JobStoreCMT
獨立環境中的持久性存儲
JobStoreTX 類設計爲用於獨立環境中。這裏的 "獨立",咱們是指這樣一個環境,在其中不存在與應用容器的事務集成。
#properties配置
org.quartz.jobStore.class = org.quartz.ompl.jdbcjobstore.JobStoreTX
依賴容器相關的持久性存儲
JobStoreCMT 類設計爲與程序容器事務集成,容器管理的事物(Container Managed Transactions (CMT))
crm使用JobStoreTX 由於quart有長時間鎖等待狀況,不參與系統自己事務(crm任務內事務與quartz自己事務分離).
Quartz 數據庫結構
表名描述
QRTZ_CALENDARS 以 Blob 類型存儲 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS 存儲 Cron Trigger,包括 Cron 表達式和時區信息
QRTZ_FIRED_TRIGGERS 存儲與已觸發的 Trigger 相關的狀態信息,以及相聯 Job 的執行信息
QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的 Trigger 組的信息
QRTZ_SCHEDULER_STATE 存儲少許的有關 Scheduler 的狀態信息,和別的 Scheduler 實例(假如是用於一個集羣中)
QRTZ_LOCKS 存儲程序的非觀鎖的信息(假如使用了悲觀鎖)
QRTZ_JOB_DETAILS 存儲每個已配置的 Job 的詳細信息
QRTZ_JOB_LISTENERS 存儲有關已配置的 JobListener 的信息
QRTZ_SIMPLE_TRIGGERS 存儲簡單的 Trigger,包括重複次數,間隔,以及已觸的次數
QRTZ_BLOG_TRIGGERS Trigger 做爲 Blob 類型存儲(用於 Quartz 用戶用 JDBC 建立他們本身定製的 Trigger 類型,JobStore 並不知道如何存儲實例的時候)
QRTZ_TRIGGER_LISTENERS 存儲已配置的 TriggerListener 的信息
QRTZ_TRIGGERS 存儲已配置的 Trigger 的信息
全部的表默認之前綴QRTZ_開始。能夠經過在 quartz.properties配置修改(org.quartz.jobStore.tablePrefix = QRTZ_)。
能夠對不一樣的Scheduler實例使用多套的表,經過改變前綴來實現。
優化 quartz數據表結構
-- 1:對關鍵查詢路徑字段創建索引
- create index idx_qrtz_t_next_fire_time on QRTZ_TRIGGERS(NEXT_FIRE_TIME);
- create index idx_qrtz_t_state on QRTZ_TRIGGERS(TRIGGER_STATE);
- create index idx_qrtz_t_nf_st on QRTZ_TRIGGERS(TRIGGER_STATE,NEXT_FIRE_TIME);
- create index idx_qrtz_ft_trig_group on QRTZ_FIRED_TRIGGERS(TRIGGER_GROUP);
- create index idx_qrtz_ft_trig_name on QRTZ_FIRED_TRIGGERS(TRIGGER_NAME);
- create index idx_qrtz_ft_trig_n_g on QRTZ_FIRED_TRIGGERS(TRIGGER_NAME,TRIGGER_GROUP);
- create index idx_qrtz_ft_trig_inst_name on QRTZ_FIRED_TRIGGERS(INSTANCE_NAME);
- create index idx_qrtz_ft_job_name on QRTZ_FIRED_TRIGGERS(JOB_NAME);
- create index idx_qrtz_ft_job_group on QRTZ_FIRED_TRIGGERS(JOB_GROUP);
-- 2:根據Mysql innodb表結構特性,調整主鍵,下降二級索引的大小
- ALTER TABLE QRTZ_TRIGGERS
- ADD UNIQUE KEY IDX_NAME_GROUP(TRIGGER_NAME,TRIGGER_GROUP),
- DROP PRIMARY KEY,
- ADD ID INT UNSIGNED NOT NULL AUTO_INCREMENT FIRST,
- ADD PRIMARY KEY (ID);
- ALTER TABLE QRTZ_JOB_DETAILS
- ADD UNIQUE KEY IDX_NAME_GROUP(JOB_NAME,JOB_GROUP),
- DROP PRIMARY KEY,
- ADD ID INT UNSIGNED NOT NULL AUTO_INCREMENT FIRST,
- ADD PRIMARY KEY (ID);
Quartz集羣
只有使用持久的JobStore才能完成Quqrtz集羣
圖6
一個 Quartz 集羣中的每一個節點是一個獨立的 Quartz 應用,它又管理着其餘的節點。
須要分別對每一個節點分別啓動或中止。不像應用服務器的集羣,獨立的 Quartz 節點並不與另外一個節點或是管理節點通訊。
Quartz 應用是經過數據庫表來感知到另外一應用。
配置集羣
- <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
- <prop key="org.quartz.jobStore.isClustered">true</prop>
- <prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>
- <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>
- <prop key="org.quartz.jobStore.dataSource">myDS</prop>
- <prop key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>
- <prop key="org.quartz.dataSource.myDS.URL">${database.url}</prop>
- <prop key="org.quartz.dataSource.myDS.user">${database.username}</prop>
- <prop key="org.quartz.dataSource.myDS.password">${database.password}</prop>
- <prop key="org.quartz.dataSource.myDS.maxConnections">5</prop>
org.quartz.jobStore.class 屬性爲 JobStoreTX,
將任務持久化到數據中。由於集羣中節點依賴於數據庫來傳播Scheduler實例的狀態,你只能在使用 JDBC JobStore 時應用 Quartz 集羣。
org.quartz.jobStore.isClustered 屬性爲 true,通知Scheduler實例要它參與到一個集羣當中。
org.quartz.jobStore.clusterCheckinInterval
屬性定義了Scheduler 實例檢入到數據庫中的頻率(單位:毫秒)。
Scheduler 檢查是否其餘的實例到了它們應當檢入的時候未檢入;
這能指出一個失敗的 Scheduler 實例,且當前 Scheduler 會以此來接管任何執行失敗並可恢復的 Job。
經過檢入操做,Scheduler 也會更新自身的狀態記錄。clusterChedkinInterval 越小,Scheduler 節點檢查失敗的 Scheduler 實例就越頻繁。默認值是 15000 (即15 秒)
集羣實現分析
Quartz原來碼分析:
基於數據庫表鎖實現多Quartz_Node 對Job,Trigger,Calendar等同步機制
- CREATE TABLE `QRTZ_LOCKS` (
- `LOCK_NAME` varchar(40) NOT NULL,
- PRIMARY KEY (`LOCK_NAME`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- +
- | LOCK_NAME |
- +
- | CALENDAR_ACCESS |
- | JOB_ACCESS |
- | MISFIRE_ACCESS |
- | STATE_ACCESS |
- | TRIGGER_ACCESS |
- +
經過行級別鎖實現多節點處理
- public class StdRowLockSemaphore extends DBSemaphore {
-
-
- public static final String SELECT_FOR_LOCK = "SELECT * FROM "
- + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_LOCK_NAME
- + " = ? FOR UPDATE";
-
-
- public StdRowLockSemaphore() {
- super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK);
- }
-
- public StdRowLockSemaphore(String tablePrefix, String seletWithLockSQL) {
- super(tablePrefix, selectWithLockSQL, SELECT_FOR_LOCK);
- }
-
-
- protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException {
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- ps = conn.prepareStatement(expandedSQL);
- ps.setString(1, lockName);
-
- if (getLog().isDebugEnabled()) {
- getLog().debug(
- "Lock '" + lockName + "' is being obtained: " +
- Thread.currentThread().getName());
- }
- rs = ps.executeQuery();
- if (!rs.next()) {
- throw new SQLException(Util.rtp(
- "No row exists in table " + TABLE_PREFIX_SUBST +
- TABLE_LOCKS + " for lock named: " + lockName, getTablePrefix()));
- }
- } catch (SQLException sqle) {
- if (getLog().isDebugEnabled()) {
- getLog().debug(
- "Lock '" + lockName + "' was not obtained by: " +
- Thread.currentThread().getName());
- }
- throw new LockException("Failure obtaining db row lock: "
- + sqle.getMessage(), sqle);
- } finally {
- if (rs != null) {
- try {
- rs.close();
- } catch (Exception ignore) {
- }
- }
- if (ps != null) {
- try {
- ps.close();
- } catch (Exception ignore) {
- }
- }
- }
- }
-
- protected String getSelectWithLockSQL() {
- return getSQL();
- }
-
- public void setSelectWithLockSQL(String selectWithLockSQL) {
- setSQL(selectWithLockSQL);
- }
- }
-
-
- public boolean obtainLock(Connection conn, String lockName) throws LockException {
- lockName = lockName.intern();
-
- Logger log = getLog();
-
- if(log.isDebugEnabled()) {
- log.debug(
- "Lock '" + lockName + "' is desired by: "
- + Thread.currentThread().getName());
- }
- if (!isLockOwner(conn, lockName)) {
- executeSQL(conn, lockName, expandedSQL);
-
- if(log.isDebugEnabled()) {
- log.debug(
- "Lock '" + lockName + "' given to: "
- + Thread.currentThread().getName());
- }
- getThreadLocks().add(lockName);
-
-
- } else if(log.isDebugEnabled()) {
- log.debug(
- "Lock '" + lockName + "' Is already owned by: "
- + Thread.currentThread().getName());
- }
- return true;
- }
-
-
- public void releaseLock(Connection conn, String lockName) {
- lockName = lockName.intern();
-
- if (isLockOwner(conn, lockName)) {
- if(getLog().isDebugEnabled()) {
- getLog().debug(
- "Lock '" + lockName + "' returned by: "
- + Thread.currentThread().getName());
- }
- getThreadLocks().remove(lockName);
-
- } else if (getLog().isDebugEnabled()) {
- getLog().warn(
- "Lock '" + lockName + "' attempt to return by: "
- + Thread.currentThread().getName()
- + " -- but not owner!",
- new Exception("stack-trace of wrongful returner"));
- }
- }
JobStoreTX 控制併發代碼
- protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException {
- return executeInNonManagedTXLock(lockName, txCallback);
- }
-
- 使用JobStoreSupport.executeInNonManagedTXLock 實現:
- protected Object executeInNonManagedTXLock(
- String lockName,
- TransactionCallback txCallback) throws JobPersistenceException {
- boolean transOwner = false;
- Connection conn = null;
- try {
- if (lockName != null) {
-
-
- if (getLockHandler().requiresConnection()) {
- conn = getNonManagedTXConnection();
- }
-
- transOwner = getLockHandler().obtainLock(conn, lockName);
- }
- if (conn == null) {
- conn = getNonManagedTXConnection();
- }
-
- Object result = txCallback.execute(conn);
-
- commitConnection(conn);
- Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
- if(sigTime != null && sigTime >= 0) {
- signalSchedulingChangeImmediately(sigTime);
- }
- return result;
- } catch (JobPersistenceException e) {
- rollbackConnection(conn);
- throw e;
- } catch (RuntimeException e) {
- rollbackConnection(conn);
- throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e);
- } finally {
- try {
-
- releaseLock(conn, lockName, transOwner);
- } finally {
- cleanupConnection(conn);
- }
- }
- }
JobStoreCMT 控制併發代碼
-
- protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException {
- boolean transOwner = false;
- Connection conn = null;
- try {
- if (lockName != null) {
-
-
- if (getLockHandler().requiresConnection()) {
- conn = getConnection();
- }
- transOwner = getLockHandler().obtainLock(conn, lockName);
- }
-
- if (conn == null) {
- conn = getConnection();
- }
-
- return txCallback.execute(conn);
- } finally {
- try {
- releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
- } finally {
- cleanupConnection(conn);
- }
- }
- }
CRM中quartz與Spring結合使用
Spring 經過提供org.springframework.scheduling.quartz下的封裝類對quartz支持
可是目前存在問題
1:Spring3.0目前不支持Quartz2.x以上版本
Caused by: java.lang.IncompatibleClassChangeError: class org.springframework.scheduling.quartz.CronTriggerBean
has interface org.quartz.CronTrigger as super class
緣由是 org.quartz.CronTrigger在2.0從class變成了一個interface形成IncompatibleClassChangeError錯誤。
解決:無解,要想使用spring和quartz結合的方式 只能使用Quartz1.x版本。
2:org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean報
java.io.NotSerializableException異常,須要本身實現QuartzJobBean。
解決:spring bug己經在http://jira.springframework.org/browse/SPR-3797找到解決方案,
做者重寫了MethodInvokingJobDetailFactoryBean.
3:Spring內bean必需要實現序列化接口,不然不能經過Sprng 屬性注入的方式爲job提供業務對象
解決:
- @Service("springBeanService")
- public class SpringBeanService implements Serializable{private static final long serialVersionUID = -2228376078979553838L;
- public <T> T getBean(Class<T> clazz,String beanName){
- ApplicationContext context = ContextLoader.getCurrentWebApplicationContext();
- return (T)context.getBean(beanName);
- }
- }
CRM中quartz模塊部分代碼
1:定義全部job的父類,並負責異常發送郵件任務和日誌任務
- public abstract class BaseQuartzJob implements Job, Serializable {
- private static final long serialVersionUID = 3347549365534415931L;
- private Logger logger = LoggerFactory.getLogger(this.getClass());
-
-
- public abstract void action(JobExecutionContext context);
-
- @Override
- public void execute(JobExecutionContext context) throws JobExecutionException {
- try {
- long start = System.currentTimeMillis();
- this.action(context);
- long end = System.currentTimeMillis();
- JobDetail jobDetail = context.getJobDetail();
- Trigger trigger = context.getTrigger();
- StringBuilder buffer = new StringBuilder();
- buffer.append("jobName = ").append(jobDetail.getName()).append(" triggerName = ")
- .append(trigger.getName()).append(" 執行完成 , 耗時: ").append((end - start)).append(" ms");
- logger.info(buffer.toString());
- } catch (Exception e) {
- doResolveException(context != null ? context.getMergedJobDataMap() : null, e);
- }
- }
- @SuppressWarnings("unchecked")
- private void doResolveException(JobDataMap dataMap, Exception ex) {
-
-
- }
- }
2:抽象Quartz操做接口(實現類 toSee: QuartzServiceImpl)
- @Service
- public interface QuartzService {
- List<Map<String, Object>> getQrtzTriggers(Page page, String orderName, String sortType);
- List<Map<String, Object>> getQrtzJobDetails();
- void executeTriggerAction(String name, String group, Integer action);
- void executeJobAction(String name, String group, Integer action);
- void addTrigger(String jobName, String jobGroup, TriggerViewBean triggerBean);
-
- void addTriggerForDate(JobDetail jobDetail, String triggerName , String
- triggerGroup , Date date, Map<String, Object> triggerDataMap) ;
- List<Map<String, Object>> getSchedulers();
- public Trigger getTrigger(String name, String group);
- public JobDetail getJobDetail(String name, String group);
- }
3:在Spring配置job,trigger,Scheduler,Listener組件
- <bean id="accountStatusTaskScannerJobDetail"
- class="org.springframework.scheduling.quartz.JobDetailBean">
- <property name="name" value="accountStatusTaskScannerJobDetail"></property>
- <property name="group" value="CrmAccountGroup"></property>
- <property name="jobClass" value="***.crm.quartz.job.AccountStatusTaskScannerJob"></property>
-
- <property name="requestsRecovery" value="true"/>
-
- <property name="durability" value="true"/>
- <property name="volatility" value="false"></property>
- </bean>
- <bean id="accountStatusTaskScannerTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
- <property name="group" value="CrmDealGroup"></property>
- <property name="name" value="accountStatusTaskScannerTrigger"></property>
- <property name="jobDetail" ref="accountStatusTaskScannerJobDetail"></property>
- <property name="cronExpression" value="0 0 1 * * ?"></property>
- </bean>
-
- <bean id="quartzExceptionSchedulerListener"
- class="***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>
-
- <bean id="quartzScheduler"
- class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
- <property name="quartzProperties">
- <props>
- <prop key="org.quartz.scheduler.instanceName">CRMscheduler</prop>
- <prop key="org.quartz.scheduler.instanceId">AUTO</prop>
-
- <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
- <prop key="org.quartz.threadPool.threadCount">20</prop>
- <prop key="org.quartz.threadPool.threadPriority">5</prop>
-
- <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
-
- <prop key="org.quartz.jobStore.isClustered">false</prop>
- <prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>
- <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>
-
- <prop key="org.quartz.jobStore.dataSource">myDS</prop>
- <prop key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>
- <prop key="org.quartz.dataSource.myDS.URL">${database.url}</prop>
- <prop key="org.quartz.dataSource.myDS.user">${database.username}</prop>
- <prop key="org.quartz.dataSource.myDS.password">${database.password}</prop>
- <prop key="org.quartz.dataSource.myDS.maxConnections">5</prop>
- <prop key="org.quartz.jobStore.misfireThreshold">120000</prop>
- </props>
- </property>
- <property name="schedulerName" value="CRMscheduler" />
-
- <property name="startupDelay" value="30"/>
- <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
-
- <property name="overwriteExistingJobs" value="true" />
-
- <property name="autoStartup" value="true" />
-
- <property name="triggers">
- <list>
- <ref bean="dailyStatisticsTrigger" />
- <ref bean="accountGrabedScannerTrigger" />
- <ref bean="syncAccountFromPOITrigger" />
- <ref bean="userSyncScannerTrigger" />
- <ref bean="syncParentBranchFromPOITrigger"/>
- <ref bean="privateReminderTrigger" />
- <ref bean="onlineBranchesScannerTrigger" />
- <ref bean="syncCtContactServiceTrigger" />
- <ref bean="dealLinkDianpingScannerTrigger" />
- <ref bean="accountStatusTaskScannerTrigger"/>
- <ref bean="nDaysActivityScannerTrigger"/>
- </list>
- </property>
- <property name="jobDetails">
- <list>
- <ref bean="myTestQuartzJobDetail"/>
- <ref bean="accountPrivateToProtectedJobDetail"/>
- <ref bean="accountProtectedToPublicJobDetail"/>
- <ref bean="nDaysActivityToProtectedJobDetail"/>
- </list>
- </property>
- <property name="schedulerListeners">
- <list>
- <ref bean="quartzExceptionSchedulerListener"/>
- </list>
- </property>
- </bean>
Crm目前能夠作到對Quartz實例的監控,操做.動態部署Trigger


後續待開發功能和問題 1:目前實現對job,Trigger操做,動態部署Trigger,後續須要加入Calendar(排除特定日期),Listener(動態加載監控),Job的動態部署(只要bean的名稱和方法名,就可完成對job生成,部署) 2:因爲Quartz集羣中的job目前是在任意一臺server中執行,Quartz日誌生成各自的系統目錄中, quartz日誌沒法統一. 3:Quartz2.x已經支持可選節點執行job(期待Spring升級後對新Quartz支持) 4:Quartz內部的DB操做大量Trigger存在嚴重競爭問題,瞬間大量trigger執行,目前只能經過(org.quartz.jobStore.tablePrefix = QRTZ)分表操做,存在長時間lock_wait(新版本聽說有提升); 5:若是有須要,能夠抽取出Quartz,變成單獨的服務,供其它系統調度使用使用