本文源碼:GitHub·點這裏 || GitEE·點這裏java
在前面的文章中,說過QuartJob這個定時任務,被普遍應用的定時任務標準。但Quartz核心點在於執行定時任務並非在於關注的業務模式和場景,缺乏高度自定義的功能。Quartz可以基於數據庫實現任務的高可用,可是不具有分佈式並行調度的功能。git
-> QuartJob定時任務github
Elastic-Job 是一個開源的分佈式調度中間件,由兩個相互獨立的子項目 Elastic-Job-Lite 和 Elastic-Job-Cloud 組成。Elastic-Job-Lite 爲輕量級無中心化解決方案,使用 jar 包提供分佈式任務的調度和治理。 Elastic-Job-Cloud 是一個 Mesos Framework,依託於Mesos額外提供資源治理、應用分發以及進程隔離等服務。spring
分佈式調度協調 彈性擴容縮容 失效轉移 錯過執行做業重觸發 做業分片一致性,保證同一分片在分佈式環境中僅一個執行實例
補刀
:人家官網這樣描述的,這裏贅述一下,充實一下文章。數據庫
該圖片來自ElasticJob官網。編程
由圖可知以下內容:segmentfault
須要Zookeeper組件支持,做爲分佈式的調度任務,有良好的監聽機制,和控制檯,下面的案例也就衝這個圖解來。服務器
這個概念在ElasticJob中是最具備特色的,實用性極好。app
任務的分佈式執行,須要將一個任務拆分爲多個獨立的任務項,而後由分佈式的服務器分別執行某一個或幾個分片項。框架
場景描述:假設有服務3臺,分3片管理,要處理數據表100條,那就能夠100%3,按照餘數0,1,2分散到三臺服務上執行,看到這裏分庫分表的基本邏輯涌上心頭,這就是爲什麼不少大牛講說,編程思惟很重要。
個性化參數即shardingItemParameter,能夠和分片項匹配對應關係,用於將分片項的數字轉換爲更加可讀的業務代碼。
場景描述:這裏猛一讀好像很飄逸,其實就是這個意思,若是分3片,取名[0,1,2]很差看,或者很差標識,能夠分別給個別名標識一下,[0=A,1=B,2=C]。
這裏使用2.0+的版本。
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>
這裏主要配置一下Zookeeper中間件,分片和分片參數。
zookeeper: server: 127.0.0.1:2181 namespace: es-job job-config: cron: 0/10 * * * * ? shardCount: 1 shardItem: 0=A,1=B,2=C,3=D
看了官方的案例,沒看到好用的註解,這裏只能本身編寫一個,基於案例的加載過程和核心API做爲參考。
核心配置類:
com.dangdang.ddframe.job.lite.config.LiteJobConfiguration
根據本身想如何使用註解的思路,好比我只想註解定時任務名稱和Cron表達式這兩個功能,其餘參數直接統一配置(這裏多是受QuartJob影響太深,可能根本就是想省事...)
@Inherited @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface TaskJobSign { @AliasFor("cron") String value() default ""; @AliasFor("value") String cron() default ""; String jobName() default ""; }
這裏打印一些基本參數,對照配置和註解,一目瞭然。
@Component @TaskJobSign(cron = "0/5 * * * * ?",jobName = "Hello-Job") public class HelloJob implements SimpleJob { private static final Logger LOG = LoggerFactory.getLogger(HelloJob.class.getName()) ; @Override public void execute(ShardingContext shardingContext) { LOG.info("當前線程: "+Thread.currentThread().getId()); LOG.info("任務分片:"+shardingContext.getShardingTotalCount()); LOG.info("當前分片:"+shardingContext.getShardingItem()); LOG.info("分片參數:"+shardingContext.getShardingParameter()); LOG.info("任務參數:"+shardingContext.getJobParameter()); } }
既然自定義註解,那加載過程天然也要自定義一下,讀取自定義的註解,配置化,加入容器,而後初始化,等着任務執行就好。
@Configuration public class ElasticJobConfig { @Resource private ApplicationContext applicationContext ; @Resource private ZookeeperRegistryCenter zookeeperRegistryCenter; @Value("${job-config.cron}") private String cron ; @Value("${job-config.shardCount}") private int shardCount ; @Value("${job-config.shardItem}") private String shardItem ; /** * 配置任務監聽器 */ @Bean public ElasticJobListener elasticJobListener() { return new TaskJobListener(); } /** * 初始化配置任務 */ @PostConstruct public void initTaskJob() { Map<String, SimpleJob> jobMap = this.applicationContext.getBeansOfType(SimpleJob.class); Iterator iterator = jobMap.entrySet().iterator(); while (iterator.hasNext()) { // 自定義註解管理 Map.Entry<String, SimpleJob> entry = (Map.Entry)iterator.next(); SimpleJob simpleJob = entry.getValue(); TaskJobSign taskJobSign = simpleJob.getClass().getAnnotation(TaskJobSign.class); if (taskJobSign != null){ String cron = taskJobSign.cron() ; String jobName = taskJobSign.jobName() ; // 生成配置 SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration( JobCoreConfiguration.newBuilder(jobName, cron, shardCount) .shardingItemParameters(shardItem).jobParameter(jobName).build(), simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder( simpleJobConfiguration).overwrite(true).build(); TaskJobListener taskJobListener = new TaskJobListener(); // 初始化任務 SpringJobScheduler jobScheduler = new SpringJobScheduler( simpleJob, zookeeperRegistryCenter, liteJobConfiguration, taskJobListener); jobScheduler.init(); } } } }
絮叨一句
:不要疑問這些API是怎麼知道,看下官方文檔的案例,他們怎麼使用這些核心API,這裏就是照着寫過來,就是多一步自定義註解類的加載過程。固然官方文檔大體讀一遍仍是頗有必要的。
補刀一句
:如何快速學習一些組件的用法,首先找到官方文檔,或者開源庫Wiki,再不濟ReadMe文檔(若是都沒有,酌情放棄,另尋其餘),熟悉基本功能是否符合本身的需求,若是符合,就看下基本用法案例,熟悉API,最後就是研究本身須要的功能模塊,我的經驗來看,該過程是彎路最少,坑最少的。
用法很是簡單,實現ElasticJobListener接口。
@Component public class TaskJobListener implements ElasticJobListener { private static final Logger LOG = LoggerFactory.getLogger(TaskJobListener.class); private long beginTime = 0; @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { beginTime = System.currentTimeMillis(); LOG.info(shardingContexts.getJobName()+"===>開始..."); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { long endTime = System.currentTimeMillis(); LOG.info(shardingContexts.getJobName()+ "===>結束...[耗時:"+(endTime - beginTime)+"]"); } }
絮叨一句
:before和after執行先後,中間執行目標方法,標準的AOP切面思想,因此底層水平決定了對上層框架的理解速度,那本《Java編程思想》上的灰塵是否是該擦擦?
有部分場景須要動態添加和管理定時任務,基於上面的加載流程,在自定義一些步驟就能夠。
@Component public class GetTimeJob implements SimpleJob { private static final Logger LOG = LoggerFactory.getLogger(GetTimeJob.class.getName()) ; private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") ; @Override public void execute(ShardingContext shardingContext) { LOG.info("Job Name:"+shardingContext.getJobName()); LOG.info("Local Time:"+format.format(new Date())); } }
這裏就動態添加上面的任務。
@Service public class TaskJobService { @Resource private ZookeeperRegistryCenter zookeeperRegistryCenter; public void addTaskJob(final String jobName,final SimpleJob simpleJob, final String cron,final int shardCount,final String shardItem) { // 配置過程 JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder( jobName, cron, shardCount) .shardingItemParameters(shardItem).build(); JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder( jobTypeConfiguration).overwrite(true).build(); TaskJobListener taskJobListener = new TaskJobListener(); // 加載執行 SpringJobScheduler jobScheduler = new SpringJobScheduler( simpleJob, zookeeperRegistryCenter, liteJobConfiguration, taskJobListener); jobScheduler.init(); } }
補刀一句
:這裏添加以後,任務就會定時執行,如何中止任務又是一個問題,能夠在任務名上作一些配置,好比在數據庫生成一條記錄[1,job1,state],若是調度到state爲中止狀態的任務,直接截胡便可。
@RestController public class TaskJobController { @Resource private TaskJobService taskJobService ; @RequestMapping("/addJob") public String addJob(@RequestParam("cron") String cron,@RequestParam("jobName") String jobName, @RequestParam("shardCount") Integer shardCount, @RequestParam("shardItem") String shardItem) { taskJobService.addTaskJob(jobName, new GetTimeJob(), cron, shardCount, shardItem); return "success"; } }
GitHub·地址 https://github.com/cicadasmile/middle-ware-parent GitEE·地址 https://gitee.com/cicadasmile/middle-ware-parent