項目使用quartz框架完成了定時任務集羣部署調度,而且對quartz進一步封裝完成在web界面可動態配置定時任務。定時任務若是集羣部署,到達時間點時,同一個任務只能在其中一臺機器上執行。對於quartz框架,其支持分佈式集羣的方案是使用數據庫來加鎖調度。java
如下是quartz分佈式集羣部署,而且能夠動態配置job的代碼。使用了spring和mybatis,數據庫使用了postgresql(用mysql也差很少,只要改下數據源dataSource,以及quartz.properties中的org.quartz.jobStore.driverDelegateClass)。mysql
quartz.properties:web
# Default Properties file for use by StdSchedulerFactory # to create a Quartz Scheduler Instance, if a different # properties file is not explicitly specified. # #org.quartz.scheduler.instanceName: DefaultQuartzScheduler org.quartz.scheduler.instanceName: ClusteredScheduler org.quartz.scheduler.instanceId: AUTO org.quartz.scheduler.rmi.export: false org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: false org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount: 2 org.quartz.threadPool.threadPriority: 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true org.quartz.jobStore.class : org.quartz.impl.jdbcjobstore.JobStoreTX ##這裏使用postgresql數據庫 org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate org.quartz.jobStore.misfireThreshold : 60000 org.quartz.jobStore.useProperties : true org.quartz.jobStore.tablePrefix : QRTZ_ org.quartz.jobStore.isClustered : true org.quartz.jobStore.clusterCheckinInterval : 15000
application-scheduler.xml:spring
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName" default-lazy-init="true"> <bean id="schedulerFactoryBean" lazy-init="true" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <!-- dataSource 需另外配 --> <property name="dataSource" ref="dataSource" /> <property name="waitForJobsToCompleteOnShutdown" value="false" /> <property name="autoStartup" value="true" /> <property name="overwriteExistingJobs" value="true" /> <property name="configLocation" value="classpath:quartz.properties" /> <!-- 因爲是在web界面動態建立job,這裏不定義 <property name="triggers"> <list> <ref bean="testTrigger" /> </list> </property> --> </bean> </beans>
SchedulerJobBO:sql
/** * job信息bean * @author hanxuetong * */ @SuppressWarnings("serial") public class SchedulerJobBO extends BasePO{ /** * 定時任務運行時狀態 */ public static final int SCHEDULER_RUN_STATE=1; /** * 定時任務關閉時狀態 */ public static final int SCHEDULER_STOP_STATE=0; /** * 任務名 */ private String jobName; private String jobGroup; /** * 任務類的路徑 */ private String jobClassPath; /** * cron表達式 */ private String cronExpression; /** * 是否啓動定時 */ private Integer isRun; /** * 世紀運行中的狀態 */ private String triggerState; /** * 世紀運行中的狀態名 */ private String triggerStateName; /** * 描述 */ private String description; public String getJobName() { return jobName; } public void setJobName(String jobName) { this.jobName = jobName; } public String getJobClassPath() { return jobClassPath; } public void setJobClassPath(String jobClassPath) { this.jobClassPath = jobClassPath; } public String getCronExpression() { return cronExpression; } public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; } public Integer getIsRun() { return isRun; } public void setIsRun(Integer isRun) { this.isRun = isRun; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getJobGroup() { return jobGroup; } public void setJobGroup(String jobGroup) { this.jobGroup = jobGroup; } public String getTriggerState() { return triggerState; } public void setTriggerState(String triggerState) { this.triggerState = triggerState; } public String getTriggerStateName() { return triggerStateName; } public void setTriggerStateName(String triggerStateName) { this.triggerStateName = triggerStateName; } }
TriggerStateEnum:數據庫
/** * quartz 任務實時狀態枚舉 * @author hanxuetong * */ public enum TriggerStateEnum { WAITING("WAITING", "等待"), PAUSED("PAUSED", "暫停"), ACQUIRED("ACQUIRED", "正常執行"), BLOCKED("BLOCKED", "阻塞"), ERROR("ERROR", "錯誤"), NORUN("NORUN", "未開啓"); String key; String desc; public static String getDescByKey(String key) { if (key==null) { return ""; } for (TriggerStateEnum triggerStateEnum : TriggerStateEnum.values()) { if (triggerStateEnum.getKey().equals(key)) { return triggerStateEnum.getDesc(); } } return key; } private TriggerStateEnum(String key, String desc) { this.key = key; this.desc = desc; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } }
SchedulerJobDao:express
public interface SchedulerJobDao { public SchedulerJobBO loadById(String id); public void insert(SchedulerJobBO schedulerJobBO); public void update(SchedulerJobBO schedulerJobBO); public void delete(String id); public List<SchedulerJobBO> list(Map<String, Object> params); }
scheduler_job.xml:mybatis
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "classpath:mybatis-3-mapper.dtd"> <mapper namespace="scheduler_job"> <sql id="Base_Column_List" > id ,scheduler_job.job_name jobName ,scheduler_job.job_group jobGroup,cron_expression cronExpression ,job_class_path jobClassPath ,is_run isRun ,scheduler_job.description ,date_create dateCreate ,date_update dateUpdate,date_delete dateDelete ,COALESCE(qrtz_triggers.trigger_state, 'NORUN') triggerState </sql> <select id="loadById" resultType="com.hxt.common.bean.bo.scheduler.SchedulerJobBO" parameterType="String"> SELECT <include refid="Base_Column_List" /> FROM scheduler_job left join qrtz_triggers on qrtz_triggers.job_name=scheduler_job.job_name and qrtz_triggers.job_group=scheduler_job.job_group where scheduler_job.id=#{id} </select> <insert id="insert" parameterType="com.hxt.common.bean.bo.scheduler.SchedulerJobBO"> insert into scheduler_job(id ,job_name ,job_group,cron_expression ,job_class_path ,is_run ,description ,date_create ,date_update ) values(#{id},#{jobName},#{jobGroup},#{cronExpression},#{jobClassPath},#{isRun},#{description}, now(),now()) </insert> <select id="list" resultType="com.hxt.common.bean.bo.scheduler.SchedulerJobBO" parameterType="map"> select <include refid="Base_Column_List" /> from scheduler_job left join qrtz_triggers on qrtz_triggers.job_name=scheduler_job.job_name and qrtz_triggers.job_group=scheduler_job.job_group <trim prefix = "where" prefixOverrides="and|or"> scheduler_job.date_delete is null <if test="id != '' and id != null "> and scheduler_job.id= #{id}</if> <if test="status != '' and status != null"> and scheduler_job.status= #{status}</if> <if test="job_name != '' and job_name != null"> and scheduler_job.job_name= #{jobName}</if> <if test="job_group != '' and job_group != null"> and scheduler_job.job_group= #{jobGroup}</if> </trim> order by scheduler_job.job_name </select> <update id="update" parameterType="com.hxt.common.bean.bo.scheduler.SchedulerJobBO"> update scheduler_job set job_name=#{jobName},job_group=#{jobGroup},cron_expression=#{cronExpression},job_class_path=#{jobClassPath},is_run=#{isRun},description=#{description},date_update=now() where id=#{id} </update> <update id="delete" parameterType="String"> update scheduler_job set date_delete=now() where id=#{id} </update> </mapper>
QuartzSchedulerManage:app
/** * * 動態添加任務 * * @author hanxuetong * */ public interface QuartzSchedulerManage { /** * 建立定時任務 * @param jobName * @param jobGroup * @param cronExpression * @param jobClass * @param jobClassParam 運行時的任務類方法能夠獲取 * @throws SchedulerException */ public void createScheduleJob(String jobName, String jobGroup, String cronExpression,String jobClassPath ,Map<String,Object> jobClassParam) throws Exception; /** * 運行一次任務 * * @param scheduler * @param jobName * @param jobGroup * @throws SchedulerException */ public void runOnce(String jobName, String jobGroup) throws SchedulerException ; /** * 暫停任務 * * @param scheduler * @param jobName * @param jobGroup * @throws SchedulerException */ public void pauseJob(String jobName, String jobGroup) throws SchedulerException ; /** * 恢復任務 * * @param scheduler * @param jobName * @param jobGroup * @throws SchedulerException */ public void resumeJob(String jobName, String jobGroup) throws SchedulerException; /** * 更新定時任務 * * @param scheduler the scheduler * @param jobName the job name * @param jobGroup the job group * @param cronExpression the cron expression * @param isSync the is sync * @param param the param * @throws SchedulerException */ public void updateScheduleJob(String jobName, String jobGroup, String cronExpression) throws SchedulerException; /** * 刪除定時任務 * * @param scheduler * @param jobName * @param jobGroup * @throws SchedulerException */ public void deleteScheduleJob(String jobName, String jobGroup) throws SchedulerException; }
QuartzSchedulerManageImpl:框架
/** * * 動態添加任務 * * @author hanxuetong * */ @Service public class QuartzSchedulerManageImpl implements QuartzSchedulerManage{ @Autowired SchedulerFactoryBean schedulerFactoryBean; private Scheduler getScheduler(){ return schedulerFactoryBean.getScheduler(); } /** * 獲取觸發器key * * @param jobName * @param jobGroup * @return */ private TriggerKey getTriggerKey(String jobName, String jobGroup) { return TriggerKey.triggerKey(jobName, jobGroup); } private Class<? extends Job> getClassByPath(String jobClassPath) throws Exception { Class<? extends Job> clazz; try { clazz = (Class<? extends Job>) Class.forName(jobClassPath); } catch (Exception e) { throw new SchedulerException("任務類加載失敗!!"); } return clazz; } /** * 獲取表達式觸發器 * * @param scheduler the scheduler * @param jobName the job name * @param jobGroup the job group * @return cron trigger * @throws SchedulerException */ public CronTrigger getCronTrigger(String jobName, String jobGroup) throws SchedulerException { Scheduler scheduler = getScheduler(); try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); return (CronTrigger) scheduler.getTrigger(triggerKey); } catch (SchedulerException e) { throw new SchedulerException("獲取定時任務CronTrigger出現異常"); } } /** * 建立定時任務 * @param jobName * @param jobGroup * @param cronExpression * @param jobClass * @param jobClassParam 運行時的任務類方法能夠獲取 * @throws SchedulerException */ public void createScheduleJob(String jobName, String jobGroup, String cronExpression,String jobClassPath ,Map<String,Object> jobClassParam) throws Exception { //同步或異步 // Class<? extends Job> jobClass = isSync ? JobSyncFactory.class : JobFactory.class; try { Class<? extends Job> jobClass=getClassByPath( jobClassPath); Scheduler scheduler = getScheduler(); //構建job信息 JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).build(); if(jobClassParam!=null&&jobClassParam.size()>0){ //放入參數,運行時的方法能夠獲取 for(Map.Entry<String, Object> entry:jobClassParam.entrySet()){ jobDetail.getJobDataMap().put(entry.getKey(), entry.getValue()); } } //表達式調度構建器 加上 withMisfireHandlingInstructionDoNothing防止啓動就運行 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); //按新的cronExpression表達式構建一個新的trigger CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup) .withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException e) { throw new SchedulerException("建立定時任務失敗"); }catch (Exception e) { throw e; } } /** * 運行一次任務 * * @param scheduler * @param jobName * @param jobGroup * @throws SchedulerException */ public void runOnce(String jobName, String jobGroup) throws SchedulerException { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); try { Scheduler scheduler = getScheduler(); scheduler.triggerJob(jobKey); } catch (SchedulerException e) { throw new SchedulerException("運行一次定時任務失敗"); } } /** * 暫停任務 * * @param scheduler * @param jobName * @param jobGroup * @throws SchedulerException */ public void pauseJob(String jobName, String jobGroup) throws SchedulerException { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); try { Scheduler scheduler = getScheduler(); scheduler.pauseJob(jobKey); } catch (SchedulerException e) { throw new SchedulerException("暫停定時任務失敗"); } } /** * 恢復任務 * * @param scheduler * @param jobName * @param jobGroup * @throws SchedulerException */ public void resumeJob(String jobName, String jobGroup) throws SchedulerException { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); try { Scheduler scheduler = getScheduler(); scheduler.resumeJob(jobKey); } catch (SchedulerException e) { throw new SchedulerException("暫停定時任務失敗"); } } /** * 獲取jobKey * * @param jobName the job name * @param jobGroup the job group * @return the job key */ public JobKey getJobKey(String jobName, String jobGroup) { return JobKey.jobKey(jobName, jobGroup); } /** * 更新定時任務 * * @param scheduler the scheduler * @param jobName the job name * @param jobGroup the job group * @param cronExpression the cron expression * @param isSync the is sync * @param param the param * @throws SchedulerException */ public void updateScheduleJob(String jobName, String jobGroup, String cronExpression) throws SchedulerException { //同步或異步 // Class<? extends Job> jobClass = isSync ? JobSyncFactory.class : JobFactory.class; try { Scheduler scheduler = getScheduler(); TriggerKey triggerKey = getTriggerKey(jobName, jobGroup); //表達式調度構建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); //按新的cronExpression表達式從新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder) .build(); //按新的trigger從新設置job執行 scheduler.rescheduleJob(triggerKey, trigger); } catch (SchedulerException e) { throw new SchedulerException("更新定時任務失敗"); } } /** * 刪除定時任務 * * @param scheduler * @param jobName * @param jobGroup * @throws SchedulerException */ public void deleteScheduleJob(String jobName, String jobGroup) throws SchedulerException { try { Scheduler scheduler = getScheduler(); scheduler.deleteJob(getJobKey(jobName, jobGroup)); } catch (SchedulerException e) { throw new SchedulerException("刪除定時任務失敗"); } } public SchedulerFactoryBean getSchedulerFactoryBean() { return schedulerFactoryBean; } public void setSchedulerFactoryBean(SchedulerFactoryBean schedulerFactoryBean) { this.schedulerFactoryBean = schedulerFactoryBean; } }
SchedulerJobService :
public interface SchedulerJobService { public void add(SchedulerJobBO schedulerJobBO) throws Exception ; public void update(SchedulerJobBO schedulerJobBO) throws Exception ; public void delete(String id) throws Exception ; public SchedulerJobBO loadById(String id); public List<SchedulerJobBO> list(); public void startAllJob(); }
SchedulerJobServiceImpl:
@Service public class SchedulerJobServiceImpl implements SchedulerJobService{ private static final Logger logger = LoggerFactory.getLogger(SchedulerJobServiceImpl.class); @Autowired private SchedulerJobDao schedulerJobDao; @Autowired private QuartzSchedulerManage quartzSchedulerManage; @Override public void add(SchedulerJobBO schedulerJobBO) throws Exception { if(schedulerJobBO.getIsRun()==SchedulerJobBO.SCHEDULER_RUN_STATE){ quartzSchedulerManage.createScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression(), schedulerJobBO.getJobClassPath(), null); } schedulerJobDao.insert(schedulerJobBO); } @Override public void update(SchedulerJobBO schedulerJobBO) throws Exception { if(StringUtil.isEmpty(schedulerJobBO.getId())){ throw new SchedulerException("id can not null!"); } SchedulerJobBO selectSchedulerJobBO=schedulerJobDao.loadById(schedulerJobBO.getId()); if(selectSchedulerJobBO==null){ throw new SchedulerException("schedulerJob is null!"); } if(schedulerJobBO.getIsRun()==SchedulerJobBO.SCHEDULER_RUN_STATE){ //任務啓動 if(!selectSchedulerJobBO.getJobClassPath().equals(schedulerJobBO.getJobClassPath())){ // 任務類路徑已經變,需刪除定時,再從新創建新任務 if(checkHasJobRun(selectSchedulerJobBO)){ quartzSchedulerManage.deleteScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup()); } quartzSchedulerManage.createScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression(), schedulerJobBO.getJobClassPath(), null); }else{ if(!checkHasJobRun(selectSchedulerJobBO)){ // quartz中沒有該任務 quartzSchedulerManage.createScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression(), schedulerJobBO.getJobClassPath(), null); }else{ if(!selectSchedulerJobBO.getCronExpression().equals(schedulerJobBO.getCronExpression())){ //Cron表達式改變 quartzSchedulerManage.updateScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression()); } } } }else{ //任務關閉 if(checkHasJobRun(selectSchedulerJobBO)){ //當前任務quartz中存在 quartzSchedulerManage.deleteScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup()); } } schedulerJobDao.update(schedulerJobBO); } /** * 判斷quartz中是否有該任務 * @param selectSchedulerJobBO * @return true: quartz有任務 , false:quartz無任務 */ private boolean checkHasJobRun(SchedulerJobBO selectSchedulerJobBO){ return !TriggerStateEnum.NORUN.getKey().equals(selectSchedulerJobBO.getTriggerState()); } @Override public void delete(String id) throws Exception { SchedulerJobBO selectSchedulerJobBO=schedulerJobDao.loadById(id); if(selectSchedulerJobBO==null){ throw new SchedulerException("schedulerJob is null!"); } if(selectSchedulerJobBO.getIsRun()==SchedulerJobBO.SCHEDULER_RUN_STATE){ quartzSchedulerManage.deleteScheduleJob(selectSchedulerJobBO.getJobName(), selectSchedulerJobBO.getJobGroup()); } schedulerJobDao.delete(id); } @Override public SchedulerJobBO loadById(String id) { return schedulerJobDao.loadById(id); } @Override public List<SchedulerJobBO> list() { return schedulerJobDao.list(new HashMap<String,Object>()); } /* public QuartzSchedulerManageImpl getQuartzSchedulerEngine() { return quartzSchedulerEngine; } public void setQuartzSchedulerEngine(QuartzSchedulerManageImpl quartzSchedulerEngine) { this.quartzSchedulerEngine = quartzSchedulerEngine; } public void setSchedulerJobDao(SchedulerJobDao schedulerJobDao) { this.schedulerJobDao = schedulerJobDao; } */ @Override public void startAllJob() { logger.info("start up all jobs!"); List<SchedulerJobBO> schedulerJobs= list(); for(SchedulerJobBO schedulerJobBO:schedulerJobs){ if(schedulerJobBO.getIsRun()==SchedulerJobBO.SCHEDULER_RUN_STATE&&!checkHasJobRun(schedulerJobBO) ){ try { quartzSchedulerManage.createScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression(), schedulerJobBO.getJobClassPath(), null); } catch (Exception e) { e.printStackTrace(); } } } logger.info("end all jobs load!"); } }
BaseJob:
/** * job具體實現須要繼承此類 * @author hanxuetong * */ public abstract class BaseJob implements Job { protected final Logger logger= LoggerFactory.getLogger(BaseJob.class); public abstract void run(); @Override public void execute(JobExecutionContext context) throws JobExecutionException { try{ run(); }catch(Exception t){ logger.error("Job throw exception", t); t.printStackTrace(); } } /** * 根據BeanId獲取Bean實例 */ public <T> T getBean(String beanId, Class<T> clazz){ return ApplicationContextUtil.getBean(beanId, clazz); } /** * 根據BeanId獲取Bean實例 */ public <T> T getBean(Class<T> clazz){ return ApplicationContextUtil.getApplicationContext().getBean(clazz); } }
TestJobTask:
public class TestJobTask extends BaseJob{ private static final Logger logger = LoggerFactory.getLogger(TestJobTask.class); @Override public void run() { System.out.println("run test start "); logger.debug(" run test!"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("run test end "); } }
固然數據庫中還要導入quartz的對應數據源的sql,在下載的quartz-2.2.2-distribution中\docs\dbTables\ 目錄下。本項目中用到postgresql,則是tables_postgres.sql。