1、前言java
Elastic-Job是一個優秀的分佈式做業調度框架。框架
Elastic-Job是一個分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。運維
Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務。分佈式
Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。ide
1. Elastic-Job-Liteui
分佈式調度協調this
彈性擴容縮容spa
失效轉移線程
錯過執行做業重觸發code
做業分片一致性,保證同一分片在分佈式環境中僅一個執行實例
自診斷並修復分佈式不穩定形成的問題
支持並行調度
支持做業生命週期操做
豐富的做業類型
Spring整合以及命名空間提供
運維平臺
2. Elastic-Job-Cloud
應用自動分發
基於Fenzo的彈性資源分配
分佈式調度協調
彈性擴容縮容
失效轉移
錯過執行做業重觸發
做業分片一致性,保證同一分片在分佈式環境中僅一個執行實例
支持並行調度
支持做業生命週期操做
豐富的做業類型
Spring整合
運維平臺
基於Docker的進程隔離(TBD)
2、導讀
一、Elastic-Job的核心思想
二、Elastic-Job的基本使用
3、Elastic-Job的核心思想
對於分佈式計算而言,分片是最基本的思想,Elastic-Job也是沿用了這個思想,每一個job跑部分數據,全部job執行完成,即是全量數據,官網給出的SimpleJob例子以下:
public class MyElasticJob implements SimpleJob { @Override public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
用switch case循環來對應分片的業務邏輯,case分片的index,進入業務邏輯執行。固然這裏也有不適應的場景,相似於MapReduce須要shuffle的場景就不適合了,比方說,要根據某一個字段全局分組聚合求結果,這時候怎麼分片均可能會不合理,由於每一個分片只能處理N分之一的數據,沒辦法shuffle再聚合,這一點,也要根據具體的業務來使用。
那麼ShardingContext能夠拿到那些信息呢?源碼以下
public final class ShardingContext { /** * 做業名稱. */ private final String jobName; /** * 做業任務ID. */ private final String taskId; /** * 分片總數. */ private final int shardingTotalCount; /** * 做業自定義參數. * 能夠配置多個相同的做業, 可是用不一樣的參數做爲不一樣的調度實例. */ private final String jobParameter; /** * 分配於本做業實例的分片項. */ private final int shardingItem; /** * 分配於本做業實例的分片參數. */ private final String shardingParameter; public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) { jobName = shardingContexts.getJobName(); taskId = shardingContexts.getTaskId(); shardingTotalCount = shardingContexts.getShardingTotalCount(); jobParameter = shardingContexts.getJobParameter(); this.shardingItem = shardingItem; shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem); } }
以上代碼,jobParameter和shardingItem是最有用的參數,shardingItem決定switch case循環的走向,shardingParameter能夠用業務的查詢條件,也能夠用字符串拼接的方式組裝很複雜的參數用於特定的業務。
4、Elastic-Job的基本使用
一、Job配置項
public class ElasticJobConfig { private static CoordinatorRegistryCenter createRegistryCenter() { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job"); CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); regCenter.init(); return regCenter; } private static LiteJobConfiguration createJobConfiguration() { JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3) .shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName()); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true) .build(); return simpleJobRootConfig; } public static void main(String[] args) { new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); } }
幾點說明:
註冊中心配置項,設置zookeeper集羣地址,我這裏用的本地單節點,因此只有一個,固然能夠配置任務名稱,命名空間(namespace,本質上會在zk裏生成一個目錄),超時時間,最大重試次數等等
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite參數很是重要,設置這個參數爲true,修改過job配置信息纔會覆蓋zookeeper裏的數據,要否則不會生效。
二、SimpleJob的實現
public class MyElasticJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { switch (shardingContext.getShardingItem()) { case 0: { System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:" + shardingContext.getShardingParameter() + " =====" + Thread.currentThread()); break; } case 1: { System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:" + shardingContext.getShardingParameter() + " =====" + Thread.currentThread()); break; } case 2: { System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:" + shardingContext.getShardingParameter() + " =====" + Thread.currentThread()); break; } default: { System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:" + shardingContext.getShardingParameter() + " =====" + Thread.currentThread()); break; } } } }
上面設置每5秒鐘執行一次,執行ElasticJobConfig的main方法,執行結果以下:
從上面的結果,能夠看出,執行每一個分片的任務,實際上是放到一個線程池去執行的,對應的分片信息和參數信息在shardingContext能夠拿到,實現業務很是方便。
最後,若是啓動多個JVM,那麼這些任務就分散到各個節點裏,若是一個節點宕機,下次觸發任務時,將把該分片任務丟到健康機器執行,這裏作到了節點容錯。可是某個分片的任務在執行過程當中失敗了,那麼這裏是不會從新觸發改分片任務的執行的。