系統中有各類定時任務,須要知足如下要求:html
Java API中的Timer提供了多種定時實現,對照需求:java
ScheduledExecutorService與Timer相似,對照需求:git
spring提供了開箱即用的輕量版定時任務schedule,以註解的形式使用,支持cron表達式,用起來可謂十分方便順手
但針對這裏的需求:github
Quartz的強大已經深得人心,使用Quartz實現這裏的需求也是綽綽有餘,針對第6條分佈式集羣Quartz也提出了解決方案redis
但在Quartz官方文檔中出現瞭如下兩段提示spring
Never run clustering on separate machines, unless their clocks are synchronized using some form of time-sync service (daemon) that runs very regularly (the clocks must be within a second of each other). See http://www.boulder.nist.gov/t... if you are unfamiliar with how to do this.
Never start (scheduler.start()) a non-clustered instance against the same set of database tables that any other instance is running (start()ed) against. You may get serious data corruption, and will definitely experience erratic behavior.
其中須要關注的一點是,各節點機器之間服務器時間偏差不能超過1秒。雖然服務器時間同步實現起來並不困難,但針對貴公司雲主機環境近1年來的表現,只能說不信任,Quartz只能作保底方案。數據庫
jesque是resque的java版實現,jesque的定位主要是延遲任務及簡單的定時任務,不支持cron表達式,對照需求:springboot
綜上服務器
需求編號 | Timer | ScheduledExecutorService | Spring Schedule | Quartz | jesque |
---|---|---|---|---|---|
1 | √ | √ | x | √ | √ |
2 | x | x | x | √ | x |
3 | x | x | √ | √ | x |
4 | √ | √ | √ | √ | √ |
5 | √ | √ | ? | √ | √ |
6 | x | x | x | √ | √ |
儘管Quartz秒殺當前需求,但鑑於貴公司雲主機環境的表現,同時考慮到沒有SQL數據庫環境,故從新擼一套,奉上源碼 https://github.com/manerfan/m...app
基本思路爲,將任務數據存放到redis,定時獲取redis中的任務數據,利用反射建立任務實例並執行。這裏分別用到了redis中的有序集合及哈希表
建立/更新任務流程:
獲取/執行任務流程:
@JsonInclude(JsonInclude.Include.NON_EMPTY) class JobEntity( var uuid: String = UUID.randomUUID().toString(), var name: String? = null, var className: String, var args: Array<Any?>? = null, @JsonInclude(JsonInclude.Include.NON_EMPTY) var vars: Map<String, Any?>? = null, @JsonDeserialize(using = DateTimeDeserializer::class) @JsonSerialize(using = DateTimeSerializer::class) var startedAt: DateTime? = null, @JsonDeserialize(using = DateTimeDeserializer::class) @JsonSerialize(using = DateTimeSerializer::class) var endedAt: DateTime? = null, cron: String? = null, // 優先使用此參數 var fixedRate: Long? = null ) { /** * cron表達式解析器 */ @JsonIgnore private var cronGenerator: CronSequenceGenerator? = cron?.let { CronSequenceGenerator(it, TimeZone.getTimeZone("Asia/Shanghai")) } /** * cron表達式 */ var cron = cron set(cron) { field = cron cronGenerator = cron?.let { CronSequenceGenerator(it, TimeZone.getTimeZone("Asia/Shanghai")) } } /** * 記錄下次須要執行的時間 */ var nextScheduledAt: Long = -1 private set /** * 計算並更新下次執行時間 * 若指定endedAt且下次執行時間晚於endedAt,則說明任務已結束,並返回false * * @return 是否須要更新 | 是否已失效 */ fun updateNextScheduledAt(timestamp: Long = System.currentTimeMillis()): Boolean { val limit = startedAt?.let { max(it.millis, timestamp) } ?: timestamp nextScheduledAt = when { null != cronGenerator -> cronGenerator!!.next(Date(limit)).time null != fixedRate -> limit + fixedRate!! else -> nextScheduledAt } return endedAt?.let { it.millis > nextScheduledAt } ?: true } }
其中
className
真正任務實例的類路徑args
對應className
類構造函數參數vars
對應className
類中的屬性-值
startedAt
任務的開始時間,不指定則當即開始endedAt
任務的結束時間,不指定則永久執行
cron
任務執行cron表達式fixedRate
以該速率(毫秒)循環執行(若指定了cron,則該參數失效)
updateNextScheduledAt
函數用於計算任務下次執行時間,若下次執行時間晚於endedAt則說明任務已結束
/** * 添加任務Job * * 計算並更新job.nextScheduledAt * 若指定endedAt且nextScheduledAt晚於endedAt,則說明任務已結束,直接返回 * 反之,將更新後的job存入redis * * @param job 任務 * * @return job */ fun add(job: JobEntity): JobEntity { if (!job.updateNextScheduledAt(now)) { logger.warn("Job is Arrived! {}", job.toString()) // Update to DB return job } val connection = jobTemplate.connectionFactory.connection try { connection.multi() connection.hSet(hKey, job.uuid.toByteArray(), objectMapper.writeValueAsBytes(job)) connection.zAdd(zKey, job.nextScheduledAt.toDouble(), job.uuid.toByteArray()) connection.exec() } finally { connection.close() } return job } /** * 更新任務 * * 1. 刪除任務 * 2. 計算並更新job.nextScheduledAt * 若指定endedAt且nextScheduledAt晚於endedAt,則說明任務已結束,直接返回 * 反之,將更新後的job存入redis * * @param job 任務 * * @return job */ fun update(job: JobEntity): JobEntity { delete(job.uuid) return add(job) } /** * 刪除任務 * * 1. 從Hash中刪除 * 2. 從SortedSet中刪除 * * @param uuid 任務uuid */ fun delete(uuid: String) { val connection = jobTemplate.connectionFactory.connection try { connection.multi() connection.hDel(hKey, uuid.toByteArray()) connection.zRem(zKey, uuid.toByteArray()) connection.exec() } finally { connection.close() } }
其中
add
計算任務下次執行時間,若晚於結束時間則直接放回,反之更新到redis中update
計算任務下次執行時間,若晚於結束時間則從redis中刪除,反之更新到redis中delete
將任務從redis中刪除
/** * 任務Job執行器 * * 每隔1秒運行一次 * 1. 從SortedSet中將score在(0,now)之間的uuid取出 * 2. 從Hash中將uuid對應的job取出 * 3. 解析job,計算job的nextScheduledAt,並將job回寫到redis中 * 4. 執行job */ @Scheduled(fixedRate = 1000) // 不使用cron是爲了使集羣中各節點執行時間隨機分散開 fun schedule() { /** * SortedSet(有序集合)中,member爲job.uuid,score爲job.nextScheduledAt * 將score在 (0, now) 之間的uuid取出 * 其對應的便是如今須要執行的job */ var connection = jobTemplate.connectionFactory.connection var keys: Set<ByteArray>? try { val now = System.currentTimeMillis().toDouble() connection.multi() connection.zRangeByScore(zKey, 0.0, now) // 將score在(0,now)之間的uuid取出 connection.zRemRangeByScore(zKey, 0.0, now) // 同時從redis中刪除 keys = connection.exec()[0] as? Set<ByteArray> } finally { connection.close() } if (ObjectUtils.isEmpty(keys)) { return } /** * Hash(哈希表)中,field爲job.uuid,value爲job * 經過uuid將對應job取出 */ connection = jobTemplate.connectionFactory.connection var values: List<ByteArray>? try { connection.multi() connection.hMGet(hKey, *keys!!.toTypedArray()) // 將uuid對應的job取出 connection.hDel(hKey, *keys.toTypedArray()) // 同時從redis中刪除 values = connection.exec()[0] as? List<ByteArray> } finally { connection.close() } if (ObjectUtils.isEmpty(values)) { return } // 解析jobs並回寫到redis中 val jobs = values!!.map { try { // 計算job的nextScheduledAt,並將其回寫到redis中 add(objectMapper.readValue(it, JobEntity::class.java)) } catch (e: Exception) { logger.warn("JSON Parse Error {} {}", it.toString(), e.message) null } } // 執行jobs jobs.filterNotNull().forEach { var job = ReflectionUtils.createObject(Class.forName(it.className), it.args, it.vars) when (job) { is Runnable -> executorService.submit(job) else -> logger.warn("Job Must Implement Runnable {}", job) } } }
這裏使用spring schedule,每1秒執行一次
首先,從有序集合中獲取score小於當前時間的任務ID,並刪除
其次,根據任務ID從哈希表中取出任務實體Job,並刪除
以後,利用反射,根據Job中的className
args
vargs
建立任務實例,並放入線程池執行任務
最後,計算更新任務Job下次執行時間,若任務未過時,則將其更新到redis中,等待下次執行
示例代碼見 https://github.com/manerfan/m...