一個簡單的基於 Redis 的分佈式任務調度器 —— Java 語言實現

折騰了一週的 Java Quartz 集羣任務調度,很遺憾沒能搞定,網上的相關文章也少得可憐,在多節點(多進程)環境下 Quartz 彷佛沒法動態增減任務,惱火。無奈之下本身擼了一個簡單的任務調度器,結果只花了不到 2天時間,並且感受很是簡單好用,代碼量也很少,擴展性很好。java

實現一個分佈式的任務調度器有幾個關鍵的考慮點git

  1. 單次任務和循環任務好作,難的是 cron 表達式的解析和時間計算怎麼作?
  2. 多進程同一時間如何保證一個任務的互斥性?
  3. 如何動態變動增長和減小任務?

代碼實例

在深刻講解實現方法以前,咱們先來看看這個調度器是如何使用的github

class Demo {
    public static void main(String[] args) {
        var redis = new RedisStore();
        // sample 爲任務分組名稱
        var store = new RedisTaskStore(redis, "sample");
        // 5s 爲任務鎖壽命
        var scheduler = new DistributedScheduler(store, 5);
        // 註冊一個單次任務
        scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {
            System.out.println("once1");
        }));
        // 註冊一個循環任務
        scheduler.register(Trigger.periodOfDelay(5, 5), Task.of("period2", () -> {
            System.out.println("period2");
        }));
        // 註冊一個 CRON 任務
        scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {
            System.out.println("cron3");
        }));
        // 設置全局版本號
        scheduler.version(1);
        // 註冊監聽器
        scheduler.listener(ctx -> {
            System.out.println(ctx.task().name() + " is complete");
        });
        // 啓動調度器
        scheduler.start();
    }
}
複製代碼

當代碼升級任務須要增長減小時(或者變動調度時間),只須要遞增全局版本號,現有的進程中的任務會自動被從新調度,那些沒有被註冊的任務(任務減小)會自動清除。新增的任務(新任務)在老代碼的進程裏是不會被調度的(沒有新任務的代碼沒法調度),被清除的任務(老任務)在老代碼的進程裏會被取消調度。redis

好比咱們要取消 period2 任務,增長 period4 任務數據庫

class Demo {
    public static void main(String[] args) {
        var redis = new RedisStore();
        // sample 爲任務分組名稱
        var store = new RedisTaskStore(redis, "sample");
        // 5s 爲任務鎖壽命
        var scheduler = new DistributedScheduler(store, 5);
        // 註冊一個單次任務
        scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {
            System.out.println("once1");
        }));
        // 註冊一個 CRON 任務
        scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {
            System.out.println("cron3");
        }));
        // 註冊一個循環任務
        scheduler.register(Trigger.periodOfDelay(5, 10), Task.of("period4", () -> {
            System.out.println("period4");
        }));
        // 遞增全局版本號
        scheduler.version(2);
        // 註冊監聽器
        scheduler.listener(ctx -> {
            System.out.println(ctx.task().name() + " is complete");
        });
        // 啓動調度器
        scheduler.start();
    }
}
複製代碼

cron4j

<dependency>
	<groupId>it.sauronsoftware.cron4j</groupId>
	<artifactId>cron4j</artifactId>
	<version>2.2.5</version>
</dependency>
複製代碼

這個開源的 library 包含了基礎的 cron 表達式解析功能,它還提供了任務的調度功能,不過這裏並不須要使用它的調度器。我只會用到它的表達式解析功能,以及一個簡單的方法用來判斷當前的時間是否匹配表達式(是否該運行任務了)。bash

咱們對 cron 的時間精度要求很低,1 分鐘判斷一次當前的時間是否到了該運行任務的時候就能夠了。分佈式

class SchedulingPattern {
    // 表達式是否有效
    boolean validate(String cronExpr);
    // 是否應該運行任務了(一分鐘判斷一次)
    boolean match(long nowTs);
}
複製代碼

任務的互斥性

由於是分佈式任務調度器,多進程環境下要控制同一個任務在調度的時間點只能有一個進程運行。使用 Redis 分佈式鎖很容易就能夠搞定。鎖須要保持必定的時間(好比默認 5s)。this

全部的進程都會在同一時間調度這個任務,可是隻有一個進程能夠搶到鎖。由於分佈式環境下時間的不一致性,不一樣機器上的進程會有較小的時間差別窗口,鎖必須保持一個窗口時間,這裏我默認設置爲 5s(可定製),這就要求不一樣機器的時間差不能超過 5s,超出了這個值就會出現重複調度。spa

public boolean grabTask(String name) {
    var holder = new Holder<Boolean>();
    redis.execute(jedis -> {
        var lockKey = keyFor("task_lock", name);
        var ok = jedis.set(lockKey, "true", SetParams.setParams().nx().ex(lockAge));
        holder.value(ok != null);
    });
    return holder.value();
}
複製代碼

全局版本號

咱們給任務列表附上一個全局的版本號,當業務上須要增長或者減小調度任務時,經過變動版本號來觸發進程的任務重加載。這個重加載的過程包含輪詢全局版本號(Redis 的一個key),若是發現版本號變更,當即從新加載任務列表配置並從新調度全部的任務。線程

private void scheduleReload() {
    // 1s 對比一次
    this.scheduler.scheduleWithFixedDelay(() -> {
        try {
            if (this.reloadIfChanged()) {
                this.rescheduleTasks();
            }
        } catch (Exception e) {
            LOG.error("reloading tasks error", e);
        }
    }, 0, 1, TimeUnit.SECONDS);
}
複製代碼

從新調度任務先要取消當前全部正在調度的任務,而後調度剛剛加載的全部任務。

private void rescheduleTasks() {
    this.cancelAllTasks();
    this.scheduleTasks();
}

private void cancelAllTasks() {
    this.futures.forEach((name, future) -> {
        LOG.warn("cancelling task {}", name);
        future.cancel(false);
    });
    this.futures.clear();
}
複製代碼

由於須要將任務持久化,因此設計了一套任務的序列化格式,這個也很簡單,使用文本符號分割任務配置屬性就行。

// 一次性任務(startTime)
ONCE@2019-04-29T15:26:29.946+0800
// 循環任務,(startTime,endTime,period),這裏任務的結束時間是天荒地老
PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5
// cron 任務,一分鐘一次
CRON@*/1 * * * *

$ redis-cli
127.0.0.1:6379> hgetall sample_triggers
1) "task3"
2) "CRON@*/1 * * * *"
3) "task2"
4) "PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5"
5) "task1"
6) "ONCE@2019-04-29T15:26:29.946+0800"
7) "task4"
8) "PERIOD@2019-04-29T15:26:29.957+0800|292278994-08-17T15:12:55.807+0800|10"
複製代碼

線程池

時間調度會有一個單獨的線程(單線程線程池),任務的運行由另一個線程池來完成(數量可定製)。

class DistributedScheduler {
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private ExecutorService executor = Executors.newFixedThreadPool(threads);
}
複製代碼

之因此要將線程池分開,是爲了不任務的執行(IO)影響了時間的精確調度。

FixedDelay vs FixedRate

Java 的內置調度器提供兩種調度策略 FixedDelay 和 FixedRate。FixedDelay 保證同一個任務的連續兩次運行有相等的時延(nextRun.startTime - lastRun.endTime),FixedRate 保證同一個任務的連續運行有肯定的間隔(nextRun.startTime - lastRun.startTime)。

FixedDelay 就比如你加班到深夜12點,能夠次日12點再來上班(保證固定的休息時間),而 FixedRate 就沒那麼體貼了,次日你繼續 9點過來上班。若是你不走運到次日 9 點了還在加班,那你今天就沒有休息時間了,繼續上班吧。

class ScheduledExecutorService {
    void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    void scheduleAtFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
複製代碼

分佈式調度器要求有精確的調度時間,因此必須採用 FixedRate 模式,保證多節點同一個任務在同一時間被爭搶。若是採用 FixedDelay 模式,會致使不一樣進程的調度時間錯開了,分佈式鎖的默認 5s 時間窗口將起不到互斥做用。

支持無互斥任務

互斥任務要求任務的單進程運行,無互斥任務就是沒有加分佈式鎖的任務,能夠多進程同時運行。默認須要互斥。

class Task {
    /** * 是否須要考慮多進程互斥(true表示不互斥,多進程能同時跑) */
    private boolean concurrent;
    private String name;
    private Runnable runner;
    ...
    public static Task of(String name, Runnable runner) {
        return new Task(name, false, runner);
    }

    public static Task concurrent(String name, Runnable runner) {
        return new Task(name, true, runner);
    }
}
複製代碼

增長回調接口

考慮到調度器的使用者可能須要對任務運行狀態進行監控,這裏增長了一個簡單的回調接口,目前功能比較簡單。能彙報運行結果(成功仍是異常)和運行的耗時

class TaskContext {
    private Task task;
    private long cost;  // 運行時間
    private boolean ok;
    private Throwable e;
}

interface ISchedulerListener {
    public void onComplete(TaskContext ctx);
}
複製代碼

支持存儲擴展

目前只實現了 Redis 和 Memory 形式的任務存儲,擴展到 zk、etcd、關係數據庫也是可行的,實現下面的接口便可。

interface ITaskStore {
  public long getRemoteVersion();
  public Map<String, String> getAllTriggers();
  public void saveAllTriggers(long version, Map<String, String> triggers);
  public boolean grabTask(String name);
}
複製代碼

代碼地址

github.com/pyloque/tas…

相關文章
相關標籤/搜索