spring4.x 集成quartz2.x 集羣化配置項目實例

前言

前段時間領導讓將一個老項目中的定時發送短信的中定時任務獨立出來,實現一個可公用的定時任務平臺,且須要支持集羣環境.
基於以上須要實現的功能有:
1. 定時任務管理:包括任務的crud, 任務的暫停、恢復
2. 任務可持久化

具體實現

1. pom.xml文件中加入所需jar包(這裏spring相關的jar就不展現了)java

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.2.3</version>
</dependency>

2. quartz.properties 配置文件spring

# Configure Main Scheduler Properties ===========================================
#能夠爲任何值,用在jdbc Jobstore中來惟一標識實例,集羣中必須相同
org.quartz.scheduler.instanceName: MyClusteredScheduler
#AUTO 基於主機和時間戳來產生成實例ID,集羣中的每個實例都必須有一個惟一的「instance id」, 應該有相同的「scheduler instance name」
org.quartz.scheduler.instanceId: AUTO
#禁用quartz軟件更新
org.quartz.scheduler.skipUpdateCheck: true

#Configure ThreadPool  執行任務線程池配置==============================================
#線程池類型,執行任務的線程
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
#線程數量
org.quartz.threadPool.threadCount: 2
#線程優先級
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true


# Configure JobStore  任務存儲方式 ====================================================
org.quartz.jobStore.misfireThreshold: 60000
#兩種存儲方式:基於內存的RAMJobStore和基於數據庫的JobStoreSuppot
#(包括JobStoreTX和JobStoreCMT兩種實現,JobStoreCMT是依賴於容器來進行事務管理,而JobStoreTX是本身管理事務)
#這裏的屬性爲JobStoreTX,將任務持久化到數據庫,
#由於集羣中節點依賴數據庫來傳播Scheduler實例的狀態,意味着在集羣裏必須使用JobStoreTX或是JobStoreCMT做爲job存儲
#org.quartz.simpl.RAMJobStore
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX

#JobStoreSupport 使用一個驅動代理來操做 trigger 和 job 的數據存儲
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.OracleDelegate
#若要設置爲true,則將JobDataMaps中的值看成string
org.quartz.jobStore.useProperties=false
#你就告訴了Scheduler實例要它參與到一個集羣當中。這一屬性會貫穿於調度框架的始終,用於修改集羣環境中操做的默認行爲。
org.quartz.jobStore.isClustered=true
#屬性定義了Scheduler實例檢入到數據庫中的頻率(單位:毫秒)。默認值是 15000 (即15 秒)。
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.jobStore.isClustered=true

#這是 JobStore 能處理的錯過觸發的 Trigger 的最大數量。
#處理太多(超過兩打) 很快會致使數據庫表被鎖定夠長的時間,這樣就妨礙了觸發別的(還未錯過觸發) trigger 執行的性能。
org.quartz.jobStore.maxMisfiresToHandleAtATime = 1
org.quartz.jobStore.misfireThreshold = 120000
org.quartz.jobStore.txIsolationLevelSerializable = false

#當事件的JVM終止後,在調度器上也將此事件終止
org.quartz.plugin.shutdownHook.class: org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownHook.cleanShutdown: true




#============================================================================
# Other Example Delegates 其餘的數據庫驅動管理委託類
#============================================================================
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DB2v6Delegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DB2v7Delegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.DriverDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.HSQLDBDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.MSSQLDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PointbaseDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.WebLogicDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.OracleDelegate
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.WebLogicOracleDelegate

3. springContext-quartz.xml 與spring整合sql

<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd 
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.3.xsd ">
    
    <!-- 調度器 -->
    <bean id="clusterScheduler" lazy-init="false" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <!--由於集羣環境下須要將任務狀態持久化  這裏須要爲調度器指定一個數據源,
        同時須要將quartz框架相關的幾個表在你的數據庫中初始化好,去官網下載quartz的壓縮包quartz-2.2.3-distribution.tar.gz,解壓以後在docs\dbTables文件下就能夠找到與之相對應的sql文件
        -->
           <property name="dataSource" ref="dataSourceRWORR"/>
        <property name="configLocation" value="classpath:conf/quartz.properties" />
        <property name="applicationContextSchedulerContextKey" value="applicationContext"/>
        <property name="autoStartup" value="true"></property>
    </bean>
    
    <!--quartz定時任務管理類 
        QuartzManager: 該類由本身實現,須要爲該類注入上文中定義的調度器:clusterScheduler
        有關定時器的全部操做都在該類中實現
    -->  
    <bean id="quartzManager" class="com.fotic.common.quartz.QuartzManager" init-method="init">
        <property name="clusterScheduler" ref="clusterScheduler"></property>
    </bean>
</beans>

4. 編寫定時器管理類:QuartzManager.java數據庫

package com.fotic.common.quartz;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* @Description: 定時器管理
* @author dgq 
* @date 2018年4月25日
*/
public class QuartzManager{
    
    private static Logger logger = LoggerFactory.getLogger(QuartzManager.class);
    
    private Scheduler clusterScheduler;//項目啓動時注入  
    
    public void init() throws SchedulerException{
        /*JobKey jobKey = JobKey.jobKey("sms_job", "sms_job_group");
        JobDetail jobDetail = clusterScheduler.getJobDetail(jobKey);//xml中配置了 
   
        List<? extends Trigger> tigge1 = clusterScheduler.getTriggersOfJob(jobKey);
        System.out.println(tigge1.size());
        Trigger trigger3 = tigge1.get(0);
        
        TriggerKey triggerKey = TriggerKey.triggerKey("sms_trigger", "sms_trigger_group");
        Trigger trigger = clusterScheduler.getTrigger(triggerKey);
        
        TriggerState triggerState2 = clusterScheduler.getTriggerState(triggerKey);
        
        Set<String> pausedTriggerGroups = clusterScheduler.getPausedTriggerGroups();
        
        
        clusterScheduler.scheduleJob(trigger);*/
        
         /*TriggerKey triggerKey = TriggerKey.triggerKey("sms_trigger", "sms_trigger_group");
        clusterScheduler.getListenerManager().addTriggerListener(new MyTriggerListeners(), KeyMatcher.keyEquals(triggerKey));*/
        try {
            
            this.addJob("push_overduedata", "com.fotic.management.sms.job.PushOverdueDataJob", "0 55 17 * * ?");
            this.addJob("sms_send", "com.fotic.sms.job.SmsSendJob", "0/5 * * ? * *");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 啓動定時器
     */
    public boolean start(){
        try {
            clusterScheduler.start();
            return true;
        } catch ( SchedulerException e) {
            e.printStackTrace();
            return false;
        }
    }
    
    /**
     * 關閉調度器
     * @return
     */
    public boolean shutdown(){
        try {
            clusterScheduler.shutdown(true);
            return true;
        } catch (Exception e) {
            logger.info("定時器中止失敗................");
            e.printStackTrace();
            return false;
        }
    }
    
    public void setClusterScheduler(Scheduler clusterScheduler) {  
        this.clusterScheduler = clusterScheduler;  
    }
    
    /**
     * 新增一個job
     * @param jobName job名稱
     * @param jobClass  job類,該類必須繼承: org.quartz.job
     * @param cronExpression "0/5 * * ? * *"
     * @throws ClassNotFoundException 
     * @throws SchedulerException 
     */
    public void addJob(String jobName, String jobClassPath, String cronExpression) throws ClassNotFoundException, SchedulerException{
            
        JobKey jobKey = new JobKey(jobName+"_job", jobName+"_group");
        
        JobDetail jobDetail = clusterScheduler.getJobDetail(jobKey);
        if(!clusterScheduler.checkExists(jobKey)){
            @SuppressWarnings("unchecked")
            Class<? extends Job> targetJob = (Class<? extends Job>) Class.forName(jobClassPath);
            jobDetail = JobBuilder
                    .newJob(targetJob)
                    .withIdentity(jobKey)
                    .build();
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity(jobName+"_trigger", jobName+"_trigger_group")
                    .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
                    .build();
            
            clusterScheduler.scheduleJob(jobDetail, trigger);
        }
        
    }
    
    /** 
     * 暫停定時任務 
     * @param allPushMessage 
     * @throws Exception 
     */  
    public void pauseJob(String jobName) throws Exception {  
        JobKey jobKey = JobKey.jobKey(jobName+"_job", jobName+"_group");  
        try {
            clusterScheduler.pauseJob(jobKey); 
        } catch (SchedulerException e) {  
            logger.info("暫停定時任務失敗"+e);  
            throw new Exception("暫停定時任務失敗");  
        }  
    }  
  
    /** 
     * 恢復任務 
     * @param 
     * @param 
     * @param 
     * @throws Exception 
     */  
    public void resumeJob(String jobName) throws Exception {
  
        JobKey jobKey = JobKey.jobKey(jobName+"_job", jobName+"_group");  
        try {
            clusterScheduler.resumeJob(jobKey);
        } catch (SchedulerException e) {  
            logger.info("恢復定時任務失敗"+e);  
            throw new Exception("恢復定時任務失敗");  
        }  
    }
    
    /**
     * 刪除任務
     * @param jobName
     * @throws Exception
     */
    public void deleteJob(String jobName) throws Exception {
        JobKey jobKey = JobKey.jobKey(jobName+"_job", jobName+"_group"); 
        try {
            clusterScheduler.deleteJob(jobKey);
        } catch (SchedulerException e) {
            logger.info("刪除定時任務失敗"+e);
            throw new Exception("刪除定時任務失敗");  
        }
    }
    
    /**
     * 修改一個觸發器的觸發規則cron Expression
     * @param triggerName
     * @param cron
     * @return
     */
    public boolean updateTrigger(String triggerName, String cron){
        try {
            CronTrigger oldtrigger = (CronTrigger) clusterScheduler.getTrigger(TriggerKey.triggerKey(triggerName+"_trigger", triggerName+"_trigger_group"));
            TriggerBuilder<CronTrigger> tb = oldtrigger.getTriggerBuilder();
            Trigger newTrigger = tb.withSchedule(CronScheduleBuilder.cronSchedule(cron)).build();
            
            clusterScheduler.rescheduleJob(oldtrigger.getKey(), newTrigger);
            return true;
        } catch (Exception e) {
            logger.info("修改定觸發器失敗................");
            e.printStackTrace();
            return false;
        }
    }
}

**5. 提供一個我實現好的job:服務器

實現一個job很簡單,只需實現org.quartz.Job接口便可,org.quartz.Job接口只有一個
execute()方法。這個方法就是定時任務執行時調用的方法,這裏就是咱們業務代碼的入口。
由於job須要持久化,因此也必須實現Serializable類。因爲quartz的bean是由本身管理的沒有交友spring ioc管理,因此沒法經過相似@Autowired這樣的註解注入bean。可是咱們在springContext-quartz.xml中配置了applicationContextSchedulerContextKey,就能夠經過execute 方法的JobExecutionContext參數獲取到ApplicationContext,那麼也就能夠同bean name拿到bea了

**oracle

package com.fotic.management.sms.job;

import java.io.Serializable;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerContext;
import org.quartz.SchedulerException;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;

import com.fotic.management.sms.service.impl.SmsServiceImpl;

/*
* @Description: 按期向短信平臺推送數據
* @author dgq 
* @date 2018年4月25日
*/
@Service
public class PushOverdueDataJob implements Job, Serializable{
    private static final long serialVersionUID = -6605766126594260962L;
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            SchedulerContext schedulerContext = context.getScheduler().getContext();
            ApplicationContext applicationContext = (ApplicationContext) schedulerContext.get("applicationContext");
            SmsServiceImpl ss = (SmsServiceImpl)applicationContext.getBean("smsServiceImpl");//獲取spring bean實例 
            ss.sendSms();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
}

結尾
項目打包集羣部署就能夠看到同一個job並不會在兩個服務器中同時被觸發。我這裏在本地用
Apache搭建了一個簡單的集羣,測試沒問,這裏就不提供Apache集羣的搭建代碼了。核心代碼都在上面了,以此記錄。app

相關文章
相關標籤/搜索