activiti本身弄了一個job的機制,解決了集羣中job的的若干問題:async
集羣中如何保證一個job只有一個機器執行ui
job在處理過程當中失敗了(其餘異常或server重啓),這個時候如何處理這些failed job,從新跑起來debug
字段名稱 | 字段描述 | 數據類型 | 主鍵 | 爲空 | 說明 | |
---|---|---|---|---|---|---|
ID_ | varchar(64) | √ | ||||
REV_ | integer | √ | ||||
TYPE_ | varchar(255) | |||||
LOCK_EXP_TIME_ | 鎖定釋放時間 | timestamp(3) | √ | |||
LOCK_OWNER_ | 掛起者 | varchar(255) | √ | |||
EXCLUSIVE_ | boolean | √ | ||||
EXECUTION_ID_ | varchar(64) | √ | ||||
PROCESS_INSTANCE_ID_ | varchar(64) | √ | ||||
PROC_DEF_ID_ | varchar(64) | √ | ||||
RETRIES_ | integer | √ | ||||
EXCEPTION_STACK_ID_ | 異常id | varchar(64) | √ | |||
EXCEPTION_MSG_ | 異常信息 | varchar(4000) | √ | |||
DUEDATE_ | 到期時間 | timestamp(3) | √ | |||
REPEAT_ | 重複 | varchar(255) | √ | |||
HANDLER_TYPE_ | 處理類型 | varchar(255) | √ | |||
HANDLER_CFG_ | varchar(4000) | √ | ||||
TENANT_ID_ | varchar(255) | √ |
SELECT RES.* FROM ACT_RU_JOB RES LEFT OUTER JOIN ACT_RU_EXECUTION PI ON PI.ID_ = RES.PROCESS_INSTANCE_ID_ WHERE (RETRIES_ > 0) AND (DUEDATE_ is null or DUEDATE_ <= ?) AND (LOCK_OWNER_ is null or LOCK_EXP_TIME_ <= ?) AND ( (RES.EXECUTION_ID_ is null) or (PI.SUSPENSION_STATE_ = 1) );
查詢條件爲code
retries值>0server
duedate爲空或者duedate小於當前rem
lock_owner爲空或者lock_exp_time小於當前get
AcquireJobsCmdit
protected void lockJob(CommandContext commandContext, JobEntity job, String lockOwner, int lockTimeInMillis) { job.setLockOwner(lockOwner); GregorianCalendar gregorianCalendar = new GregorianCalendar(); gregorianCalendar.setTime(commandContext.getProcessEngineConfiguration().getClock().getCurrentTime()); gregorianCalendar.add(Calendar.MILLISECOND, lockTimeInMillis); job.setLockExpirationTime(gregorianCalendar.getTime()); }
設置lock_owner以及lock_exp_time
lockTimeInMillis好比5 60 1000,5分鐘io
private void removeJobs() { for (Job job: getJobs()) { ((JobEntity) job).delete(); } }
從job表中刪除table
if (activity == null || activity.getFailedJobRetryTimeCycleValue() == null) { log.debug("activitiy or FailedJobRetryTimerCycleValue is null in job " + jobId + "'. only decrementing retries."); job.setRetries(job.getRetries() - 1); job.setLockOwner(null); job.setLockExpirationTime(null); if (job.getDuedate() == null) { // add wait time for failed async job job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getAsyncFailedJobWaitTime(), null)); } else { // add default wait time for failed job job.setDuedate(calculateDueDate(commandContext, processEngineConfig.getDefaultFailedJobWaitTime(), job.getDuedate())); } }
retries減1
清空lock_owner
清空lock_exp_time
從新設置due_date,延時重試
/** define the default wait time for a failed async job in seconds */ protected int asyncFailedJobWaitTime = 10;