Activiti架構分析及源碼詳解

Activiti架構分析及源碼詳解

[TOC]java

引言

工做流引擎,應用於解決流程審批和流程編排方面等問題,有效的提供了擴展性的支撐。而目前來講,工做流領域也有了相對通行化的標準規範,也就是BPMN2.0。支持這個規範的開源引擎主要有:Activiti,flowable,Jbpm4等。本文着重對Activiti的架構設計進行分析和梳理,同時對流程啓動和原子操做的相關代碼進行完整走讀。算法

本文的閱讀對象須要對Activiti有必定的理解而且已經可以初步的使用Activiti進行流程流轉方面開發。數據庫

若是對文章內容有疑惑,歡迎加入技術交流羣186233599,做者會不定時解答相關問題。緩存

1、Activiti設計解析-架構&領域模型

1.1 架構

Activiti採用了一個分層架構完成自底向上的包裝。架構圖以下架構

大體包括:併發

  • 核心接口層,被PVM接口定義。PVM會在後面的章節中詳細講述。
  • 核心實現層,基於PVM的思想和接口,定義了一些關鍵實體包含ActivityImpl(該類抽象了節點實現),FlowElementBehavior實現(該類抽象了節點指令動做),ExecutionImpl(流程執行實體類)
  • 命令層,Activiti在編碼模式上直接限定總體風格爲命令模式。也就是將業務邏輯封裝爲一個個的Command接口實現類。這樣新增一個業務功能時只須要新增一個Command實現便可。這裏須要特別提到的是,命令自己須要運行在命令上下文中,也就是CommandContext類對象。
  • 命令攔截層,採用責任鏈模式,經過責任鏈模式的攔截器層,爲命令的執行創造條件。諸如開啓事務,建立CommandContext上下文,記錄日誌等
  • 業務接口層,面向業務,提供了各類接口。這部分的接口就再也不面向框架開發者了,而是面向框架的使用者。
  • 部署層,嚴格來講,這個與上面說到的並非一個完整的分層體系。可是爲了突出重要性,單獨拿出來講。流程運轉的前提是流程定義。而流程定義解析就是一切的開始。從領域語言解析爲Java的POJO對象依靠的就是部署層。後文還會細說這個環節。
  • 流程引擎,全部接口的總入口。上面提到的業務接口層,部署層均可以從流程引擎類中獲得。所以這裏的流程引擎接口其實相似門面模式,只做爲提供入口。

1.1.1 命令模式

Activit總體上採用命令模式進行代碼功能解耦。將流程引擎的大部分涉及到客戶端的需求讓外部以具體命令實現類的方式實現。app

完成這個編碼模式,有幾個重點類須要關注框架

  • Command命令接口,全部的具體命令都須要實現該類,最終業務就是執行該類的execute方法。
  • CommandContext命令上下文,爲具體命令的執行提供上下文支撐。該上下文的生成是依靠命令攔截器中的上下文攔截器org.activiti.engine.impl.interceptor.CommandContextInterceptor來生成的。該攔截器會判斷是複用當前的上下文仍是生成新的上下文。

引擎內的大部分功能都是經過單獨的命令完成。ide

1.1.2 責任鏈模式

Activiti的命令模式還須要搭配其對應的責任鏈來完成。具體來講,Activiti中存在一個命令攔截器鏈條,該命令攔截器鏈條由幾大塊的攔截器實現組成,以下oop

其中重要的默認攔截器有2個:

  • 事務攔截器,主要職責是使得後續命令運行在事務環境下。
  • CommandContext攔截器,主要職責是在有必要的時候建立CommandContext對象,並在使用完成後關閉該上下文。
1.1.2.1 事務攔截器

事務攔截器是否提供取決於org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl的子類對方法createTransactionInterceptor的實現。獨立使用時的org.activiti.engine.impl.cfg.StandaloneProcessEngineConfiguration該方法返回爲空。也就是不提供事務攔截器。此時,命令的運行就沒法經過事務攔截器來提供事務環境了。

1.1.2.2 命令上下文攔截器

實現類:org.activiti.engine.impl.interceptor.CommandContextInterceptor。

該攔截器的功能很是重要,能夠說是Activiti操做的核心之一。其做用是在後續攔截器執行前檢查當前上下文環境,若是不存在CommandContext對象,則建立一個;在後續攔截器執行後,將CommandContext對象close。CommandContext包含了本次操做中涉及到全部的數據對象。

1.1.3 流程定義解析

Activiti遵循BPMN2.0規範,所以框架中少不了對BPMN2.0規範的定義文件(XML形式)的解析類。Activiti採用的STAX的拉模型進行XML解析。這裏先不分析其具體的解析類的內在聯繫,而是概念性的闡述下Activiti對解析的概念分層。

首先經過類org.activiti.bpmn.converter.BpmnXMLConverter進行XML解析,解析爲org.activiti.bpmn.model

包下面的與各個XML元素定義對應的POJO類。此時這些POJO類僅僅只是XML文件的一個Java表達。

在經過類org.activiti.engine.impl.bpmn.parser.BpmnParser聚合不一樣的解析類,將上面步驟解析出來的POJO類進一步解析爲能夠在框架中利用的org.activiti.engine.impl.pvm.process包下面的類。典型的表明就是ActivityImpl類。

三者之間的關係簡單用圖表達就是

1.2 領域模型

Activiti採起了領域中的充血模型做爲本身的實現方式。大部分業務邏輯都直接關聯在了org.activiti.engine.impl.persistence.entity.ExecutionEntity中。因爲Activiti採用了MyBatis而非Hibernate這樣的O/R Mapping產品做爲持久層,所以Activiti在具體的持久化操做上也有本身的獨特方式。

1.2.1 數據集中提交

Activiti的持久化機制簡單說來就是數據集中提交。集中提交還產生了一個額外的做用:自動提交。換句話說,在內存中的實體,若是更新了屬性可是沒有顯示的執行刷新動做,在一個調用的生命週期結束後也會被持久化其最新的狀態到數據庫。下面來看下詳細解釋下這個集中提交機制。

在Activiti全部運行期生成的對象都須要實現一個接口org.activiti.engine.impl.interceptor.Session,其定義以下

public interface Session
{
  void flush();
  void close();
}

而Session對象則是由接口org.activiti.engine.impl.interceptor.SessionFactory的方法進行生成,其定義以下

public interface SessionFactory {
  Class<?> getSessionType();
  Session openSession();
}

流程引擎內部持有各類SessionFactory實現,用戶也能夠自定義註冊本身的SessionFactory實現,若是用戶但願自定義的對象也能夠被集中提交機制處理的話。

在CommandContext中存在一個Map<Class,Session>存儲,存儲着該CommandContext生命週期內新建的全部Session對象。當一個命令執行完畢後,最終命令上下文CommandContext的close方法會被調用。當執行CommandContext.close()方法時,其內部會按照順序執行flushSessions,closeSessions方法。從名字能夠看到,第一個方法內部就是執行全部Session對象的flush方法,第二個方法內部就是執行全部的Session對象的close方法。

流程引擎內部有一個Session實現是比較特別的。也就是org.activiti.engine.impl.db.DbSqlSession實現。若是有須要更新,刪除,插入等操做,該操做是須要經過DbSqlSession來實現的,而實際上該實現會將這些操做緩存在內部。只有在執行flush方法時纔會真正的提交到數據庫去執行。正是由於如此,全部的數據操做,實際上最終都是要等到CommandContext執行close方法時,纔會真正提到到數據庫。

理論上,充血模型是在聚合根這個層面上完成的持久化。可是因爲Activiti沒有采用O/R Mapping框架,因而本身完成了一個相似功能的模塊。

1.2.2 PersistentObject

要明白工做流中的數據插入機制,首先要看下實體類的接口org.activiti.engine.impl.db.PersistentObject,以下

public interface PersistentObject {
  String getId();
  void setId(String id);
  Object getPersistentState();
}

Activiti的數據庫表都是單字符串主鍵,每個實體類都須要實現該接口。在對實體類進行保存的時候,DbSqlSession會調用getId方法判斷是否存在ID,若是不存在,則使用ID生成器(該生成器根據策略有幾種不一樣實現,這裏不表)生成一個ID而且設置。

而方法getPersistentState是用來返回一個持久化狀態對象。則方法的使用場合在下一個章節說明

1.2.3 DbSqlSession

該Session內部存在三個重要屬性,以下

//該屬性存儲着全部使用insert方法放入的對象
protected Map<Class<? extends PersistentObject>, List<PersistentObject>> insertedObjects = new HashMap<Class<? extends PersistentObject>, List<PersistentObject>>();
//該Map結構內存儲全部經過該DbSqlSession查詢出來的結果,以及update方法放入的對象
protected Map<Class<?>, Map<String, CachedObject>> cachedObjects = new HashMap<Class<?>, Map<String,CachedObject>>();
//該屬性內存儲着全部將要執行的刪除操做
protected List<DeleteOperation> deleteOperations = new ArrayList<DeleteOperation>();

刪除和新增都比較容易理解,就是要此類操做緩存起來,一次性提交到數據庫,上文曾提到的數據集中提交就體如今這個地方。而cachedObjects就有些不一樣了。要解析這個Map結構,首先來看下類org.activiti.engine.impl.db.DbSqlSession.CachedObject的結構屬性,以下

public static class CachedObject {
    protected PersistentObject persistentObject;
    protected Object persistentObjectState;
}
public CachedObject(PersistentObject persistentObject, boolean storeState) {
      this.persistentObject = persistentObject;
      if (storeState) {
        this.persistentObjectState = persistentObject.getPersistentState();
      }
    }

經過構造方法能夠明白,在新建該對象的時候,經過storeState參數決定是否保存當時的持久化狀態。

該Map的數據來源有2處

  • 全部經過該DbSqlSession對象執行查詢的結果對象都會生成一個對應的CachedObject對象,而且storeState參數爲true
  • 執行該DbSqlSession對象的update類方法,會將參數用CachedObject對象包裝起來,storeState參數爲false。

當DbSqlSession執行flush方法時,主要來講是作了數據提交動做

  1. 將insertObjects列表中的元素插入到數據庫
  2. 將deleteOperations列表中的元素遍歷執行
  3. 執行方法getUpdatedObjects得到要更新的實體對象

方法getUpdatedObjects的邏輯就是遍歷全部的CachedObject,同時知足如下條件者則放入要更新的實體集合中

  1. 實體的getPersistentState方法不爲空
  2. 實體的getPersistentState方法返回對象與CachedObject存儲的persistentObjectState執行equal判斷,結果爲false

經過上面能夠得知,若是一個實體類在DbSqlSession的生命週期被查詢出來,而且其數據內容有了修改,則DbSqlSession刷新時會自動刷新到數據庫

2、Activiti設計解析-PVM執行樹

2.1 核心理念

任何框架都是核心理念上發展細化而來。Activiti的核心理念就是流程虛擬機(Process Virtual Machine,如下簡稱PVM)。PVM試圖提供一組API,經過API自己來描述工做流方面的各類可能性。沒有了具體實現,也使得PVM自己能夠較好的適應各類不一樣的工做流領域語言,而Activiti自己也是在PVM上的一種實現。

2.1.1 PVM對流程定義期的描述

首先來看下流程定義自己。在工做流中,流程定義能夠圖形化的表達爲一組節點和鏈接構成的集合。好比下圖

即便沒有任何知識也能大概明白這張圖表達的是一個流程以及執行順序的意圖。流程定義的表達方式不限,可使用圖形的方式表達,可使用領域語言,也能夠傳統的XML(好比Activiti用的就是BPMN2.0 Schema下的XML)。特別的,當前已經有了標準化的BPMN2.0規範。

PVM將流程定義描述爲流程元素的集合。再將流程元素細分爲2個子類:流程節點和連線。

  • 流程節點是某一種動做表達的抽象描述。節點自己是能夠嵌套的,也就是節點能夠擁有子節點。
  • 連線表達是不一樣節點之間的轉移關係。一個連線只能有一個源頭節點和一個目標節點。而節點自己能夠有任意多的進入連線和外出連線。

從類圖的角度也能很好的看出這種關係,流程節點PvmActivity和連線PvmTransition都是流程元素PvmProcessElement。

從類圖能夠看到PvmActivity繼承於PvmScope。這種繼承關係代表流程節點自己有其歸於的做用域(PvmScope),節點自己也多是另一些節點的做用域,這也符合節點可能擁有子節點的原則。關於做用域自己,後文還會再次詳細講解,這裏先按下不表。

2.1.2 PVM對流程運行期的描述

經過流程節點和連線,PVM完成了對流程定義的表達。流程定義是一個流程的靜態表達,流程執行則是依照流程定義啓動的一個運行期表達,每個流程執行都具有本身惟一的生命週期。流程執行須要具有如下要素:

  1. 流程節點的具體執行動做。
  2. 流程執行當前處於哪個流程節點。
  3. 流程執行是如何從一個節點運行至下一個節點。
  4. 流程執行如何執行流程節點定義的執行動做

針對要素1,Activiti提供了接口org.activiti.engine.impl.pvm.delegate.ActivityBehavior。該接口內部僅有一個execute方法。該接口的實現即爲不一樣PvmActivity節點提供了具體動做。ActivityBehavior有豐富的不一樣實現,對應了流程中豐富的不一樣功能的節點。每個PvmActivity對象都會持有一個ActivityBehavior對象。

針對要素2,Activiti提供了接口org.activiti.engine.impl.pvm.PvmExecution。該接口有一個方法PvmActivity getActivity()。用以返回當前流程執行所處的流程節點。

針對要素3,Activiti提供了接口org.activiti.engine.impl.pvm.runtime.InterpretableExecution。接口方法不少,這裏取和流程執行運轉最重要的2個方法展開,以下

public interface InterpretableExecution extends ActivityExecution, ExecutionListenerExecution, PvmProcessInstance {
  void take(PvmTransition transition);
  void take(PvmTransition transition, boolean fireActivityCompletedEvent);

執行方法take,以連線對象做爲入參,這會使得流程執行該連線定義的路線。其實現邏輯應該爲讓流程執行定位於連線源頭的活動節點,經由連線對象,到達連線目的地的活動節點。

針對要素4,實際上也是由接口org.activiti.engine.impl.pvm.runtime.AtomicOperation來完成的。經過該接口的調用類,此種狀況的實現者須要獲取當前流程執行所處的活動節點的ActivityBehavior對象,執行其execute方法來執行節點動做。結合要素3和4,能夠看出AtomicOperation接口用於執行流程運轉中的單一指令,例如根據連線移動,執行節點指令等。分解成單一指令的好處是易於編碼和理解。這也契合接口命名中的原子一意。

2.1.3PVM綜述

從上面對PVM定義期和運行期的解釋能夠看出,整個概念體系並不複雜。涉及到的類也很少。正是由於PVM只對工做流中最基礎的部分作了抽象和接口定義,使得PVM的實現上有了不少的可能性。

然而也正是因爲定義的簡單性,實際上這套PVM在轉化爲實際實現的時候須要額外附加不少的特性才能真正完成框架需求。

2.2 ActivitiImpl與做用域

在解析完成後,一個流程定義中的全部節點都會被解析爲ActivityImpl對象。ActivityImpl對象自己能夠持有事件訂閱(根據BPMN2.0規範,目前有定時,消息,信號三種事件訂閱類型)。由於ActivityImpl自己能夠嵌套而且能夠持有訂閱,所以引入做用域概念(Scope)。

一個ActivityImpl在如下兩種狀況下會被定義爲做用域ActivityImpl。

  1. 該ActivityImpl是可變範圍,則它是一個做用域。可變範圍能夠理解爲該節點的內容定義是可變的。好比流程定義、子流程,其內部內容是可變的。根據BPMN定義,可變範圍有:流程定義,子流程,多實例,調用活動。
  2. 該ActivityImpl定義了一個上下文用於接收事件。好比:具有邊界事件的ActivityImpl,具有事件子流程的ActivityImpl,事件驅動網關,中間事件捕獲ActivityImpl。

做用域是一個很重要的概念,狀況1中做用域定義的是複雜節點的生命週期,狀況2中做用域定義的是事件的捕獲範圍。

2.3 ExecutionEntity

ExecutionEntity的含義是一個流程定義被啓動後的執行實例,表明着流程的運行期狀態。在Activiti的設計中,事件訂閱,流程變量等都是與一個具體的ExecutionEntity相關的。其自己有幾個重要的屬性:

  • isScope:該屬性爲真時,意味該執行實例在執行一個具有做用域的ActivityImpl節點或者執行一個流程定義。更簡單一些,意味着該實例正在執行一個做用域活動。
  • isConcurrent:該屬性爲真時,意味與該執行實例正在執行的活動節點同屬相同做用域的節點可能正併發被其餘執行實例執行(好比並行網關後面的2個並行任務)。
  • isActive:該屬性爲真時,意味該執行實例正在執行一個簡單ActivityImpl(不包含其餘ActivityImpl的ActivityImpl)
  • isEventScope:該屬性爲真時,意味該執行實例是爲了後期補償而進行的變量保存所建立的執行實例。因爲流程執行中的變量都須要與ExecutionEntity掛鉤,而補償是須要原始變量的快照。爲了知足這個需求,建立出一個專用於此的ExecutionEntity。
  • activityId:該ExecutionEntity正在執行的ActivityImpl的id。正在執行意味着幾種狀況:進入該節點,執行該節點動做,離開該節點。若是是等待子流程的完成,則該屬性爲null。

上面對ExecutionEntity的解釋仍然抽象。若是直觀的看,能夠認爲ExecutionEntity是某一種生命週期的體現,其內部屬性隨着不一樣的狀況而變化。以下圖所示:

隨着流程的啓動,會建立一個ExecutionEntity對象。該ExecutionEntity生命週期與整個流程相同,而其中的isScopeisConcurrent在建立之初固定,而且不會改變。而isActiveactivityId隨着流程的推動則會不斷變化。

ExecutionEntity是用來反映流程的推動狀況的,實際上,每每一個ExecutionEntity不足以支撐所有的BPMN功能。所以實現上,Activiti是經過一個樹狀結構的ExecutionEntity結構來反映流程推動狀況。建立之初的ExecutionEntity對象隨着流程的推動會不斷的分裂和合並,ExecutionEntity樹也會不斷的生長和修剪。在流程的推動過程當中會遇到4種基本狀況

  1. 單獨的非做用域節點
  2. 單獨的做用域節點
  3. 併發的非做用域節點
  4. 併發的做用域節點

2.3.1 單獨的非做用域節點

此種狀況能夠以下圖所示

在流程前進的構成中,遇到單獨的非做用域節點,ExecutionEntity一直處於激活狀態,只不過隨着流程前進,其activityId指向會不斷變化。如上圖所示,會經歷:開始節點、組員工做、領導審批、結束節點4個不一樣的值。

實際上,這些節點都是在做用域<流程定義>之下,而ExecutionEntity表明的正是該做用域,所以其isScope屬性爲true。

2.3.2 單獨的做用域節點

若是流程推動中遇到單獨的做用域節點,則當前執行對象ExecutionEntity應該建立一個做用域子執行(isScope爲true的ExecutionEntity)。整個變化過程能夠以下圖所示

當準備進入節點<組員工做>時,ExecutionEntity1凍結,而且建立出子執行ExecutionEntity2ExecutionEntity2的isScope屬性也爲true。

ExecutionEntity1的isScope爲true,是由於該執行實例負責整個流程定義的事件訂閱,ExecutionEntity2的isScope爲true,是由於該執行實例負責節點<組員工做>的事件訂閱。

前面提到過,事件訂閱與某一個具體的執行實例相關。當節點<組員工做>完成時,也就是事件訂閱所在的做用域要被摧毀時,對應的事件訂閱也要被刪除。此時額外的ExecutionEntity就特別方便,只要刪除該ExecutionEntity,順便刪除相關的事件訂閱便可,在這裏就是刪除ExecutionEntity2

刪除ExecutionEntity2,而且激活父執行ExecutionEntity1。隨着流程推動,ExecutionEntity1更換指向的activityId。

2.3.3 併發的非做用域節點

流程推動中節點存在多個外出連線,則能夠根據須要建立多個併發的子執行,每個子執行對應一個連線。以下圖所示

當流程節點A、B被激活時,ExecutionEntity1會有2個併發的子執行ExecutionEntity2ExecutionEntity3。這兩個子執行的isConcurrent屬性均爲true,由於節點A和B都是在相同的做用域(流程定義)下被併發的執行。

當節點A、B執行完畢後,併發的子執行被刪除,父執行從新被激活,繼續後面的節點。

2.3.4 併發的做用域節點

流程推動中遇到併發節點,而且節點爲做用域節點,狀況就會以下所示

當流程運行至P1節點,P1節點有多個出線。根據出線數目建立2個子執行,此時2個子執行均爲併發的,且從P1做爲出線的源頭節點,所以2個子執行的activityId均爲P1。

當運行到A、B節點時,因爲2個節點均爲做用域節點,所以還會再建立2個子執行。此時ExecutionEntity3ExecutionEntity3凍結。ExecutionEntity4ExecutionEntity5所執行的節點在各自的做用域下均無併發操做,所以其isScope屬性爲true,isConcurrent屬性爲false。這5個執行實例構成的執行樹以下

當A、B節點完成時,首先是各自的做用域被刪除,所以ExecutionEntity4ExecutionEntity5首先被刪除,ExecutionEntity3ExecutionEntity4激活。然後匯聚於P2節點,所以ExecutionEntity3ExecutionEntity4刪除,ExecutionEntity1被激活,繼續執行剩下的節點。

3、代碼解析-流程啓動

3.1 流程說明

流程啓動依靠的是命令類:org.activiti.engine.impl.cmd.StartProcessInstanceCmd

該命令的總體流程以下

部署管理器的查詢和運行期關係不大,先忽略。兩個流程着重展開:

  1. 流程定義實體建立流程實例
  2. 流程實例執行啓動

3.1.1 流程定義實體建立流程實例

流程以下

其中指定初始滑動節點建立實例自己展開後的流程以下

ExecutionEntity的初始化須要單獨說下,流程以下

在建立流程的邏輯的尾部是一個循環流程。該循環的目的是爲了建立正確的ExecutionImpl樹(如下簡稱執行樹)。本質上該方法是建立一個流程實例,而且將流程當前運行節點定位到指定的節點。而工做流的正確執行依賴於執行樹的正確分裂和整合。所以就須要爲指定的節點建立其上游的執行樹實例。使得在效果上看起來就和流程自動執行到當前節點相似(執行樹相似,節點運行歷史則無類似,實際上也無歷史節點)。

而若是指定的初始節點就是流程定義的初始節點,則循環就不存在乎義了。

3.1.2 流程實例啓動

流程實例的啓動的內容,就是執行原子操做:org.activiti.engine.impl.pvm.runtime.AtomicOperationProcessStart。關於原子操做單獨闡述。

3.2 額外補充

3.2.1 ActivityImpl的parent屬性

org.activiti.engine.impl.pvm.process.ActivityImpl類中有一個屬性parent,類型爲org.activiti.engine.impl.pvm.process.ScopeImpl。在解析的時候,該屬性爲當前節點的做用域節點。根據做用域節點的定義,該屬性的取值有兩種可能的類型,一種是org.activiti.engine.impl.pvm.process.ActivityImpl,另一種是org.activiti.engine.impl.pvm.process.ProcessDefinitionImpl

第二種狀況意味着該節點是直屬於流程定義的節點了。

4、代碼解析-原子操做

4.1 說明

原子操做是一個接口org.activiti.engine.impl.pvm.runtime.AtomicOperation。從名字也能夠看出,該接口的做用就是執行流程實例中的一個單步操做。下面分階段說明

4.2 AbstractEventAtomicOperation

該抽象類是衆多實現類的基類。其代碼以下

public abstract class AbstractEventAtomicOperation implements AtomicOperation {
 
  public boolean isAsync(InterpretableExecution execution) {
    return false;
  }
 
  public void execute(InterpretableExecution execution) {
      //獲取當前執行對象的做用域對象。具體由子類提供。
    ScopeImpl scope = getScope(execution);
      //從做用域對象中獲取指定事件的監聽器。事件名稱由子類提供。
    List<ExecutionListener> exectionListeners = scope.getExecutionListeners(getEventName());
    int executionListenerIndex = execution.getExecutionListenerIndex();
 
    if (exectionListeners.size()>executionListenerIndex) {
      execution.setEventName(getEventName());
      execution.setEventSource(scope);
      ExecutionListener listener = exectionListeners.get(executionListenerIndex);
      try {
        listener.notify(execution);
      } catch (RuntimeException e) {
        throw e;
      } catch (Exception e) {
        throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
      }
      execution.setExecutionListenerIndex(executionListenerIndex+1);
      execution.performOperation(this);
 
    } else {
      execution.setExecutionListenerIndex(0);
      execution.setEventName(null);
      execution.setEventSource(null);
 
      eventNotificationsCompleted(execution);
    }
  }
 
  protected abstract ScopeImpl getScope(InterpretableExecution execution);
  protected abstract String getEventName();
  protected abstract void eventNotificationsCompleted(InterpretableExecution execution);
}

整個抽象類的邏輯歸納而言,就是將獲取當前執行實例的做用域對象(具體由子類提供),執行其中特定事件(事件名由子類提供)的監聽器。

在所有的監聽器執行完畢後,執行子類的特定邏輯。

4.3 AtomicOperationProcessStart

該操做用於流程啓動。可是並不執行真正的啓動動做。只是設置了當前執行實例的活動節點爲org.activiti.engine.impl.pvm.runtime.StartingExecution中存儲的活動節點。而後執行原子操做org.activiti.engine.impl.pvm.runtime.AtomicOperationProcessStartInitial

本質上來講,只是執行了一個設置的動做。

public class AtomicOperationProcessStart extends AbstractEventAtomicOperation {
 
  @Override
  protected ScopeImpl getScope(InterpretableExecution execution) {
    return execution.getProcessDefinition();
  }
 
  @Override
  protected String getEventName() {
    return org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_START;
  }
 
  @Override
  protected void eventNotificationsCompleted(InterpretableExecution execution) {
      if (Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
        Map<String, Object> variablesMap = null;
        try {
          variablesMap = execution.getVariables();
        } catch (Throwable t) {
          // In some rare cases getting the execution variables can fail (JPA entity load failure for example)
          // We ignore the exception here, because it's only meant to include variables in the initialized event.
        }
        Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
                ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED,
                    execution, variablesMap, false));
      Context.getProcessEngineConfiguration().getEventDispatcher()
              .dispatchEvent(ActivitiEventBuilder.createProcessStartedEvent(execution, variablesMap, false));
    }
 
    ProcessDefinitionImpl processDefinition = execution.getProcessDefinition();
    StartingExecution startingExecution = execution.getStartingExecution();
    List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial());
    execution.setActivity(initialActivityStack.get(0));
    execution.performOperation(PROCESS_START_INITIAL);
  }
}

4.4 AtomicOperationProcessStartInitial

代碼以下

public class AtomicOperationProcessStartInitial extends AbstractEventAtomicOperation {
 
  @Override
  protected ScopeImpl getScope(InterpretableExecution execution) {
    return (ScopeImpl) execution.getActivity();
  }
 
  @Override
  protected String getEventName() {
    return org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_START;
  }
 
  @Override
  protected void eventNotificationsCompleted(InterpretableExecution execution) {
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    ProcessDefinitionImpl processDefinition = execution.getProcessDefinition();
    StartingExecution startingExecution = execution.getStartingExecution();
     //從開始節點開始的,該判斷均爲真。
    if (activity==startingExecution.getInitial()) {
      execution.disposeStartingExecution();
      execution.performOperation(ACTIVITY_EXECUTE);
    } else {
      List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial());
      int index = initialActivityStack.indexOf(activity);
      activity = initialActivityStack.get(index+1);
 
      InterpretableExecution executionToUse = null;
      if (activity.isScope()) {
        executionToUse = (InterpretableExecution) execution.getExecutions().get(0);
      } else {
        executionToUse = execution;
      }
      executionToUse.setActivity(activity);
      executionToUse.performOperation(PROCESS_START_INITIAL);
    }
  }
}

4.5 AtomicOperationTransitionNotifyListenerEnd

該原子操做的目的僅是爲了執行節點上的end事件監聽器。監聽器執行完畢後,就執行下一個原子操做AtomicOperationTransitionDestroyScope

4.6 AtomicOperationTransitionNotifyListenerStart

該原子操做的目的是爲了執行節點上start事件監聽器。在執行完畢後,會判斷執行實例的當前節點是否能夠執行。判斷的依據該節點和鏈接線節點

4.3 AtomicOperationActivityExecute

該原子操做的做用實際上就是取出該執行實例當前的活動節點,而且執行該活動節點的行爲定義。行爲定義經過接口org.activiti.engine.impl.pvm.delegate.ActivityBehavior定義。不一樣的節點行爲由不一樣的子類完成

public class AtomicOperationActivityExecute implements AtomicOperation {
 
  private static Logger log = LoggerFactory.getLogger(AtomicOperationActivityExecute.class);
 
  public boolean isAsync(InterpretableExecution execution) {
    return false;
  }
 
  public void execute(InterpretableExecution execution) {
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
 
    ActivityBehavior activityBehavior = activity.getActivityBehavior();
    if (activityBehavior==null) {
      throw new PvmException("no behavior specified in "+activity);
    }
 
    log.debug("{} executes {}: {}", execution, activity, activityBehavior.getClass().getName());
 
    try {
        if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
          Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
                  ActivitiEventBuilder.createActivityEvent(ActivitiEventType.ACTIVITY_STARTED,
                          execution.getActivity().getId(),
                          (String) execution.getActivity().getProperty("name"),
                          execution.getId(),
                          execution.getProcessInstanceId(),
                          execution.getProcessDefinitionId(),
                          (String) activity.getProperties().get("type"),
                          activity.getActivityBehavior().getClass().getCanonicalName()));
      }
 
      activityBehavior.execute(execution);
    } catch (RuntimeException e) {
      throw e;
    } catch (Exception e) {
      LogMDC.putMDCExecution(execution);
      throw new PvmException("couldn't execute activity <"+activity.getProperty("type")+" id=\""+activity.getId()+"\" ...>: "+e.getMessage(), e);
    }
  }
}

4.4 AtomicOperationTransitionDestroyScope

public class AtomicOperationTransitionDestroyScope implements AtomicOperation {
 
  private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionDestroyScope.class);
 
  public boolean isAsync(InterpretableExecution execution) {
    return false;
  }
 
  @SuppressWarnings("unchecked")
  public void execute(InterpretableExecution execution) {
    InterpretableExecution propagatingExecution = null;
 
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    /**
    * 若是當前的活動節點具有做用域。這就意味着最初的時候,有一個處於激活狀態的執行實例在執行該節點,如下稱初始執行實例。
    * 從這樣的節點退出要考慮幾種狀況:
    * 1、單獨的做用域節點。此時的執行樹狀況是:非激活的非做用域執行實例-激活的做用域執行實例(初始執行實例)。此時要離開做用域節點,首先是銷燬激活的做用域執行實例(初始執行實例),激活初始執行實例的父實例,使用該父實例執行後續的出線動做。
    * 2、並行的做用域節點。此時的執行樹狀況是:非激活的非做用域併發執行實例-非激活的
    */
    if (activity.isScope()) {
 
      InterpretableExecution parentScopeInstance = null;
      // if this is a concurrent execution crossing a scope boundary
      if (execution.isConcurrent() && !execution.isScope()) {
        // first remove the execution from the current root
        InterpretableExecution concurrentRoot = (InterpretableExecution) execution.getParent();
        parentScopeInstance = (InterpretableExecution) execution.getParent().getParent();
 
        log.debug("moving concurrent {} one scope up under {}", execution, parentScopeInstance);
        List<InterpretableExecution> parentScopeInstanceExecutions = (List<InterpretableExecution>) parentScopeInstance.getExecutions();
        List<InterpretableExecution> concurrentRootExecutions = (List<InterpretableExecution>) concurrentRoot.getExecutions();
        // if the parent scope had only one single scope child
        if (parentScopeInstanceExecutions.size()==1) {
          // it now becomes a concurrent execution
          parentScopeInstanceExecutions.get(0).setConcurrent(true);
        }
 
        concurrentRootExecutions.remove(execution);
        parentScopeInstanceExecutions.add(execution);
        execution.setParent(parentScopeInstance);
        execution.setActivity(activity);
        propagatingExecution = execution;
 
        // if there is only a single concurrent execution left
        // in the concurrent root, auto-prune it.  meaning, the
        // last concurrent child execution data should be cloned into
        // the concurrent root. 
        if (concurrentRootExecutions.size()==1) {
          InterpretableExecution lastConcurrent = concurrentRootExecutions.get(0);
          if (lastConcurrent.isScope()) {
            lastConcurrent.setConcurrent(false);
 
          } else {
            log.debug("merging last concurrent {} into concurrent root {}", lastConcurrent, concurrentRoot);
 
            // We can't just merge the data of the lastConcurrent into the concurrentRoot.
            // This is because the concurrent root might be in a takeAll-loop.  So the
            // concurrent execution is the one that will be receiving the take
            concurrentRoot.setActivity((ActivityImpl) lastConcurrent.getActivity());
            concurrentRoot.setActive(lastConcurrent.isActive());
            lastConcurrent.setReplacedBy(concurrentRoot);
            lastConcurrent.remove();
          }
        }
 
      } else if (execution.isConcurrent() && execution.isScope()) {
        /**
        * 根據算法,這種狀況不會出現。源代碼中,這部分也屬於todo的內容。
        */
      }
        else {
        /**
        * 這個條件是執行實例的scope屬性爲真。此時銷燬當前的執行實例,使用其父執行實例繼續後面的流程
        */
        propagatingExecution = (InterpretableExecution) execution.getParent();
        propagatingExecution.setActivity((ActivityImpl) execution.getActivity());
        propagatingExecution.setTransition(execution.getTransition());
        propagatingExecution.setActive(true);
        log.debug("destroy scope: scoped {} continues as parent scope {}", execution, propagatingExecution);
        execution.destroy();
        //刪除與該執行實例相關的一切,包括:定時工做,各類任務,事件訂閱,用戶流程關係,最後刪除自身。
        execution.remove();
      }
    } else {
      //若是離開的是一個非做用域節點,則仍然使用當前的執行實例做爲下一個節點的執行實例
      propagatingExecution = execution;
    }
 
    // if there is another scope element that is ended
    ScopeImpl nextOuterScopeElement = activity.getParent();
    TransitionImpl transition = propagatingExecution.getTransition();
    ActivityImpl destination = transition.getDestination();
    /**
    * 考慮當前的節點多是子流程或者活動調用中的節點。那麼就須要離開當前的做用域範圍,回到更上層的做用域下。所以須要判斷目的地是否和源頭節點處於同一個做用域。若是不是同一個做用域,則不斷向上回溯
    */
    if (transitionLeavesNextOuterScope(nextOuterScopeElement, destination)) {
      propagatingExecution.setActivity((ActivityImpl) nextOuterScopeElement);
      propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_END);
    } else {
      propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_TAKE);
    }
  }
 
  public boolean transitionLeavesNextOuterScope(ScopeImpl nextScopeElement, ActivityImpl destination) {
    return !nextScopeElement.contains(destination);
  }
}

4.5 AtomicOperationTransitionNotifyListenerTake

該原子操做的目的就是爲了執行在鏈接線上的監聽器。在執行完畢後,就準備執行目標活動節點。這裏關於目標節點還存在一個選擇的問題。並非直接執行鏈接線上的目標活動節點。而是從目標活動節點出發,選擇和執行實例當前活動節點同屬同一個做用域的目標活動節點或其父節點。

public class AtomicOperationTransitionNotifyListenerTake implements AtomicOperation {
 
  private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionNotifyListenerTake.class);
 
  public boolean isAsync(InterpretableExecution execution) {
    return false;
  }
 
  public void execute(InterpretableExecution execution) {
    TransitionImpl transition = execution.getTransition();
 
    List<ExecutionListener> executionListeners = transition.getExecutionListeners();
    int executionListenerIndex = execution.getExecutionListenerIndex();
    /**
    * 整個if的功能就是不斷判斷監聽器是否被執行完畢。都執行完畢後走入到else的部分。
    */
    if (executionListeners.size()>executionListenerIndex) {
      execution.setEventName(org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_TAKE);
      execution.setEventSource(transition);
      ExecutionListener listener = executionListeners.get(executionListenerIndex);
      try {
        listener.notify(execution);
      } catch (RuntimeException e) {
        throw e;
      } catch (Exception e) {
        throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
      }
      execution.setExecutionListenerIndex(executionListenerIndex+1);
      execution.performOperation(this);
 
    } else {
        if (log.isDebugEnabled()) {
            log.debug("{} takes transition {}", execution, transition);
        }
      execution.setExecutionListenerIndex(0);
      execution.setEventName(null);
      execution.setEventSource(null);
 
      ActivityImpl activity = (ActivityImpl) execution.getActivity();
      ActivityImpl nextScope = findNextScope(activity.getParent(), transition.getDestination());
      execution.setActivity(nextScope);
 
      // Firing event that transition is being taken       
      if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
          Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
                ActivitiEventBuilder.createSequenceFlowTakenEvent(ActivitiEventType.SEQUENCEFLOW_TAKEN, transition.getId(),
                        activity.getId(), (String) activity.getProperties().get("name") ,(String) activity.getProperties().get("type"), activity.getActivityBehavior().getClass().getCanonicalName(),
                        nextScope.getId(), (String) nextScope.getProperties().get("name"), (String) nextScope.getProperties().get("type"), nextScope.getActivityBehavior().getClass().getCanonicalName()));
      }
 
      execution.performOperation(TRANSITION_CREATE_SCOPE);
    }
  }
 
  /** finds the next scope to enter.  the most outer scope is found first */
  public static ActivityImpl findNextScope(ScopeImpl outerScopeElement, ActivityImpl destination) {
    ActivityImpl nextScope = destination;
    while( (nextScope.getParent() instanceof ActivityImpl)
           && (nextScope.getParent() != outerScopeElement)
         ) {
      nextScope = (ActivityImpl) nextScope.getParent();
    }
    return nextScope;
  }
}

4.6 AtomicOperationTransitionCreateScope

該原子操做是爲了確認進入的節點是否具有做用域。若是具有做用域,則將目前的執行實例凍結。而且建立出新的執行實例,用於執行做用域節點;若是不具有做用域,則無效果。

在確認完畢後,執行原子操做AtomicOperationTransitionNotifyListenerStart

public class AtomicOperationTransitionCreateScope implements AtomicOperation {
 
  private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionCreateScope.class);
 
  public boolean isAsync(InterpretableExecution execution) {
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    return activity.isAsync();
  }
 
  public void execute(InterpretableExecution execution) {
    InterpretableExecution propagatingExecution = null;
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    if (activity.isScope()) {
      //爲做用域活動建立一個新的執行實例,是該原子操做的主要目的
      propagatingExecution = (InterpretableExecution) execution.createExecution();
      propagatingExecution.setActivity(activity);
      propagatingExecution.setTransition(execution.getTransition());
      execution.setTransition(null);
      execution.setActivity(null);
      execution.setActive(false);
      log.debug("create scope: parent {} continues as execution {}", execution, propagatingExecution);
      //這裏是另一個重點。在一個流程實例初始化的時候,會對當前流程所處的做用域對象(多是流程定義或者是做用域活動進行處理,具體表現是爲該做用域對象上的定時事件,消息事件,信號事件執行註冊動做。分別是放入定時調度器,在數據庫新增事件訂閱)
      propagatingExecution.initialize();
 
    } else {
      propagatingExecution = execution;
    }
 
    propagatingExecution.performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_START);
  }
}
相關文章
相關標籤/搜索