LTS(light-task-scheduler)主要用於解決分佈式任務調度問題,支持實時任務,定時任務和Cron任務。有較好的伸縮性,擴展性,健壯穩定性而被多家公司使用,同時也但願開源愛好者一塊兒貢獻。java
github地址: https://github.com/qq254963746/light-task-schedulernode
oschina地址: http://git.oschina.net/hugui/light-task-schedulermysql
這兩個地址都會同步更新。感興趣,請加QQ羣:109500214 一塊兒探討、完善。越多人支持,就越有動力去更新,喜歡記得右上角star哈。linux
LTS 有主要有如下四種節點:git
其中JobClinet,JobTracker,TaskTracker節點都是無狀態
的。 能夠部署多個並動態的進行刪減,來實現負載均衡,實現更大的負載量, 而且框架採用FailStore策略使LTS具備很好的容錯能力。github
LTS註冊中心提供多種實現(Zookeeper,redis等),註冊中心進行節點信息暴露,master選舉。(Mongo or Mysql)存儲任務隊列和任務執行日誌, netty or mina作底層通訊, 並提供多種序列化方式fastjson, hessian2, java等。redis
LTS支持任務類型:spring
###節點組sql
###FailStoreshell
下圖是一個標準的實時任務執行流程。
目先後臺帶有由ztajy提供的一個簡易的認證功能. 用戶名密碼在auth.cfg中,用戶自行修改.
##特性 ###一、Spring支持 LTS能夠徹底不用Spring框架,可是考慮到很用用戶項目中都是用了Spring框架,因此LTS也提供了對Spring的支持,包括Xml和註解,引入lts-spring.jar
便可。 ###二、業務日誌記錄器 在TaskTracker端提供了業務日誌記錄器,供應用程序使用,經過這個業務日誌器,能夠將業務日誌提交到JobTracker,這些業務日誌能夠經過任務ID串聯起來,能夠在LTS-Admin中實時查看任務的執行進度。 ###三、SPI擴展支持 SPI擴展能夠達到零侵入,只須要實現相應的接口,並實現便可被LTS使用,目前開放出來的擴展接口有
###四、故障轉移 當正在執行任務的TaskTracker宕機以後,JobTracker會立馬將分配在宕機的TaskTracker的全部任務再分配給其餘正常的TaskTracker節點執行。 ###五、節點監控 能夠對JobTracker,TaskTracker節點進行資源監控,任務監控等,能夠實時的在LTS-Admin管理後臺查看,進而進行合理的資源調配。 ###六、多樣化任務執行結果支持 LTS框架提供四種執行結果支持,EXECUTE_SUCCESS
,EXECUTE_FAILED
,EXECUTE_LATER
,EXECUTE_EXCEPTION
,並對每種結果採起相應的處理機制,譬如重試。
###七、FailStore容錯 採用FailStore機制來進行節點容錯,Fail And Store,不會由於遠程通訊的不穩定性而影響當前應用的運行。具體FailStore說明,請參考概念說明中的FailStore說明。
##項目編譯打包 項目主要採用maven進行構建,目前提供shell腳本的打包。 環境依賴:Java(jdk1.7)
Maven
用戶使用通常分爲兩種: ###一、Maven構建 能夠經過maven命令將lts的jar包上傳到本地倉庫中。在父pom.xml中添加相應的repository,並用deploy命令上傳便可。具體引用方式能夠參考lts中的例子便可。 ###二、直接Jar引用 須要將lts的各個模塊打包成單獨的jar包,而且將全部lts依賴包引入。具體引用哪些jar包能夠參考lts中的例子便可。
##JobTracker和LTS-Admin部署 提供(cmd)windows
和(shell)linux
兩種版本腳原本進行編譯和部署:
運行根目錄下的sh build.sh
或build.cmd
腳本,會在dist
目錄下生成lts-{version}-bin
文件夾
下面是其目錄結構,其中bin目錄主要是JobTracker和LTS-Admin的啓動腳本。jobtracker
中是 JobTracker的配置文件和須要使用到的jar包,lts-admin
是LTS-Admin相關的war包和配置文件。 lts-{version}-bin的文件結構
├── bin │ ├── jobtracker.cmd │ ├── jobtracker.sh │ ├── lts-admin.cmd │ └── lts-admin.sh ├── jobtracker │ ├── conf │ │ └── zoo │ │ ├── jobtracker.cfg │ │ └── log4j.properties │ └── lib │ └── *.jar ├── lts-admin │ ├── conf │ │ ├── log4j.properties │ │ └── lts-admin.cfg │ ├── lib │ │ └── *.jar │ └── lts-admin.war └── tasktracker ├── bin │ └── tasktracker.sh ├── conf │ ├── log4j.properties │ └── tasktracker.cfg └── lib └── *.jar
conf/zoo
下的配置文件,而後運行 sh jobtracker.sh zoo start
便可,若是你想啓動兩個JobTracker節點,那麼你須要拷貝一份zoo,譬如命名爲zoo2
,修改下zoo2
下的配置文件,而後運行sh jobtracker.sh zoo2 start
便可。logs文件夾下生成jobtracker-zoo.out
日誌。lts-admin/conf
下的配置,而後運行bin
下的sh lts-admin.sh
或lts-admin.cmd
腳本便可。logs文件夾下會生成lts-admin.out
日誌,啓動成功在日誌中會打印出訪問地址,用戶能夠經過這個訪問地址訪問了。##JobClient(部署)使用 須要引入lts的jar包有lts-jobclient-{version}.jar
,lts-core-{version}.jar
及其它第三方依賴jar。 ###API方式啓動
JobClient jobClient = new RetryJobClient(); jobClient.setNodeGroup("test_jobClient"); jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); jobClient.start(); // 提交任務 Job job = new Job(); job.setTaskId("3213213123"); job.setParam("shopId", "11111"); job.setTaskTrackerNodeGroup("test_trade_TaskTracker"); // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表達式 // job.setTriggerTime(new Date()); // 支持指定時間執行 Response response = jobClient.submitJob(job);
###Spring XML方式啓動
<bean id="jobClient" class="com.lts.spring.JobClientFactoryBean"> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_jobClient"/> <property name="masterChangeListeners"> <list> <bean class="com.lts.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="jobFinishedHandler"> <bean class="com.lts.example.support.JobFinishedHandlerImpl"/> </property> <property name="configs"> <props> <!-- 參數 --> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
###Spring 全註解方式
@Configuration public class LTSSpringConfig { @Bean(name = "jobClient") public JobClient getJobClient() throws Exception { JobClientFactoryBean factoryBean = new JobClientFactoryBean(); factoryBean.setClusterName("test_cluster"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setNodeGroup("test_jobClient"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); return factoryBean.getObject(); } }
##TaskTracker(部署使用) 須要引入lts的jar包有lts-tasktracker-{version}.jar
,lts-core-{version}.jar
及其它第三方依賴jar。 ###定義本身的任務執行類
public class MyJobRunner implements JobRunner { private final static BizLogger bizLogger = LtsLoggerFactory.getBizLogger(); @Override public Result run(Job job) throws Throwable { try { // TODO 業務邏輯 // 會發送到 LTS (JobTracker上) bizLogger.info("測試,業務日誌啊啊啊啊啊"); } catch (Exception e) { return new Result(Action.EXECUTE_FAILED, e.getMessage()); } return new Result(Action.EXECUTE_SUCCESS, "執行成功了,哈哈"); } }
###API方式啓動
TaskTracker taskTracker = new TaskTracker(); taskTracker.setJobRunnerClass(MyJobRunner.class); taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); taskTracker.setNodeGroup("test_trade_TaskTracker"); taskTracker.setWorkThreads(20); taskTracker.start();
###Spring XML方式啓動
<bean id="taskTracker" class="com.lts.spring.TaskTrackerAnnotationFactoryBean" init-method="start"> <property name="jobRunnerClass" value="com.lts.example.support.MyJobRunner"/> <property name="bizLoggerLevel" value="INFO"/> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_trade_TaskTracker"/> <property name="workThreads" value="20"/> <property name="masterChangeListeners"> <list> <bean class="com.lts.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="configs"> <props> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
###Spring註解方式啓動
@Configuration public class LTSSpringConfig implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Bean(name = "taskTracker") public TaskTracker getTaskTracker() throws Exception { TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean(); factoryBean.setApplicationContext(applicationContext); factoryBean.setClusterName("test_cluster"); factoryBean.setJobRunnerClass(MyJobRunner.class); factoryBean.setNodeGroup("test_trade_TaskTracker"); factoryBean.setBizLoggerLevel("INFO"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); factoryBean.setWorkThreads(20); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); // factoryBean.start(); return factoryBean.getObject(); } }
##參數說明
參數 | 是否必須 | 默認值 | 使用範圍 | 設置方式 | 參數說明 |
---|---|---|---|---|---|
registryAddress | 必須 | 無 | JobClient,JobTracker,TaskTracker | setRegistryAddress("xxxx") | 註冊中心,能夠選用zk或者redis,參考值: zookeeper://127.0.0.1:2181 |
clusterName | 必須 | 無 | JobClient,JobTracker,TaskTracker | setClusterName("xxxx") | 集羣名稱,clusterName相同的全部節點纔會組成整個LTS架構 |
listenPort | 必須 | 35001 | JobTracker | setListenPort(xxx) | JobTracker的遠程監聽端口 |
job.logger | 必須 | console | JobTracker | addConfig("job.logger","xxx") | LTS業務日誌記錄器,可選值console,mysql,mongo,或者本身實現SPI擴展 |
job.queue | 必須 | mongo | JobTracker | addConfig("job.queue", "xx") | LTS任務隊列,可選值mongo,mysql,或者本身實現SPI擴展 |
jdbc.url | 可選 | 無 | JobTracker | addConfig("jdbc.url", "xxx") | mysql鏈接URL,當job.queue爲mysql的時候起做用 |
jdbc.username | 可選 | 無 | JobTracker | addConfig("jdbc.username", "xxx") | mysql鏈接密碼,當job.queue爲mysql的時候起做用 |
jdbc.password | 可選 | 無 | JobTracker | addConfig("jdbc.password", "xxx") | mysql鏈接密碼,當job.queue爲mysql的時候起做用 |
mongo.addresses | 可選 | 無 | JobTracker | addConfig("mongo.addresses", "xxx") | mongo鏈接URL,當job.queue爲mongo的時候起做用 |
mongo.database | 可選 | 無 | JobTracker | addConfig("mongo.database", "xxx") | mongo數據庫名,當job.queue爲mongo的時候起做用 |
zk.client | 可選 | zkclient | JobClient,JobTracker,TaskTracker | addConfig("zk.client", "xxx") | zookeeper客戶端,可選值zkclient, curator |
job.pull.frequency | 可選 | 3 | TaskTracker | addConfig("job.pull.frequency", "xx") | TaskTracker去向JobTracker Pull任務的頻率,針對不一樣的場景能夠作相應的調整,單位秒 |
job.max.retry.times | 可選 | 10 | JobTracker | addConfig("job.max.retry.times", "xx") | 任務的最大重試次數 |
lts.monitor.url | 可選 | 無 | JobTracker,TaskTracker | addConfig("lts.monitor.url", "xx") | 監控中心地址,也就是LTS-Admin地址,如 http://localhost:8081 |
stop.working | 可選 | false | TaskTracker | addConfig("stop.working", "true") | 主要用於當TaskTracker與JobTracker出現網絡隔離的時候,超過必定時間隔離以後,TaskTracker自動中止當前正在運行的任務 |
job.fail.store | 可選 | leveldb | JobClient,TaskTracker | addConfig("job.fail.store", "leveldb") | 可選值:leveldb(默認), rocksdb, berkeleydb, mapdb FailStore實現, leveldb有問題的同窗,能夠試試mapdb |
lazy.job.logger | 可選 | false | JobTracker | addConfig("lazy.job.logger", "true") | 可選值:ture,false, 是否延遲批量刷盤日誌, 若是啓用,採用隊列的方式批量將日誌刷盤(在應用關閉的時候,可能會形成日誌丟失) |
dataPath | 可選 | user.home | JobClient,TaskTracker,JobTracker | setDataPath("xxxx") | FailStore文件存儲路徑及其它數據存儲路徑 |
lts.monitor.interval | 可選 | 1 | JobClient,TaskTracker,JobTracker | addConfig("lts.monitor.interval", "2") | 分鐘,整數,建議1-5分鐘 |
lts.remoting | 可選 | netty | JobClient,TaskTracker,JobTracker | addConfig("lts.remoting", "netty") | 底層通信框架,可選值netty和mina,能夠混用,譬如JobTracker是netty, JobClient採用mina |
lts.remoting.serializable.default | 可選 | fastjson | JobClient,TaskTracker,JobTracker | addConfig("lts.remoting.serializable.default", "fastjson") | 底層通信默認序列化方式,可選值 fastjson, hessian2 ,java,底層會自動識別你請求的序列化方式,而後返回數據也是採用與請求的序列化方式返回,假設JobTracker設置的是fastjson,而JobClient是hessian2,那麼JobClient提交任務的時候,序列化方式是hessian2,當JobTracker收到請求的時候採用hessian2解碼,而後也會將響應數據採用hessian2編碼返回給JobClient |
lts.compiler | 可選 | javassist | JobClient,TaskTracker,JobTracker | addConfig("lts.compiler", "javassist") | java編譯器,可選值 jdk, javassist(須要引入javassist包) |
##使用建議 通常在一個JVM中只須要一個JobClient實例便可,不要爲每種任務都新建一個JobClient實例,這樣會大大的浪費資源,由於一個JobClient能夠提交多種任務。相同的一個JVM通常也儘可能保持只有一個TaskTracker實例便可,多了就可能形成資源浪費。當遇到一個TaskTracker要運行多種任務的時候,請參考下面的 "一個TaskTracker執行多種任務"。 ##一個TaskTracker執行多種任務 有的時候,業務場景須要執行多種任務,有些人會問,是否是要每種任務類型都要一個TaskTracker去執行。個人答案是否認的,若是在一個JVM中,最好使用一個TaskTracker去運行多種任務,由於一個JVM中使用多個TaskTracker實例比較浪費資源(固然當你某種任務量比較多的時候,能夠將這個任務單獨使用一個TaskTracker節點來執行)。那麼怎麼才能實現一個TaskTracker執行多種任務呢。下面是我給出來的參考例子。
/** * 總入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class) * JobClient 提交 任務時指定 Job 類型 job.setParam("type", "aType") */ public class JobRunnerDispatcher implements JobRunner { private static final ConcurrentHashMap<String/*type*/, JobRunner> JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>(); static { JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也能夠從Spring中拿 JOB_RUNNER_MAP.put("bType", new JobRunnerB()); } @Override public Result run(Job job) throws Throwable { String type = job.getParam("type"); return JOB_RUNNER_MAP.get(type).run(job); } } class JobRunnerA implements JobRunner { @Override public Result run(Job job) throws Throwable { // TODO A類型Job的邏輯 return null; } } class JobRunnerB implements JobRunner { @Override public Result run(Job job) throws Throwable { // TODO B類型Job的邏輯 return null; } }
##TaskTracker的JobRunner測試 通常在編寫TaskTracker的時候,只須要測試JobRunner的實現邏輯是否正確,又不想啓動LTS進行遠程測試。爲了方便測試,LTS提供了JobRunner的快捷測試方法。本身的測試類集成com.lts.tasktracker.runner.JobRunnerTester
便可,並實現initContext
和newJobRunner
方法便可。如lts-example
中的例子:
public class TestJobRunnerTester extends JobRunnerTester { public static void main(String[] args) throws Throwable { // 1. Mock Job 數據 Job job = new Job(); job.setTaskId("2313213"); // 2. 運行測試 TestJobRunnerTester tester = new TestJobRunnerTester(); Result result = tester.run(job); System.out.println(JSONUtils.toJSONString(result)); } @Override protected void initContext() { // TODO 初始化Spring容器等 } @Override protected JobRunner newJobRunner() { return new TestJobRunner(); } }
##多網卡選擇問題 當機器有內網兩個網卡的時候,有時候,用戶想讓LTS的流量走外網網卡,那麼須要在host中,把主機名稱的映射地址改成外網網卡地址便可,內網同理。
##打包成獨立jar 請在lts-parent/lts
下install便可,會在 lts-parent/lts/target
下生成lts-{version}.jar
##關於節點標識問題 若是在節點啓動的時候設置節點標識,LTS會默認設置一個UUID爲節點標識,可讀性會比較差,可是能保證每一個節點的惟一性,若是用戶能本身保證節點標識的惟一性,能夠經過 setIdentity
來設置,譬如若是每一個節點都是部署在一臺機器(一個虛擬機)上,那麼能夠將identity設置爲主機名稱
##SPI擴展說明 ###LTS-Logger擴展
lts-logger-api-{version}.jar
JobLogger
和JobLoggerFactory
接口META-INF/lts/com.lts.biz.logger.JobLoggerFactory
文件,文件內容爲xxx=com.lts.biz.logger.xxx.XxxJobLoggerFactory
jobtracker.addConfig("job.logger", "xxx"))
###LTS-Queue擴展 實現方式和LTS-Logger擴展相似,具體參考lts-queue-mysql
或lts-queue-mongo
模塊的實現 ##和其它解決方案比較 ###和MQ比較 見docs/LTS業務場景說明.pdf ###和Quartz比較 見docs/LTS業務場景說明.pdf