微服務的概念能夠說給程序設計打開了一個新世界,帶來了衆多的優勢,可是也將一些以往容易處理的問題變得複雜,例如:緩存、事務、定時任務等。緩存能夠用中間件例如redis、memcached等,事務有諸多分佈式事務框架解決,定時任務也有分佈式的解決方案,例如quartz、elastic job等,今天我要講的是就是定時任務。java
既然已經有成熟的分佈式定時任務框架,我要講的東西並非用另外一種設計去實現相同的功能,而是從不一樣的角度去解決分佈式定時任務的問題。web
這個問題來起源於一個小功能,咱們有一個發送短信的微服務,須要獲取短信的狀態報告,狀態報告對於短信發送不是同步的,短信提交到服務商,服務商要提交運營商發送以後才能生成狀態報告,所以有必定的延遲,須要異步獲取,而且服務商提供的接口有頻率限制,所以須要作一個定時任務,且須要單點執行,那麼問題來了,由於這一個功能我就須要引入一個定時任務框架嗎,總感受有點大材小用的意思。redis
以前咱們的定時任務處理既有用過quartz,也用過elastic job,可是隻爲這樣一個小功能就引入一個框架,再加上配置又得好半天,想一想都不划算。spring
例如要用quartz,要建立一堆數據庫表,但表裏面只存儲了一個任務信息。數據庫
用elastic job吧,還要使用zookeeper,即使用lite版,也須要一堆配置,遠比我寫業務的時間要長。緩存
我只想簡簡單單的寫邏輯!!!websocket
談分佈式解決方案大體總離不開中間件,聯想到上次解決websocket的分佈式方案(參見Spring Cloud 微服務架構下的 WebSocket 解決方案)使用到的Spring Cloud Stream,大概有了思路:架構
根據上一步的方案,須要確認一些細節,以及一些特殊的狀況,例如定時任務多是由微服務集羣中單個實例執行,也可能存在集體執行(例如更新內存中的緩存),還可能存在分區執行。app
客戶端(須要定時任務的爲服務端)須要創建如下消息隊列:框架
客戶端與服務端須要經過惟一的任務id來確認須要執行的定時任務
服務端(任務分發微服務)須要根據狀況將消息推送到不一樣的隊列,不能直接使用Spring Cloud Stream,須要使用rabbitmq
服務端自己也是分佈式的,所以須要一個定時任務框架用於任務觸發,我這裏選擇了quartz
Spring Cloud Stream的基本知識我再也不復述了,Spring Cloud 微服務架構下的 WebSocket 解決方案中有講解。
data class ScheduleTask(
/** 任務的id,全局惟一,與客戶端的taskId徹底匹配 */
var taskId: String = "",
/** 定時任務的cron 表達式 */
var cron: String = "",
/** 關聯應用 */
var appId: Int = 0,
/** 任務描述 */
var description: String = "",
/** 接收任務的分區 */
var zone: String? = null,
/** 調度方式,廣播到集羣或單例執行,默認單例 */
var dispatchMode: DispatchMode = DispatchMode.Singleton,
/** 是否啓用 */
var enabled: Boolean = true,
/** 任務的數據庫記錄 id,自增 */
var id: Int = -1)
複製代碼
使用quartz進行任務調度
private fun scheduleJob(task: ScheduleTask) {
val job = JobBuilder.newJob(TaskEmitterJob::class.java)
.withIdentity(task.taskId, task.appId.toString())
.withDescription(task.description)
.storeDurably()
.requestRecovery()
.usingJobData("id", task.id)
.usingJobData("taskId", task.taskId)
.build()
val trigger = TriggerBuilder.newTrigger()
.withIdentity(task.taskId, task.appId.toString())
.withSchedule(CronScheduleBuilder.cronSchedule(task.cron))
.forJob(job)
.build()
scheduler.addJob(job, true, true)
if (scheduler.checkExists(trigger.key)) {
scheduler.rescheduleJob(trigger.key, trigger)
} else {
scheduler.scheduleJob(trigger)
}
}
複製代碼
ScheduleTask是持久化的,插入的時候同時向quartz插入任務,更新的時候也要向quartz更新,刪除的時候同時刪除
class TaskEmitterJob : Job {
companion object {
private val log = LogFactory.getLog(TaskEmitterJob::class.java)
}
override fun execute(context: JobExecutionContext) {
try {
val taskId = context.jobDetail.jobDataMap["taskId"] as String
log.info("任務分發:$taskId")
val service = ScheduleCenterApplication.context.getBean(ScheduleTaskService::class.java)
service.launch(taskId)
} catch (e: Exception) {
log.error("任務失敗$[taskId]", e)
}
}
}
複製代碼
/** * 發佈定時任務事件 */
fun launch(task: ScheduleTask) {
val exchange = when (task.dispatchMode) {
Cluster -> "aegisScheduleCluster"
Singleton -> "aegisScheduleSingleton"
}
val routingKey = when (task.dispatchMode) {
Cluster -> exchange
Singleton -> "$exchange.${task.appName}"
}
val executeTaskInfo = ScheduleTaskInfo(task.taskId, task.appName!!)
amqpTemplate.convertAndSend(exchange, routingKey,
executeTaskInfo)
taskExecuteRecordDAO.save(
TaskExecuteRecord(executeTaskInfo.uid, task.id, Date())
)
}
複製代碼
@FunctionalInterface
interface ScheduledJob {
/** * 執行定時任務 */
fun execute(properties: Map<String, Any>)
/** * 獲取定時任務id * @return 定時任務id,對應任務分發中心ScheduleTask的taskId */
fun getId(): String
}
複製代碼
/**
* 接收單例任務
*/
@StreamListener(SINGLETON_INPUT)
fun acceptGroupTask(taskInfo: ScheduleTaskInfo) {
if (taskInfo.app == application) {
val receivedTime = Date()
val job = jobsProvider.ifAvailable?.firstOrNull {
it.getId() == taskInfo.id
}
job?.execute(taskInfo.properties ?: mapOf())
singletonOutput.send(GenericMessage(
ConfirmInfo(taskInfo.id, taskInfo.uid, job != null, receivedTime, Date())
))
}
}
複製代碼
集羣全體執行任務與單例任務的區別只在stream的配置,一個須要聲明binding的group,一個不須要,這屬於Spring Cloud Stream的知識範疇,能夠本身看官方文檔或查看我前面提到的文檔,若是有不懂的能夠私聊我。
/** * 定時任務信息的事件流接口 * @author 吳昊 * @since 0.1.0 */
interface AegisScheduleClient {
companion object {
const val CLUSTER_INPUT = "aegisScheduleClusterInput"
const val SINGLETON_INPUT = "aegisScheduleSingletonInput"
const val CONFIRM_OUTPUT = "aegisScheduleGroupOutput"
}
/** * * @return */
@Input(CLUSTER_INPUT)
fun scheduleInput(): SubscribableChannel
/** * * @return */
@Input(SINGLETON_INPUT)
fun singletonScheduleInput(): SubscribableChannel
/** * * @return */
@Output(CONFIRM_OUTPUT)
fun confirmOutput(): MessageChannel
}
複製代碼
最後再加上服務端確認消息的接收代碼:
@StreamListener(CONFIRM_INPUT)
fun acceptGroupTask(confirmInfo: ConfirmInfo) {
LOG.info("接收到確認消息:$confirmInfo")
scheduleTaskService.confirm(confirmInfo)
}
複製代碼
主要的代碼已經所有放上來了,總體思路也很簡單,後面仍有不少須要優化的地方,例如消息推送失敗,或者確認消息未送達等等,於總體設計並無多大的影響了。
這樣在微服務端若是須要添加定時任務,只須要
至於在任務中心添加任務,主題代碼有了,實現個簡單管理界面很容易對不對,也就幾個字段的輸入。
最後附上管理界面的截圖:
任務列表
任務詳情
個人其餘文章: