[學習筆記]Elastic-job+spring-boot實現多任務配置

目的

在實際生產中利用Elastic-job+spring-boot用註解實現多任務的配置spring

準備

  1. maven中添加Elastic-job相關jar包
<dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>${elastic-job.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>curator-client</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>curator-framework</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>curator-recipes</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
            </exclusions>

        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>${elastic-job.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>curator-client</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>curator-framework</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>curator-recipes</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
            </exclusions>
        </dependency>
複製代碼

注意:若是項目中單獨導入了高版本的zookeeper的客戶端curator包則須要在Elastic-job包中剔除curator,不然會發生curator的版本衝突數據庫

  1. 編寫配置類

即在yaml配置文件中配置Elastic-job-lite 依賴的zookeeper 分佈式調度服務器參數apache

@Configuration
public class JobParserAutoConfiguration {

    /**
     * 服務地址列表 包括IP地址和端口號
     */
    @Value("${elasticjob.reg-center.server-lists}")
    private String serverList;

    /**
     * 註冊中心的命名空間 
     */
    @Value("${elasticjob.reg-center.namespace}")
    private String namespace;

    /**
     * 等待重試的間隔時間的初始值.
     * 單位毫秒.
     */
    @Value("${elasticjob.reg-center.baseSleepTimeMilliseconds}")
    private Integer baseSleepTimeMilliseconds;
    /**
     * 等待重試的間隔時間的最大值.
     * 單位毫秒.
     */
    @Value("${elasticjob.reg-center.maxSleepTimeMilliseconds}")
    private Integer maxSleepTimeMilliseconds;
    /**
     * 會話超時時間.
     * 單位毫秒.
     */
    @Value("${elasticjob.reg-center.sessionTimeoutMilliseconds}")
    private Integer sessionTimeoutMilliseconds;
    /**
     * 鏈接超時時間.
     * 單位毫秒.
     */
    @Value("${elasticjob.reg-center.connectionTimeoutMilliseconds}")
    private Integer connectionTimeoutMilliseconds;
    /**
     * 最大重試次數.
     */
    @Value("${elasticjob.reg-center.maxRetries}")
    private Integer maxRetries;


    @Bean
    public JobConfParser jobConfParser() {
        return new JobConfParser();
    }

    /**
     * 初始化zookeeper註冊中心
     * @return ZookeeperRegistryCenter
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter() {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
        zookeeperConfiguration.setBaseSleepTimeMilliseconds(baseSleepTimeMilliseconds);
        zookeeperConfiguration.setConnectionTimeoutMilliseconds(connectionTimeoutMilliseconds);
        zookeeperConfiguration.setMaxSleepTimeMilliseconds(maxSleepTimeMilliseconds);
        zookeeperConfiguration.setSessionTimeoutMilliseconds(sessionTimeoutMilliseconds);
        zookeeperConfiguration.setMaxRetries(maxRetries);
        return new ZookeeperRegistryCenter(zookeeperConfiguration);
    }

}
複製代碼
  1. 新增自定義任務配置註解
@Component
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConf {

    /*********************DataflowJobConfiguration START********************/

    /**
     * 做業名稱
     *
     * @return
     */
    String name();

    /**
     * 類型
     */
    JobType jobType();

    /**
     * cron表達式,用於控制做業觸發時間
     *
     * @return
     */
    String cron() default "";

    /**
     * 做業分片總數
     *
     * @return
     */
    String shardingTotalCount() default "1";

    /**
     * 分片序列號和參數用等號分隔,多個鍵值對用逗號分隔
     * <p>分片序列號從0開始,不可大於或等於做業分片總數<p>
     * <p>如:<p>
     * <p>0=a,1=b,2=c<p>
     *
     * @return
     */
    String shardingItemParameters() default "";

    /**
     * 做業自定義參數
     * <p>做業自定義參數,可經過傳遞該參數爲做業調度的業務方法傳參,用於實現帶參數的做業<p>
     * <p>例:每次獲取的數據量、做業實例從數據庫讀取的主鍵等<p>
     *
     * @return
     */
    String jobParameter() default "";

    /**
     * 是否開啓任務執行失效轉移,開啓表示若是做業在一次任務執行中途宕機,容許將該次未完成的任務在另外一做業節點上補償執行
     *
     * @return
     */
    boolean failover() default false;

    /**
     * 是否開啓錯過任務從新執行
     *
     * @return
     */
    boolean misfire() default false;

    /**
     * 做業描述信息
     *
     * @return
     */
    String description() default "";

    boolean overwrite() default false;

    /*********************DataflowJobConfiguration END********************/


    /*********************DataflowJobConfiguration START********************/

    /**
     * 是否流式處理數據
     * <p>若是流式處理數據, 則fetchData不返回空結果將持續執行做業<p>
     * <p>若是非流式處理數據, 則處理數據完成後做業結束<p>
     *
     * @return
     */
    boolean streamingProcess() default false;

    /*********************DataflowJobConfiguration END********************/


    /*********************ScriptJobConfiguration START********************/

    /**
     * 腳本型做業執行命令行
     *
     * @return
     */
    String scriptCommandLine() default "";

    /*********************ScriptJobConfiguration END********************/


    /*********************LiteJobConfiguration START********************/

    /**
     * 監控做業運行時狀態
     * <p>每次做業執行時間和間隔時間均很是短的狀況,建議不監控做業運行時狀態以提高效率。<p>
     * <p>由於是瞬時狀態,因此無必要監控。請用戶自行增長數據堆積監控。而且不能保證數據重複選取,應在做業中實現冪等性。<p>
     * <p>每次做業執行時間和間隔時間均較長的狀況,建議監控做業運行時狀態,可保證數據不會重複選取。<p>
     *
     * @return
     */
    boolean monitorExecution() default true;

    /**
     * 做業監控端口
     * <p>建議配置做業監控端口, 方便開發者dump做業信息。<p>
     * <p>使用方法: echo 「dump」 | nc 127.0.0.1 9888<p>
     *
     * @return
     */
    int monitorPort() default -1;

    /**
     * 大容許的本機與註冊中心的時間偏差秒數
     * <p>若是時間偏差超過配置秒數則做業啓動時將拋異常<p>
     * <p>配置爲-1表示不校驗時間偏差<p>
     *
     * @return
     */
    int maxTimeDiffSeconds() default -1;

    /**
     * 做業分片策略實現類全路徑,默認使用平均分配策略
     *
     * @return
     */
    String jobShardingStrategyClass() default "";

    /**
     * 修復做業服務器不一致狀態服務調度間隔時間,配置爲小於1的任意值表示不執行修復,單位:分鐘
     *
     * @return
     */
    int reconcileIntervalMinutes() default 10;

    /**
     * 做業事件追蹤的數據源Bean引用
     *
     * @return
     */
    String eventTraceRdbDataSource() default "";

    /*********************LiteJobConfiguration END********************/

    /**
     * 前置後置任務監聽實現類,需實現ElasticJobListener接口
     *
     * @return
     */
    String listener() default "";

    /**
     * 做業是否禁止啓動,可用於部署做業時,先禁止啓動,部署結束後統一啓動
     *
     * @return
     */
    String disabled() default "false";

    /**
     * 前置後置任務分佈式監聽實現類,需繼承AbstractDistributeOnceElasticJobListener類
     *
     * @return
     */
    String distributedListener() default "";

    /**
     * 最後一個做業執行前的執行方法的超時時間,單位:毫秒
     *
     * @return
     */
    long startedTimeoutMilliseconds() default Long.MAX_VALUE;

    /**
     * 最後一個做業執行後的執行方法的超時時間,單位:毫秒
     *
     * @return
     */
    long completedTimeoutMilliseconds() default Long.MAX_VALUE;

    /**
     * 自定義異常處理類
     *
     * @return
     */
    String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";

    /**
     * 自定義業務處理線程池
     *
     * @return
     */
    String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
}
複製代碼

這些配置均可以在官方文檔中查到,這裏只是整合起來方便用註解配置多個執行的任務 配置手冊 固然也能夠自定義一些屬性,最後在初始化任務的時候去實現便可服務器

  1. 初始化配置的定時任務及定時任務監聽
Map<String, Object> beanMap = ctx.getBeansWithAnnotation(ElasticJobConf.class);
        for (Object confBean : beanMap.values()) {
            Class<?> clz = confBean.getClass();

            ElasticJobConf conf = AnnotationUtils.findAnnotation(clz, ElasticJobConf.class);
            
	/**讀取註解配置屬性略 */

            JobCoreConfiguration coreConfig = getJobCoreConfiguration(jobName, cron, shardingItemParameters, description, jobParameter, jobExceptionHandler, executorServiceHandler, failover, misfire, shardingTotalCount);

            // 不一樣類型的任務配置處理
            JobTypeConfiguration typeConfig = getJobTypeConfiguration(jobTypeName, jobClass, scriptCommandLine, streamingProcess, coreConfig);

            LiteJobConfiguration jobConfig = getLiteJobConfiguration(jobShardingStrategyClass, overwrite, disabled, monitorExecution, monitorPort, maxTimeDiffSeconds, reconcileIntervalMinutes, typeConfig);

            List<BeanDefinition> elasticJobListeners = getTargetElasticJobListeners(conf);

            // 構建SpringJobScheduler對象來初始化任務
            BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
            factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
            if (jobTypeName.equals(JobType.SCRIPT)) {
                factory.addConstructorArgValue(null);
            } else {
                factory.addConstructorArgValue(confBean);
            }

            factory.addConstructorArgValue(zookeeperRegistryCenter);
            factory.addConstructorArgValue(jobConfig);

            factory.addConstructorArgValue(elasticJobListeners);
            DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) ctx.getAutowireCapableBeanFactory();
            defaultListableBeanFactory.registerBeanDefinition(jobName + "SpringJobScheduler", factory.getBeanDefinition());
            SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean(jobName + "SpringJobScheduler");
            springJobScheduler.init();
            log.info("【" + jobName + "】\t" + jobClass + "\tinit success");
        }
複製代碼

這裏主要是將註解配置的定時任務類註冊到SpringJobScheduler並由它來統一的初始化各個定時任務,每一個定時任務的實例的管理則是由spring來控制markdown

  1. 實現定時任務
@Slf4j
@Component
@ElasticJobConf(jobType = JobType.DATAFLOW, cron = "${elasticjob.jobs.dataflowJob.mark.cron}", name = "${elasticjob.jobs.dataflowJob.mark.jobName}",
        shardingTotalCount = "${elasticjob.jobs.dataflowJob.mark.shardingTotalCount}", shardingItemParameters = "${elasticjob.jobs.dataflowJob.mark.shardingItemParameters}",disabled =
        "${elasticjob.jobs.dataflowJob.mark.disabled}"
)
public class DemoDataflowJob implements DataflowJob<String> {

    @Override
    public List<String> fetchData(ShardingContext shardingContext) {
        //拉取數據邏輯
    }

    @Override
    public void processData(ShardingContext shardingContext, List<String> list) {
       //數據處理邏輯
    }
}
複製代碼

其中shardingContext能夠作任務的分片處理,具體詳情可見config配置配置手冊session

相關文章
相關標籤/搜索