Schedule 調度系統設計(單機版)

  • 鑑於對Spring實現的@Scheduled的調度和SchedulerFactoryBean的研究發現,基於Spring的調度封裝雖知足了大多需求,但其爲了簡化使用方式,過分封裝使得Job並不容易控制和運維,致使開發對Job的控制和運維成本上升;下面是本人基於Quartz和Spring及Annotation開發的單機版調度配置DEMO,知足單機調度的大部分需求和管理、運維操做並解放對配置文件的繁瑣操做;
    下面對Spring的@Scheduled註解和SchedulerFactoryBean配置及自定義@SchedulerJob作對比;

源碼地址


功能點描述

功能點 Spring @Scheduled 自定義@SchedulerJob
可控制
可運維
可頁面化
可統一跟蹤業務狀態
可統一跟蹤調度狀態
支持cron表達式
支持相似ScheduledExecutorService的定時調度

代碼演示

  • 未改造前的做業配置
    對於圖示調度配置,再實現對Job的可控制和可運維的前提下保證代碼的簡潔和程序性能,須要細讀Spring源碼,將targetObject和targetMethod從代理中剝離出來,統一放到容器裏作統一調度控制(不少時候基於最小改動、系統穩定性、資源利用的原則,不得不採起這種剝離方式);
/**
  * @author baiyunpeng
  * 抽取Job控制信息
  */
 MethodInvokingJobDetailFactoryBean methodInvoker = (MethodInvokingJobDetailFactoryBean) jobDetail.getJobDataMap().get("methodInvoker");
                String jobName = jobDetail.getKey().getName();
                String group = jobDetail.getKey().getGroup();
                String className = methodInvoker.getTargetClass().getName();
複製代碼

  • 基於註解進行做業配置
@Slf4j(topic = "dynamic-datasource")
@Component
public class DetectJob {
   /**
     * 做業配置 value=做業名,group=做業所屬組,init=true爲容器建立完畢時當即觸發
     */
    @SchedulerJob(value = "detectDataSource",cron = "${cron.detect.data.source}",group = "dynamic-datasource",
            descrption="動態數據源切換",init = true)
    public void detectDataSource(){
        log.info(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"dynamic-datasource","detectDataSource"));
    }
}
複製代碼
##cron表達式
cron.detect.data.source=1 * * * * ? 
複製代碼
  • 代碼執行效果


頁面演示

經過頁面可對做業進行統一的監控和管理(觸發、暫停、恢復、動態添加、參數下發)及報警等操做;bash

簡要列出如下功能點:併發

  • 做業展現
  • 做業運維報警
  • 做業參數下發
  • 做業事件跟蹤


設計思路

  • 應當知足什麼業務場景
  • 如何簡化操做、下降開發成本
  • 如何對業務、系統功能進行監控、控制、運維
  • 如何設計才能便於後期業務和功能的擴展

功能設計

  • 設計思路
    • 如何獲取方法上的註解及配置
    • 如何實現經過Quartz定時執行註解方法
    • 如何對每一個方法上的註解進行統一的資源管理和監控、控制、運維
    • 如何對調度進行性能的優化
  • 功能點分析
    • 基本調度
      • 初始化當即調度
      • 人工或系統控制調度(任務建立後不執行調度,控制權交給外部)
      • 定時執行調度(及按照指定cron配置週期調度)
      • 是否可併發執行
    • 資源管理
      • 統一管理系統內所有的配置資源(做業所屬組、描述、cron表達式、是否開啓報警、是否開啓監控等)
    • 調度管理
      • 調度狀態管理(系統狀態、業務狀態)
      • 調度行爲管理
      • 做業業務參數下發(彌補業務過失)
      • 調度跟蹤、業務跟蹤
      • 調度報警、業務報警
  • 基本功能點實現
    • 註解配置
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SchedulerJob {
  /**
   * 做業名
   * @return
   */
  String value();

  /**
   * 表達式
   * @return
   */
  String cron();

  /**
   * 是否初始化時當即執行
   * @return
   */
  boolean init() default false;

  /**
   * 是否人爲控制
   * @return
   */
  boolean control() default false;

  /**
   * 所屬組
   * @return
   */
  String group() default "default";

  /**
   * 做業描述
   * @return
   */
  String descrption() default "";

  /**
   * 做業執行器
   * @return
   */
  Class jobClass() default SimpleJob.class;
}

@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SchedulerJobs {
    /**
     * 註解集
     * @return
     */
    SchedulerJob[] value();
}
複製代碼
  • 調度建立
@Slf4j
@Configuration
public class SchedulerBean implements InitializingBean, DisposableBean {

    private Scheduler scheduler;

   @Value("#{schdulerProperties['quartz.thread.count']}")
    private String threadCount;

    @Override
    public void destroy() throws Exception {
        scheduler.shutdown();
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        createScheduler();
    }

    /**
     * 建立調度
     * @throws SchedulerException
     */
    public void createScheduler() throws SchedulerException {
        StdSchedulerFactory factory = new StdSchedulerFactory();
        factory.initialize(getBaseQuartzProperties());
        this.scheduler = factory.getScheduler();
    }

    /**
     * 做業配置
     * @return
     */
    private Properties getBaseQuartzProperties() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", threadCount);
        result.put("org.quartz.scheduler.threadName", "baiyunpeng-scheduler");
        result.put("org.quartz.scheduler.instanceName", "baiyunpeng-scheduler");
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        return result;
    }

    /**
     * 建立做業
     * @param jobParam
     * @throws SchedulerException
     */
    public void createJob(JobParam jobParam) throws SchedulerException {
        SchedulerJob schedulerJob = jobParam.getSchedulerJob();
        JobDetail jobDetail = JobBuilder.newJob(schedulerJob.jobClass())
                .withIdentity(jobParam.getJobKey())
                .withDescription(jobParam.getJobKey().getName())
                .build();
        addJobDataMap(jobDetail,jobParam.getTarget(),jobParam.getTargetMethod());
        this.scheduler.scheduleJob(jobDetail,createTrigger(jobParam.getJobKey(),jobParam.getCron()));
    }

    /**
     * 建立觸發器
     * @param jobKey
     * @param cron
     * @return
     */
    private Trigger createTrigger(JobKey jobKey, String cron) {
        return TriggerBuilder.newTrigger().withIdentity(jobKey.getName(),jobKey.getGroup())
                .withSchedule(CronScheduleBuilder.cronSchedule(cron)
                .withMisfireHandlingInstructionDoNothing()).build();
    }

    /**
     * 添加做業map
     * @param jobDetail
     * @param target
     * @param targetMethod
     */
    private void addJobDataMap(JobDetail jobDetail, Object target, Method targetMethod) {
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        jobDataMap.put("executeJob",target);
        jobDataMap.put("executeMethod",targetMethod);
    }

    public Scheduler getScheduler() {
        return scheduler;
    }

    public void start() throws SchedulerException {
        this.scheduler.start();
    }
}
複製代碼
  • 簡單的做業執行器建立
/**
 * 做業抽象類
 * @author baiyunpeng
 */
public abstract class ExecuteJob implements Job {

    protected Object executeJob;

    protected Method executeMethod;

    protected void setExecuteJob(Object executeJob) {
        this.executeJob = executeJob;
    }

    protected void setExecuteMethod(Method executeMethod) {
        this.executeMethod = executeMethod;
    }
}  

/**
 * 非併發執行
 * @author baiyunpeng
 */
@Slf4j
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SimpleJob extends ExecuteJob {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            executeMethod.invoke(executeJob);
        } catch (IllegalAccessException | InvocationTargetException e) {
            log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e)));
        }
    }
}

/**
 * 可併發執行
 * @author baiyunpeng
 */
@Slf4j
public class ConcurrentJob extends ExecuteJob{
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            executeMethod.invoke(executeJob);
        } catch (IllegalAccessException | InvocationTargetException e) {
            log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e)));
        }
    }
}
複製代碼
  • 做業建立
/**
       * 做業配置解析
       * @param scheduled
       * @param method
       * @param bean
      */
    protected void processScheduled(SchedulerJob scheduled, Method method, Object bean) {
            Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
            String cron = scheduled.cron();
            if(StringUtils.hasText(cron)){
                if(Objects.nonNull(this.embeddedValueResolver)){
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                }
                jobParams.add(new JobParam(scheduled,bean,invocableMethod,new JobKey(scheduled.value(),scheduled.group()),cron));
            }
        }
    
    /**
     * 做業初始化
     */
    private void finishRegister() {
        if(Objects.isNull(this.schedulerBean)){
            SchedulerBean schedulerBean = beanFactory.getBean(SCHEDULER_BEAN, SchedulerBean.class);
            AssertUtil.assertNull(schedulerBean, SystemErrorCode.NS000000,"the scheduler bean init error");
            this.schedulerBean = schedulerBean;
            try {
                jobParams.parallelStream().forEach(jobParam -> {
                    try {
                        this.schedulerBean.createJob(jobParam);
                        SchedulerJob schedulerJob = jobParam.getSchedulerJob();
                        if(!schedulerJob.control()){
                            if (schedulerJob.init()){
                                this.schedulerBean.getScheduler().triggerJob(jobParam.getJobKey());
                            }
                        }else {
                            this.schedulerBean.getScheduler().pauseJob(jobParam.getJobKey());
                        }
                    } catch (SchedulerException e) {
                        log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e)));
                       System.exit(1);
                    }
                });
                schedulerBean.start();
            }catch (Exception e){
                log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e)));
                System.exit(1);
            }
        }
    }
    
複製代碼

總結

  • 如何異步執行方法,首先得獲取該方法的實例
  • 如何定時執行,首先建立並獲取定時器
  • 如何基於Quarzt監控做業執行,需獲Schedule和Jobkey等
  • 如何對做業調度狀態作監控,應當抽象出統一的JobWrapper來實現對調度的記錄
  • 如何對做業的業務狀態作監控,定義接口返回的Result,對返回值和狀態進行封裝;並借鑑上條思路作統一健康;
  • 如何採集全部做業信息,在上訴功能點描述中的做業建立過程當中對做業進行內存存儲或持久存儲;
  • 如何將調度行爲可控,獲取JobKey便可對調度做業控制;
  • 如何動態爲做業添加報警,一樣基於內存或DB實現對做業的報警的開啓和關閉;
  • 如何進行調度或業務的狀態上報,建議使用事件機制作異步上報(分佈式下有采用Http、MQ等);

後續更新

  • 如何統一監控調度狀態和業務狀態
  • 如何解決work線程池被任務阻塞的問題
  • 如何作任務補發(note:除了misfire機制外還有哪些作法)
  • 如何基於單機調度實現基本的分佈式調度
  • 分佈式調度須要考慮的點有哪些
相關文章
相關標籤/搜索