【學習】023 分佈式任務調度平臺

什麼是定時任務

 指定時間去執行任務java

Java實現定時任務方式

Thread

package com.hongmoshui.thread;

public class Demo01
{
    static long count = 0;

    public static void main(String[] args)
    {
        Runnable runnable = new Runnable()
        {
            public void run()
            {
                while (true)
                {
                    try
                    {
                        Thread.sleep(1000);
                        count++;
                        System.out.println(count);
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread thread = new Thread(runnable);
        thread.start();
    }
}

TimerTask

package com.hongmoshui.TimerTask;
import java.util.Timer;
import java.util.TimerTask;

/**
 * 使用TimerTask類實現定時任務
 */
public class Demo02
{
    static long count = 0;

    public static void main(String[] args)
    {
        TimerTask timerTask = new TimerTask()
        {

            @Override
            public void run()
            {
                count++;
                System.out.println(count);
            }
        };
        Timer timer = new Timer();
        // 天數
        long delay = 0;
        // 秒數
        long period = 1000;
        timer.scheduleAtFixedRate(timerTask, delay, period);
    }

}

ScheduledExecutorService

使用ScheduledExecutorService是從Javagit

JavaSE5的java.util.concurrent裏,作爲併發工具類被引進的,這是最理想的定時任務實現方式。github

package com.hongmoshui.scheduled;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo003
{
    public static void main(String[] args)
    {
        Runnable runnable = new Runnable()
        {
            public void run()
            {
                // task to run goes here
                System.out.println("Hello !!");
            }
        };
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        // 第二個參數爲首次執行的延時時間,第三個參數爲定時執行的間隔時間
        service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
    }
}

Quartz

建立一個quartz_demo項目

引入maven依賴

    <dependencies>
        <!-- quartz -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>2.2.1</version>
        </dependency>
    </dependencies>

任務調度類

package com.hongmoshui.quartz;
import java.util.Date;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class MyJob implements Job
{

    public void execute(JobExecutionContext context) throws JobExecutionException
    {
        System.out.println("quartz MyJob date:" + new Date().getTime());
    }

}

啓動類

package com.hongmoshui;
import java.util.Date;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import com.hongmoshui.quartz.MyJob;

public class QuartzStart
{
    public static void main(String[] args) throws SchedulerException
    {
        // 1.建立Scheduler的工廠
        SchedulerFactory sf = new StdSchedulerFactory();
        // 2.從工廠中獲取調度器實例
        Scheduler scheduler = sf.getScheduler();
        // 3.建立JobDetail
        JobDetail jb = JobBuilder.newJob(MyJob.class).withDescription("this is a ram job") // job的描述
                .withIdentity("ramJob", "ramGroup") // job
                                                    // 的name和group
                .build();
        // 任務運行的時間,SimpleSchedle類型觸發器有效
        long time = System.currentTimeMillis() + 3 * 1000L; // 3秒後啓動任務
        Date statTime = new Date(time);
        // 4.建立Trigger
        // 使用SimpleScheduleBuilder或者CronScheduleBuilder
        Trigger t = TriggerBuilder.newTrigger().withDescription("").withIdentity("ramTrigger", "ramTriggerGroup")
                // .withSchedule(SimpleScheduleBuilder.simpleSchedule())
                .startAt(statTime) // 默認當前時間啓動
                .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) // 兩秒執行一次
                .build();
        // 5.註冊任務和定時器
        scheduler.scheduleJob(jb, t);
        // 6.啓動 調度器
        scheduler.start();

    }
}

Quartz表達式

http://cron.qqe2.com/web

分佈式狀況下定時任務會出現哪些問題?

分佈式集羣的狀況下,怎麼保證定時任務不被重複執行spring

分佈式定時任務解決方案

①使用zookeeper實現分佈式鎖 缺點(須要建立臨時節點、和事件通知不易於擴展)數據庫

②使用配置文件作一個開關  缺點發布後,須要重啓apache

③數據庫惟一約束,缺點效率低tomcat

④使用分佈式任務調度平臺XXLJOB安全

XXLJOB介紹

一、簡單:支持經過Web頁面對任務進行CRUD操做,操做簡單,一分鐘上手;服務器

二、動態:支持動態修改任務狀態、暫停/恢復任務,以及終止運行中任務,即時生效;

三、調度中心HA(中心式):調度採用中心式設計,「調度中心」基於集羣Quartz實現,可保證調度中心HA;

四、執行器HA(分佈式):任務分佈式執行,任務"執行器"支持集羣部署,可保證任務執行HA;

五、任務Failover:執行器集羣部署時,任務路由策略選擇"故障轉移"狀況下調度失敗時將會平滑切換執行器進行Failover;

六、一致性:「調度中心」經過DB鎖保證集羣分佈式調度的一致性, 一次任務調度只會觸發一次執行;

七、自定義任務參數:支持在線配置調度任務入參,即時生效;

八、調度線程池:調度系統多線程觸發調度運行,確保調度精確執行,不被堵塞;

九、彈性擴容縮容:一旦有新執行器機器上線或者下線,下次調度時將會從新分配任務;

十、郵件報警:任務失敗時支持郵件報警,支持配置多郵件地址羣發報警郵件;

十一、狀態監控:支持實時監控任務進度;

十二、Rolling執行日誌:支持在線查看調度結果,而且支持以Rolling方式實時查看執行器輸出的完整的執行日誌;

1三、GLUE:提供Web IDE,支持在線開發任務邏輯代碼,動態發佈,實時編譯生效,省略部署上線的過程。支持30個版本的歷史版本回溯。

1四、數據加密:調度中心和執行器之間的通信進行數據加密,提高調度信息安全性;

1五、任務依賴:支持配置子任務依賴,當父任務執行結束且執行成功後將會主動觸發一次子任務的執行, 多個子任務用逗號分隔;

1六、推送maven中央倉庫: 將會把最新穩定版推送到maven中央倉庫, 方便用戶接入和使用;

1七、任務註冊: 執行器會週期性自動註冊任務, 調度中心將會自動發現註冊的任務並觸發執行。同時,也支持手動錄入執行器地址;

1八、路由策略:執行器集羣部署時提供豐富的路由策略,包括:第一個、最後一個、輪詢、隨機、一致性HASH、最不常常使用、最近最久未使用、故障轉移、忙碌轉移等;

1九、運行報表:支持實時查看運行數據,如任務數量、調度次數、執行器數量等;以及調度報表,如調度日期分佈圖,調度成功分佈圖等;

20、腳本任務:支持以GLUE模式開發和運行腳本任務,包括Shell、Python等類型腳本;

2一、阻塞處理策略:調度過於密集執行器來不及處理時的處理策略,策略包括:單機串行(默認)、丟棄後續調度、覆蓋以前調度;

2二、失敗處理策略;調度失敗時的處理策略,策略包括:失敗告警(默認)、失敗重試;

2三、分片廣播任務:執行器集羣部署時,任務路由策略選擇"分片廣播"狀況下,一次任務調度將會廣播觸發對應集羣中全部執行器執行一次任務,同時傳遞分片參數;可根據分片參數開發分片任務;

2四、動態分片:分片廣播任務以執行器爲維度進行分片,支持動態擴容執行器集羣從而動態增長分片數量,協同進行業務處理;在進行大數據量業務操做時可顯著提高任務處理能力和速度。

2五、事件觸發:除了"Cron方式"和"任務依賴方式"觸發任務執行以外,支持基於事件的觸發任務方式。調度中心提供觸發任務單次執行的API服務,可根據業務事件靈活觸發。

XXLJOBGitHub地址

https://github.com/xuxueli/xxl-job

文檔

http://www.xuxueli.com/xxl-job/#/?id=_21-%e5%88%9d%e5%a7%8b%e5%8c%96%e8%b0%83%e5%ba%a6%e6%95%b0%e6%8d%ae%e5%ba%93

步驟:

①  部署: xxl-job-admin  做爲註冊中心

②  建立執行器(具體調度地址) 能夠支持集羣

③  配置文件須要填寫xxl-job註冊中心地址

④  每一個具體執行job服務器須要建立一個netty鏈接端口號

⑤  須要執行job的任務類,集成IJobHandler抽象類註冊到job容器中

⑥  Execute方法中編寫具體job任務

客戶端對接

引入依賴

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.hongmoshui.www</groupId>
    <artifactId>springBoot-xxLjob</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <!-- <dependencies></dependencies> -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.6.RELEASE</version>
    </parent>
    <dependencies>
        <!-- jetty -->
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-util</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-http</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-io</artifactId>
        </dependency>
        <!-- spring-boot-starter-web (spring-webmvc + tomcat) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- xxl-job-core -->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>1.8.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin </artifactId>
            </plugin>
        </plugins>
    </build>
</project>

配置文件

# web port
server.port=8084
# log config
logging.config=classpath:logback.xml
# xxl-job
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8081/xxl-job-admin
### xxl-job executor address
xxl.job.executor.appname=test
xxl.job.executor.ip=192.168.1.3
xxl.job.executor.port=9999
### xxl-job log path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler/
### xxl-job, access token
xxl.job.accessToken=

Java代碼

 

package com.hongmoshui.xxljob;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import com.xxl.job.core.executor.XxlJobExecutor;

/**
 * xxl-job config
 * @author hongmoshui
 */
@Configuration
@ComponentScan(basePackages = "com.xxl.job.executor.service.jobhandler")
public class XxlJobConfig
{
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String addresses;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logpath;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobExecutor xxlJobExecutor()
    {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobExecutor xxlJobExecutor = new XxlJobExecutor();
        xxlJobExecutor.setIp(ip);
        xxlJobExecutor.setPort(port);
        xxlJobExecutor.setAppName(appname);
        xxlJobExecutor.setAdminAddresses(addresses);
        xxlJobExecutor.setLogPath(logpath);
        xxlJobExecutor.setAccessToken(accessToken);
        return xxlJobExecutor;
    }

}
package com.hongmoshui.xxljob;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander;
import com.xxl.job.core.log.XxlJobLogger;

/**
 * <任務Handler的一個Demo(Bean模式)>
 *  <開發步驟>
 * 一、繼承 「IJobHandler」 ; 二、裝配到Spring,例如加「@Service」 註解;
 * 三、加 「@JobHander」註解,註解value值爲新增任務生成的JobKey的值;多個JobKey用逗號分割;
 * 四、執行日誌:須要經過 "XxlJobLogger.log"打印執行日誌;
 * @author xuxueli 2015-12-19 19:43:36
 */
@JobHander(value = "demoJobHandler")
@Service
public class DemoJobHandler extends IJobHandler
{
    @Value("${xxl.job.executor.port}")
    private String port;

    @Override
    public ReturnT<String> execute(String... params) throws Exception
    {
        XxlJobLogger.log("XXL-JOB, Hello World." + port);
        System.out.println("XXL-JOB, Hello World." + port);
        for (int i = 0; i < 5; i++)
        {
            XxlJobLogger.log("beat at:" + i);
            // TimeUnit.SECONDS.sleep(2);
        }
        return ReturnT.SUCCESS;
    }

}

XXLJOB路由策略

相關文章
相關標籤/搜索