http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/node
Elastic-Job是一個分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。算法
Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務。spring
Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供最輕量級的分佈式任務的協調服務,外部依賴僅Zookeeper。數據庫
任務的分佈式執行,須要將一個任務拆分爲多個獨立的任務項,而後由分佈式的服務器分別執行某一個或幾個分片項。api
例如:有一個遍歷數據庫某張表的做業,現有2臺服務器。爲了快速的執行做業,那麼每臺服務器應執行做業的50%。 爲知足此需求,可將做業分紅2片,每臺服務器執行1片。做業遍歷數據的邏輯應爲:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 若是分紅10片,則做業遍歷數據的邏輯應爲:每片分到的分片項應爲ID%10,而服務器A被分配到分片項0,1,2,3,4;服務器B被分配到分片項5,6,7,8,9,直接的結果就是服務器A遍歷ID以0-4結尾的數據;服務器B遍歷ID以5-9結尾的數據。安全
Elastic-Job並不直接提供數據處理的功能,框架只會將分片項分配至各個運行中的做業服務器,開發者須要自行處理分片項與真實數據的對應關係。springboot
個性化參數即shardingItemParameter,能夠和分片項匹配對應關係,用於將分片項的數字轉換爲更加可讀的業務代碼。服務器
例如:按照地區水平拆分數據庫,數據庫A是北京的數據;數據庫B是上海的數據;數據庫C是廣州的數據。 若是僅按照分片項配置,開發者須要瞭解0表示北京;1表示上海;2表示廣州。 合理使用個性化參數可讓代碼更可讀,若是配置爲0=北京,1=上海,2=廣州,那麼代碼中直接使用北京,上海,廣州的枚舉值便可完成分片項和業務邏輯的對應關係。數據結構
Elastic-Job-Lite並沒有做業調度中心節點,而是基於部署做業框架的程序在到達相應時間點時各自觸發調度。架構
註冊中心僅用於做業註冊和監控信息存儲。而主做業節點僅用於處理分片和清理等功能。
Elastic-Job-Lite提供最安全的方式執行做業。將分片總數設置爲1,並使用多於1臺的服務器執行做業,做業將會以1主n從的方式執行。
一旦執行做業的服務器崩潰,等待執行的服務器將會在下次做業啓動時替補執行。開啓失效轉移功能效果更好,能夠保證在本次做業執行時崩潰,備機當即啓動替補執行。
Elastic-Job-Lite也提供最靈活的方式,最大限度的提升執行做業的吞吐量。將分片項設置爲大於服務器的數量,最好是大於服務器倍數的數量,做業將會合理的利用分佈式資源,動態的分配分片項。
例如:3臺服務器,分紅10片,則分片項分配結果爲服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。 若是服務器C崩潰,則分片項分配結果爲服務器A=0,1,2,3,4;服務器B=5,6,7,8,9。在不丟失分片項的狀況下,最大限度的利用現有資源提升吞吐量。
全路徑:
com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
策略說明:
基於平均分配算法的分片策略,也是默認的分片策略。
若是分片不能整除,則不能整除的多餘分片將依次追加到序號小的服務器。如:
若是有3臺服務器,分紅9片,則每臺服務器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
若是有3臺服務器,分紅8片,則每臺服務器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
若是有3臺服務器,分紅10片,則每臺服務器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
全路徑:
com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy
策略說明:
根據做業名的哈希值奇偶數決定IP升降序算法的分片策略。
做業名的哈希值爲奇數則IP升序。
做業名的哈希值爲偶數則IP降序。
用於不一樣的做業平均分配負載至不一樣的服務器。
AverageAllocationJobShardingStrategy的缺點是,一旦分片數小於做業服務器數,做業將永遠分配至IP地址靠前的服務器,致使IP地址靠後的服務器空閒。而OdevitySortByNameJobShardingStrategy則能夠根據做業名稱從新分配服務器負載。如:
若是有3臺服務器,分紅2片,做業名稱的哈希值爲奇數,則每臺服務器分到的分片是:1=[0], 2=[1], 3=[]
若是有3臺服務器,分紅2片,做業名稱的哈希值爲偶數,則每臺服務器分到的分片是:3=[0], 2=[1], 1=[]
全路徑:
com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy
策略說明:
根據做業名的哈希值對服務器列表進行輪轉的分片策略。
實現JobShardingStrategy接口並實現sharding方法,接口方法參數爲做業服務器IP列表和分片策略選項,分片策略選項包括做業名稱,分片總數以及分片序列號和個性化參數對照表,能夠根據需求定製化本身的分片策略。
歡迎將分片策略以插件的形式貢獻至com.dangdang.ddframe.job.lite.api.strategy包。
與配置一般的做業屬性相同,在spring命名空間或者JobConfiguration中配置jobShardingStrategyClass屬性,屬性值是做業分片策略類的全路徑。
第一臺服務器上線觸發主服務器選舉。主服務器一旦下線,則從新觸發選舉,選舉過程當中阻塞,只有主服務器選舉完成,纔會執行其餘任務。
某做業服務器上線時會自動將服務器信息註冊到註冊中心,下線時會自動更新服務器狀態。
主節點選舉,服務器上下線,分片總數變動均更新從新分片標記。
定時任務觸發時,如需從新分片,則經過主服務器分片,分片過程當中阻塞,分片結束後纔可執行任務。如分片過程當中主服務器下線,則先選舉主服務器,再分片。
經過上一項說明可知,爲了維持做業運行時的穩定性,運行過程當中只會標記分片狀態,不會從新分片。分片僅可能發生在下次任務觸發前。
每次分片都會按服務器IP排序,保證分片結果不會產生較大波動。
實現失效轉移功能,在某臺服務器執行完畢後主動抓取未分配的分片,而且在某臺服務器下線後主動尋找可用的服務器執行任務。
註冊中心在定義的命名空間下,建立做業名稱節點,用於區分不一樣做業,因此做業一旦建立則不能修改做業名稱,若是修更名稱將視爲新的做業。做業名稱節點下又包含4個數據子節點,分別是config, instances, sharding, servers和leader。
做業配置信息,以JSON格式存儲
做業運行實例信息,子節點是當前做業運行實例的主鍵。做業運行實例主鍵由做業運行服務器的IP地址和PID構成。做業運行實例主鍵均爲臨時節點,看成業實例上線時註冊,下線時自動清理。註冊中心監控這些節點的變化來協調分佈式做業的分片以及高可用。 可在做業運行實例節點寫入TRIGGER表示該實例當即執行一次。
做業分片信息,子節點是分片項序號,從零開始,至分片總數減一。分片項序號的子節點存儲詳細信息。每一個分片項下的子節點用於控制和記錄分片運行狀態。節點詳細信息說明:
子節點名 | 臨時節點 | 描述 |
---|---|---|
instance | 否 | 執行該分片項的做業運行實例主鍵 |
running | 是 | 分片項正在運行的狀態 僅配置monitorExecution時有效 |
failover | 是 | 若是該分片項被失效轉移分配給其餘做業服務器,則此節點值記錄執行此分片的做業服務器IP |
misfire | 否 | 是否開啓錯過任務從新執行 |
disabled | 否 | 是否禁用此分片項 |
做業服務器信息,子節點是做業服務器的IP地址。可在IP地址節點寫入DISABLED表示該服務器禁用。 在新的cloud native架構下,servers節點大幅弱化,僅包含控制服務器是否能夠禁用這一功能。爲了更加純粹的實現job核心,servers功能將來可能刪除,控制服務器是否禁用的能力應該下放至自動化部署系統。
做業服務器主節點信息,分爲election,sharding和failover三個子節點。分別用於主節點選舉,分片和失效轉移處理。
leader節點是內部使用的節點,若是對做業框架原理不感興趣,可不關注此節點。
子節點名 | 臨時節點 | 描述 |
---|---|---|
election\instance | 是 | 主節點服務器IP地址 一旦該節點被刪除將會觸發從新選舉 從新選舉的過程當中一切主節點相關的操做都將阻塞 |
election\latch | 否 | 主節點選舉的分佈式鎖 爲curator的分佈式鎖使用 |
sharding\necessary | 否 | 是否須要從新分片的標記 若是分片總數變化,或做業服務器節點上下線或啓用/禁用,以及主節點選舉,會觸發設置重分片標記 做業在下次執行時使用主節點從新分片,且中間不會被打斷 做業執行時不會觸發分片 |
sharding\processing | 是 | 主節點在分片時持有的節點 若是有此節點,全部的做業執行都將阻塞,直至分片結束 主節點分片結束或主節點崩潰會刪除此臨時節點 |
failover\items\分片項 | 否 | 一旦有做業崩潰,則會向此節點記錄 當有空閒做業服務器時,會今後節點抓取需失效轉移的做業項 |
failover\items\latch | 否 | 分配失效轉移分片項時佔用的分佈式鎖 爲curator的分佈式鎖使用 |
在springboot項目中運用 elasticjob 步驟:
1.maven 或者 gradle 中
jar包
2.註冊到zookeeper中
1)
配置文件中:
#elastic job transformer.job.enabled=false transformer.job.zk-nodes=192.168.103.128 transformer.job.namespace=transformer
2) lite 實現simpleJob
@Slf4j public abstract class AbstractConfigSimpleJob implements SimpleJob { protected volatile boolean inited; @Setter @Getter private boolean enable; @Setter @Getter private String cron; @Getter @Setter private Integer shards = 1; @Getter @Setter private String name; private ZookeeperRegistryCenter regCenter; @Autowired(required = false) public void setRegCenter(ZookeeperRegistryCenter regCenter) { this.regCenter = regCenter; } @PostConstruct public void init() { if (regCenter != null) { if (inited) { // SpringCloud Context 初始化兩次 log.info("當前Job已經初始化,不須要重複註冊"); } else { LiteJobConfiguration jobConfig = simpleJobConfigBuilder(this.getName(), this.getClass(), this.getShards(), this.getCron()).overwrite(true).build(); new SpringJobScheduler(this, regCenter, jobConfig).init(); this.inited = true; } } } private LiteJobConfiguration.Builder simpleJobConfigBuilder( final String jobName, final Class<? extends SimpleJob> jobClass, final int shardingTotalCount, final String cron) { return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration( JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build(), jobClass.getCanonicalName())); }
3)自定義的job (多個)
@Component @Slf4j @ConfigurationProperties(prefix = "transformer.job.handle-event") @ConditionalOnProperty(prefix = "transformer.job.handle-event", name = "enable", havingValue = "true") public class HandleEventJob extends AbstractEventJob { @Value("${transformer.job.search-counts}") private int searchCounts; @Autowired private SystemConfig systemConfig; @Autowired private JobService jobService; @Autowired private JobDependService jobDependService; @Autowired private SubEventManager subEventManager; @Override public void execute(ShardingContext shardingContext) { int shardingItem = shardingContext.getShardingItem(); int shardingTotalCount = shardingContext.getShardingTotalCount(); // 未處理事件標記 Byte flag = (byte) 0; // 處理事件 List<SubEventPO> transEvents = subEventManager.getNecessaryEventsByFlag(flag, searchCounts, systemConfig.getTryTimes()); log.info("推送事件begin,當前shardingItem = {},shardingTotalCount = {}", shardingItem, shardingTotalCount); List<SubEventPO> eventsLocalRound = transEvents.stream() .filter(transEvent -> (transEvent.getUserId() % shardingTotalCount) == shardingItem).collect(Collectors.toList()); if (systemConfig.isRelyOn()) { log.info("事件依賴開關已打開,事件要按照依賴順序發送!"); jobDependService.execute(eventsLocalRound); } else { log.info("事件依賴開關已關閉,事件發送無前後順序!"); jobService.execute(eventsLocalRound); } }
每個job配置:
#事件中心消息發送job transformer.job.handle-event.enable=false #是否執行 #cron 表達式 兩分鐘執行一次 transformer.job.handle-event.cron=0 0/2 * * * ? #shards 分片數量 3 transformer.job.handle-event.shards=3
1:分片策略使用默認的平均分配策略
2:進入job,搜索到全部的數據,每一臺服務器會有對應的片數
List<SubEventPO> eventsLocalRound = transEvents.stream() .filter(transEvent -> (transEvent.getUserId() % shardingTotalCount) == shardingItem).collect(Collectors.toList());
徹底搞定