Activiti 併發處理

Activiti 版本 5.10 

使用activiti 有一段時間了,目前使用activiti 的大部分公司都是用來作相似於OA 等以用戶任務爲主的流程, 
這我沒什麼好說的,由於咱們的流程是以ServiceTask UserTask 結合來處理定時調度等數據處理任務。 
ServiceTask 以主,採用class 和 Spring bean 的方式。廢話補多少,切入正題: 

Activiti 5.10 設計器是支持 delegateExpression 的注入參數的,但activiti 引擎的解析器卻未能將參數注入從Spring 獲得的bean,該問題在5.11 的版本上是獲得解決了的,但 5.11版本目前尚未發佈,有興趣的同窗能夠去下載正在開發中的代碼進行研究,或者修改源碼 

而後咱們來談談Activiti 對於併發的處理以及其中的問題(以ServiceTask 爲例): 
當咱們將serviceTask 設置 async = "true" (關於 isExclusive 後續會提到) 的時候,流程引擎採用JobExecutor 來異步執行,執行順序爲引擎首先會將該任務實例化一條job記錄,插act_ru_job表,而後JobExecutor 掃描該表並加鎖執行該job,這裏就涉及到定義ServiceTask 的另一個屬性isExclusive,這個屬性默認爲true,即同一流程中當前存在於act_ru_job 且 is_exclusive 爲true的會一塊兒取出來,放入一個AcquireJobsCmd,而後放入一個線程執行,這樣作用來保證該批任務時串行執行的,使用相同的context,這樣作沒有什麼問題, 
但不能達到真正並行的目的。 
附上咱們的流程圖: 



目標,處於parallelGateWay後面的任務並行執行,即任務完成的時間爲單個任務完成的最大時間。 
每一個ServiceTask 的 acitiviti:async = "true" activiti:exclusive="false" 
運行,這時你會遇到一個 ActivitiOptimisticLockingException 異常 
   (toString(updatedObject) " was updated by another transaction concurrently"); 
   
爲何會這樣呢?由於每一個ServiceTask 對應一條act_ru_execution 表的記錄,當該任務完後後,會去跟新其 
parent_id 對應的execution 將其版本 1 , 
  update ${prefix}ACT_RU_EXECUTION set 
      REV_ = #{revisionNext, jdbcType=INTEGER}, 
      PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR}, 
      ACT_ID_ = #{activityId, jdbcType=VARCHAR}, 
      IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN}, 
      IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN}, 
      IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN}, 
      IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN}, 
      PARENT_ID_ = #{parentId, jdbcType=VARCHAR}, 
      SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR}, 
      SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER}, 
      CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER} 
     where ID_ = #{id, jdbcType=VARCHAR} 
      and REV_ = #{revision, jdbcType=INTEGER} 

跟蹤該異常的拋出緣由是 在serviceTask完成後,更新的ExecutionEntity 是同一條記錄,而每個serviceTask 此時處於 
兩個不一樣的線程和事務當中,兩個事務彼此不可見,任務開始時獲取的ExecutionEntity完成相同,當一個事務成功更新後, 
另外一個事務就會失敗。這樣保證了流程的準確執行,當該任務失敗後,會在下一個JobExecutor 掃描時從新執行。此時獲取的 
execution 的版本已經加1,此時任務正常結束。Activiti 引擎如此作有必定的道理,但這不是我要的。爲何這樣說呢? 
假設我兩個 serviceTask,每一個執行都須要30分鐘,僅僅由於這樣,我就須要花費1個小時的時間才能完成,天啊,饒了我吧! 

有木有辦法,我想是有的,只要你夠大膽。跟蹤代碼,更新父execution的時候,惟一變了的就是version,其餘值都沒變化, 
那麼咱們是否能夠將update語句更改一下,以下: 
update ${prefix}ACT_RU_EXECUTION set 
      REV_ = REV_ 1, 
      PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR}, 
      ACT_ID_ = #{activityId, jdbcType=VARCHAR}, 
      IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN}, 
      IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN}, 
      IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN}, 
      IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN}, 
      PARENT_ID_ = #{parentId, jdbcType=VARCHAR}, 
      SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR}, 
      SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER}, 
      CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER} 
     where ID_ = #{id, jdbcType=VARCHAR} 
     
注意到這裏的變化,where 條件只剩id去掉了version ,而後REV_使用表裏面的數據直接 1經測試完成可行,有興趣的朋友能夠本身試試。這裏的關口過了,但後面仍然存在危險,因此修改源碼是有風險的(%>_<%)。問題出在哪兒?有時候你會發現 兩個serviceTask 運行完了,Execution表的記錄也更新了,流程停滯不前了,這是神馬緣由??? 

最後終於讓我問題出現的地方,ParallelGatewayActivityBehavior,咱們前面說過,當每一個ServiceTask執行完成以後,事務並無到結束的地方,根據ServiceTask 的流程指向來到了第二個ParallelGateway,ParallelGatewayActivityBehavior 的做用就是判斷前面的任務是否完成,是否繼續執行,當每一個ServiceTask所在的事務到達此處時,他們都只能看見本身完成的部分,而不能看見與他並行的事務裏面的狀態。因此當到達是否執行下一步的判斷條件時 
if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {
      // Fork
      log.fine("parallel gateway '" activity.getId() "' activates: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
      execution.takeAll(outgoingTransitions, joinedExecutions);
      
  } else if (log.isLoggable(Level.FINE)){
     log.fine("parallel gateway '" activity.getId() "' does not activate: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
  }

都會告訴ParallelGatewayActivityBehavior 我已經完成了,其餘的尚未完成。 
有人可能會說,爲何個人程序沒有遇到這種狀況呢? 
第一 若是你不是ServiceTask 任務很難遇到這種狀況 
第二 若是你的ServiceTask 沒有設置爲 async = "true" 和 exclusive="false" 也就不是真正的併發,固然也不會遇到 
     還有其餘緣由我就不贅述了。 
     
這種問題也是能夠解決的,由於針對同一個流程,每個事務都是經過同一個ParallelGatewayActivityBehavior實例來進行判斷的, 
咱們只要記錄每個經過該GateWay的事務的完成狀況,而後彙總起來就OK 了,另外在完成的那一步須要將executio 的parent 的 
executions 對應的更新,不然execution 會有記錄不能刪除,但流程是能夠完整的執行完成,給出個人完整處理方式: 
public class ParallelGatewayActivityBehavior extends GatewayActivityBehavior {
  
  private static Logger log = Logger.getLogger(ParallelGatewayActivityBehavior.class.getName());
  private Map<String,ActivityExecution> activityJoinedExecutions = new ConcurrentHashMap<String,ActivityExecution>();
  
  public void execute(ActivityExecution execution) throws Exception { 
    
    // Join
    PvmActivity activity = execution.getActivity();
    List<PvmTransition> outgoingTransitions = execution.getActivity().getOutgoingTransitions();
    
    execution.inactivate();
    lockConcurrentRoot(execution);
    
    List<ActivityExecution> joinedExecutions = execution.findInactiveConcurrentExecutions(activity);
    int nbrOfExecutionsToJoin = execution.getActivity().getIncomingTransitions().size();
    int nbrOfExecutionsJoined = joinedExecutions.size();
    if(nbrOfExecutionsToJoin!=nbrOfExecutionsJoined){
	for(ActivityExecution e:joinedExecutions){
	    activityJoinedExecutions.put(e.getId(), e);
	}
	nbrOfExecutionsJoined = activityJoinedExecutions.size();
	if(nbrOfExecutionsJoined == nbrOfExecutionsToJoin && execution.getParentId()!=null 
		&& execution instanceof ExecutionEntity){
	    ExecutionEntity et = (ExecutionEntity)execution;
	    while(joinedExecutions.size()!=nbrOfExecutionsToJoin ){
		Thread.sleep(10000);
		for(int i = 0 ; i < et.getParent().getExecutions().size(); i  ){
		    ExecutionEntity ct = et.getParent().getExecutions().get(i);
		    if(activityJoinedExecutions.containsKey(ct.getId()) ){
			et.getParent().getExecutions().set(i, (ExecutionEntity)activityJoinedExecutions.get(ct.getId()));
		    }
		    
		}
		joinedExecutions = execution.findInactiveConcurrentExecutions(activity);
	    }
	}
    }
    
    if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {
	activityJoinedExecutions.clear();
      // Fork
      log.fine("parallel gateway '" activity.getId() "' activates: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
      execution.takeAll(outgoingTransitions, joinedExecutions);
      
    } else if (log.isLoggable(Level.FINE)){
      log.fine("parallel gateway '" activity.getId() "' does not activate: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
    }
  }

}
相關文章
相關標籤/搜索