SpringBoot-ElasticJob封裝快速上手使用(分佈式定時器)

elastic-job-spring-boot

qq交流羣:812321371java

1 簡介

Elastic-Job是一個分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-LiteElastic-Job-Cloud組成。Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務。
基於quartz定時任務框架爲基礎的,所以具有quartz的大部分功能
使用zookeeper作協調,調度中心,更加輕量級
支持任務的分片
支持彈性擴容,能夠水平擴展, 當任務再次運行時,會檢查當前的服務器數量,從新分片,分片結束以後纔會繼續執行任務
失效轉移,容錯處理,當一臺調度服務器宕機或者跟zookeeper斷開鏈接以後,會當即中止做業,而後再去尋找其餘空閒的調度服務器,來運行剩餘的任務
提供運維界面,能夠管理做業和註冊中心。mysql

1.1 使用場景

因爲項目爲微服務,單模塊可能在兩個實例以上的數量,定時器就會出現多實例同時執行的狀況。
通常定時器缺乏管理界面,沒法監控定時器是否執行成功。
市面上常見的解決方案爲定時器加鎖的操做,或者採用第3方分佈式定時器。
分佈式定時器有多種方案,好比阿里內部的ScheduledX,噹噹網的Elastic job,我的開源的xxl-job等。git

1.2 功能列表

  • 分佈式調度協調
  • 彈性擴容縮容
  • 失效轉移
  • 錯過執行做業重觸發
  • 做業分片一致性,保證同一分片在分佈式環境中僅一個執行實例
  • 自診斷並修復分佈式不穩定形成的問題
  • 支持並行調度
  • 支持做業生命週期操做
  • 豐富的做業類型
  • Spring整合以及命名空間提供
  • 運維平臺

1.3 概念

分片:任務的分佈式執行,須要將一個任務拆分爲多個獨立的任務項,而後由分佈式的服務器分別執行某一個或幾個分片項。
例如:有一個遍歷數據庫某張表的做業,現有2臺服務器。爲了快速的執行做業,那麼每臺服務器應執行做業的50%。 爲知足此需求,可將做業分紅2片,每臺服務器執行1片。做業遍歷數據的邏輯應爲:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 若是分紅10片,則做業遍歷數據的邏輯應爲:每片分到的分片項應爲ID%10,而服務器A被分配到分片項0,1,2,3,4;服務器B被分配到分片項5,6,7,8,9,直接的結果就是服務器A遍歷ID0-4結尾的數據;服務器B遍歷ID5-9結尾的數據。github

歷史軌跡:Elastic-Job提供了事件追蹤功能,可經過事件訂閱的方式處理調度過程的重要事件,用於查詢、統計和監控。spring

1.4 封裝elasticjob

因爲噹噹網Elastic job處於1年間未更新階段,相關jar處於可使用階段功能不全。考慮到使用場景爲多項目使用,將elastic-job-lite-spring簡單封裝便於使用。sql

2.使用說明:

2.1 添加依賴

ps:實際version版本請使用最新版數據庫

<dependency>
  <groupId>com.purgeteam</groupId>
  <artifactId>elasticjob-spring-boot-starter</artifactId>
  <version>0.1.1.RELEASE</version>
</dependency>

2.2 配置

ps: 須要mysql,zookeeper支持,請提早搭建好。bootstrap

配置bootstrap.yml或者application.yml服務器

加入如下配置:app

spring:
  elasticjob:
    datasource: # job須要的記錄數據源
      url: jdbc:mysql://127.0.0.1:3306/batch_log?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password: Rtqw123OpnmER
    regCenter: # 註冊中心
      serverList: 127.0.0.1:2181
      namespace: elasticJobDemo

2.3 定時器實現方法編寫

建立定時器類(惟一不一樣的地方在於將@Scheduled改成實現SimpleJob接口便可)
定時器實現方法編寫在execute方法裏。

@Slf4j
@Component
public class MySimpleJob implements SimpleJob {

    //  @Scheduled(cron = "0 0/1 * * * ?")
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info(String.format("Thread ID: %s, 做業分片總數: %s, " +
                        "當前分片項: %s.當前參數: %s," +
                        "做業名稱: %s.做業自定義參數: %s",
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));
        // 分片大體以下:根據配置的分片參數執行相應的邏輯
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}
log:Thread ID: 66, 做業分片總數: 1, 當前分片項: 0.當前參數: Beijing,做業名稱: PropertiesSimpleJob.做業自定義參數: test

2.4 配置定時器

2.4.1 建立Configuration類

ZookeeperRegistryCenterJobEventConfiguration注入。
建立JobScheduler @Bean(initMethod = "init")
mySimpleJobScheduler方法裏先經過ElasticJobUtils#getLiteJobConfiguration獲取LiteJobConfiguration對象。
建立SpringJobScheduler對象返回便可。

@Configuration
public class MyJobConfig {

    // job 名稱
    private static final String JOB_NAME = "MySimpleJob";

    // 定時器cron參數
    private static final String CRON = "0 0/1 * * * ?";

    // 定時器分片
    private static final int SHARDING_TOTAL_COUNT = 1;

    // 分片參數
    private static final String SHARDING_ITEM_PARAMETERS = "0=Beijing,1=Shanghai,2=Guangzhou";

    // 自定義參數
    private static final String JOB_PARAMETERS = "parameter";

    @Resource
    private ZookeeperRegistryCenter regCenter;

    @Resource
    private JobEventConfiguration jobEventConfiguration;


    @Bean(initMethod = "init")
    public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob) {

        LiteJobConfiguration liteJobConfiguration = ElasticJobUtils
                .getLiteJobConfiguration(mySimpleJob.getClass(), JOB_NAME, CRON,
                        SHARDING_TOTAL_COUNT, SHARDING_ITEM_PARAMETERS, JOB_PARAMETERS);
        // 參數:1.定時器實例,2.註冊中心類,3.LiteJobConfiguration,
        //     3.歷史軌跡(不須要能夠省略)
        return new SpringJobScheduler(mySimpleJob, regCenter, liteJobConfiguration, jobEventConfiguration);
    }

}

ElasticJobUtils#getLiteJobConfiguration參數簡介:

/**
     * 獲取 {@link LiteJobConfiguration} 對象
     *
     * @param jobClass               定時器實現類
     * @param jobName                定時器名稱
     * @param cron                   定時參數
     * @param shardingTotalCount     做業分片總數
     * @param shardingItemParameters 當前參數 能夠爲null
     * @param jobParameters          做業自定義參數 能夠爲null
     * @return {@link LiteJobConfiguration}
     */
  public static LiteJobConfiguration getLiteJobConfiguration(
      final Class<? extends SimpleJob> jobClass,
      final String jobName,
      final String cron,
      final int shardingTotalCount,
      final String shardingItemParameters,
      final String jobParameters) {
  ...
    return ...;
  }

2.4.2 簡化Configuration類

固然也能夠用下面的@Configuration實現簡化,配置bootstrap.yml或者application.yml

spring:
  elasticjob:
    scheduled:
      jobConfigMap: // 爲map集合
        PropertiesSimpleJob: // 定時器key名稱
          jobName: PropertiesSimpleJob // job名稱
          cron: 0 0/1 * * * ? // cron表達式
          shardingTotalCount: 2 // 分片數量
          shardingItemParameters: 0=123,1=332 // 分片參數
          jobParameters: test // 自定義參數

注入SpringJobSchedulerFactory,在propertiesSimpleJobScheduler方法裏調用gerSpringJobScheduler方法便可。

@Configuration
public class PropertiesSimpleJobConfig {

    @Resource
    private SpringJobSchedulerFactory springJobSchedulerFactory;

    @Bean(initMethod = "init")
    public JobScheduler propertiesSimpleJobScheduler(final PropertiesSimpleJob job) {
        // 參數:1.定時器實例,2.配置名稱,3.是否開啓歷史軌跡
        return springJobSchedulerFactory.getSpringJobScheduler(job,"PropertiesSimpleJob", true);
    }

}

2.4.3 註解方式配置(推薦方式)

ps:這個註解包含了上述方式,簡化定時器注入。

繼承SimpleJob實現方法execute

AnnotationSimpleJob類上加入註解@ElasticJobScheduler便可。
下面爲完整註解。

@Slf4j
@ElasticJobScheduler(
        name = "AnnotationSimpleJob", // 定時器名稱
        cron = "0/8 * * * * ?", // 定時器表達式
        shardingTotalCount = 1, // 做業分片總數 默認爲1
        shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou",  // 分片序列號和參數用等號分隔 不須要參數能夠不加
        jobParameters = "123", // 做業自定義參數 不須要參數能夠不加
        isEvent = true // 是否開啓數據記錄 默認爲true
)
public class AnnotationSimpleJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        log.info(String.format("Thread ID: %s, 做業分片總數: %s, " +
                        "當前分片項: %s.當前參數: %s," +
                        "做業名稱: %s.做業自定義參數: %s",
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));
    }
}

總結

分佈式job能夠解決多個項目同一個定時器都執行的問題,配合elastic-job控制檯能夠直觀監控定時器執行狀況等。

img

示例代碼地址: elastic-job-spring-boot

做者GitHub:
Purgeyao 歡迎關注
本文由博客一文多發平臺 OpenWrite 發佈!

相關文章
相關標籤/搜索