基於redis的分佈式任務管理

業務背景

系統中有各類定時任務,須要知足如下要求:html

  1. 定時任務須要可以動態增刪改查
  2. 須要可以設置任務的有效時間範圍(只在此範圍內執行)
  3. 任務執行週期須要可以靈活配置
  4. 須要可以輕鬆接入各類任務實現
  5. 須要可以靈活配置任務實現的運行參數
  6. 系統爲分佈式集羣,須要保證同一時刻同一任務只能被一個節點執行,同時增刪改動做須要同步到每一個節點

定時任務實現方式

1. Timer

Java API中的Timer提供了多種定時實現,對照需求:java

  1. [√] 能夠經過動態建立、取消實現
  2. [x] Timer自己不支持
  3. [x] 目前Timer實現的執行週期還很簡單
  4. [√] 須要實現不一樣Job
  5. [√] 須要在建立Job時指定
  6. [x] Timer目前不支持

2. ScheduledExecutorService

ScheduledExecutorService與Timer相似,對照需求:git

  1. [√] 能夠經過動態建立、取消實現
  2. [x] ScheduledExecutorService自己不支持
  3. [x] 目前ScheduledExecutorService實現的執行週期一樣很簡單
  4. [√] 須要實現不一樣Job
  5. [√] 須要在建立Job時指定
  6. [x] ScheduledExecutorService目前一樣不支持

3. Spring Schedule

spring提供了開箱即用的輕量版定時任務schedule,以註解的形式使用,支持cron表達式,用起來可謂十分方便順手
但針對這裏的需求:github

  1. [x] 因爲以註解的形式使用,自己並不支持動態增刪改
  2. [x] 目前不支持
  3. [√] spring schedule提供了cron、fixedRate兩種方式,基本知足平常需求
  4. [√] 須要實現不一樣job
  5. [?] 因爲不能動態增刪改,job均是事先編碼的,不直接支持(能夠經過其餘方式實現)
  6. [x] 目前不支持

4. Quartz

Quartz的強大已經深得人心,使用Quartz實現這裏的需求也是綽綽有餘,針對第6條分佈式集羣Quartz也提出了解決方案redis

  1. [√] 支持
  2. [√] 支持
  3. [√] 支持
  4. [√] 支持
  5. [√] 支持
  6. [√] 支持

但在Quartz官方文檔中出現瞭如下兩段提示spring

Never run clustering on separate machines, unless their clocks are synchronized using some form of time-sync service (daemon) that runs very regularly (the clocks must be within a second of each other). See http://www.boulder.nist.gov/t... if you are unfamiliar with how to do this.
Never start (scheduler.start()) a non-clustered instance against the same set of database tables that any other instance is running (start()ed) against. You may get serious data corruption, and will definitely experience erratic behavior.

其中須要關注的一點是,各節點機器之間服務器時間偏差不能超過1秒。雖然服務器時間同步實現起來並不困難,但針對貴公司雲主機環境近1年來的表現,只能說不信任,Quartz只能作保底方案。數據庫

5. jesque

jesqueresque的java版實現,jesque的定位主要是延遲任務及簡單的定時任務,不支持cron表達式,對照需求:springboot

  1. [√] 支持
  2. [x] jesque自己不支持
  3. [x] 延遲任務及簡單的定時任務,不支持cron表達式
  4. [√] 須要實現不一樣Job
  5. [√] 能夠配置Job類的構造函數參數及屬性參數值
  6. [√] 基於redis實現

綜上服務器

需求編號 Timer ScheduledExecutorService Spring Schedule Quartz jesque
1 x
2 x x x x
3 x x x
4
5 ?
6 x x x

儘管Quartz秒殺當前需求,但鑑於貴公司雲主機環境的表現,同時考慮到沒有SQL數據庫環境,故從新擼一套,奉上源碼 https://github.com/manerfan/m...app

基於Redis實現分佈式任務管理

實現思路

基本思路爲,將任務數據存放到redis,定時獲取redis中的任務數據,利用反射建立任務實例並執行。這裏分別用到了redis中的有序集合及哈希表

建立/更新任務流程:

  1. 以任務執行時間爲score,任務ID爲member存入有序集合(ZADD
  2. 以任務ID爲field,任務實體爲value存入哈希表(HSET

獲取/執行任務流程:

  1. 從有序集合中獲取score小於當前時間的任務ID(ZRANGEBYSCORE
  2. 根據任務ID從哈希表中取出任務實體(HMGET
  3. 使用反射建立任務實例並執行

編碼實現

任務實體Job

@JsonInclude(JsonInclude.Include.NON_EMPTY)
class JobEntity(
        var uuid: String = UUID.randomUUID().toString(),
        var name: String? = null,

        var className: String,
        var args: Array<Any?>? = null,
        @JsonInclude(JsonInclude.Include.NON_EMPTY) 
        var vars: Map<String, Any?>? = null,

        @JsonDeserialize(using = DateTimeDeserializer::class)
        @JsonSerialize(using = DateTimeSerializer::class)
        var startedAt: DateTime? = null,

        @JsonDeserialize(using = DateTimeDeserializer::class)
        @JsonSerialize(using = DateTimeSerializer::class)
        var endedAt: DateTime? = null,

        cron: String? = null, //  優先使用此參數
        var fixedRate: Long? = null
) {
    /**
     * cron表達式解析器
     */
    @JsonIgnore
    private var cronGenerator: CronSequenceGenerator? = cron?.let { CronSequenceGenerator(it, TimeZone.getTimeZone("Asia/Shanghai")) }

    /**
     * cron表達式
     */
    var cron = cron
        set(cron) {
            field = cron
            cronGenerator = cron?.let { CronSequenceGenerator(it, TimeZone.getTimeZone("Asia/Shanghai")) }
        }

    /**
     * 記錄下次須要執行的時間
     */
    var nextScheduledAt: Long = -1 private set

    /**
     * 計算並更新下次執行時間
     * 若指定endedAt且下次執行時間晚於endedAt,則說明任務已結束,並返回false
     *
     * @return 是否須要更新 | 是否已失效
     */
    fun updateNextScheduledAt(timestamp: Long = System.currentTimeMillis()): Boolean {
        val limit = startedAt?.let { max(it.millis, timestamp) } ?: timestamp

        nextScheduledAt = when {
            null != cronGenerator -> cronGenerator!!.next(Date(limit)).time
            null != fixedRate -> limit + fixedRate!!
            else -> nextScheduledAt
        }

        return endedAt?.let { it.millis > nextScheduledAt } ?: true
    }
}

其中

className 真正任務實例的類路徑
args 對應className類構造函數參數
vars 對應className類中的屬性-值

startedAt 任務的開始時間,不指定則當即開始
endedAt 任務的結束時間,不指定則永久執行

cron 任務執行cron表達式
fixedRate 以該速率(毫秒)循環執行(若指定了cron,則該參數失效)

updateNextScheduledAt函數用於計算任務下次執行時間,若下次執行時間晚於endedAt則說明任務已結束

任務管理

/**
 * 添加任務Job
 *
 * 計算並更新job.nextScheduledAt
 * 若指定endedAt且nextScheduledAt晚於endedAt,則說明任務已結束,直接返回
 * 反之,將更新後的job存入redis
 *
 * @param job 任務
 *
 * @return job
 */
fun add(job: JobEntity): JobEntity {
    if (!job.updateNextScheduledAt(now)) {
        logger.warn("Job is Arrived! {}", job.toString())
        // Update to DB
        return job
    }

    val connection = jobTemplate.connectionFactory.connection
    try {
        connection.multi()
        connection.hSet(hKey, job.uuid.toByteArray(), objectMapper.writeValueAsBytes(job))
        connection.zAdd(zKey, job.nextScheduledAt.toDouble(), job.uuid.toByteArray())
        connection.exec()
    } finally {
        connection.close()
    }

    return job
}

/**
 * 更新任務
 *
 * 1. 刪除任務
 * 2. 計算並更新job.nextScheduledAt
 * 若指定endedAt且nextScheduledAt晚於endedAt,則說明任務已結束,直接返回
 * 反之,將更新後的job存入redis
 *
 * @param job 任務
 *
 * @return job
 */
fun update(job: JobEntity): JobEntity {
    delete(job.uuid)
    return add(job)
}

/**
 * 刪除任務
 *
 * 1. 從Hash中刪除
 * 2. 從SortedSet中刪除
 *
 * @param uuid 任務uuid
 */
fun delete(uuid: String) {
    val connection = jobTemplate.connectionFactory.connection
    try {
        connection.multi()
        connection.hDel(hKey, uuid.toByteArray())
        connection.zRem(zKey, uuid.toByteArray())
        connection.exec()
    } finally {
        connection.close()
    }
}

其中

add 計算任務下次執行時間,若晚於結束時間則直接放回,反之更新到redis中
update 計算任務下次執行時間,若晚於結束時間則從redis中刪除,反之更新到redis中
delete 將任務從redis中刪除

任務執行

/**
 * 任務Job執行器
 *
 * 每隔1秒運行一次
 * 1. 從SortedSet中將score在(0,now)之間的uuid取出
 * 2. 從Hash中將uuid對應的job取出
 * 3. 解析job,計算job的nextScheduledAt,並將job回寫到redis中
 * 4. 執行job
 */
@Scheduled(fixedRate = 1000) // 不使用cron是爲了使集羣中各節點執行時間隨機分散開
fun schedule() {

    /**
     * SortedSet(有序集合)中,member爲job.uuid,score爲job.nextScheduledAt
     * 將score在 (0, now) 之間的uuid取出
     * 其對應的便是如今須要執行的job
     */

    var connection = jobTemplate.connectionFactory.connection
    var keys: Set<ByteArray>?
    try {
        val now = System.currentTimeMillis().toDouble()
        connection.multi()
        connection.zRangeByScore(zKey, 0.0, now) // 將score在(0,now)之間的uuid取出
        connection.zRemRangeByScore(zKey, 0.0, now) // 同時從redis中刪除
        keys = connection.exec()[0] as? Set<ByteArray>
    } finally {
        connection.close()
    }

    if (ObjectUtils.isEmpty(keys)) {
        return
    }

    /**
     * Hash(哈希表)中,field爲job.uuid,value爲job
     * 經過uuid將對應job取出
     */

    connection = jobTemplate.connectionFactory.connection
    var values: List<ByteArray>?
    try {
        connection.multi()
        connection.hMGet(hKey, *keys!!.toTypedArray()) // 將uuid對應的job取出
        connection.hDel(hKey, *keys.toTypedArray()) // 同時從redis中刪除
        values = connection.exec()[0] as? List<ByteArray>
    } finally {
        connection.close()
    }

    if (ObjectUtils.isEmpty(values)) {
        return
    }

    // 解析jobs並回寫到redis中
    val jobs = values!!.map {
        try {
            // 計算job的nextScheduledAt,並將其回寫到redis中
            add(objectMapper.readValue(it, JobEntity::class.java))
        } catch (e: Exception) {
            logger.warn("JSON Parse Error {} {}", it.toString(), e.message)
            null
        }
    }

    // 執行jobs
    jobs.filterNotNull().forEach {
        var job = ReflectionUtils.createObject(Class.forName(it.className), it.args, it.vars)
        when (job) {
            is Runnable -> executorService.submit(job)
            else -> logger.warn("Job Must Implement Runnable {}", job)
        }
    }
}

這裏使用spring schedule,每1秒執行一次

首先,從有序集合中獲取score小於當前時間的任務ID,並刪除
其次,根據任務ID從哈希表中取出任務實體Job,並刪除
以後,利用反射,根據Job中的className args vargs建立任務實例,並放入線程池執行任務
最後,計算更新任務Job下次執行時間,若任務未過時,則將其更新到redis中,等待下次執行

示例代碼見 https://github.com/manerfan/m...


訂閱號

相關文章
相關標籤/搜索