上文咱們講到Springboot整合Elastic-Job整合的demo,只是簡單的實現了主要功能。本文在上文基礎上,進行新的調整。git
Elastic-Job提供了事件追蹤功能,可經過事件訂閱的方式處理調度過程的重要事件,用於查詢、統計和監控。Elastic-Job目前提供了基於關係型數據庫兩種事件訂閱方式記錄事件。咱們只須要將添加以下配置便可數據庫
/** * 將做業運行的痕跡進行持久化到DB */ @Bean public JobEventConfiguration jobEventConfiguration(){ return new JobEventRdbConfiguration(dataSource); }
項目運行後,Elastic-Job會自動建立JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG兩張表以及若干索引。app
上文咱們添加一個任務的步驟是,定義一個任務類,再在配置類中定義任務屬性,並加入到SpringJobScheduler。若是咱們有幾百個任務,配置類基本就沒法維護了。那怎麼優化呢,咱們能夠參考@Schedual註解,在job上定義一個註解,每次啓動的時候掃描註解自動將job加入到SpringJobScheduler中。ide
@Component public class ElasticJobHandler { @Autowired private ZookeeperRegistryCenter regCenter; @Resource private JobEventConfiguration jobEventConfiguration; @Resource private ElasticJobListener elasticJobListener; /** * @Description 任務配置類 */ private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration( JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount) .shardingItemParameters(shardingItemParameters).build() , jobClass.getCanonicalName()) ).overwrite(true).build(); } public void addJob(final SimpleJob simpleJob, final String cron, final Integer shardingTotalCount, final String shardingItemParameters) throws IllegalAccessException, InstantiationException { LiteJobConfiguration jobConfig = getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters); new SpringJobScheduler(simpleJob, regCenter, jobConfig, jobEventConfiguration, elasticJobListener).init(); } }
@Component @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) public @interface ElasticScheduler { /** * 任務名稱 * @return */ String name(); /** * cron表達式,用於控制做業觸發時間 * @return */ String cron() default ""; /** * 分片參數 * @return */ String shardingItemParameters() default ""; /** * 總分片數 * @return */ int shardingTotalCount(); /** * 任務描述信息 * @return */ String description() default ""; }
@Component public class ElasticSchedulerAspect implements ApplicationContextAware, InitializingBean { private ApplicationContext applicationContext; @Autowired private ElasticJobHandler elasticJobHandler; @Override public void afterPropertiesSet() throws Exception { registrJob(applicationContext); } /** * 解析context信息,開始註冊 * @param applicationContext */ private void registrJob(ApplicationContext applicationContext) { String[] beanNamesForAnnotation = applicationContext.getBeanNamesForAnnotation(ElasticScheduler.class); for (String beanName : beanNamesForAnnotation) { Class<?> handlerType = applicationContext.getType(beanName); Object bean = applicationContext.getBean(beanName); ElasticScheduler annotation = AnnotationUtils.findAnnotation(handlerType, ElasticScheduler.class); addJobToContext(annotation,bean); } } /** * 將任務添加到容器中 * @param elasticScheduler * @param bean */ private void addJobToContext(ElasticScheduler elasticScheduler, Object bean) { String cron = elasticScheduler.cron(); String name = elasticScheduler.name(); String description = elasticScheduler.description(); String shardingItemParameters = elasticScheduler.shardingItemParameters(); Integer shardingTotalCount = elasticScheduler.shardingTotalCount(); try { elasticJobHandler.addJob((SimpleJob) bean,cron,shardingTotalCount,shardingItemParameters); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext=applicationContext; } }
@Component @ElasticScheduler(cron = "0/5 * * * * ?",shardingTotalCount = 4,name = "測試註解",shardingItemParameters = "0=0,1=0,2=1,3=1") public class StockSimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " + "當前分片項: %s.當前參數: %s," + "當前任務名稱: %s.當前任務參數: %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
注意,該註解只爲了避免想引入太多外部依賴本身隨手寫的,只爲給你們提供思路。git上已經有人對用註解整合Elastic-Job了,你們可自行搜索。測試