elastic-job

http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/node

Elastic-Job是一個分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。算法

Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務。spring

  • 分佈式調度協調
  • 彈性擴容縮容
  • 失效轉移
  • 錯過執行做業重觸發
  • 做業分片一致性,保證同一分片在分佈式環境中僅一個執行實例
  • 自診斷並修復分佈式不穩定形成的問題
  • 支持並行調度
  • 支持做業生命週期操做
  • 豐富的做業類型
  • Spring整合以及命名空間提供
  • 運維平臺

簡介

Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供最輕量級的分佈式任務的協調服務,外部依賴僅Zookeeper。數據庫

基本概念

1. 分片概念

任務的分佈式執行,須要將一個任務拆分爲多個獨立的任務項,而後由分佈式的服務器分別執行某一個或幾個分片項。api

例如:有一個遍歷數據庫某張表的做業,現有2臺服務器。爲了快速的執行做業,那麼每臺服務器應執行做業的50%。 爲知足此需求,可將做業分紅2片,每臺服務器執行1片。做業遍歷數據的邏輯應爲:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 若是分紅10片,則做業遍歷數據的邏輯應爲:每片分到的分片項應爲ID%10,而服務器A被分配到分片項0,1,2,3,4;服務器B被分配到分片項5,6,7,8,9,直接的結果就是服務器A遍歷ID以0-4結尾的數據;服務器B遍歷ID以5-9結尾的數據。安全

2. 分片項與業務處理解耦

Elastic-Job並不直接提供數據處理的功能,框架只會將分片項分配至各個運行中的做業服務器,開發者須要自行處理分片項與真實數據的對應關係。springboot

3. 個性化參數的適用場景

個性化參數即shardingItemParameter,能夠和分片項匹配對應關係,用於將分片項的數字轉換爲更加可讀的業務代碼。服務器

例如:按照地區水平拆分數據庫,數據庫A是北京的數據;數據庫B是上海的數據;數據庫C是廣州的數據。 若是僅按照分片項配置,開發者須要瞭解0表示北京;1表示上海;2表示廣州。 合理使用個性化參數可讓代碼更可讀,若是配置爲0=北京,1=上海,2=廣州,那麼代碼中直接使用北京,上海,廣州的枚舉值便可完成分片項和業務邏輯的對應關係。數據結構

核心理念

1. 分佈式調度

Elastic-Job-Lite並沒有做業調度中心節點,而是基於部署做業框架的程序在到達相應時間點時各自觸發調度。架構

註冊中心僅用於做業註冊和監控信息存儲。而主做業節點僅用於處理分片和清理等功能。

2. 做業高可用

Elastic-Job-Lite提供最安全的方式執行做業。將分片總數設置爲1,並使用多於1臺的服務器執行做業,做業將會以1主n從的方式執行。

一旦執行做業的服務器崩潰,等待執行的服務器將會在下次做業啓動時替補執行。開啓失效轉移功能效果更好,能夠保證在本次做業執行時崩潰,備機當即啓動替補執行。

3. 最大限度利用資源

Elastic-Job-Lite也提供最靈活的方式,最大限度的提升執行做業的吞吐量。將分片項設置爲大於服務器的數量,最好是大於服務器倍數的數量,做業將會合理的利用分佈式資源,動態的分配分片項。

例如:3臺服務器,分紅10片,則分片項分配結果爲服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。 若是服務器C崩潰,則分片項分配結果爲服務器A=0,1,2,3,4;服務器B=5,6,7,8,9。在不丟失分片項的狀況下,最大限度的利用現有資源提升吞吐量。

總體架構圖

Elastic-Job-Lite Architecture

做業分片策略

框架提供的分片策略

AverageAllocationJobShardingStrategy

全路徑:

com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy

策略說明:

基於平均分配算法的分片策略,也是默認的分片策略。

若是分片不能整除,則不能整除的多餘分片將依次追加到序號小的服務器。如:

若是有3臺服務器,分紅9片,則每臺服務器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]

若是有3臺服務器,分紅8片,則每臺服務器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]

若是有3臺服務器,分紅10片,則每臺服務器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]

OdevitySortByNameJobShardingStrategy

全路徑:

com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy

策略說明:

根據做業名的哈希值奇偶數決定IP升降序算法的分片策略。

做業名的哈希值爲奇數則IP升序。

做業名的哈希值爲偶數則IP降序。

用於不一樣的做業平均分配負載至不一樣的服務器。

AverageAllocationJobShardingStrategy的缺點是,一旦分片數小於做業服務器數,做業將永遠分配至IP地址靠前的服務器,致使IP地址靠後的服務器空閒。而OdevitySortByNameJobShardingStrategy則能夠根據做業名稱從新分配服務器負載。如:

若是有3臺服務器,分紅2片,做業名稱的哈希值爲奇數,則每臺服務器分到的分片是:1=[0], 2=[1], 3=[]

若是有3臺服務器,分紅2片,做業名稱的哈希值爲偶數,則每臺服務器分到的分片是:3=[0], 2=[1], 1=[]

RotateServerByNameJobShardingStrategy

全路徑:

com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy

策略說明:

根據做業名的哈希值對服務器列表進行輪轉的分片策略。

自定義分片策略

實現JobShardingStrategy接口並實現sharding方法,接口方法參數爲做業服務器IP列表和分片策略選項,分片策略選項包括做業名稱,分片總數以及分片序列號和個性化參數對照表,能夠根據需求定製化本身的分片策略。

歡迎將分片策略以插件的形式貢獻至com.dangdang.ddframe.job.lite.api.strategy包。

配置分片策略

與配置一般的做業屬性相同,在spring命名空間或者JobConfiguration中配置jobShardingStrategyClass屬性,屬性值是做業分片策略類的全路徑。

 

實現原理

彈性分佈式實現

  • 第一臺服務器上線觸發主服務器選舉。主服務器一旦下線,則從新觸發選舉,選舉過程當中阻塞,只有主服務器選舉完成,纔會執行其餘任務。

  • 某做業服務器上線時會自動將服務器信息註冊到註冊中心,下線時會自動更新服務器狀態。

  • 主節點選舉,服務器上下線,分片總數變動均更新從新分片標記。

  • 定時任務觸發時,如需從新分片,則經過主服務器分片,分片過程當中阻塞,分片結束後纔可執行任務。如分片過程當中主服務器下線,則先選舉主服務器,再分片。

  • 經過上一項說明可知,爲了維持做業運行時的穩定性,運行過程當中只會標記分片狀態,不會從新分片。分片僅可能發生在下次任務觸發前。

  • 每次分片都會按服務器IP排序,保證分片結果不會產生較大波動。

  • 實現失效轉移功能,在某臺服務器執行完畢後主動抓取未分配的分片,而且在某臺服務器下線後主動尋找可用的服務器執行任務。

註冊中心數據結構

註冊中心在定義的命名空間下,建立做業名稱節點,用於區分不一樣做業,因此做業一旦建立則不能修改做業名稱,若是修更名稱將視爲新的做業。做業名稱節點下又包含4個數據子節點,分別是config, instances, sharding, servers和leader。

config節點

做業配置信息,以JSON格式存儲

instances節點

做業運行實例信息,子節點是當前做業運行實例的主鍵。做業運行實例主鍵由做業運行服務器的IP地址和PID構成。做業運行實例主鍵均爲臨時節點,看成業實例上線時註冊,下線時自動清理。註冊中心監控這些節點的變化來協調分佈式做業的分片以及高可用。 可在做業運行實例節點寫入TRIGGER表示該實例當即執行一次。

sharding節點

做業分片信息,子節點是分片項序號,從零開始,至分片總數減一。分片項序號的子節點存儲詳細信息。每一個分片項下的子節點用於控制和記錄分片運行狀態。節點詳細信息說明:

子節點名 臨時節點 描述
instance 執行該分片項的做業運行實例主鍵
running 分片項正在運行的狀態
僅配置monitorExecution時有效
failover 若是該分片項被失效轉移分配給其餘做業服務器,則此節點值記錄執行此分片的做業服務器IP
misfire 是否開啓錯過任務從新執行
disabled 是否禁用此分片項

servers節點

做業服務器信息,子節點是做業服務器的IP地址。可在IP地址節點寫入DISABLED表示該服務器禁用。 在新的cloud native架構下,servers節點大幅弱化,僅包含控制服務器是否能夠禁用這一功能。爲了更加純粹的實現job核心,servers功能將來可能刪除,控制服務器是否禁用的能力應該下放至自動化部署系統。

leader節點

做業服務器主節點信息,分爲election,sharding和failover三個子節點。分別用於主節點選舉,分片和失效轉移處理。

leader節點是內部使用的節點,若是對做業框架原理不感興趣,可不關注此節點。

子節點名 臨時節點 描述
election\instance 主節點服務器IP地址
一旦該節點被刪除將會觸發從新選舉
從新選舉的過程當中一切主節點相關的操做都將阻塞
election\latch 主節點選舉的分佈式鎖
爲curator的分佈式鎖使用
sharding\necessary 是否須要從新分片的標記
若是分片總數變化,或做業服務器節點上下線或啓用/禁用,以及主節點選舉,會觸發設置重分片標記
做業在下次執行時使用主節點從新分片,且中間不會被打斷
做業執行時不會觸發分片
sharding\processing 主節點在分片時持有的節點
若是有此節點,全部的做業執行都將阻塞,直至分片結束
主節點分片結束或主節點崩潰會刪除此臨時節點
failover\items\分片項 一旦有做業崩潰,則會向此節點記錄
當有空閒做業服務器時,會今後節點抓取需失效轉移的做業項
failover\items\latch 分配失效轉移分片項時佔用的分佈式鎖
爲curator的分佈式鎖使用

流程圖

做業啓動

做業啓動

做業執行

做業執行

在springboot項目中運用 elasticjob 步驟:

1.maven 或者 gradle 中

jar包

2.註冊到zookeeper中

1)

配置文件中:

#elastic job
transformer.job.enabled=false
transformer.job.zk-nodes=192.168.103.128
transformer.job.namespace=transformer

2) lite 實現simpleJob

@Slf4j
public abstract class AbstractConfigSimpleJob implements SimpleJob {

    protected volatile boolean inited;
    @Setter
    @Getter
    private boolean enable;
    @Setter
    @Getter
    private String cron;
    @Getter
    @Setter
    private Integer shards = 1;
    @Getter
    @Setter
    private String name;
    private ZookeeperRegistryCenter regCenter;

    @Autowired(required = false)
    public void setRegCenter(ZookeeperRegistryCenter regCenter) {
        this.regCenter = regCenter;
    }

    @PostConstruct
    public void init() {
        if (regCenter != null) {

            if (inited) {
//                SpringCloud Context 初始化兩次
                log.info("當前Job已經初始化,不須要重複註冊");
            } else {
                LiteJobConfiguration jobConfig = simpleJobConfigBuilder(this.getName(),
                        this.getClass(),
                        this.getShards(),
                        this.getCron()).overwrite(true).build();
                new SpringJobScheduler(this, regCenter, jobConfig).init();
                this.inited = true;
            }
        }
    }

    private LiteJobConfiguration.Builder simpleJobConfigBuilder(
            final String jobName,
            final Class<? extends SimpleJob> jobClass,
            final int shardingTotalCount,
            final String cron) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build(), jobClass.getCanonicalName()));
    }

 

3)自定義的job (多個)

@Component
@Slf4j
@ConfigurationProperties(prefix = "transformer.job.handle-event")
@ConditionalOnProperty(prefix = "transformer.job.handle-event", name = "enable", havingValue = "true")
public class HandleEventJob extends AbstractEventJob {

    @Value("${transformer.job.search-counts}")
    private int searchCounts;
    @Autowired
    private SystemConfig systemConfig;
    @Autowired
    private JobService jobService;
    @Autowired
    private JobDependService jobDependService;
    @Autowired
    private SubEventManager subEventManager;


    @Override
    public void execute(ShardingContext shardingContext) {
        int shardingItem = shardingContext.getShardingItem();
        int shardingTotalCount = shardingContext.getShardingTotalCount();


        // 未處理事件標記
        Byte flag = (byte) 0;
        // 處理事件
        List<SubEventPO> transEvents = subEventManager.getNecessaryEventsByFlag(flag, searchCounts, systemConfig.getTryTimes());
        log.info("推送事件begin,當前shardingItem = {},shardingTotalCount = {}", shardingItem, shardingTotalCount);

        List<SubEventPO> eventsLocalRound = transEvents.stream()
                .filter(transEvent -> (transEvent.getUserId() % shardingTotalCount) == shardingItem).collect(Collectors.toList());

        if (systemConfig.isRelyOn()) {
            log.info("事件依賴開關已打開,事件要按照依賴順序發送!");
            jobDependService.execute(eventsLocalRound);
        } else {
            log.info("事件依賴開關已關閉,事件發送無前後順序!");
            jobService.execute(eventsLocalRound);
        }

    }

 

每個job配置:

#事件中心消息發送job
transformer.job.handle-event.enable=false #是否執行
#cron 表達式 兩分鐘執行一次
transformer.job.handle-event.cron=0 0/2 * * * ? 
#shards 分片數量 3
transformer.job.handle-event.shards=3

1:分片策略使用默認的平均分配策略

2:進入job,搜索到全部的數據,每一臺服務器會有對應的片數

List<SubEventPO> eventsLocalRound = transEvents.stream() .filter(transEvent -> (transEvent.getUserId() % shardingTotalCount) == shardingItem).collect(Collectors.toList());

徹底搞定

相關文章
相關標籤/搜索