昨天雖然試用了一下惟品會的「土星」,可是我實在沒想明白他的Job該怎麼用Spring來託管,因此沒有使用。今天來講一下噹噹的Elastic-Job.linux
安裝管理平臺git
先說一下Elastic-Job的管理平臺跟Java的Job開發沒有半毛錢關係,他只是把註冊進Zookeeper的信息讀取出來,進行控制。github
先下載Elastic-Job的源代碼git clone https://github.com/elasticjob/elastic-job-lite.git,編譯以後找到elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz上傳到服務器,解壓以後進入/bin目錄。spring
先不要急着執行start.sh,由於這個文件裏面有Windows字符,linux沒法識別。apache
因此先執行 sed -i 's/\r$//' start.sh 改掉Windows字符。api
而後執行nohup ./start.sh &瀏覽器
在瀏覽器打開服務器的8899端口(用戶名root,密碼root)tomcat
先配置好zookeeper的註冊中心地址,命名空間,基本上這個就裝好了。springboot
編寫Java Job服務器
pom
<dependency> <groupId>com.github.kuhn-he</groupId> <artifactId>elastic-job-lite-spring-boot-starter</artifactId> <version>2.1.5</version> <exclusions> <exclusion> <artifactId>curator-client</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>curator-framework</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>curator-recipes</artifactId> <groupId>org.apache.curator</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.12.0</version> </dependency>
資源文件添加
elaticjob: zookeeper: server-lists: 192.168.5.129:2188 namespace: elastic-job-lite-springboot
這裏要跟你管理平臺的保持一致
配置文件
@Configuration @ConditionalOnExpression("'${elatic.zookeeper.server-lists}'.length() >0") public class ElasticConfig { /** * 初始化配置 * @param serverList * @param namespace * @return */ @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(@Value("${elaticjob.zookeeper.server-lists}") String serverList , @Value("${elaticjob.zookeeper.namespace}") String namespace) { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } /** * 設置活動監聽,前提是已經設置好了監聽,見下一個目錄 * @return */ @Bean public ElasticJobListener elasticJobListener() { return new ElasticJobListener(100, 100); } }
監聽器
@Component public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener { /** * 設置間隔時間 * @param startedTimeoutMilliseconds * @param completedTimeoutMilliseconds */ public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) { super(startedTimeoutMilliseconds, completedTimeoutMilliseconds); } /** * 任務開始 * @param shardingContexts */ @Override public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) { System.out.println("任務開始"); } /** * 任務結束 * @param shardingContexts */ @Override public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) { System.err.println("任務結束"); } }
Job
@Component @ElasticSimpleJob(cron="0/5 * * * * ?",jobName="jobTask",shardingTotalCount=2,jobParameter="測試參數",shardingItemParameters="0=Chengdu0,1=Chengdu1") public class StockSimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { switch (shardingContext.getShardingItem()) { case 0: System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " + "當前分片項: %s.當前參數: %s," + "當前任務名稱: %s.當前任務參數: %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() ) ); break; case 1: System.out.println("啦啦啦"); break; default: break; } } }
啓動項目能夠看到
2018-11-07 19:29:08.900 INFO [schedule-center,,,] 21244 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8011 (http) with context path '/api-sc'
2018-11-07 19:29:08.901 INFO [schedule-center,,,] 21244 --- [ main] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 8011
2018-11-07 19:29:08.903 INFO [schedule-center,,,] 21244 --- [ main] c.c.schedule.ScheduleCenterApplication : Started ScheduleCenterApplication in 16.618 seconds (JVM running for 17.331)
啦啦啦
------Thread ID: 96, 任務總片數: 2, 當前分片項: 0.當前參數: Chengdu0,當前任務名稱: com.cloud.schedule.jobs.StockSimpleJob.當前任務參數:
------Thread ID: 100, 任務總片數: 2, 當前分片項: 0.當前參數: Chengdu0,當前任務名稱: com.cloud.schedule.jobs.StockSimpleJob.當前任務參數:
啦啦啦
------Thread ID: 102, 任務總片數: 2, 當前分片項: 0.當前參數: Chengdu0,當前任務名稱: com.cloud.schedule.jobs.StockSimpleJob.當前任務參數:
啦啦啦
查看平臺的做業
咱們能夠在這裏面修改他的配置
經啓動多個實例,咱們能夠看到分片會被多個實例均攤,相同的分片只會在一個進程內執行,多個Job也是同樣,不會重複執行。退出一個進程,單個進程就會執行所有分片,實現了高可用。
動態添加Job
//@Component //@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="testTask",shardingTotalCount=2,jobParameter="測試參數",shardingItemParameters="0=A,1=B") public class TestJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println("TestJob"); } }
咱們只寫了一個Job,不進行配置
咱們能夠在Restful中進行動態添加
@RestController public class TestController { @Autowired private ZookeeperRegistryCenter zookeeperRegistryCenter; /** * 動態添加任務邏輯 */ @GetMapping("/test") public void test(@RequestParam("cron") String cron) { int shardingTotalCount = 2; String jobName = UUID.randomUUID().toString() + "-test123"; JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration .newBuilder(jobName, cron, shardingTotalCount) .shardingItemParameters("0=A,1=B") .build(); SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, TestJob.class.getCanonicalName()); JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build()); try { jobScheduler.init(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("定時任務建立失敗"); } } }
經過@RequestParam("cron") String cron參數,咱們能夠動態給該Job添加可變的時間配置。
在zookeeper中咱們能夠看到他的註冊信息
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2188(CONNECTED) 0] ls /
[zookeeper, elastic-job-lite-springboot, spark, hadoop-ha, dubbo, zoo, pushservers, guanjian]
[zk: localhost:2188(CONNECTED) 1] ls /elastic-job-lite-springboot
[com.guanjian.job.TestJob, e5d89fde-a665-46e2-8cac-7560a48812c9-test123, com.cloud.schedule.jobs.MyJob, com.cloud.schedule.jobs.StockSimpleJob, com.guanjian.job.StockSimpleJob]
[zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.
com.guanjian.job.TestJob com.cloud.schedule.jobs.MyJob com.cloud.schedule.jobs.StockSimpleJob com.guanjian.job.StockSimpleJob
[zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.
com.cloud.schedule.jobs.MyJob com.cloud.schedule.jobs.StockSimpleJob [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.StockSimpleJob [leader, servers, config, instances, sharding] [zk: localhost:2188(CONNECTED) 3]