LTS 輕量級分佈式任務調度框架(Light Task Scheduler)用戶文檔.md

LTS用戶文檔

LTS(light-task-scheduler)主要用於解決分佈式任務調度問題,支持實時任務,定時任務和Cron任務。有較好的伸縮性,擴展性,健壯穩定性而被多家公司使用,同時也但願開源愛好者一塊兒貢獻。java

項目地址

github地址: https://github.com/qq254963746/light-task-schedulernode

oschina地址: http://git.oschina.net/hugui/light-task-schedulermysql

這兩個地址都會同步更新。感興趣,請加QQ羣:109500214 一塊兒探討、完善。越多人支持,就越有動力去更新,喜歡記得右上角star哈。linux

框架概況

LTS 有主要有如下四種節點:git

  • JobClient:主要負責提交任務, 並接收任務執行反饋結果。
  • JobTracker:負責接收並分配任務,任務調度。
  • TaskTracker:負責執行任務,執行完反饋給JobTracker。
  • LTS-Admin:(管理後臺)主要負責節點管理,任務隊列管理,監控管理等。

其中JobClinet,JobTracker,TaskTracker節點都是無狀態的。 能夠部署多個並動態的進行刪減,來實現負載均衡,實現更大的負載量, 而且框架採用FailStore策略使LTS具備很好的容錯能力。github

LTS註冊中心提供多種實現(Zookeeper,redis等),註冊中心進行節點信息暴露,master選舉。(Mongo or Mysql)存儲任務隊列和任務執行日誌, netty or mina作底層通訊, 並提供多種序列化方式fastjson, hessian2, java等。redis

LTS支持任務類型:spring

  • 實時任務:提交了以後當即就要執行的任務。
  • 定時任務:在指定時間點執行的任務,譬如 今天3點執行(單次)。
  • Cron任務:CronExpression,和quartz相似(可是不是使用quartz實現的)譬如 0 0/1 * * * ?

架構圖

LTS architecture

概念說明

###節點組sql

  1. 英文名稱 NodeGroup,一個節點組等同於一個小的集羣,同一個節點組中的各個節點是對等的,等效的,對外提供相同的服務。
  2. 每一個節點組中都有一個master節點,這個master節點是由LTS動態選出來的,當一個master節點掛掉以後,LTS會立馬選出另一個master節點,框架提供API監聽接口給用戶。

###FailStoreshell

  1. 顧名思義,這個主要是用於失敗了存儲的,主要用於節點容錯,當遠程數據交互失敗以後,存儲在本地,等待遠程通訊恢復的時候,再將數據提交。
  2. FailStore主要用戶JobClient的任務提交,TaskTracker的任務反饋,TaskTracker的業務日誌傳輸的場景下。
  3. FailStore目前提供幾種實現:leveldb,rocksdb,berkeleydb,mapdb,ltsdb,用於能夠自由選擇使用哪一種,用戶也能夠採用SPI擴展使用本身的實現。

流程圖

下圖是一個標準的實時任務執行流程。

LTS progress

LTS-Admin新版界面預覽

LTS Admin 目先後臺帶有由ztajy提供的一個簡易的認證功能. 用戶名密碼在auth.cfg中,用戶自行修改.

##特性 ###一、Spring支持 LTS能夠徹底不用Spring框架,可是考慮到很用用戶項目中都是用了Spring框架,因此LTS也提供了對Spring的支持,包括Xml和註解,引入lts-spring.jar便可。 ###二、業務日誌記錄器 在TaskTracker端提供了業務日誌記錄器,供應用程序使用,經過這個業務日誌器,能夠將業務日誌提交到JobTracker,這些業務日誌能夠經過任務ID串聯起來,能夠在LTS-Admin中實時查看任務的執行進度。 ###三、SPI擴展支持 SPI擴展能夠達到零侵入,只須要實現相應的接口,並實現便可被LTS使用,目前開放出來的擴展接口有

  1. 對任務隊列的擴展,用戶能夠不選擇使用mysql或者mongo做爲隊列存儲,也能夠本身實現。
  2. 對業務日誌記錄器的擴展,目前主要支持console,mysql,mongo,用戶也能夠經過擴展選擇往其餘地方輸送日誌。

###四、故障轉移 當正在執行任務的TaskTracker宕機以後,JobTracker會立馬將分配在宕機的TaskTracker的全部任務再分配給其餘正常的TaskTracker節點執行。 ###五、節點監控 能夠對JobTracker,TaskTracker節點進行資源監控,任務監控等,能夠實時的在LTS-Admin管理後臺查看,進而進行合理的資源調配。 ###六、多樣化任務執行結果支持 LTS框架提供四種執行結果支持,EXECUTE_SUCCESSEXECUTE_FAILEDEXECUTE_LATEREXECUTE_EXCEPTION,並對每種結果採起相應的處理機制,譬如重試。

  • EXECUTE_SUCCESS: 執行成功,這種狀況,直接反饋客戶端(若是任務被設置了要反饋給客戶端)。
  • EXECUTE_FAILED:執行失敗,這種狀況,直接反饋給客戶端,不進行重試。
  • EXECUTE_LATER:稍後執行(須要重試),這種狀況,不反饋客戶端,重試策略採用1min,2min,3min的策略,默認最大重試次數爲10次,用戶能夠經過參數設置修改這個重試次數。
  • EXECUTE_EXCEPTION:執行異常, 這中狀況也會重試(重試策略,同上)

###七、FailStore容錯 採用FailStore機制來進行節點容錯,Fail And Store,不會由於遠程通訊的不穩定性而影響當前應用的運行。具體FailStore說明,請參考概念說明中的FailStore說明。

##項目編譯打包 項目主要採用maven進行構建,目前提供shell腳本的打包。 環境依賴:Java(jdk1.7) Maven

用戶使用通常分爲兩種: ###一、Maven構建 能夠經過maven命令將lts的jar包上傳到本地倉庫中。在父pom.xml中添加相應的repository,並用deploy命令上傳便可。具體引用方式能夠參考lts中的例子便可。 ###二、直接Jar引用 須要將lts的各個模塊打包成單獨的jar包,而且將全部lts依賴包引入。具體引用哪些jar包能夠參考lts中的例子便可。

##JobTracker和LTS-Admin部署 提供(cmd)windows(shell)linux兩種版本腳原本進行編譯和部署:

  1. 運行根目錄下的sh build.shbuild.cmd腳本,會在dist目錄下生成lts-{version}-bin文件夾

  2. 下面是其目錄結構,其中bin目錄主要是JobTracker和LTS-Admin的啓動腳本。jobtracker 中是 JobTracker的配置文件和須要使用到的jar包,lts-admin是LTS-Admin相關的war包和配置文件。 lts-{version}-bin的文件結構

├── bin
│   ├── jobtracker.cmd
│   ├── jobtracker.sh
│   ├── lts-admin.cmd
│   └── lts-admin.sh
├── jobtracker
│   ├── conf
│   │   └── zoo
│   │       ├── jobtracker.cfg
│   │       └── log4j.properties
│   └── lib
│       └── *.jar
├── lts-admin
│   ├── conf
│   │   ├── log4j.properties
│   │   └── lts-admin.cfg
│   ├── lib
│   │   └── *.jar
│   └── lts-admin.war
└── tasktracker
    ├── bin
    │   └── tasktracker.sh
    ├── conf
    │   ├── log4j.properties
    │   └── tasktracker.cfg
    └── lib
        └── *.jar
  1. JobTracker啓動。若是你想啓動一個節點,直接修改下conf/zoo下的配置文件,而後運行 sh jobtracker.sh zoo start便可,若是你想啓動兩個JobTracker節點,那麼你須要拷貝一份zoo,譬如命名爲zoo2,修改下zoo2下的配置文件,而後運行sh jobtracker.sh zoo2 start便可。logs文件夾下生成jobtracker-zoo.out日誌。
  2. LTS-Admin啓動.修改lts-admin/conf下的配置,而後運行bin下的sh lts-admin.shlts-admin.cmd腳本便可。logs文件夾下會生成lts-admin.out日誌,啓動成功在日誌中會打印出訪問地址,用戶能夠經過這個訪問地址訪問了。

##JobClient(部署)使用 須要引入lts的jar包有lts-jobclient-{version}.jarlts-core-{version}.jar 及其它第三方依賴jar。 ###API方式啓動

JobClient jobClient = new RetryJobClient();
jobClient.setNodeGroup("test_jobClient");
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
jobClient.start();
 
// 提交任務
Job job = new Job();
job.setTaskId("3213213123");
job.setParam("shopId", "11111");
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
// job.setCronExpression("0 0/1 * * * ?");  // 支持 cronExpression表達式
// job.setTriggerTime(new Date()); // 支持指定時間執行
Response response = jobClient.submitJob(job);

###Spring XML方式啓動

<bean id="jobClient" class="com.lts.spring.JobClientFactoryBean">
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="test_jobClient"/>
    <property name="masterChangeListeners">
        <list>
            <bean class="com.lts.example.support.MasterChangeListenerImpl"/>
        </list>
    </property>
    <property name="jobFinishedHandler">
        <bean class="com.lts.example.support.JobFinishedHandlerImpl"/>
    </property>
    <property name="configs">
        <props>
            <!-- 參數 -->
            <prop key="job.fail.store">leveldb</prop>
        </props>
    </property>
</bean>

###Spring 全註解方式

@Configuration
public class LTSSpringConfig {
 
    @Bean(name = "jobClient")
    public JobClient getJobClient() throws Exception {
        JobClientFactoryBean factoryBean = new JobClientFactoryBean();
        factoryBean.setClusterName("test_cluster");
        factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
        factoryBean.setNodeGroup("test_jobClient");
        factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
                new MasterChangeListenerImpl()
        });
        Properties configs = new Properties();
        configs.setProperty("job.fail.store", "leveldb");
        factoryBean.setConfigs(configs);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }
}

##TaskTracker(部署使用) 須要引入lts的jar包有lts-tasktracker-{version}.jarlts-core-{version}.jar 及其它第三方依賴jar。 ###定義本身的任務執行類

public class MyJobRunner implements JobRunner {
    private final static BizLogger bizLogger = LtsLoggerFactory.getBizLogger();
    @Override
    public Result run(Job job) throws Throwable {
        try {
            // TODO 業務邏輯
            // 會發送到 LTS (JobTracker上)
            bizLogger.info("測試,業務日誌啊啊啊啊啊");
 
        } catch (Exception e) {
            return new Result(Action.EXECUTE_FAILED, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "執行成功了,哈哈");
    }
}

###API方式啓動

TaskTracker taskTracker = new TaskTracker();
taskTracker.setJobRunnerClass(MyJobRunner.class);
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
taskTracker.setNodeGroup("test_trade_TaskTracker");
taskTracker.setWorkThreads(20);
taskTracker.start();

###Spring XML方式啓動

<bean id="taskTracker" class="com.lts.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
    <property name="jobRunnerClass" value="com.lts.example.support.MyJobRunner"/>
    <property name="bizLoggerLevel" value="INFO"/>
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="test_trade_TaskTracker"/>
    <property name="workThreads" value="20"/>
    <property name="masterChangeListeners">
        <list>
            <bean class="com.lts.example.support.MasterChangeListenerImpl"/>
        </list>
    </property>
    <property name="configs">
        <props>
            <prop key="job.fail.store">leveldb</prop>
        </props>
    </property>
</bean>

###Spring註解方式啓動

@Configuration
public class LTSSpringConfig implements ApplicationContextAware {
    private ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
   @Bean(name = "taskTracker")
    public TaskTracker getTaskTracker() throws Exception {
        TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
        factoryBean.setApplicationContext(applicationContext);
        factoryBean.setClusterName("test_cluster");
        factoryBean.setJobRunnerClass(MyJobRunner.class);
        factoryBean.setNodeGroup("test_trade_TaskTracker");
        factoryBean.setBizLoggerLevel("INFO");
        factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
        factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
                new MasterChangeListenerImpl()
        });
        factoryBean.setWorkThreads(20);
        Properties configs = new Properties();
        configs.setProperty("job.fail.store", "leveldb");
        factoryBean.setConfigs(configs);
 
        factoryBean.afterPropertiesSet();
//        factoryBean.start();
        return factoryBean.getObject();
    }
}

##參數說明

參數 是否必須 默認值 使用範圍 設置方式 參數說明
registryAddress 必須 JobClient,JobTracker,TaskTracker setRegistryAddress("xxxx") 註冊中心,能夠選用zk或者redis,參考值: zookeeper://127.0.0.1:2181
clusterName 必須 JobClient,JobTracker,TaskTracker setClusterName("xxxx") 集羣名稱,clusterName相同的全部節點纔會組成整個LTS架構
listenPort 必須 35001 JobTracker setListenPort(xxx) JobTracker的遠程監聽端口
job.logger 必須 console JobTracker addConfig("job.logger","xxx") LTS業務日誌記錄器,可選值console,mysql,mongo,或者本身實現SPI擴展
job.queue 必須 mongo JobTracker addConfig("job.queue", "xx") LTS任務隊列,可選值mongo,mysql,或者本身實現SPI擴展
jdbc.url 可選 JobTracker addConfig("jdbc.url", "xxx") mysql鏈接URL,當job.queue爲mysql的時候起做用
jdbc.username 可選 JobTracker addConfig("jdbc.username", "xxx") mysql鏈接密碼,當job.queue爲mysql的時候起做用
jdbc.password 可選 JobTracker addConfig("jdbc.password", "xxx") mysql鏈接密碼,當job.queue爲mysql的時候起做用
mongo.addresses 可選 JobTracker addConfig("mongo.addresses", "xxx") mongo鏈接URL,當job.queue爲mongo的時候起做用
mongo.database 可選 JobTracker addConfig("mongo.database", "xxx") mongo數據庫名,當job.queue爲mongo的時候起做用
zk.client 可選 zkclient JobClient,JobTracker,TaskTracker addConfig("zk.client", "xxx") zookeeper客戶端,可選值zkclient, curator
job.pull.frequency 可選 3 TaskTracker addConfig("job.pull.frequency", "xx") TaskTracker去向JobTracker Pull任務的頻率,針對不一樣的場景能夠作相應的調整,單位秒
job.max.retry.times 可選 10 JobTracker addConfig("job.max.retry.times", "xx") 任務的最大重試次數
lts.monitor.url 可選 JobTracker,TaskTracker addConfig("lts.monitor.url", "xx") 監控中心地址,也就是LTS-Admin地址,如 http://localhost:8081
stop.working 可選 false TaskTracker addConfig("stop.working", "true") 主要用於當TaskTracker與JobTracker出現網絡隔離的時候,超過必定時間隔離以後,TaskTracker自動中止當前正在運行的任務
job.fail.store 可選 leveldb JobClient,TaskTracker addConfig("job.fail.store", "leveldb") 可選值:leveldb(默認), rocksdb, berkeleydb, mapdb FailStore實現, leveldb有問題的同窗,能夠試試mapdb
lazy.job.logger 可選 false JobTracker addConfig("lazy.job.logger", "true") 可選值:ture,false, 是否延遲批量刷盤日誌, 若是啓用,採用隊列的方式批量將日誌刷盤(在應用關閉的時候,可能會形成日誌丟失)
dataPath 可選 user.home JobClient,TaskTracker,JobTracker setDataPath("xxxx") FailStore文件存儲路徑及其它數據存儲路徑
lts.monitor.interval 可選 1 JobClient,TaskTracker,JobTracker addConfig("lts.monitor.interval", "2") 分鐘,整數,建議1-5分鐘
lts.remoting 可選 netty JobClient,TaskTracker,JobTracker addConfig("lts.remoting", "netty") 底層通信框架,可選值netty和mina,能夠混用,譬如JobTracker是netty, JobClient採用mina
lts.remoting.serializable.default 可選 fastjson JobClient,TaskTracker,JobTracker addConfig("lts.remoting.serializable.default", "fastjson") 底層通信默認序列化方式,可選值 fastjson, hessian2 ,java,底層會自動識別你請求的序列化方式,而後返回數據也是採用與請求的序列化方式返回,假設JobTracker設置的是fastjson,而JobClient是hessian2,那麼JobClient提交任務的時候,序列化方式是hessian2,當JobTracker收到請求的時候採用hessian2解碼,而後也會將響應數據採用hessian2編碼返回給JobClient
lts.compiler 可選 javassist JobClient,TaskTracker,JobTracker addConfig("lts.compiler", "javassist") java編譯器,可選值 jdk, javassist(須要引入javassist包)

##使用建議 通常在一個JVM中只須要一個JobClient實例便可,不要爲每種任務都新建一個JobClient實例,這樣會大大的浪費資源,由於一個JobClient能夠提交多種任務。相同的一個JVM通常也儘可能保持只有一個TaskTracker實例便可,多了就可能形成資源浪費。當遇到一個TaskTracker要運行多種任務的時候,請參考下面的 "一個TaskTracker執行多種任務"。 ##一個TaskTracker執行多種任務 有的時候,業務場景須要執行多種任務,有些人會問,是否是要每種任務類型都要一個TaskTracker去執行。個人答案是否認的,若是在一個JVM中,最好使用一個TaskTracker去運行多種任務,由於一個JVM中使用多個TaskTracker實例比較浪費資源(固然當你某種任務量比較多的時候,能夠將這個任務單獨使用一個TaskTracker節點來執行)。那麼怎麼才能實現一個TaskTracker執行多種任務呢。下面是我給出來的參考例子。

/**
 * 總入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
 * JobClient 提交 任務時指定 Job 類型  job.setParam("type", "aType")
 */
public class JobRunnerDispatcher implements JobRunner {
 
    private static final ConcurrentHashMap<String/*type*/, JobRunner>
            JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();
 
    static {
        JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也能夠從Spring中拿
        JOB_RUNNER_MAP.put("bType", new JobRunnerB());
    }
 
    @Override
    public Result run(Job job) throws Throwable {
        String type = job.getParam("type");
        return JOB_RUNNER_MAP.get(type).run(job);
    }
}
 
class JobRunnerA implements JobRunner {
    @Override
    public Result run(Job job) throws Throwable {
        //  TODO A類型Job的邏輯
        return null;
    }
}
 
class JobRunnerB implements JobRunner {
    @Override
    public Result run(Job job) throws Throwable {
        // TODO B類型Job的邏輯
        return null;
    }
}

##TaskTracker的JobRunner測試 通常在編寫TaskTracker的時候,只須要測試JobRunner的實現邏輯是否正確,又不想啓動LTS進行遠程測試。爲了方便測試,LTS提供了JobRunner的快捷測試方法。本身的測試類集成com.lts.tasktracker.runner.JobRunnerTester便可,並實現initContextnewJobRunner方法便可。如lts-example中的例子:

public class TestJobRunnerTester extends JobRunnerTester {
 
    public static void main(String[] args) throws Throwable {
        //  1. Mock Job 數據
        Job job = new Job();
        job.setTaskId("2313213");
        // 2. 運行測試
        TestJobRunnerTester tester = new TestJobRunnerTester();
        Result result = tester.run(job);
        System.out.println(JSONUtils.toJSONString(result));
    }
 
    @Override
    protected void initContext() {
        // TODO 初始化Spring容器等
    }
 
    @Override
    protected JobRunner newJobRunner() {
        return new TestJobRunner();
    }
}

##多網卡選擇問題 當機器有內網兩個網卡的時候,有時候,用戶想讓LTS的流量走外網網卡,那麼須要在host中,把主機名稱的映射地址改成外網網卡地址便可,內網同理。

##打包成獨立jar 請在lts-parent/lts 下install便可,會在 lts-parent/lts/target 下生成lts-{version}.jar

##關於節點標識問題 若是在節點啓動的時候設置節點標識,LTS會默認設置一個UUID爲節點標識,可讀性會比較差,可是能保證每一個節點的惟一性,若是用戶能本身保證節點標識的惟一性,能夠經過 setIdentity 來設置,譬如若是每一個節點都是部署在一臺機器(一個虛擬機)上,那麼能夠將identity設置爲主機名稱

##SPI擴展說明 ###LTS-Logger擴展

  1. 引入lts-logger-api-{version}.jar
  2. 實現JobLoggerJobLoggerFactory接口
  3. 在 resources META-INF/lts/com.lts.biz.logger.JobLoggerFactory文件,文件內容爲xxx=com.lts.biz.logger.xxx.XxxJobLoggerFactory
  4. 使用本身的logger擴展,修改jobtracker參數配置 configs.job.logger=xxx。(若是你本身引入JobTracker jar包的方式的話,使用 jobtracker.addConfig("job.logger", "xxx"))

###LTS-Queue擴展 實現方式和LTS-Logger擴展相似,具體參考lts-queue-mysqllts-queue-mongo模塊的實現 ##和其它解決方案比較 ###和MQ比較 見docs/LTS業務場景說明.pdf ###和Quartz比較 見docs/LTS業務場景說明.pdf

相關文章
相關標籤/搜索