分佈式調度Elastic-Job攻略

昨天雖然試用了一下惟品會的「土星」,可是我實在沒想明白他的Job該怎麼用Spring來託管,因此沒有使用。今天來講一下噹噹的Elastic-Job.linux

安裝管理平臺git

先說一下Elastic-Job的管理平臺跟Java的Job開發沒有半毛錢關係,他只是把註冊進Zookeeper的信息讀取出來,進行控制。github

先下載Elastic-Job的源代碼git clone https://github.com/elasticjob/elastic-job-lite.git,編譯以後找到elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz上傳到服務器,解壓以後進入/bin目錄。spring

先不要急着執行start.sh,由於這個文件裏面有Windows字符,linux沒法識別。apache

因此先執行   sed -i 's/\r$//' start.sh  改掉Windows字符。api

而後執行nohup ./start.sh &瀏覽器

在瀏覽器打開服務器的8899端口(用戶名root,密碼root)tomcat

先配置好zookeeper的註冊中心地址,命名空間,基本上這個就裝好了。springboot

編寫Java Job服務器

pom

<dependency>
   <groupId>com.github.kuhn-he</groupId>
   <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
   <version>2.1.5</version>
   <exclusions>
      <exclusion>
         <artifactId>curator-client</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
      <exclusion>
         <artifactId>curator-framework</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
      <exclusion>
         <artifactId>curator-recipes</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
   </exclusions>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-framework</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-client</artifactId>
   <version>2.12.0</version>
</dependency>

資源文件添加

elaticjob:
  zookeeper:
    server-lists: 192.168.5.129:2188
    namespace: elastic-job-lite-springboot

這裏要跟你管理平臺的保持一致

配置文件

@Configuration
@ConditionalOnExpression("'${elatic.zookeeper.server-lists}'.length() >0")
public class ElasticConfig {
    /**
     * 初始化配置
     * @param serverList
     * @param namespace
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${elaticjob.zookeeper.server-lists}") String serverList
            , @Value("${elaticjob.zookeeper.namespace}") String namespace) {

        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

    /**
     * 設置活動監聽,前提是已經設置好了監聽,見下一個目錄
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        return new ElasticJobListener(100, 100);
    }
}

監聽器

@Component
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
    /**
     * 設置間隔時間
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    /**
     * 任務開始
     * @param shardingContexts
     */
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("任務開始");
    }

    /**
     * 任務結束
     * @param shardingContexts
     */
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.err.println("任務結束");
    }

}

Job

@Component
@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="jobTask",shardingTotalCount=2,jobParameter="測試參數",shardingItemParameters="0=Chengdu0,1=Chengdu1")
public class StockSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        switch (shardingContext.getShardingItem()) {
            case 0:
                System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " +
                                "當前分片項: %s.當前參數: %s," +
                                "當前任務名稱: %s.當前任務參數: %s"
                        ,
                        Thread.currentThread().getId(),
                        shardingContext.getShardingTotalCount(),
                        shardingContext.getShardingItem(),
                        shardingContext.getShardingParameter(),
                        shardingContext.getJobName(),
                        shardingContext.getJobParameter()

                        )
                );
                break;
            case 1:
                System.out.println("啦啦啦");
                break;
            default:
                break;
        }
    }
}

啓動項目能夠看到

2018-11-07 19:29:08.900  INFO [schedule-center,,,] 21244 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8011 (http) with context path '/api-sc'
2018-11-07 19:29:08.901  INFO [schedule-center,,,] 21244 --- [           main] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 8011
2018-11-07 19:29:08.903  INFO [schedule-center,,,] 21244 --- [           main] c.c.schedule.ScheduleCenterApplication   : Started ScheduleCenterApplication in 16.618 seconds (JVM running for 17.331)
啦啦啦
------Thread ID: 96, 任務總片數: 2, 當前分片項: 0.當前參數: Chengdu0,當前任務名稱: com.cloud.schedule.jobs.StockSimpleJob.當前任務參數: 
------Thread ID: 100, 任務總片數: 2, 當前分片項: 0.當前參數: Chengdu0,當前任務名稱: com.cloud.schedule.jobs.StockSimpleJob.當前任務參數: 
啦啦啦
------Thread ID: 102, 任務總片數: 2, 當前分片項: 0.當前參數: Chengdu0,當前任務名稱: com.cloud.schedule.jobs.StockSimpleJob.當前任務參數: 
啦啦啦

查看平臺的做業

咱們能夠在這裏面修改他的配置

經啓動多個實例,咱們能夠看到分片會被多個實例均攤,相同的分片只會在一個進程內執行,多個Job也是同樣,不會重複執行。退出一個進程,單個進程就會執行所有分片,實現了高可用。

動態添加Job

//@Component
//@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="testTask",shardingTotalCount=2,jobParameter="測試參數",shardingItemParameters="0=A,1=B")
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("TestJob");
    }
}

咱們只寫了一個Job,不進行配置

咱們能夠在Restful中進行動態添加

@RestController
public class TestController {

    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;


    /**
     * 動態添加任務邏輯
     */
    @GetMapping("/test")
    public void test(@RequestParam("cron") String cron) {
        int shardingTotalCount = 2;
        String jobName = UUID.randomUUID().toString() + "-test123";

        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder(jobName, cron, shardingTotalCount)
                .shardingItemParameters("0=A,1=B")
                .build();

        SimpleJobConfiguration simpleJobConfiguration =
                new SimpleJobConfiguration(jobCoreConfiguration, TestJob.class.getCanonicalName());
        JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());



        try {
            jobScheduler.init();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("定時任務建立失敗");
        }
    }
}

經過@RequestParam("cron") String cron參數,咱們能夠動態給該Job添加可變的時間配置。

在zookeeper中咱們能夠看到他的註冊信息

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2188(CONNECTED) 0] ls /
[zookeeper, elastic-job-lite-springboot, spark, hadoop-ha, dubbo, zoo, pushservers, guanjian]
[zk: localhost:2188(CONNECTED) 1] ls /elastic-job-lite-springboot
[com.guanjian.job.TestJob, e5d89fde-a665-46e2-8cac-7560a48812c9-test123, com.cloud.schedule.jobs.MyJob, com.cloud.schedule.jobs.StockSimpleJob, com.guanjian.job.StockSimpleJob]
[zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.

com.guanjian.job.TestJob                 com.cloud.schedule.jobs.MyJob            com.cloud.schedule.jobs.StockSimpleJob   com.guanjian.job.StockSimpleJob
[zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.

com.cloud.schedule.jobs.MyJob            com.cloud.schedule.jobs.StockSimpleJob [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.StockSimpleJob [leader, servers, config, instances, sharding] [zk: localhost:2188(CONNECTED) 3] 

相關文章
相關標籤/搜索