elastic-job學習 第一篇

根據官方文檔實現了一個簡單的simple類型(Elastic-Job提供Simple、Dataflow和Script 3種做業類型)的Demo。java

官方介紹:

基本概念

1. 分片概念

任務的分佈式執行,須要將一個任務拆分爲多個獨立的任務項,而後由分佈式的服務器分別執行某一個或幾個分片項。linux

例如:有一個遍歷數據庫某張表的做業,現有2臺服務器。爲了快速的執行做業,那麼每臺服務器應執行做業的50%。 爲知足此需求,可將做業分紅2片,每臺服務器執行1片。做業遍歷數據的邏輯應爲:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 若是分紅10片,則做業遍歷數據的邏輯應爲:每片分到的分片項應爲ID%10,而服務器A被分配到分片項0,1,2,3,4;服務器B被分配到分片項5,6,7,8,9,直接的結果就是服務器A遍歷ID以0-4結尾的數據;服務器B遍歷ID以5-9結尾的數據。數據庫

2. 分片項與業務處理解耦

Elastic-Job並不直接提供數據處理的功能,框架只會將分片項分配至各個運行中的做業服務器,開發者須要自行處理分片項與真實數據的對應關係。windows

3. 個性化參數的適用場景

個性化參數即shardingItemParameter,能夠和分片項匹配對應關係,用於將分片項的數字轉換爲更加可讀的業務代碼。api

例如:按照地區水平拆分數據庫,數據庫A是北京的數據;數據庫B是上海的數據;數據庫C是廣州的數據。 若是僅按照分片項配置,開發者須要瞭解0表示北京;1表示上海;2表示廣州。 合理使用個性化參數可讓代碼更可讀,若是配置爲0=北京,1=上海,2=廣州,那麼代碼中直接使用北京,上海,廣州的枚舉值便可完成分片項和業務邏輯的對應關係。安全

核心理念

1. 分佈式調度

Elastic-Job-Lite並沒有做業調度中心節點,而是基於部署做業框架的程序在到達相應時間點時各自觸發調度。服務器

註冊中心僅用於做業註冊和監控信息存儲。而主做業節點僅用於處理分片和清理等功能。架構

2. 做業高可用

Elastic-Job-Lite提供最安全的方式執行做業。將分片總數設置爲1,並使用多於1臺的服務器執行做業,做業將會以1主n從的方式執行。框架

一旦執行做業的服務器崩潰,等待執行的服務器將會在下次做業啓動時替補執行。開啓失效轉移功能效果更好,能夠保證在本次做業執行時崩潰,備機當即啓動替補執行。分佈式

3. 最大限度利用資源

Elastic-Job-Lite也提供最靈活的方式,最大限度的提升執行做業的吞吐量。將分片項設置爲大於服務器的數量,最好是大於服務器倍數的數量,做業將會合理的利用分佈式資源,動態的分配分片項。

例如:3臺服務器,分紅10片,則分片項分配結果爲服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。 若是服務器C崩潰,則分片項分配結果爲服務器A=0,1,2,3,4;服務器B=5,6,7,8,9。在不丟失分片項的狀況下,最大限度的利用現有資源提升吞吐量。

總體架構圖

環境說明

這裏使用去中心化的版本elastic-job-lite。

JDK:1.8(elastic-job要1.7以上)

zookeeper:zookeeper-3.4.12 (官方要求zookeeper-3.4.6以上)

elastic-job-lite-core:2.0.3

依賴

<dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.0.3</version>
        </dependency>

由於我使用了slf4j因此還須要引入依賴:

<dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.2</version>
        </dependency>

代碼

我寫了兩個類一個是任務做業的實現類MyELasticJob,很是簡單,這裏只是寫了一些日誌的打印:

package com.elasticjob.demo;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
 * @Description: 做業實現類
 * @Author: 張穎輝(yh)
 * @CreateDate: 2018/7/20 14:01
 * @UpdateUser: 張穎輝(yh)
 * @UpdateDate: 2018/7/20 14:01
 * @UpdateRemark: The modified content
 * @Version: 1.0
 */
public class MyELasticJob implements SimpleJob {

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * @Description: 要執行的做業(分片)
     * @Author: 張穎輝(yh)
     * @Date: 2018/7/23 14:42
     * @param: [shardingContext]
     * @return: void
     * @Version: 1.0
     */
    @Override
    public void execute(ShardingContext shardingContext) {
        //任務分片
        switch (shardingContext.getShardingItem()) {
            case 0:
                logger.info("job名稱={},分片數量={},當前分片={},當前分片名稱={},當前自定義參數={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter());
                break;
            case 1:
                logger.info("job名稱={},分片數量={},當前分片={},當前分片名稱={},當前自定義參數={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter());
                break;
            case 2:
                logger.info("job名稱={},分片數量={},當前分片={},當前分片名稱={},當前自定義參數={} -----------", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobParameter());
                break;
        }
    }
}

另一個就是關於任務做業的設置和註冊中心的設置以及任務的啓動:

package com.elasticjob.demo;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * @Description: Description
 * @Author: 張穎輝(yh)
 * @CreateDate: 2018/7/23 11:30
 * @UpdateUser: 張穎輝(yh)
 * @UpdateDate: 2018/7/23 11:30
 * @UpdateRemark: The modified content
 * @Version: 1.0
 */
public class TestJob {
    public static void main(String[] args) {
        //做業參數
        String jobParameter = "做業節點-linux";
        if (isWindows()) {
            jobParameter = "做業節點-windows";
        }
        new JobScheduler(createRegistryCenter(), createJobConfiguration(jobParameter)).init();
    }

    /**
     * @Description: 建立註冊中心
     * @Author: 張穎輝(yh)
     * @Date: 2018/7/20 15:09
     * @param: []
     * @return: com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter
     * @Version: 1.0
     */
    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("192.168.5.100:2181", "elastic-job-demo"));
        registryCenter.init();
        return registryCenter;
    }


    /**
     * @Description: 建立做業配置
     * @Author: 張穎輝(yh)
     * @Date: 2018/7/20 15:08
     * @param:
     * @return:
     * @Version: 1.0
     */
    private static LiteJobConfiguration createJobConfiguration(String jobParameter) {
        // demoSimpleJob 爲jobname, 0/10 * * * * ?爲cron表達式,  3 分片數量,  0=北京,1=上海,2=廣州 分片對應參數內容,  jobParameter 做業自定義參數
        // 定義做業核心配置
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("demoSimpleJob_3p", "0/10 * * * * ?", 3).shardingItemParameters("0=北京,1=上海,2=廣州").jobParameter(jobParameter).build();
        // 定義SIMPLE類型配置
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MyELasticJob.class.getCanonicalName());
        // 定義Lite做業根配置
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build();
        return liteJobConfiguration;
    }

    /**
     * @Description: 是否是windows
     * @Author: 張穎輝(yh)
     * @Date: 2018/7/23 15:15
     * @param:
     * @return:
     * @Version: 1.0
     */
    private static boolean isWindows() {
        return System.getProperties().getProperty("os.name").toUpperCase().indexOf("WINDOWS") != -1;
    }
}

關於做業的分片數量要與做業類中的execute方法裏分支相匹配。

而後我在虛擬機下linux中安裝了註冊中心zookeeper(對應上面代碼中指定的zookeeper配置)安裝教程

將上面的兩個類打成jar,上傳到linux中執行,本地windows也執行一份。就會發現三個任務分片有兩個在window下執行,

剩下的一個在linux中執行了。

注意:須要兩臺不一樣ip的機器去啓動job(做業服務器) 由於ElasticJob默認的分片機制是根據ip來分片的 若是ip相同 它會默認爲一臺服務器 。因此這裏我把代碼上傳到linux上執行一份。

相關文章
相關標籤/搜索