Springboot整合Elastic-Job(二)

上文咱們講到Springboot整合Elastic-Job整合的demo,只是簡單的實現了主要功能。本文在上文基礎上,進行新的調整。git

事件追蹤


 Elastic-Job提供了事件追蹤功能,可經過事件訂閱的方式處理調度過程的重要事件,用於查詢、統計和監控。Elastic-Job目前提供了基於關係型數據庫兩種事件訂閱方式記錄事件。咱們只須要將添加以下配置便可數據庫

/**
     * 將做業運行的痕跡進行持久化到DB
     */
    @Bean
    public JobEventConfiguration jobEventConfiguration(){
        return new JobEventRdbConfiguration(dataSource);
    }

項目運行後,Elastic-Job會自動建立JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG兩張表以及若干索引。app

 

使用註解


 

上文咱們添加一個任務的步驟是,定義一個任務類,再在配置類中定義任務屬性,並加入到SpringJobScheduler。若是咱們有幾百個任務,配置類基本就沒法維護了。那怎麼優化呢,咱們能夠參考@Schedual註解,在job上定義一個註解,每次啓動的時候掃描註解自動將job加入到SpringJobScheduler中。ide

1.抽象添加job方法

@Component
public class ElasticJobHandler {
    @Autowired
    private ZookeeperRegistryCenter regCenter;
    @Resource
    private JobEventConfiguration jobEventConfiguration;
    @Resource
    private ElasticJobListener elasticJobListener;

    /**
     * @Description 任務配置類
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                        , jobClass.getCanonicalName())
        ).overwrite(true).build();
    }

    public void addJob(final SimpleJob simpleJob,
                       final String cron,
                       final Integer shardingTotalCount,
                       final String shardingItemParameters)
            throws IllegalAccessException, InstantiationException {
        LiteJobConfiguration jobConfig =
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters);

         new SpringJobScheduler(simpleJob, regCenter, jobConfig, jobEventConfiguration, elasticJobListener).init();
    }
}

 

2.添加ElasticScheduler註解

@Component
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticScheduler {
    /**
     * 任務名稱
     * @return
     */
    String name();

    /**
     * cron表達式,用於控制做業觸發時間
     * @return
     */
    String cron() default "";

    /**
     * 分片參數
     * @return
     */
    String shardingItemParameters() default "";

    /**
     * 總分片數
     * @return
     */
    int shardingTotalCount();

    /**
     * 任務描述信息
     * @return
     */
    String description() default "";
}

3.定義掃描方法

@Component
public class ElasticSchedulerAspect implements ApplicationContextAware, InitializingBean {
    private ApplicationContext applicationContext;
    @Autowired
    private ElasticJobHandler elasticJobHandler;
    @Override
    public void afterPropertiesSet() throws Exception {
        registrJob(applicationContext);
    }

    /**
     * 解析context信息,開始註冊
     * @param applicationContext
     */
    private void registrJob(ApplicationContext applicationContext) {
        String[] beanNamesForAnnotation = applicationContext.getBeanNamesForAnnotation(ElasticScheduler.class);
        for (String beanName : beanNamesForAnnotation) {
            Class<?> handlerType = applicationContext.getType(beanName);
            Object bean = applicationContext.getBean(beanName);
            ElasticScheduler annotation = AnnotationUtils.findAnnotation(handlerType, ElasticScheduler.class);
            addJobToContext(annotation,bean);
        }
    }

    /**
     * 將任務添加到容器中
     * @param elasticScheduler
     * @param bean
     */
    private void addJobToContext(ElasticScheduler elasticScheduler, Object bean) {
        String cron = elasticScheduler.cron();
        String name = elasticScheduler.name();
        String description = elasticScheduler.description();
        String shardingItemParameters = elasticScheduler.shardingItemParameters();
        Integer shardingTotalCount = elasticScheduler.shardingTotalCount();
        try {
            elasticJobHandler.addJob((SimpleJob) bean,cron,shardingTotalCount,shardingItemParameters);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext=applicationContext;
    }

}

4.使用註解

@Component
@ElasticScheduler(cron = "0/5 * * * * ?",shardingTotalCount = 4,name = "測試註解",shardingItemParameters = "0=0,1=0,2=1,3=1")
public class StockSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " +
                        "當前分片項: %s.當前參數: %s," +
                        "當前任務名稱: %s.當前任務參數: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()

        ));
    }
}

注意,該註解只爲了避免想引入太多外部依賴本身隨手寫的,只爲給你們提供思路。git上已經有人對用註解整合Elastic-Job了,你們可自行搜索。測試

相關文章
相關標籤/搜索