Spring Boot Quartz 分佈式集羣任務調度實現

Spring Boot Quartz

主要內容

  • Spring Scheduler 框架
  • Quartz 框架,功能強大,配置靈活
  • Quartz 集羣
  • mysql 持久化定時任務腳本(tables_mysql.sql)

介紹

在工程中時常會遇到一些需求,例如定時刷新一下配置、隔一段時間檢查下網絡狀態併發送郵件等諸如此類的定時任務。
定時任務本質就是一個異步的線程,線程能夠查詢或修改並執行一系列的操做。因爲本質是線程,在 Java 中能夠自行編寫一個線程池對定時任務進行控制,但這樣效率過低了,且功能有限,屬於重複造輪子。html

分佈式任務調度應用場景

Quartz的集羣功能經過故障轉移和負載平衡功能爲您的調度程序帶來高可用性和可擴展性。java

調度程序中會有不少定時任務須要執行,一臺服務器已經不能知足使用,須要解決定時任務單機單點故障問題。mysql

用Quartz框架,在集羣環境下,經過數據庫鎖機制來實現定時任務的執行;獨立的 Quartz 節點並不與另外一其的節點或是管理節點通訊。git

Spring Scheduler 實現定時任務

1.定義 Task 類

/**
 * Spring Scheduled示例
 */
@Component
public class ScheduledTask {

    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private Integer count0 = 1;
    private Integer count1 = 1;
    private Integer count2 = 1;

    @Scheduled(fixedRate = 5000)
    public void reportCurrentTime() throws InterruptedException {
        System.out.println(String.format("reportCurrentTime第%s次執行,當前時間爲:%s", count0++, dateFormat.format(new Date())));
    }

    @Scheduled(fixedDelay = 5000)
    public void reportCurrentTimeAfterSleep() throws InterruptedException {
        System.out.println(String.format("reportCurrentTimeAfterSleep第%s次執行,當前時間爲:%s", count1++, dateFormat.format(new Date())));
    }

    @Scheduled(cron = "0 0 1 * * *")
    public void reportCurrentTimeCron() throws InterruptedException {
        System.out.println(String.format("reportCurrentTimeCron第%s次執行,當前時間爲:%s", count2++, dateFormat.format(new Date())));
    }
}

2.啓動定時任務

在Spring Boot的主類中加入@EnableScheduling註解,啓用定時任務的配置github

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
@EnableScheduling
public class ScheduledTaskTests {
    @Test
    public void test() {
        log.info("啓動了ScheduledTask定時做業");
        while (true) {
        }
    }
}

quartz實現分佈式定時任務

quartz 是一個開源的分佈式調度庫,它基於java實現。
> 它有着強大的調度功能,支持豐富多樣的調度方式,好比簡單調度,基於cron表達式的調度等等。
> 支持調度任務的多種持久化方式。好比支持內存存儲,數據庫存儲,Terracotta server 存儲。
> 支持分佈式和集羣能力。
> 採用JDBCJobStore方式存儲時,針對事務的處理方式支持全局事務(和業務服務共享同一個事務)和局部事務(quarzt 單獨管理本身的事務)
> 基於plugin機制以及listener機制支持靈活的擴展。

1.pom.xml配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

<!-- mysql -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- orm -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

2.spring-quartz.properties集羣配置

#============================================================================
# 配置JobStore
#============================================================================
# JobDataMaps是否都爲String類型,默認false
org.quartz.jobStore.useProperties=false

# 表的前綴,默認QRTZ_
org.quartz.jobStore.tablePrefix = QRTZ_

# 是否加入集羣
org.quartz.jobStore.isClustered = true

# 調度實例失效的檢查時間間隔 ms
org.quartz.jobStore.clusterCheckinInterval = 5000

# 當設置爲「true」時,此屬性告訴Quartz 在非託管JDBC鏈接上調用setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED)。
org.quartz.jobStore.txIsolationLevelReadCommitted = true

# 數據保存方式爲數據庫持久化
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

# 數據庫代理類,通常org.quartz.impl.jdbcjobstore.StdJDBCDelegate能夠知足大部分數據庫
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#============================================================================
# Scheduler 調度器屬性配置
#============================================================================
# 調度標識名 集羣中每個實例都必須使用相同的名稱
org.quartz.scheduler.instanceName = ClusterQuartz
# ID設置爲自動獲取 每個必須不一樣
org.quartz.scheduler.instanceId= AUTO

#============================================================================
# 配置ThreadPool
#============================================================================
# 線程池的實現類(通常使用SimpleThreadPool便可知足幾乎全部用戶的需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool

# 指定線程數,通常設置爲1-100直接的整數,根據系統資源配置
org.quartz.threadPool.threadCount = 5

# 設置線程的優先級(能夠是Thread.MIN_PRIORITY(即1)和Thread.MAX_PRIORITY(這是10)之間的任何int 。默認值爲Thread.NORM_PRIORITY(5)。)
org.quartz.threadPool.threadPriority = 5

3.定義兩個job

  • QuartzJob.java
//持久化
@PersistJobDataAfterExecution
//禁止併發執行(Quartz不要併發地執行同一個job定義(這裏指一個job類的多個實例))
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob extends QuartzJobBean {
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        String taskName = context.getJobDetail().getJobDataMap().getString("name");
        log.info("---> Quartz job {}, {} <----", new Date(), taskName);
    }
}
  • QuartzJob2.java
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob2 extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        String taskName = context.getJobDetail().getJobDataMap().getString("name");
        log.info("---> Quartz job 2 {}, {} <----", new Date(), taskName);
    }
}

4.初始化觸發器等信息,這裏經過Listener初始化

@Slf4j
public class StartApplicationListener implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    SchedulerConfig schedulerConfig;
    public static AtomicInteger count = new AtomicInteger(0);
    private static String TRIGGER_GROUP_NAME = "test_trriger";
    private static String JOB_GROUP_NAME = "test_job";

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 防止重複執行
        if (event.getApplicationContext().getParent() == null && count.incrementAndGet() <= 1) {
            initMyJob();
        }
    }

    public void initMyJob() {
        Scheduler scheduler = null;
        try {
            scheduler = schedulerConfig.scheduler();

            TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            if (null == trigger) {
                Class clazz = QuartzJob.class;
                JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).build();
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
                trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME)
                        .withSchedule(scheduleBuilder).build();
                scheduler.scheduleJob(jobDetail, trigger);
                log.info("Quartz 建立了job:...:{}", jobDetail.getKey());
            } else {
                log.info("job已存在:{}", trigger.getKey());
            }

            TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME);
            CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2);
            if (null == trigger2) {
                Class clazz = QuartzJob2.class;
                JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).build();
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/15 * * * * ?");
                trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME)
                        .withSchedule(scheduleBuilder).build();
                scheduler.scheduleJob(jobDetail2, trigger2);
                log.info("Quartz 建立了job:...:{}", jobDetail2.getKey());
            } else {
                log.info("job已存在:{}", trigger2.getKey());
            }
            scheduler.start();
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }
}

5.啓動定時器

啓動兩個Application,分別是示例中的DemoQuartzApplication和DemoQuartzApplication2,會發現,兩個Job會分別在兩個應用執行。spring

當手動中止一個應用的時候,另外一個應用會自動接管全部任務並繼續執行,若是任務太多,咱們能夠再開一臺服務便可。實現了調度任務的高可用性和可擴展性sql

運行效果如圖:
執行結果數據庫

資料

相關文章
相關標籤/搜索