分佈式任務調度框架

1. 框架概況:

LTS是一個輕量級分佈式任務調度框架。有三種角色, JobClient, JobTracker, TaskTracker。各個節點都是無狀態的,能夠部署多個,來實現負載均衡,實現更大的負載量, 而且框架具備很好的容錯能力。 採用多種註冊中心(Zookeeper,redis等)進行節點信息暴露,master選舉。(Mongo or Mysql)存儲任務隊列和任務執行日誌, netty作底層通訊。java

  • JobClient : 主要負責提交任務, 和 接收任務執行反饋結果。git

  • JobTracker : 負責接收並分配任務,任務調度。github

  • TaskTracker: 負責執行任務,執行完反饋給JobTracker。redis

支持任務類型:sql

  • 實時任務api

  • 也支持定時任務 (如:3天以後執行)多線程

  • CronExpression (如:0 0/1 * * * ?)架構

感興趣,請加QQ羣:109500214 一塊兒探討、完善。而且記得star一下哈,3Q負載均衡

github地址:https://github.com/qq254963746/light-task-scheduler框架

2.架構圖

Aaron Swartz

2.1節點組:

  • 1. 一個節點組等同於一個集羣,同一個節點組中的各個節點是對等的,外界不管鏈接節點組中的任務一個節點都是能夠的。

  • 2. 每一個節點組中都有一個master節點(master宕機,會自動選舉出新的master節點),框架會提供接口API來監聽master節點的變化,用戶能夠本身使用master節點作本身想作的事情。

  • 3. JobClient和TaskTracker均可以存在多個節點組。譬如 JobClient 能夠存在多個節點組。 譬如:JobClient 節點組爲 ‘lts_WEB’ 中的一個節點提交提交一個 只有節點組爲’lts_TRADE’的 TaskTracker 才能執行的任務。

  • 4. (每一個集羣中)JobTacker只有一個節點組。

  • 5. 多個JobClient節點組和多個TaskTracker節點組再加上一個JobTacker節點組, 組成一個大的集羣。

3. 工做流程:

  • 1. JobClient 提交一個 任務 給 JobTracker, 這裏我提供了兩種客戶端API, 一種是若是JobTracker 不存在或者提交失敗,直接返回提交失敗。另外一種客戶端是重試客戶端, 若是提交失敗,先存儲到本地FailStore(可使用NFS來達到同個節點組共享leveldb文件的目的,多線程訪問,已經作了文件鎖處理),返回給客戶端提交成功的信息,待JobTracker可用的時候,再將任務提交。

  • 2. JobTracker收到JobClient提交來的任務,將任務存入任務隊列。JobTracker等待TaskTracker的Pull請求,而後將任務Push給TaskTracker去執行。

  • 3. TaskTracker收到JobTracker分發來的任務以後,而後從線程池中拿到一個線程去執行。執行完畢以後,再反饋任務執行結果給JobTracker(成功or 失敗[失敗有失敗錯誤信息]),若是發現JobTacker不可用,那麼存儲本地FailStore,等待TaskTracker可用的時候再反饋。反饋結果的同時,詢問JobTacker有沒有新的任務要執行。

  • 4. JobTacker收到TaskTracker節點的任務結果信息。根據任務信息決定要不要反饋給客戶端。不須要反饋的直接刪除,須要反饋的,直接反饋,反饋失敗進入FeedbackQueue, 等待從新反饋。

  • 5. JobClient收到任務執行結果,進行本身想要的邏輯處理。

4. 特性

  • 負載均衡:

    • JobClient和TaskTracker但是根據本身設置的負載均衡策略來請求JobTracker節點組中的一個節點。當鏈接上後將一直保持鏈接這個節點,保持鏈接通道,直到這個節點不可用,減小每次都從新鏈接一個節點帶來的性能開銷。

  • 健壯性:

    • 當節點組中的一個節點當機以後,自動轉到其餘節點工做。當整個節點組當機以後,將會採用存儲文件的方式,待節點組可用的時候進行重發。

    • 當執行任務的TaskTracker節點當機以後,JobTracker會將這個TaskTracker上的未完成的任務(死任務),從新分配給節點組中其餘節點執行。

  • 伸縮性:

    • 由於各個節點都是無狀態的,能夠動態增長機器部署實例, 節點關注者會自動發現。

  • 擴展性:

    • 採用和dubbo同樣的SPI擴展方式,能夠實現任務隊列擴展,日誌記錄器擴展等

5. 日誌記錄

對於任務的分發,執行,還有用戶經過 (BizLogger) 【LtsLoggerFactory.getBizLogger()】 輸入的業務日誌,LTS都有記錄,用戶能夠在LTS Admin 後臺界面查看某個任務的全部日誌,能夠實時查看這個任務的執行狀況。

6. 開發計劃:

  • WEB後臺管理:性能統計分析,預警等

  • 實現LTS的分佈式隊列存儲

7. LTS Admin

Aaron Swartz

8. 調用示例

下面提供的是最簡單的配置方式。更多配置請查看 lts-example 模塊下的 API 調用方式例子.

8.1 JobTracker 端

    final JobTracker jobTracker = new JobTracker();    
    // 節點信息配置
    jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");    
    // 1. 任務隊列用mongo
    jobTracker.addConfig("job.queue", "mongo");    
    // mongo 配置
    jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); 
    jobTracker.addConfig("mongo.database", "lts");
    jobTracker.setOldDataHandler(new OldDataDeletePolicy());    
    // 啓動節點
    jobTracker.start();

8.2 TaskTracker端

    TaskTracker taskTracker = new TaskTracker();
    taskTracker.setJobRunnerClass(TestJobRunner.class);
    taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
    taskTracker.setNodeGroup("test_trade_TaskTracker");
    taskTracker.setWorkThreads(20);
    taskTracker.start();    // 任務執行類
    public class TestJobRunner implements JobRunner {        
        @Override
        public void run(Job job) throws Throwable {            
           System.out.println("我要執行"+ job);            
           System.out.println(job.getParam("shopId"));            
           // TODO 用戶本身的業務邏輯, 應該保證冪等
            try {                
                  Thread.sleep(5*1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

8.3 JobClient端

    JobClient jobClient = new RetryJobClient();    
    // final JobClient jobClient = new JobClient();
    jobClient.setNodeGroup("test_jobClient");
    jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
    jobClient.start();    // 提交任務
    Job job = new Job();
    job.setTaskId("3213213123");
    job.setParam("shopId", "11111");
    job.setTaskTrackerNodeGroup("test_trade_TaskTracker");    
    // job.setCronExpression("0 0/1 * * * ?");  
    // 支持 cronExpression表達式
    // job.setTriggerTime(new Date()); // 支持指定時間執行
    Response response = jobClient.submitJob(job);
相關文章
相關標籤/搜索