elastic-job lite 編程實戰經驗

(繼續貼一篇以前寫的經驗案例)編程

                                                                       elastic-job lite 編程實戰經驗

其實這是一次失敗的項目,雖然最後仍是作出來了,可是付出了很大代價。而且須要較深刻的踩坑改造elastic-job,致使代碼的可讀性,可維護性也不好。json

事實證實 elastic-job lite 並不適合用於作 須要長時間運行(可能幾小時或幾天不中止)的做業調度。服務器

 

1、           elastic-job 簡介多線程

Elastic-Job是噹噹推出的一個開源分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。Elastic-Job-Lite定位爲輕量級無中心化解決方案。分佈式

詳見官網介紹,傳送門http://elasticjob.io/docs/elastic-job-lite/00-overview/intro/ide

 

本文從編程踩坑和大量測試中提煉,講解目的在於幫助有基礎的開發者理解elastic-job真實的運行邏輯,解答編程中官方文檔未說起的疑惑,避免重複踩坑。基礎的概念和高深的主從節點選舉失效轉移不是本文要講的內容,請移步官方文檔,本文更專一於怎麼用對。測試

2、           elastic-job zookeeperspa

elastic-job 基於zk實現分佈式協調,重要的做業信息都被存儲在了zk上,如圖:線程

algosmElasticJobs 節點是自定義的命名空間的名稱。server

nameLib-295是做業名

servers 經過子節點記錄做業在哪幾臺機器上正在運行,一個機器對應一個子節點,運行結束後,servers子節點會被刪除。

instances 記錄了job實例與機器的關係

config 存儲做業的配置信息

sharding 展開 是做業的分片信息,及每一個分片運行在哪一個機器上,與運行狀態。

 

每一個做業對應命名空間algosmElasticJobs下的一個子節點(節點名稱爲做業名)

 

 

 

 

3、           elastic-job 做業、分片與線程

做業運行起來後,服務器、實例、線程、分片的關係圖:

(2臺服務器,每臺運行2個做業,每一個做業4個分片)

 

 

1)做業 job 實現做業邏輯的class類,需重寫execute(ShardingContext shardingContext) 方法。

elastic-job建立一個做業時,會在當前服務器上拉起一個做業class的實例,並須要爲該實例指定惟一的做業名稱。(以下圖所示做業名稱各不相同)

同一個class能夠建立多個實例,從而生成多個做業(各個做業的名稱不一樣,做業自定義配置也可設置的不同)。

以下圖 nameLib 開頭的是由ClassA 生成, makelabel是由ClassB生成。

 

 

 

2)分片 sharding: 

建立做業時,須要設置這個做業有多少個分片來執行,若是做業A在機器1上得到了兩個分片,那麼這兩個分片其實是兩個線程(注意是一臺機器上得到兩個分片,每臺機器上裝一個elastic-job 服務的狀況下),這兩個線程共用的是同一個class實例,也就是說這兩個分片 會共享 此class實例的成員變量。分片1修改了實例的成員變量,就會被分片2讀到,從而影響分片2的做業邏輯。

 

若是想要爲每一個分片設置獨享的變量,從而不受到其餘分片影響,那麼就須要用到線程變量。

         方法是,在該class中定義線程變量,用法以下:

 

/**
 *
與線程相關的變量, key 線程號
 
*/
private Map<Long,JobBaseParam> threadParam = new ConcurrentHashMap<>();

 

//初始化線程變量
private void initParam(ShardingContext shardingContext){
    //elasticJob 單實例多線程,每次拉起須要清理線程上次殘留的狀態
   
JobBaseParam jobBaseParam = new JobBaseParam();
    jobBaseParam.setShardingItem(shardingContext.getShardingItem());
    jobBaseParam.setCompletedActive(false);
    jobBaseParam.setOver(true);
    jobBaseParam.setReceiveOver(false);
    threadParam.put(Thread.currentThread().getId(),jobBaseParam);
    jobName = shardingContext.getJobName();

}

 

//使用線程變量

threadParam.get(Thread.currentThread().getId()).setCompletedActive(true);
threadParam.get(masterThreadId).getReceiveOver()

 

線程變量使用時,需當心:在哪一個線程裏threadParam.put的變量,就須要在哪一個線程裏threadParam.get,例如在主線程裏put變量,而後在子線程裏get變量,就會get不到,產生邏輯錯誤。

 

關於做業Class中的靜態變量,該靜態變量將會被由改class new出來的全部做業分片讀到,做用域範圍最大。

 

jobClass不一樣變量做用域:    

變量類型

靜態變量

成員變量

線程變量

做用域

全部做業實例、全部分片

當前做業實例的全部分片

當前分片

                                           

若是想要長久保留分片要用的變量,每次分片拉起時自動從上一次狀態繼續,能夠將與分片相關的變量存儲到zk上,做業對應的分片節點下面,相似:

 

 

algosm是自定義的前綴標識,以與elastic-job管理的節點區分。注意分片下加自定義節點,是不會影響elastic-job運行的,也不會被elastic-job 清除,是可行的方案。

 

4、           elastic-job 分片與失效轉移

 

想要做業A失效轉移生效,前提是每臺服務器上都要在運行着做業A。

分片序號從0開始,當服務器1同時得到兩個分片,分片1執行完畢,分片2未結束的狀況下,分片1不會被再次觸發,一直要等到分片2結束。

經典模式,正常運行時,會隨機分片,致使做業分片在不一樣機器上切換。

做業的兩個分片在同一臺服務器上時,分片1與分片2用的是同一實例,不一樣線程,如有狀態殘留在實例的成員變量中,須要當心,建議分片每次運行都要初始化一次狀態。

 

 

 

 

5、           elastic-job 做業重啓恢復

 

elastic-job 若是發生重啓,是不會自動將做業拉起的,雖然其做業配置存儲到了zk上,須要自行實現重啓,拉起做業功能。

 

 

實現要點,是要判斷做業是否爲異常結束,非正常銷燬的做業,會在servers節點下殘留子節點,若是servers的子節點不爲空,說明是異常中止,須要被拉起

config節點中存儲了做業的配置信息

實戰代碼以下:

(代碼中jobPool是自行實現的一個做業池用來管理做業的實例,jobPoolLock是自行實現的細粒度鎖)

@Override
public  Boolean loadPreJob(){
    try {
        String rootPath = "/"+algosmJobConfig.getRegCenterNamespace();
        if(!zkClient.exists(rootPath)){
            logger.info("初次運行,未發現{}",rootPath);
            return true;
        }
        List<String> zkJobList = zkClient.getChildren(rootPath);
        if(!CollectionUtils.isEmpty(zkJobList)){
            for(String jobName : zkJobList){
                try{
                    String jobPath = rootPath+"/"+jobName;
                    //只要servers 不是空的,就說明做業非正常終止,須要將做業拉起來
                   
if(!CollectionUtils.isEmpty(zkClient.getChildren(jobPath+"/servers"))){
                        //說明是非正常結束的job,須要拉起
                       
String jobConfig = zkClient.readData(jobPath+"/config",true);
                        if(StringUtils.isEmpty(jobConfig)){
                            logger.warn("job=[{}] config爲空",jobName);
                        }else{
                            //取出參數
                           
JsonNode jobNode = jsonTool.readTree(jobConfig);
                            String jobParam = jobNode.get("jobParameter").textValue();
                            //建立任務
                           
String[] jobInfo = jobName.split("-");
                            if(jobInfo.length != 2 || AlgosmJobType.trans(jobInfo[0])==null){
                                logger.warn("jobName={} 命名非法",jobName);
                            }else{
                                jobPoolLock.lock(jobName);
                                try{
                                    logger.info("初始化-加載job {}",jobName);
                                    if(jobPool.containsKey(jobName)){
                                        logger.warn("job={}已存在,跳過!",jobName);
                                    }else{
                                        JobEntityConfig jobEntityConfig = jobControl.createJob(jobName,AlgosmJobType.trans(jobInfo[0]),jobParam);
                                        JobScheduler jobScheduler = new SpringJobScheduler(jobEntityConfig.getJobEntity(),
                                                zookeeperRegistryCenter, jobEntityConfig.getJobConfiguration(), new AlgosmElasticJobListener());
                                        jobScheduler.init();
                                        jobPool.put(jobName,jobEntityConfig.getJobEntity());
                                        logger.info("加載job={}成功! param={}",jobName,jobParam);
                                    }

                                }finally {
                                    jobPoolLock.unlock(jobName);
                                }
                            }
                        }

                    }
                }catch (Exception e){
                    logger.error("加載做業失敗!jobName={}",jobName,e);
                    continue;
                }

            }
        }
    }catch (Exception e){
        logger.error("初始化加載做業失敗!",e);
        return false;
    }

    return true;
}

 

 

6、           elastic-job 分佈式做業控制與任務狀態統計

 

如何控制全部機器上做業的啓停,如何獲取當前做業的運行狀態,考慮到做業是運行在多臺機器上的,因此掛了一臺,做業並不算中止。做業運行也並不是全部機器都在跑就算運行,做業跑在不一樣機器上,每一個機器上又可能不止一個分片全部分片的任務統計數據疊加,纔算是做業準確的統計數據。

這塊是須要自行實現的,elastic-job是不支持的。筆者已實現該部分功能,限於篇幅與時間限制,等下篇再講述。(賣個關子,從下圖分片中的自定義節點命名可看出一二)。

 

相關文章
相關標籤/搜索