activiti的job設計淺析

activiti本身弄了一個job的機制,解決了集羣中job的的若干問題:async

  • 集羣中如何保證一個job只有一個機器執行ui

  • job在處理過程當中失敗了(其餘異常或server重啓),這個時候如何處理這些failed job,從新跑起來debug

ACT_RU_JOB表結構

字段名稱 字段描述 數據類型 主鍵 爲空 說明
ID_ varchar(64)
REV_ integer
TYPE_ varchar(255)
LOCK_EXP_TIME_ 鎖定釋放時間 timestamp(3)
LOCK_OWNER_ 掛起者 varchar(255)
EXCLUSIVE_ boolean
EXECUTION_ID_ varchar(64)
PROCESS_INSTANCE_ID_ varchar(64)
PROC_DEF_ID_ varchar(64)
RETRIES_ integer
EXCEPTION_STACK_ID_ 異常id varchar(64)
EXCEPTION_MSG_ 異常信息 varchar(4000)
DUEDATE_ 到期時間 timestamp(3)
REPEAT_ 重複 varchar(255)
HANDLER_TYPE_ 處理類型 varchar(255)
HANDLER_CFG_ varchar(4000)
TENANT_ID_ varchar(255)

查詢待處理job

SELECT RES.* FROM
ACT_RU_JOB RES
LEFT OUTER JOIN
ACT_RU_EXECUTION PI ON PI.ID_ = RES.PROCESS_INSTANCE_ID_ 
WHERE (RETRIES_ > 0)
AND (DUEDATE_ is null or DUEDATE_ <= ?)
AND (LOCK_OWNER_ is null or LOCK_EXP_TIME_ <= ?)
AND ( (RES.EXECUTION_ID_ is null) or (PI.SUSPENSION_STATE_ = 1) );

查詢條件爲code

  • retries值>0server

  • duedate爲空或者duedate小於當前rem

  • lock_owner爲空或者lock_exp_time小於當前get

搶佔任務

AcquireJobsCmdit

protected void lockJob(CommandContext commandContext, JobEntity job, String lockOwner, int lockTimeInMillis) {    
    job.setLockOwner(lockOwner);
    GregorianCalendar gregorianCalendar = new GregorianCalendar();
    gregorianCalendar.setTime(commandContext.getProcessEngineConfiguration().getClock().getCurrentTime());
    gregorianCalendar.add(Calendar.MILLISECOND, lockTimeInMillis);
    job.setLockExpirationTime(gregorianCalendar.getTime());    
  }

設置lock_owner以及lock_exp_time
lockTimeInMillis好比5 60 1000,5分鐘io

job執行成功

private void removeJobs() {
    for (Job job: getJobs()) {
      ((JobEntity) job).delete();
    }
  }

從job表中刪除table

job執行失敗

if (activity == null || activity.getFailedJobRetryTimeCycleValue() == null) {
      log.debug("activitiy or FailedJobRetryTimerCycleValue is null in job " + jobId + "'. only decrementing retries.");
      job.setRetries(job.getRetries() - 1);
      job.setLockOwner(null);
      job.setLockExpirationTime(null);
      if (job.getDuedate() == null) {
        // add wait time for failed async job
        job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getAsyncFailedJobWaitTime(), null));
      } else {
        // add default wait time for failed job
        job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getDefaultFailedJobWaitTime(), job.getDuedate()));
      }
      
    }
  • retries減1

  • 清空lock_owner

  • 清空lock_exp_time

  • 從新設置due_date,延時重試

/** define the default wait time for a failed async job in seconds */
  protected int asyncFailedJobWaitTime = 10;
相關文章
相關標籤/搜索