根據官方文檔實現了一個簡單的simple類型(Elastic-Job提供Simple、Dataflow和Script 3種做業類型)的Demo。java
基本概念1. 分片概念任務的分佈式執行,須要將一個任務拆分爲多個獨立的任務項,而後由分佈式的服務器分別執行某一個或幾個分片項。linux 例如:有一個遍歷數據庫某張表的做業,現有2臺服務器。爲了快速的執行做業,那麼每臺服務器應執行做業的50%。 爲知足此需求,可將做業分紅2片,每臺服務器執行1片。做業遍歷數據的邏輯應爲:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 若是分紅10片,則做業遍歷數據的邏輯應爲:每片分到的分片項應爲ID%10,而服務器A被分配到分片項0,1,2,3,4;服務器B被分配到分片項5,6,7,8,9,直接的結果就是服務器A遍歷ID以0-4結尾的數據;服務器B遍歷ID以5-9結尾的數據。數據庫 2. 分片項與業務處理解耦Elastic-Job並不直接提供數據處理的功能,框架只會將分片項分配至各個運行中的做業服務器,開發者須要自行處理分片項與真實數據的對應關係。windows 3. 個性化參數的適用場景個性化參數即shardingItemParameter,能夠和分片項匹配對應關係,用於將分片項的數字轉換爲更加可讀的業務代碼。api 例如:按照地區水平拆分數據庫,數據庫A是北京的數據;數據庫B是上海的數據;數據庫C是廣州的數據。 若是僅按照分片項配置,開發者須要瞭解0表示北京;1表示上海;2表示廣州。 合理使用個性化參數可讓代碼更可讀,若是配置爲0=北京,1=上海,2=廣州,那麼代碼中直接使用北京,上海,廣州的枚舉值便可完成分片項和業務邏輯的對應關係。安全 核心理念1. 分佈式調度Elastic-Job-Lite並沒有做業調度中心節點,而是基於部署做業框架的程序在到達相應時間點時各自觸發調度。服務器 註冊中心僅用於做業註冊和監控信息存儲。而主做業節點僅用於處理分片和清理等功能。架構 2. 做業高可用Elastic-Job-Lite提供最安全的方式執行做業。將分片總數設置爲1,並使用多於1臺的服務器執行做業,做業將會以1主n從的方式執行。框架 一旦執行做業的服務器崩潰,等待執行的服務器將會在下次做業啓動時替補執行。開啓失效轉移功能效果更好,能夠保證在本次做業執行時崩潰,備機當即啓動替補執行。分佈式 3. 最大限度利用資源Elastic-Job-Lite也提供最靈活的方式,最大限度的提升執行做業的吞吐量。將分片項設置爲大於服務器的數量,最好是大於服務器倍數的數量,做業將會合理的利用分佈式資源,動態的分配分片項。 例如:3臺服務器,分紅10片,則分片項分配結果爲服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。 若是服務器C崩潰,則分片項分配結果爲服務器A=0,1,2,3,4;服務器B=5,6,7,8,9。在不丟失分片項的狀況下,最大限度的利用現有資源提升吞吐量。 總體架構圖 |
這裏使用去中心化的版本elastic-job-lite。
JDK:1.8(elastic-job要1.7以上) zookeeper:zookeeper-3.4.12 (官方要求zookeeper-3.4.6以上) elastic-job-lite-core:2.0.3 |
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.0.3</version> </dependency>
由於我使用了slf4j因此還須要引入依賴:
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency>
我寫了兩個類一個是任務做業的實現類MyELasticJob,很是簡單,這裏只是寫了一些日誌的打印:
package com.elasticjob.demo; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; /** * @Description: 做業實現類 * @Author: 張穎輝(yh) * @CreateDate: 2018/7/20 14:01 * @UpdateUser: 張穎輝(yh) * @UpdateDate: 2018/7/20 14:01 * @UpdateRemark: The modified content * @Version: 1.0 */ public class MyELasticJob implements SimpleJob { private Logger logger = LoggerFactory.getLogger(getClass()); /** * @Description: 要執行的做業(分片) * @Author: 張穎輝(yh) * @Date: 2018/7/23 14:42 * @param: [shardingContext] * @return: void * @Version: 1.0 */ @Override public void execute(ShardingContext shardingContext) { //任務分片 switch (shardingContext.getShardingItem()) { case 0: logger.info("job名稱={},分片數量={},當前分片={},當前分片名稱={},當前自定義參數={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter()); break; case 1: logger.info("job名稱={},分片數量={},當前分片={},當前分片名稱={},當前自定義參數={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter()); break; case 2: logger.info("job名稱={},分片數量={},當前分片={},當前分片名稱={},當前自定義參數={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter()); break; } } }
另一個就是關於任務做業的設置和註冊中心的設置以及任務的啓動:
package com.elasticjob.demo; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.JobRootConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; /** * @Description: Description * @Author: 張穎輝(yh) * @CreateDate: 2018/7/23 11:30 * @UpdateUser: 張穎輝(yh) * @UpdateDate: 2018/7/23 11:30 * @UpdateRemark: The modified content * @Version: 1.0 */ public class TestJob { public static void main(String[] args) { //做業參數 String jobParameter = "做業節點-linux"; if (isWindows()) { jobParameter = "做業節點-windows"; } new JobScheduler(createRegistryCenter(), createJobConfiguration(jobParameter)).init(); } /** * @Description: 建立註冊中心 * @Author: 張穎輝(yh) * @Date: 2018/7/20 15:09 * @param: [] * @return: com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter * @Version: 1.0 */ private static CoordinatorRegistryCenter createRegistryCenter() { CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("192.168.5.100:2181", "elastic-job-demo")); registryCenter.init(); return registryCenter; } /** * @Description: 建立做業配置 * @Author: 張穎輝(yh) * @Date: 2018/7/20 15:08 * @param: * @return: * @Version: 1.0 */ private static LiteJobConfiguration createJobConfiguration(String jobParameter) { // demoSimpleJob 爲jobname, 0/10 * * * * ?爲cron表達式, 3 分片數量, 0=北京,1=上海,2=廣州 分片對應參數內容, jobParameter 做業自定義參數 // 定義做業核心配置 JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("demoSimpleJob_3p", "0/10 * * * * ?", 3).shardingItemParameters("0=北京,1=上海,2=廣州").jobParameter(jobParameter).build(); // 定義SIMPLE類型配置 SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MyELasticJob.class.getCanonicalName()); // 定義Lite做業根配置 LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build(); return liteJobConfiguration; } /** * @Description: 是否是windows * @Author: 張穎輝(yh) * @Date: 2018/7/23 15:15 * @param: * @return: * @Version: 1.0 */ private static boolean isWindows() { return System.getProperties().getProperty("os.name").toUpperCase().indexOf("WINDOWS") != -1; } }
關於做業的分片數量要與做業類中的execute方法裏分支相匹配。
而後我在虛擬機下linux中安裝了註冊中心zookeeper(對應上面代碼中指定的zookeeper配置)安裝教程
將上面的兩個類打成jar,上傳到linux中執行,本地windows也執行一份。就會發現三個任務分片有兩個在window下執行,
剩下的一個在linux中執行了。
注意:須要兩臺不一樣ip的機器去啓動job(做業服務器) 由於ElasticJob默認的分片機制是根據ip來分片的 若是ip相同 它會默認爲一臺服務器 。因此這裏我把代碼上傳到linux上執行一份。