spring-boot-starter-quartz集羣實踐

前情提要】因爲項目須要,須要一個定時任務集羣,故此有了這個spring-boot-starter-quartz集羣的實踐。springboot的版本爲:2.1.6.RELEASE;quartz的版本爲:2.3.1.假如這裏一共有兩個定時任務的節點,它們的代碼徹底同樣。java

壹.jar包依賴

~~~pom 1.8 mysql

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>複製代碼
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>複製代碼
<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>複製代碼
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>複製代碼
<dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>複製代碼
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
~~~
這裏選擇將定時任務的數據入庫,避免數據直接存在內存中,因應用重啓形成的數據丟失和作集羣控制。複製代碼

貳、項目配置

~~~yamlspring:server:port: 8080servlet:context-path: /lovindatasource:url: jdbc:mysql://127.0.0.1:3306/training?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=trueusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driverquartz:job-store-type: jdbc #數據庫方式jdbc:initialize-schema: never #不初始化表結構properties:org:quartz:scheduler:instanceId: AUTO #默認主機名和時間戳生成實例ID,能夠是任何字符串,但對於全部調度程序來講,必須是惟一的 對應qrtzschedulerstate INSTANCE_NAME字段git

quartzScheduler

jobStore:class: org.quartz.impl.jdbcjobstore.JobStoreTX #持久化配置driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #咱們僅爲數據庫製做了特定於數據庫的代理useProperties: false #以指示JDBCJobStore將JobDataMaps中的全部值都做爲字符串,所以能夠做爲名稱 - 值對存儲而不是在BLOB列中以其序列化形式存儲更多複雜的對象。從長遠來看,這是更安全的,由於您避免了將非String類序列化爲BLOB的類版本問題。tablePrefix: qrtz_ #數據庫表前綴misfireThreshold: 60000 #在被認爲「失火」以前,調度程序將「容忍」一個Triggers將其下一個啓動時間經過的毫秒數。默認值(若是您在配置中未輸入此屬性)爲60000(60秒)。clusterCheckinInterval: 5000 #設置此實例「檢入」*與羣集的其餘實例的頻率(以毫秒爲單位)。影響檢測失敗實例的速度。isClustered: true #打開羣集功能threadPool: #鏈接池class: org.quartz.simpl.SimpleThreadPoolthreadCount: 10threadPriority: 5threadsInheritContextClassLoaderOfInitializingThread: true~~~這裏須要注意的是兩個節點的端口號應該不一致,避免衝突github

叄、實現一個Job

~~~java@Slf4jpublic class Job extends QuartzJobBean {@Overrideprotected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {// 獲取參數JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();// 業務邏輯 ...log.info("------springbootquartzonejob執行"+jobDataMap.get("name").toString()+"###############"+jobExecutionContext.getTrigger());web

}
~~~複製代碼

其中的日誌輸出是爲了便於觀察任務執行狀況spring

肆、封裝定時任務操做

~~~java@Servicepublic class QuartzService {@Autowiredprivate Scheduler scheduler;sql

@PostConstruct
    public void startScheduler() {
        try {
            scheduler.start();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }複製代碼
/**
     * 增長一個job
     *
     * @param jobClass
     *            任務實現類
     * @param jobName
     *            任務名稱
     * @param jobGroupName
     *            任務組名
     * @param jobTime
     *            時間表達式 (這是每隔多少秒爲一次任務)
     * @param jobTimes
     *            運行的次數 (<0:表示不限次數)
     * @param jobData
     *            參數
     */
    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
                       int jobTimes, Map jobData) {
        try {
            // 任務名稱和組構成任務key
            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
                    .build();
            // 設置job參數
            if(jobData!= null && jobData.size()>0){
                jobDetail.getJobDataMap().putAll(jobData);
            }
            // 使用simpleTrigger規則
            Trigger trigger = null;
            if (jobTimes < 0) {
                trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
                        .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
                        .startNow().build();
            } else {
                trigger = TriggerBuilder
                        .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
                                .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
                        .startNow().build();
            }
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }複製代碼
/**
     * 增長一個job
     *
     * @param jobClass
     *            任務實現類
     * @param jobName
     *            任務名稱(建議惟一)
     * @param jobGroupName
     *            任務組名
     * @param jobTime
     *            時間表達式 (如:0/5 * * * * ? )
     * @param jobData
     *            參數
     */
    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) {
        try {
            // 建立jobDetail實例,綁定Job實現類
            // 指明job的名稱,所在組的名稱,以及綁定job類
            // 任務名稱和組構成任務key
            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
                    .build();
            // 設置job參數
            if(jobData!= null && jobData.size()>0){
                jobDetail.getJobDataMap().putAll(jobData);
            }
            // 定義調度觸發規則
            // 使用cornTrigger規則
            // 觸發器key
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
                    .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
                    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
            // 把做業和觸發器註冊到任務調度中
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }複製代碼
/**
     * 修改 一個job的 時間表達式
     *
     * @param jobName
     * @param jobGroupName
     * @param jobTime
     */
    public void updateJob(String jobName, String jobGroupName, String jobTime) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
                    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
            // 重啓觸發器
            scheduler.rescheduleJob(triggerKey, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }複製代碼
/**
     * 刪除任務一個job
     *
     * @param jobName
     *            任務名稱
     * @param jobGroupName
     *            任務組名
     */
    public void deleteJob(String jobName, String jobGroupName) {
        try {
            scheduler.deleteJob(new JobKey(jobName, jobGroupName));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }複製代碼
/**
     * 暫停一個job
     *
     * @param jobName
     * @param jobGroupName
     */
    public void pauseJob(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.pauseJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }複製代碼
/**
     * 恢復一個job
     *
     * @param jobName
     * @param jobGroupName
     */
    public void resumeJob(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.resumeJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }複製代碼
/**
     * 當即執行一個job
     *
     * @param jobName
     * @param jobGroupName
     */
    public void runAJobNow(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.triggerJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }複製代碼
/**
     * 獲取全部計劃中的任務列表
     *
     * @return
     */
    public List<Map<String, Object>> queryAllJob() {
        List<Map<String, Object>> jobList = null;
        try {
            GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
            Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
            jobList = new ArrayList<Map<String, Object>>();
            for (JobKey jobKey : jobKeys) {
                List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                for (Trigger trigger : triggers) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("jobName", jobKey.getName());
                    map.put("jobGroupName", jobKey.getGroup());
                    map.put("description", "觸發器:" + trigger.getKey());
                    Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                    map.put("jobStatus", triggerState.name());
                    if (trigger instanceof CronTrigger) {
                        CronTrigger cronTrigger = (CronTrigger) trigger;
                        String cronExpression = cronTrigger.getCronExpression();
                        map.put("jobTime", cronExpression);
                    }
                    jobList.add(map);
                }
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return jobList;
    }複製代碼
/**
     * 獲取全部正在運行的job
     *
     * @return
     */
    public List<Map<String, Object>> queryRunJob() {
        List<Map<String, Object>> jobList = null;
        try {
            List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
            jobList = new ArrayList<Map<String, Object>>(executingJobs.size());
            for (JobExecutionContext executingJob : executingJobs) {
                Map<String, Object> map = new HashMap<String, Object>();
                JobDetail jobDetail = executingJob.getJobDetail();
                JobKey jobKey = jobDetail.getKey();
                Trigger trigger = executingJob.getTrigger();
                map.put("jobName", jobKey.getName());
                map.put("jobGroupName", jobKey.getGroup());
                map.put("description", "觸發器:" + trigger.getKey());
                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                map.put("jobStatus", triggerState.name());
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    map.put("jobTime", cronExpression);
                }
                jobList.add(map);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return jobList;
    }複製代碼

~~~數據庫

陸、初始化任務

這裏不許備給用戶用web界面來配置定時任務,故此採用CommandLineRunner來子啊應用初始化的時候來初始化任務。只須要實現CommandLineRunner的run()方法便可。~~~java@Overridepublic void run(String... args) throws Exception {HashMap map = new HashMap<>();map.put("name",1);quartzService.deleteJob("job", "test");quartzService.addJob(Job.class, "job", "test", "0 ?", map); 安全

map.put("name",2);
        quartzService.deleteJob("job2", "test");
        quartzService.addJob(Job.class, "job2", "test", "10 * * * * ?", map);複製代碼
map.put("name",3);
        quartzService.deleteJob("job3", "test2");
        quartzService.addJob(Job.class, "job3", "test2", "15 * * * * ?", map);複製代碼
}
~~~
# 柒、測試驗證
分別夏侯啓動兩個應用,而後觀察任務執行,以及在運行過程當中殺死某個服務,來觀察定時任務的執行。
![SpringbootquartzoneApplication](https://eelve.com/upload/2019/8/1-a8a710a578ad47a8afc8ace72f3cbd7c.png)
![SpringbootquartztwoApplication](https://eelve.com/upload/2019/8/2-db731d38c3ed4b4b8123482c9b3ef28d.png)複製代碼

寫在後面的話】下面給出的是所須要腳本的鏈接地址:腳本下載地址,另外這邊又一個本身實現的demospringboot

相關文章
相關標籤/搜索