(繼續貼一篇以前寫的經驗案例)編程
其實這是一次失敗的項目,雖然最後仍是作出來了,可是付出了很大代價。而且須要較深刻的踩坑改造elastic-job,致使代碼的可讀性,可維護性也不好。json
事實證實 elastic-job lite 並不適合用於作 須要長時間運行(可能幾小時或幾天不中止)的做業調度。服務器
1、 elastic-job 簡介多線程
Elastic-Job是噹噹推出的一個開源分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。Elastic-Job-Lite定位爲輕量級無中心化解決方案。分佈式
詳見官網介紹,傳送門http://elasticjob.io/docs/elastic-job-lite/00-overview/intro/ide
本文從編程踩坑和大量測試中提煉,講解目的在於幫助有基礎的開發者理解elastic-job真實的運行邏輯,解答編程中官方文檔未說起的疑惑,避免重複踩坑。基礎的概念和高深的主從節點選舉失效轉移不是本文要講的內容,請移步官方文檔,本文更專一於怎麼用對。測試
2、 elastic-job 與 zookeeperspa
elastic-job 基於zk實現分佈式協調,重要的做業信息都被存儲在了zk上,如圖:線程
algosmElasticJobs 節點是自定義的命名空間的名稱。server
nameLib-295是做業名
servers 經過子節點記錄做業在哪幾臺機器上正在運行,一個機器對應一個子節點,運行結束後,servers子節點會被刪除。
instances 記錄了job實例與機器的關係
config 存儲做業的配置信息
sharding 展開 是做業的分片信息,及每一個分片運行在哪一個機器上,與運行狀態。
每一個做業對應命名空間algosmElasticJobs下的一個子節點(節點名稱爲做業名)
3、 elastic-job 做業、分片與線程
做業運行起來後,服務器、實例、線程、分片的關係圖:
(2臺服務器,每臺運行2個做業,每一個做業4個分片)
1)做業 job : 實現做業邏輯的class類,需重寫execute(ShardingContext shardingContext) 方法。
elastic-job建立一個做業時,會在當前服務器上拉起一個做業class的實例,並須要爲該實例指定惟一的做業名稱。(以下圖所示做業名稱各不相同)
同一個class能夠建立多個實例,從而生成多個做業(各個做業的名稱不一樣,做業自定義配置也可設置的不同)。
以下圖 nameLib 開頭的是由ClassA 生成, makelabel是由ClassB生成。
2)分片 sharding:
建立做業時,須要設置這個做業有多少個分片來執行,若是做業A在機器1上得到了兩個分片,那麼這兩個分片其實是兩個線程(注意是一臺機器上得到兩個分片,每臺機器上裝一個elastic-job 服務的狀況下),這兩個線程共用的是同一個class實例,也就是說這兩個分片 會共享 此class實例的成員變量。分片1修改了實例的成員變量,就會被分片2讀到,從而影響分片2的做業邏輯。
若是想要爲每一個分片設置獨享的變量,從而不受到其餘分片影響,那麼就須要用到線程變量。
方法是,在該class中定義線程變量,用法以下:
/**
* 與線程相關的變量, key 線程號
*/
private Map<Long,JobBaseParam> threadParam = new ConcurrentHashMap<>();
//初始化線程變量
private void initParam(ShardingContext shardingContext){
//elasticJob 單實例多線程,每次拉起須要清理線程上次殘留的狀態
JobBaseParam jobBaseParam = new JobBaseParam();
jobBaseParam.setShardingItem(shardingContext.getShardingItem());
jobBaseParam.setCompletedActive(false);
jobBaseParam.setOver(true);
jobBaseParam.setReceiveOver(false);
threadParam.put(Thread.currentThread().getId(),jobBaseParam);
jobName = shardingContext.getJobName();
}
//使用線程變量
threadParam.get(Thread.currentThread().getId()).setCompletedActive(true);
threadParam.get(masterThreadId).getReceiveOver()
線程變量使用時,需當心:在哪一個線程裏threadParam.put的變量,就須要在哪一個線程裏threadParam.get,例如在主線程裏put變量,而後在子線程裏get變量,就會get不到,產生邏輯錯誤。
關於做業Class中的靜態變量,該靜態變量將會被由改class new出來的全部做業分片讀到,做用域範圍最大。
jobClass不一樣變量做用域:
變量類型 |
靜態變量 |
成員變量 |
線程變量 |
做用域 |
全部做業實例、全部分片 |
當前做業實例的全部分片 |
當前分片 |
若是想要長久保留分片要用的變量,每次分片拉起時自動從上一次狀態繼續,能夠將與分片相關的變量存儲到zk上,做業對應的分片節點下面,相似:
algosm是自定義的前綴標識,以與elastic-job管理的節點區分。注意分片下加自定義節點,是不會影響elastic-job運行的,也不會被elastic-job 清除,是可行的方案。
4、 elastic-job 分片與失效轉移
想要做業A失效轉移生效,前提是每臺服務器上都要在運行着做業A。
分片序號從0開始,當服務器1同時得到兩個分片,分片1執行完畢,分片2未結束的狀況下,分片1不會被再次觸發,一直要等到分片2結束。
經典模式,正常運行時,會隨機分片,致使做業分片在不一樣機器上切換。
做業的兩個分片在同一臺服務器上時,分片1與分片2用的是同一實例,不一樣線程,如有狀態殘留在實例的成員變量中,須要當心,建議分片每次運行都要初始化一次狀態。
5、 elastic-job 做業重啓恢復
elastic-job 若是發生重啓,是不會自動將做業拉起的,雖然其做業配置存儲到了zk上,須要自行實現重啓,拉起做業功能。
實現要點,是要判斷做業是否爲異常結束,非正常銷燬的做業,會在servers節點下殘留子節點,若是servers的子節點不爲空,說明是異常中止,須要被拉起
config節點中存儲了做業的配置信息
實戰代碼以下:
(代碼中jobPool是自行實現的一個做業池用來管理做業的實例,jobPoolLock是自行實現的細粒度鎖)
@Override
public Boolean loadPreJob(){
try {
String rootPath = "/"+algosmJobConfig.getRegCenterNamespace();
if(!zkClient.exists(rootPath)){
logger.info("初次運行,未發現{}",rootPath);
return true;
}
List<String> zkJobList = zkClient.getChildren(rootPath);
if(!CollectionUtils.isEmpty(zkJobList)){
for(String jobName : zkJobList){
try{
String jobPath = rootPath+"/"+jobName;
//只要servers 不是空的,就說明做業非正常終止,須要將做業拉起來
if(!CollectionUtils.isEmpty(zkClient.getChildren(jobPath+"/servers"))){
//說明是非正常結束的job,須要拉起
String jobConfig = zkClient.readData(jobPath+"/config",true);
if(StringUtils.isEmpty(jobConfig)){
logger.warn("job=[{}] config爲空",jobName);
}else{
//取出參數
JsonNode jobNode = jsonTool.readTree(jobConfig);
String jobParam = jobNode.get("jobParameter").textValue();
//建立任務
String[] jobInfo = jobName.split("-");
if(jobInfo.length != 2 || AlgosmJobType.trans(jobInfo[0])==null){
logger.warn("jobName={} 命名非法",jobName);
}else{
jobPoolLock.lock(jobName);
try{
logger.info("初始化-加載job {}",jobName);
if(jobPool.containsKey(jobName)){
logger.warn("job={}已存在,跳過!",jobName);
}else{
JobEntityConfig jobEntityConfig = jobControl.createJob(jobName,AlgosmJobType.trans(jobInfo[0]),jobParam);
JobScheduler jobScheduler = new SpringJobScheduler(jobEntityConfig.getJobEntity(),
zookeeperRegistryCenter, jobEntityConfig.getJobConfiguration(), new AlgosmElasticJobListener());
jobScheduler.init();
jobPool.put(jobName,jobEntityConfig.getJobEntity());
logger.info("加載job={}成功! param={}",jobName,jobParam);
}
}finally {
jobPoolLock.unlock(jobName);
}
}
}
}
}catch (Exception e){
logger.error("加載做業失敗!jobName={}",jobName,e);
continue;
}
}
}
}catch (Exception e){
logger.error("初始化加載做業失敗!",e);
return false;
}
return true;
}
6、 elastic-job 分佈式做業控制與任務狀態統計
如何控制全部機器上做業的啓停,如何獲取當前做業的運行狀態,考慮到做業是運行在多臺機器上的,因此掛了一臺,做業並不算中止。做業運行也並不是全部機器都在跑就算運行,做業跑在不一樣機器上,每一個機器上又可能不止一個分片全部分片的任務統計數據疊加,纔算是做業準確的統計數據。
這塊是須要自行實現的,elastic-job是不支持的。筆者已實現該部分功能,限於篇幅與時間限制,等下篇再講述。(賣個關子,從下圖分片中的自定義節點命名可看出一二)。