Activiti 版本 5.10
使用activiti 有一段時間了,目前使用activiti 的大部分公司都是用來作相似於OA 等以用戶任務爲主的流程,
這我沒什麼好說的,由於咱們的流程是以ServiceTask UserTask 結合來處理定時調度等數據處理任務。
ServiceTask 以主,採用class 和 Spring bean 的方式。廢話補多少,切入正題:
Activiti 5.10 設計器是支持 delegateExpression 的注入參數的,但activiti 引擎的解析器卻未能將參數注入從Spring 獲得的bean,該問題在5.11 的版本上是獲得解決了的,但 5.11版本目前尚未發佈,有興趣的同窗能夠去下載正在開發中的代碼進行研究,或者修改源碼
而後咱們來談談Activiti 對於併發的處理以及其中的問題(以ServiceTask 爲例):
當咱們將serviceTask 設置 async = "true" (關於 isExclusive 後續會提到) 的時候,流程引擎採用JobExecutor 來異步執行,執行順序爲引擎首先會將該任務實例化一條job記錄,插act_ru_job表,而後JobExecutor 掃描該表並加鎖執行該job,這裏就涉及到定義ServiceTask 的另一個屬性isExclusive,這個屬性默認爲true,即同一流程中當前存在於act_ru_job 且 is_exclusive 爲true的會一塊兒取出來,放入一個AcquireJobsCmd,而後放入一個線程執行,這樣作用來保證該批任務時串行執行的,使用相同的context,這樣作沒有什麼問題,
但不能達到真正並行的目的。
附上咱們的流程圖:
目標,處於parallelGateWay後面的任務並行執行,即任務完成的時間爲單個任務完成的最大時間。
每一個ServiceTask 的 acitiviti:async = "true" activiti:exclusive="false"
運行,這時你會遇到一個 ActivitiOptimisticLockingException 異常
(toString(updatedObject) " was updated by another transaction concurrently");
爲何會這樣呢?由於每一個ServiceTask 對應一條act_ru_execution 表的記錄,當該任務完後後,會去跟新其
parent_id 對應的execution 將其版本 1 ,
update ${prefix}ACT_RU_EXECUTION set
REV_ = #{revisionNext, jdbcType=INTEGER},
PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR},
ACT_ID_ = #{activityId, jdbcType=VARCHAR},
IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN},
IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN},
IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN},
IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN},
PARENT_ID_ = #{parentId, jdbcType=VARCHAR},
SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR},
SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER},
CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER}
where ID_ = #{id, jdbcType=VARCHAR}
and REV_ = #{revision, jdbcType=INTEGER}
跟蹤該異常的拋出緣由是 在serviceTask完成後,更新的ExecutionEntity 是同一條記錄,而每個serviceTask 此時處於
兩個不一樣的線程和事務當中,兩個事務彼此不可見,任務開始時獲取的ExecutionEntity完成相同,當一個事務成功更新後,
另外一個事務就會失敗。這樣保證了流程的準確執行,當該任務失敗後,會在下一個JobExecutor 掃描時從新執行。此時獲取的
execution 的版本已經加1,此時任務正常結束。Activiti 引擎如此作有必定的道理,但這不是我要的。爲何這樣說呢?
假設我兩個 serviceTask,每一個執行都須要30分鐘,僅僅由於這樣,我就須要花費1個小時的時間才能完成,天啊,饒了我吧!
有木有辦法,我想是有的,只要你夠大膽。跟蹤代碼,更新父execution的時候,惟一變了的就是version,其餘值都沒變化,
那麼咱們是否能夠將update語句更改一下,以下:
update ${prefix}ACT_RU_EXECUTION set
REV_ = REV_ 1,
PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR},
ACT_ID_ = #{activityId, jdbcType=VARCHAR},
IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN},
IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN},
IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN},
IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN},
PARENT_ID_ = #{parentId, jdbcType=VARCHAR},
SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR},
SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER},
CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER}
where ID_ = #{id, jdbcType=VARCHAR}
注意到這裏的變化,where 條件只剩id去掉了version ,而後REV_使用表裏面的數據直接 1經測試完成可行,有興趣的朋友能夠本身試試。這裏的關口過了,但後面仍然存在危險,因此修改源碼是有風險的(%>_<%)。問題出在哪兒?有時候你會發現 兩個serviceTask 運行完了,Execution表的記錄也更新了,流程停滯不前了,這是神馬緣由???
最後終於讓我問題出現的地方,ParallelGatewayActivityBehavior,咱們前面說過,當每一個ServiceTask執行完成以後,事務並無到結束的地方,根據ServiceTask 的流程指向來到了第二個ParallelGateway,ParallelGatewayActivityBehavior 的做用就是判斷前面的任務是否完成,是否繼續執行,當每一個ServiceTask所在的事務到達此處時,他們都只能看見本身完成的部分,而不能看見與他並行的事務裏面的狀態。因此當到達是否執行下一步的判斷條件時
if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {
// Fork
log.fine("parallel gateway '" activity.getId() "' activates: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
execution.takeAll(outgoingTransitions, joinedExecutions);
} else if (log.isLoggable(Level.FINE)){
log.fine("parallel gateway '" activity.getId() "' does not activate: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
}
都會告訴ParallelGatewayActivityBehavior 我已經完成了,其餘的尚未完成。
有人可能會說,爲何個人程序沒有遇到這種狀況呢?
第一 若是你不是ServiceTask 任務很難遇到這種狀況
第二 若是你的ServiceTask 沒有設置爲 async = "true" 和 exclusive="false" 也就不是真正的併發,固然也不會遇到
還有其餘緣由我就不贅述了。
這種問題也是能夠解決的,由於針對同一個流程,每個事務都是經過同一個ParallelGatewayActivityBehavior實例來進行判斷的,
咱們只要記錄每個經過該GateWay的事務的完成狀況,而後彙總起來就OK 了,另外在完成的那一步須要將executio 的parent 的
executions 對應的更新,不然execution 會有記錄不能刪除,但流程是能夠完整的執行完成,給出個人完整處理方式:
public class ParallelGatewayActivityBehavior extends GatewayActivityBehavior {
private static Logger log = Logger.getLogger(ParallelGatewayActivityBehavior.class.getName());
private Map<String,ActivityExecution> activityJoinedExecutions = new ConcurrentHashMap<String,ActivityExecution>();
public void execute(ActivityExecution execution) throws Exception {
// Join
PvmActivity activity = execution.getActivity();
List<PvmTransition> outgoingTransitions = execution.getActivity().getOutgoingTransitions();
execution.inactivate();
lockConcurrentRoot(execution);
List<ActivityExecution> joinedExecutions = execution.findInactiveConcurrentExecutions(activity);
int nbrOfExecutionsToJoin = execution.getActivity().getIncomingTransitions().size();
int nbrOfExecutionsJoined = joinedExecutions.size();
if(nbrOfExecutionsToJoin!=nbrOfExecutionsJoined){
for(ActivityExecution e:joinedExecutions){
activityJoinedExecutions.put(e.getId(), e);
}
nbrOfExecutionsJoined = activityJoinedExecutions.size();
if(nbrOfExecutionsJoined == nbrOfExecutionsToJoin && execution.getParentId()!=null
&& execution instanceof ExecutionEntity){
ExecutionEntity et = (ExecutionEntity)execution;
while(joinedExecutions.size()!=nbrOfExecutionsToJoin ){
Thread.sleep(10000);
for(int i = 0 ; i < et.getParent().getExecutions().size(); i ){
ExecutionEntity ct = et.getParent().getExecutions().get(i);
if(activityJoinedExecutions.containsKey(ct.getId()) ){
et.getParent().getExecutions().set(i, (ExecutionEntity)activityJoinedExecutions.get(ct.getId()));
}
}
joinedExecutions = execution.findInactiveConcurrentExecutions(activity);
}
}
}
if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {
activityJoinedExecutions.clear();
// Fork
log.fine("parallel gateway '" activity.getId() "' activates: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
execution.takeAll(outgoingTransitions, joinedExecutions);
} else if (log.isLoggable(Level.FINE)){
log.fine("parallel gateway '" activity.getId() "' does not activate: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
}
}
}