多情只有春庭月,猶爲離人照落花。java
因項目中使用到定時任務,且服務部署多實例,所以須要解決定時任務重複執行的問題。即在同一時間點,每個定時任務只在一個節點上執行。常見的開源方案,如 elastic-job
、 xxl-job
、quartz
、 saturn
、 opencron
、 antares
等。最終決定使用elastic-job
。elastic-job
的亮點主要以下:git
但在實際開發中發現elastic-job
對於動態添加的定時任務不支持分片。即在多實例狀況下,在某個實例上動態添加任務,則該任務會一直在這一臺節點上運行。若是須要在其它實例上運行,則須要以相同的參數調用其它實例接口。參考:elastic-job:動態進行任務的添加。在屢次百度+google
下發現Elastic-Job動態添加任務這裏與樓主遇到了相同的問題。但經樓主測試動態添加任務的分片時好時壞,且只要在zookeeper
中註冊了任務,重啓時任務仍是會自動初始化。(關於對動態呢任務的描述,能夠參考上面連接的描述,此處不在作過多的解釋)。github
順着尹大的思路,將任務的節點都集中管理起來,不管動態任務在哪一個節點上進行註冊,都須要將這個請求轉發到其餘的節點上進行初始化操做,這樣就能夠保證多節點分片的任務正常執行。web
代碼以下:redis
/** * 開啓任務監聽,當有任務添加時,監聽zk中的數據增長,自動在其餘節點也初始化該任務 */ public void monitorJobRegister() { CuratorFramework client = zookeeperRegistryCenter.getClient(); @SuppressWarnings("resource") PathChildrenCache childrenCache = new PathChildrenCache(client, "/", true); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: String config = new String(client.getData().forPath(data.getPath() + "/config")); Job job = JsonUtils.toBean(Job.class, config); Object bean = null; // 獲取bean失敗則添加任務 try { bean = ctx.getBean("SpringJobScheduler" + job.getJobName()); } catch (BeansException e) { logger.error("ERROR NO BEAN,CREATE BEAN SpringJobScheduler" + job.getJobName()); } if (Objects.isNull(bean)) { addJob(job); } break; default: break; } } }; childrenCache.getListenable().addListener(childrenCacheListener); try { // https://blog.csdn.net/u010402202/article/details/79581575 childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { e.printStackTrace(); } }
測試動態添加定時任務,支持分片失效轉移。spring
maven
命令install
到本地demo-elastic-job
項目demo-elastic-job ├── mvnw ├── mvnw.cmd ├── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── example │ │ │ └── demo │ │ │ ├── job │ │ │ │ ├── DynamicJob.java │ │ │ │ └── TestJob.java │ │ │ └── DemoApplication.java │ │ └── resources │ │ ├── application.yml │ │ └── application-dev.yml │ └── test │ └── java │ └── com │ └── example │ └── demo │ └── DemoApplicationTests.java ├── pom.xml └── demo.iml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.cxytiandi</groupId> <artifactId>elastic-job-spring-boot-starter</artifactId> <version>1.0.4</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
package com.example.demo; import com.cxytiandi.elasticjob.annotation.EnableElasticJob; import com.cxytiandi.elasticjob.dynamic.service.JobService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @EnableElasticJob @ComponentScan(basePackages = {"com.cxytiandi", "com.example.demo"}) public class DemoApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Autowired private JobService jobService; @Override public void run(String... args) throws Exception { // 模擬初始化讀取數據庫 添加任務 // Job job1 = new Job(); // job1.setJobName("job1"); // job1.setCron("0/10 * * * * ? "); // job1.setJobType("SIMPLE"); // job1.setJobClass("com.example.demo.job.DynamicJob"); // job1.setShardingItemParameters(""); // job1.setShardingTotalCount(2); // jobService.addJob(job1); // Job job2 = new Job(); // job2.setJobName("job2"); // job2.setCron("0/10 * * * * ? "); // job2.setJobType("SIMPLE"); // job2.setJobClass("com.example.demo.job.DynamicJob"); // job2.setShardingItemParameters("0=A,1=B"); // job2.setShardingTotalCount(2); // jobService.addJob(job2); } }
package com.example.demo.job; import com.cxytiandi.elasticjob.annotation.ElasticJobConf; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; /** * Created by zhenglongfei on 2019/7/22 * * @VERSION 1.0 */ @Component @Slf4j @ElasticJobConf(name = "dayJob", cron = "0/10 * * * * ?", shardingTotalCount = 2, shardingItemParameters = "0=AAAA,1=BBBB", description = "簡單任務", failover = true) public class TestJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info("TestJob任務名:【{}】, 片數:【{}】, param=【{}】", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingParameter()); } }
package com.example.demo.job; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * Created by zhenglongfei on 2019/7/24 * * @VERSION 1.0 */ @Component @Slf4j public class DynamicJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { switch (shardingContext.getShardingItem()) { case 0: log.info("【0】 is running"); break; case 1: log.info("【1】 is running"); break; } } }
elastic: job: zk: serverLists: 172.25.66.137:2181 namespace: demo_test server: port: 8082 spring: redis: host: 127.0.0.1 port: 6379
啓動兩個項目分別爲8081
和8082
端口,使用REST API
來動態的註冊任務。數據庫
http://localhost:8081/job post
參數以下:apache
{ "jobName": "DynamicJob01", "cron": "0/3 * * * * ?", "jobType": "SIMPLE", "jobClass": "com.example.demo.job.DynamicJob", "jobParameter": "test", "shardingTotalCount": 2, "shardingItemParameters": "0=AAAA,1=BBBB" }
🙂🙂🙂關注微信公衆號java乾貨
不按期分享乾貨資料json
參考連接:api