在實際生產中利用Elastic-job+spring-boot用註解實現多任務的配置spring
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
<exclusions>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-recipes</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
<exclusions>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-recipes</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
複製代碼
注意:若是項目中單獨導入了高版本的zookeeper的客戶端curator包則須要在Elastic-job包中剔除curator,不然會發生curator的版本衝突數據庫
即在yaml配置文件中配置Elastic-job-lite 依賴的zookeeper 分佈式調度服務器參數apache
@Configuration
public class JobParserAutoConfiguration {
/**
* 服務地址列表 包括IP地址和端口號
*/
@Value("${elasticjob.reg-center.server-lists}")
private String serverList;
/**
* 註冊中心的命名空間
*/
@Value("${elasticjob.reg-center.namespace}")
private String namespace;
/**
* 等待重試的間隔時間的初始值.
* 單位毫秒.
*/
@Value("${elasticjob.reg-center.baseSleepTimeMilliseconds}")
private Integer baseSleepTimeMilliseconds;
/**
* 等待重試的間隔時間的最大值.
* 單位毫秒.
*/
@Value("${elasticjob.reg-center.maxSleepTimeMilliseconds}")
private Integer maxSleepTimeMilliseconds;
/**
* 會話超時時間.
* 單位毫秒.
*/
@Value("${elasticjob.reg-center.sessionTimeoutMilliseconds}")
private Integer sessionTimeoutMilliseconds;
/**
* 鏈接超時時間.
* 單位毫秒.
*/
@Value("${elasticjob.reg-center.connectionTimeoutMilliseconds}")
private Integer connectionTimeoutMilliseconds;
/**
* 最大重試次數.
*/
@Value("${elasticjob.reg-center.maxRetries}")
private Integer maxRetries;
@Bean
public JobConfParser jobConfParser() {
return new JobConfParser();
}
/**
* 初始化zookeeper註冊中心
* @return ZookeeperRegistryCenter
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
zookeeperConfiguration.setBaseSleepTimeMilliseconds(baseSleepTimeMilliseconds);
zookeeperConfiguration.setConnectionTimeoutMilliseconds(connectionTimeoutMilliseconds);
zookeeperConfiguration.setMaxSleepTimeMilliseconds(maxSleepTimeMilliseconds);
zookeeperConfiguration.setSessionTimeoutMilliseconds(sessionTimeoutMilliseconds);
zookeeperConfiguration.setMaxRetries(maxRetries);
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}
複製代碼
@Component
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConf {
/*********************DataflowJobConfiguration START********************/
/**
* 做業名稱
*
* @return
*/
String name();
/**
* 類型
*/
JobType jobType();
/**
* cron表達式,用於控制做業觸發時間
*
* @return
*/
String cron() default "";
/**
* 做業分片總數
*
* @return
*/
String shardingTotalCount() default "1";
/**
* 分片序列號和參數用等號分隔,多個鍵值對用逗號分隔
* <p>分片序列號從0開始,不可大於或等於做業分片總數<p>
* <p>如:<p>
* <p>0=a,1=b,2=c<p>
*
* @return
*/
String shardingItemParameters() default "";
/**
* 做業自定義參數
* <p>做業自定義參數,可經過傳遞該參數爲做業調度的業務方法傳參,用於實現帶參數的做業<p>
* <p>例:每次獲取的數據量、做業實例從數據庫讀取的主鍵等<p>
*
* @return
*/
String jobParameter() default "";
/**
* 是否開啓任務執行失效轉移,開啓表示若是做業在一次任務執行中途宕機,容許將該次未完成的任務在另外一做業節點上補償執行
*
* @return
*/
boolean failover() default false;
/**
* 是否開啓錯過任務從新執行
*
* @return
*/
boolean misfire() default false;
/**
* 做業描述信息
*
* @return
*/
String description() default "";
boolean overwrite() default false;
/*********************DataflowJobConfiguration END********************/
/*********************DataflowJobConfiguration START********************/
/**
* 是否流式處理數據
* <p>若是流式處理數據, 則fetchData不返回空結果將持續執行做業<p>
* <p>若是非流式處理數據, 則處理數據完成後做業結束<p>
*
* @return
*/
boolean streamingProcess() default false;
/*********************DataflowJobConfiguration END********************/
/*********************ScriptJobConfiguration START********************/
/**
* 腳本型做業執行命令行
*
* @return
*/
String scriptCommandLine() default "";
/*********************ScriptJobConfiguration END********************/
/*********************LiteJobConfiguration START********************/
/**
* 監控做業運行時狀態
* <p>每次做業執行時間和間隔時間均很是短的狀況,建議不監控做業運行時狀態以提高效率。<p>
* <p>由於是瞬時狀態,因此無必要監控。請用戶自行增長數據堆積監控。而且不能保證數據重複選取,應在做業中實現冪等性。<p>
* <p>每次做業執行時間和間隔時間均較長的狀況,建議監控做業運行時狀態,可保證數據不會重複選取。<p>
*
* @return
*/
boolean monitorExecution() default true;
/**
* 做業監控端口
* <p>建議配置做業監控端口, 方便開發者dump做業信息。<p>
* <p>使用方法: echo 「dump」 | nc 127.0.0.1 9888<p>
*
* @return
*/
int monitorPort() default -1;
/**
* 大容許的本機與註冊中心的時間偏差秒數
* <p>若是時間偏差超過配置秒數則做業啓動時將拋異常<p>
* <p>配置爲-1表示不校驗時間偏差<p>
*
* @return
*/
int maxTimeDiffSeconds() default -1;
/**
* 做業分片策略實現類全路徑,默認使用平均分配策略
*
* @return
*/
String jobShardingStrategyClass() default "";
/**
* 修復做業服務器不一致狀態服務調度間隔時間,配置爲小於1的任意值表示不執行修復,單位:分鐘
*
* @return
*/
int reconcileIntervalMinutes() default 10;
/**
* 做業事件追蹤的數據源Bean引用
*
* @return
*/
String eventTraceRdbDataSource() default "";
/*********************LiteJobConfiguration END********************/
/**
* 前置後置任務監聽實現類,需實現ElasticJobListener接口
*
* @return
*/
String listener() default "";
/**
* 做業是否禁止啓動,可用於部署做業時,先禁止啓動,部署結束後統一啓動
*
* @return
*/
String disabled() default "false";
/**
* 前置後置任務分佈式監聽實現類,需繼承AbstractDistributeOnceElasticJobListener類
*
* @return
*/
String distributedListener() default "";
/**
* 最後一個做業執行前的執行方法的超時時間,單位:毫秒
*
* @return
*/
long startedTimeoutMilliseconds() default Long.MAX_VALUE;
/**
* 最後一個做業執行後的執行方法的超時時間,單位:毫秒
*
* @return
*/
long completedTimeoutMilliseconds() default Long.MAX_VALUE;
/**
* 自定義異常處理類
*
* @return
*/
String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";
/**
* 自定義業務處理線程池
*
* @return
*/
String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
}
複製代碼
這些配置均可以在官方文檔中查到,這裏只是整合起來方便用註解配置多個執行的任務 配置手冊 固然也能夠自定義一些屬性,最後在初始化任務的時候去實現便可服務器
Map<String, Object> beanMap = ctx.getBeansWithAnnotation(ElasticJobConf.class);
for (Object confBean : beanMap.values()) {
Class<?> clz = confBean.getClass();
ElasticJobConf conf = AnnotationUtils.findAnnotation(clz, ElasticJobConf.class);
/**讀取註解配置屬性略 */
JobCoreConfiguration coreConfig = getJobCoreConfiguration(jobName, cron, shardingItemParameters, description, jobParameter, jobExceptionHandler, executorServiceHandler, failover, misfire, shardingTotalCount);
// 不一樣類型的任務配置處理
JobTypeConfiguration typeConfig = getJobTypeConfiguration(jobTypeName, jobClass, scriptCommandLine, streamingProcess, coreConfig);
LiteJobConfiguration jobConfig = getLiteJobConfiguration(jobShardingStrategyClass, overwrite, disabled, monitorExecution, monitorPort, maxTimeDiffSeconds, reconcileIntervalMinutes, typeConfig);
List<BeanDefinition> elasticJobListeners = getTargetElasticJobListeners(conf);
// 構建SpringJobScheduler對象來初始化任務
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
if (jobTypeName.equals(JobType.SCRIPT)) {
factory.addConstructorArgValue(null);
} else {
factory.addConstructorArgValue(confBean);
}
factory.addConstructorArgValue(zookeeperRegistryCenter);
factory.addConstructorArgValue(jobConfig);
factory.addConstructorArgValue(elasticJobListeners);
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) ctx.getAutowireCapableBeanFactory();
defaultListableBeanFactory.registerBeanDefinition(jobName + "SpringJobScheduler", factory.getBeanDefinition());
SpringJobScheduler springJobScheduler = (SpringJobScheduler) ctx.getBean(jobName + "SpringJobScheduler");
springJobScheduler.init();
log.info("【" + jobName + "】\t" + jobClass + "\tinit success");
}
複製代碼
這裏主要是將註解配置的定時任務類註冊到SpringJobScheduler並由它來統一的初始化各個定時任務,每一個定時任務的實例的管理則是由spring來控制markdown
@Slf4j
@Component
@ElasticJobConf(jobType = JobType.DATAFLOW, cron = "${elasticjob.jobs.dataflowJob.mark.cron}", name = "${elasticjob.jobs.dataflowJob.mark.jobName}",
shardingTotalCount = "${elasticjob.jobs.dataflowJob.mark.shardingTotalCount}", shardingItemParameters = "${elasticjob.jobs.dataflowJob.mark.shardingItemParameters}",disabled =
"${elasticjob.jobs.dataflowJob.mark.disabled}"
)
public class DemoDataflowJob implements DataflowJob<String> {
@Override
public List<String> fetchData(ShardingContext shardingContext) {
//拉取數據邏輯
}
@Override
public void processData(ShardingContext shardingContext, List<String> list) {
//數據處理邏輯
}
}
複製代碼
其中shardingContext能夠作任務的分片處理,具體詳情可見config配置配置手冊session