微服務架構下的輕量級定時任務解決方案

微服務的概念能夠說給程序設計打開了一個新世界,帶來了衆多的優勢,可是也將一些以往容易處理的問題變得複雜,例如:緩存、事務、定時任務等。緩存能夠用中間件例如redis、memcached等,事務有諸多分佈式事務框架解決,定時任務也有分佈式的解決方案,例如quartz、elastic job等,今天我要講的是就是定時任務。java

既然已經有成熟的分佈式定時任務框架,我要講的東西並非用另外一種設計去實現相同的功能,而是從不一樣的角度去解決分佈式定時任務的問題。web

問題來源

這個問題來起源於一個小功能,咱們有一個發送短信的微服務,須要獲取短信的狀態報告,狀態報告對於短信發送不是同步的,短信提交到服務商,服務商要提交運營商發送以後才能生成狀態報告,所以有必定的延遲,須要異步獲取,而且服務商提供的接口有頻率限制,所以須要作一個定時任務,且須要單點執行,那麼問題來了,由於這一個功能我就須要引入一個定時任務框架嗎,總感受有點大材小用的意思。redis

以前咱們的定時任務處理既有用過quartz,也用過elastic job,可是隻爲這樣一個小功能就引入一個框架,再加上配置又得好半天,想一想都不划算。spring

例如要用quartz,要建立一堆數據庫表,但表裏面只存儲了一個任務信息。數據庫

用elastic job吧,還要使用zookeeper,即使用lite版,也須要一堆配置,遠比我寫業務的時間要長。緩存

我只想簡簡單單的寫邏輯!!!websocket

解決方案

談分佈式解決方案大體總離不開中間件,聯想到上次解決websocket的分佈式方案(參見Spring Cloud 微服務架構下的 WebSocket 解決方案)使用到的Spring Cloud Stream,大概有了思路:架構

  1. 我須要一個任務分發中心,專門負責觸發定時任務
  2. 其餘服務若是須要觸發定時任務,接收特定的觸發消息
  3. 任務執行完成向任務分發中心推送任務完成的確認消息
  4. 爲任務執行端提供一個公共的spring boot starter晚上2,3的步驟,實際須要編碼的幾乎就剩下業務邏輯自己了

詳細設計

根據上一步的方案,須要確認一些細節,以及一些特殊的狀況,例如定時任務多是由微服務集羣中單個實例執行,也可能存在集體執行(例如更新內存中的緩存),還可能存在分區執行。app

客戶端(須要定時任務的爲服務端)須要創建如下消息隊列:框架

  1. 集羣接收的隊列,每一個微服務實例創建一個,每一個微服務實例都會收到相同消息
  2. 單獨接收的隊列,每一個應用集羣創建一個,確保消息只被一個實例消費
  3. 按分區接收的隊列,每一個分區創建一個,確保只被分區內一個實例消費

客戶端與服務端須要經過惟一的任務id來確認須要執行的定時任務

服務端(任務分發微服務)須要根據狀況將消息推送到不一樣的隊列,不能直接使用Spring Cloud Stream,須要使用rabbitmq

服務端自己也是分佈式的,所以須要一個定時任務框架用於任務觸發,我這裏選擇了quartz

代碼實現

Spring Cloud Stream的基本知識我再也不復述了,Spring Cloud 微服務架構下的 WebSocket 解決方案中有講解。

定時任務分發服務

定義定時任務

data class ScheduleTask(
    /** 任務的id,全局惟一,與客戶端的taskId徹底匹配 */
    var taskId: String = "",
    /** 定時任務的cron 表達式 */
    var cron: String = "",
    /** 關聯應用 */
    var appId: Int = 0,
    /** 任務描述 */
    var description: String = "",
    /** 接收任務的分區 */
    var zone: String? = null,
    /** 調度方式,廣播到集羣或單例執行,默認單例 */
    var dispatchMode: DispatchMode = DispatchMode.Singleton,
    /** 是否啓用 */
    var enabled: Boolean = true,
    /** 任務的數據庫記錄 id,自增 */
    var id: Int = -1) 
複製代碼

任務調度

使用quartz進行任務調度

private fun scheduleJob(task: ScheduleTask) {
    val job = JobBuilder.newJob(TaskEmitterJob::class.java)
        .withIdentity(task.taskId, task.appId.toString())
        .withDescription(task.description)
        .storeDurably()
        .requestRecovery()
        .usingJobData("id", task.id)
        .usingJobData("taskId", task.taskId)
        .build()
    val trigger = TriggerBuilder.newTrigger()
        .withIdentity(task.taskId, task.appId.toString())
        .withSchedule(CronScheduleBuilder.cronSchedule(task.cron))
        .forJob(job)
        .build()
    scheduler.addJob(job, true, true)
    if (scheduler.checkExists(trigger.key)) {
      scheduler.rescheduleJob(trigger.key, trigger)
    } else {
      scheduler.scheduleJob(trigger)
    }
  }
複製代碼

ScheduleTask是持久化的,插入的時候同時向quartz插入任務,更新的時候也要向quartz更新,刪除的時候同時刪除

quartz的任務觸發

class TaskEmitterJob : Job {

  companion object {
    private val log = LogFactory.getLog(TaskEmitterJob::class.java)
  }

  override fun execute(context: JobExecutionContext) {
    try {
      val taskId = context.jobDetail.jobDataMap["taskId"] as String
      log.info("任務分發:$taskId")
      val service = ScheduleCenterApplication.context.getBean(ScheduleTaskService::class.java)
      service.launch(taskId)
    } catch (e: Exception) {
      log.error("任務失敗$[taskId]", e)
    }
  }

}
複製代碼

rabbitmq的發送邏輯

/** * 發佈定時任務事件 */
  fun launch(task: ScheduleTask) {
    val exchange = when (task.dispatchMode) {
      Cluster   -> "aegisScheduleCluster"
      Singleton -> "aegisScheduleSingleton"
    }
    val routingKey = when (task.dispatchMode) {
      Cluster   -> exchange
      Singleton -> "$exchange.${task.appName}"
    }
    val executeTaskInfo = ScheduleTaskInfo(task.taskId, task.appName!!)
    amqpTemplate.convertAndSend(exchange, routingKey,
        executeTaskInfo)
    taskExecuteRecordDAO.save(
        TaskExecuteRecord(executeTaskInfo.uid, task.id, Date())
    )
  }
複製代碼

客戶端spring boot starter的實現

定義定時任務接口,只要在項目中實現該接口並將實現聲明爲bean,便可完成定時任務的定義

@FunctionalInterface
interface ScheduledJob {

  /** * 執行定時任務 */
  fun execute(properties: Map<String, Any>)

  /** * 獲取定時任務id * @return 定時任務id,對應任務分發中心ScheduleTask的taskId */
  fun getId(): String

}
複製代碼

接收任務

/**
   * 接收單例任務
   */
  @StreamListener(SINGLETON_INPUT)
  fun acceptGroupTask(taskInfo: ScheduleTaskInfo) {
    if (taskInfo.app == application) {
      val receivedTime = Date()
      val job = jobsProvider.ifAvailable?.firstOrNull {
        it.getId() == taskInfo.id
      }
      job?.execute(taskInfo.properties ?: mapOf())
      singletonOutput.send(GenericMessage(
          ConfirmInfo(taskInfo.id, taskInfo.uid, job != null, receivedTime, Date())
      ))
    }
  }
複製代碼

集羣全體執行任務與單例任務的區別只在stream的配置,一個須要聲明binding的group,一個不須要,這屬於Spring Cloud Stream的知識範疇,能夠本身看官方文檔或查看我前面提到的文檔,若是有不懂的能夠私聊我。

stream的事件流聲明

/** * 定時任務信息的事件流接口 * @author 吳昊 * @since 0.1.0 */
interface AegisScheduleClient {

  companion object {
    const val CLUSTER_INPUT = "aegisScheduleClusterInput"
    const val SINGLETON_INPUT = "aegisScheduleSingletonInput"
    const val CONFIRM_OUTPUT = "aegisScheduleGroupOutput"
  }

  /** * * @return */
  @Input(CLUSTER_INPUT)
  fun scheduleInput(): SubscribableChannel

  /** * * @return */
  @Input(SINGLETON_INPUT)
  fun singletonScheduleInput(): SubscribableChannel

  /** * * @return */
  @Output(CONFIRM_OUTPUT)
  fun confirmOutput(): MessageChannel

}
複製代碼

最後再加上服務端確認消息的接收代碼:

@StreamListener(CONFIRM_INPUT)
  fun acceptGroupTask(confirmInfo: ConfirmInfo) {
    LOG.info("接收到確認消息:$confirmInfo")
    scheduleTaskService.confirm(confirmInfo)
  }
複製代碼

主要的代碼已經所有放上來了,總體思路也很簡單,後面仍有不少須要優化的地方,例如消息推送失敗,或者確認消息未送達等等,於總體設計並無多大的影響了。

這樣在微服務端若是須要添加定時任務,只須要

  1. 引入starter
  2. 實現ScheduledJob接口
  3. 在任務調度中心添加任務

至於在任務中心添加任務,主題代碼有了,實現個簡單管理界面很容易對不對,也就幾個字段的輸入。

最後附上管理界面的截圖:

任務列表

任務詳情

個人其餘文章:

Spring Cloud 微服務架構下的 WebSocket 解決方案

Mybatis去xml化:我不再想寫xml了

Spring Security OAuth2 緩存使用jackson序列化的處理

相關文章
相關標籤/搜索