Activiti6.0的探索紀要

前言

年初要作一個運維自動化平臺,須要用到流程引擎,原本打算項目用golang寫的,可是golang的流程引擎功能太簡單實在是用不來,最後仍是選型java + activiti。到activiti官網一看,嘿出7.0告終果文檔是剛寫的還不全,咱們java仍是8的,7.0是匹配的java11,最終是問題太多隻好放棄用activiti6.0了。java

摸石頭過河

雖然網上教程有很多,不過要真正跑起來着實不容易,有些內容好比參數的意義仍是看5.0的手冊才弄明白的。mysql

安裝

解決依賴先跑起來

搭建環境:java8 + springboot2.1.3 + activiti6.0 + mysqlgolang

pom依賴:spring

<dependency>
    <groupId>org.activiti</groupId>
    <artifactId>activiti-spring-boot-starter-basic</artifactId>
	<version>${activiti.version}</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.15</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
複製代碼

配置數據庫:sql

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/activiti?nullCatalogMeansCurrent=true&serverTimezone=Asia/Shanghai
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root
複製代碼

而後啓動,恭喜你,應用沒起來,會出現以下報錯:數據庫

Caused by: java.io.FileNotFoundException: class path resource [org/springframework/security/config/annotation/authentication/configurers/GlobalAuthenticationConfigurerAdapter.class] cannot be opened because it does not exist:express

這個緣由是activiti6.0開發的時候springboot2.0還沒出來以至年久失修,文件路徑不對了,解決方案:springboot

啓動類前排除掉SecurityAutoConfigurationbash

@SpringBootApplication(exclude = SecurityAutoConfiguration.class)併發

再次啓動,若是你碰到建立數據庫表卻找不到表的問題,兩種解決方法:

  • 手動建立表,表的路徑在activiti-engine-6.0.0.jar/org/activiti/db/create
  • 數據庫配置中加上nullCatalogMeansCurrent=true

這是由於在org/activiti/engine/impl/db/DbSqlSession中activiti使用databaseMetaData.getTables尋找庫表是否存在,而dbSqlSessionFactory獲取到的catalog爲空由於mysql使用schema標識庫名而不是catalog,致使mysql掃描全部的庫來找表,一旦其餘庫中有同名表activiti就覺得找到了其實表並不存在。nullCatalogMeansCurrent的意義就在於讓mysql默認當前庫,在mysql-connector-java 5.x該參數默認爲true,但在6.x以上默認爲false。

再次啓動,仍然沒起來:

Caused by: java.io.FileNotFoundException: class path resource [processes/] cannot be resolved to URL because it does not exist

這是由於activiti會到resource/processes下面尋找流程文件,建立該目錄。

配置畫圖插件

activiti提供了多種設計器來畫流程圖,我安裝了IDEA的插件和eclipse的插件,比較了下仍是eclipse的好用,因此只介紹eclipse 插件的安裝過程。

  1. 下個eclipse
  2. 下載插件 Help --> Install New Software -->Add:
Names : Activiti BPMN 2.0 designer
Location : https://www.activiti.org/designer/update/
複製代碼
  1. 下完了New一個diagram

4. 畫流程圖,一個標準的流程圖有一個開始事件和一個結束事件,畫完了保存爲.bpmn文件或者.bpmn20.xml文件,並放到resource/processes目錄下,一個xml示例文件以下:

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:tns="http://www.activiti.org/test" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test" id="m1551347612734" name="">
  <process id="machine_expansion" name="Machine Expansion" isExecutable="true" isClosed="false" processType="None">
    <startEvent id="start" name="開始"></startEvent>
    ...
複製代碼

運行

使用runtimeservice建立一個createProcessInstanceBuilder,設置processDefinitionKey爲xml中的process id,而後start就能夠了。

@Autowired
    private RuntimeService runtimeService;
    @Override
    public String start(BaseParamDTO baseParamDTO) {
        String currentUserName = CurrentUserUtil.getCurrentUserName();
        identityService.setAuthenticatedUserId(currentUserName);
        ExpansionParamDTO expansionParamDTO = (ExpansionParamDTO) baseParamDTO.getParams();
        ProcessInstance machineExpansion = runtimeService.createProcessInstanceBuilder().processDefinitionKey(WorksheetTypeEnum.EXPANSION.getActivitiDefineName())
                .businessKey(expansionParamDTO.getAppName()).name(WorksheetTypeEnum.EXPANSION.name()).start();
        baseParamDTO.setId(machineExpansion.getProcessInstanceId());
        submit(baseParamDTO);
        return machineExpansion.getProcessInstanceId();
    }
複製代碼

爲了設置流程的startuser,必需要使用identityService.setAuthenticatedUserId設置,由於process獲取的是當前線程的用戶。

數據庫表

要熟悉一個項目,最簡單的方式就是看他的數據庫表是怎麼設計的,表裏面都存了什麼東西,activiti數據庫中一共有28張表

  • ACT_GE_*:通用表,通常不用關注,若是其餘表中出現二進制的數據id,好比拋錯,就到act_ge_bytearray中查詢
  • ACT_HI_*:歷史信息,activiti歷史信息存儲時分4個級別,能夠用spring.activiti.historyLevel配置,對應HistoryService
    • none:什麼也不保存
    • activity:保存全部的流程實例、任務、活動信息;
    • audit:默認級別,保存全部的流程實例、任務、活動、表單屬性;
    • full:流程過程當中所有保存,比audit多詳情和變量;
  • ACT_ID_*:用戶認證信息,對應IdentityService;
  • ACT_RE_*:流程中的部署和定義,對應RepositoryService;
  • ACT_RU_*:運行時信息,流程級別相關的表對應RuntimeService,task級別表對應TaskService,job級別表對應ManagementService。
表名 描述
act_evt_log 事件日誌表
act_ge_bytearray 二進制數據表,好比拋錯
act_ge_property activiti屬性表
act_hi_actinst 歷史的流程活動實例
act_hi_attachment 歷史的流程附件
act_hi_comment 歷史的描述信息
act_hi_detail 歷史的流程運行中的細節信息
act_hi_identitylink 歷史的用戶關係信息
act_hi_procinst 歷史的流程實例
act_hi_taskinst 歷史的usertask實例
act_hi_varinst 歷史的流程參數
act_id_group 用戶組
act_id_info 用戶詳情信息
act_id_membership 用戶和用戶組關聯關係
act_id_user 用戶信息
act_procdef_info 流程定義擴展表
act_re_deployment 流程部署信息
act_re_model 流程模型
act_re_procdef 已部署過的流程
act_ru_deadletter_job 運行時中死掉的job
act_ru_event_subscr 運行時的事件監聽
act_ru_execution 運行時的流程執行實例
act_ru_identitylink 運行時的用戶關係信息
act_ru_job 運行時的job
act_ru_suspended_job 運行時暫停的job
act_ru_task 運行時的usertask實例
act_ru_timer_job 運行時的定時器job
act_ru_variable 運行時的參數

描述東西要舉例

基礎定義

在activiti中有幾個基礎定義:

  • process:一個流程實例,好比定義了一個審批流程bpmn,如今run起來了,這就是一個流程實例。
  • exection:流程執行實例,一個流程中有不少步驟,每個步驟都會觸發一個流程執行實例。
  • task:專門指usertask,當建立了usertask時,activiti會在exection中建立一個task。
  • job:執行步驟時,每一個exection會生成job給jobExecutor去執行,若是是通常job就在act_ru_job中,定時任務就在act_ru_timer_job中,job暫停了就會進入act_ru_suspended_job,跑失敗了就到act_ru_deadletter_job。
  • activity:這個詞在歷史表中出現,理解上和exection差很少,區別在於exection是一個執行實例,一個執行實例能夠執行多個activity,而activity是指流程中的一個步驟,會對應到bpmn中某一步。

幾種經常使用的task的區別

eclipse設計器總共有8種task,目前我只用到其中的4種

用戶任務(User Task)

用戶任務用來設置須要人操做才能完成的任務如審批,執行到usertask時,流程會卡住,只有用戶人工觸發了流程纔會繼續。示例以下,這裏沒有在畫bpmn時寫死用戶,而是在流程中經過taskService設置assigness,usertask完成只能是complete,後續的流程能夠用參數和排他網關控制。

@Override
    public void submit(BaseParamDTO baseParamDTO) {
        String currentUserName = CurrentUserUtil.getCurrentUserName();
        ExpansionParamDTO expansionParamDTO = (ExpansionParamDTO) baseParamDTO.getParams();
        Task submit = taskService.createTaskQuery().processInstanceId(baseParamDTO.getId()).taskDefinitionKey(ExpansionDisplayEnum.SUBMIT.getFlow()).singleResult();
        taskService.setAssignee(submit.getId(), currentUserName);
        taskService.setVariable(submit.getId(), INPUT_PARAM, expansionParamDTO);
        taskService.setVariable(submit.getId(), INPUT_PRIORITY, baseParamDTO.getPriority());
        taskService.setPriority(submit.getId(), baseParamDTO.getPriority());
        if (StringUtils.isNotBlank(baseParamDTO.getComment())){
            taskService.addComment(submit.getId(), null, baseParamDTO.getComment());
        }
        taskService.complete(submit.getId());
    }
複製代碼

用戶任務涉及到的幾個概念:

  • assigness:該任務的執行人,誰執行的就寫誰
  • candidateUsers:該任務的候選人,須要哪些人執行的就寫誰
  • candidateGroups:該任務的候選組,須要哪一個組的人執行的就寫哪些組
  • comment:備註,這個信息會寫到act_hi_comment表中,只能和usertask或者流程相關聯

腳本任務(Script Task)

寫一段腳本,執行到該步驟時自動執行腳本。(沒用過)

服務任務(Service Task)

最經常使用的task,執行到該步驟時自動執行對應的java類

@Component
public class ApplyMachineTask implements JavaDelegate {

    public static final Logger LOGGER = LoggerFactory.getLogger(ApplyMachineTask.class);

    public static final String MACHINE_APPLY_ID = "applyId";
    public static final String MACHINE_APPLY_ID_LOCAL = "localApplyId";
    public static final String MACHINE_APPLY_CHECK = "applyCheckCount";

    @Autowired
    private MachineService machineService;

    @Override
    public void execute(DelegateExecution delegateExecution){
        ExpansionParamDTO param = delegateExecution.getVariable(INPUT_PARAM, ExpansionParamDTO.class);
        LOGGER.debug("start apply machine, param {}", param);
        param.setWorksheetId(delegateExecution.getProcessInstanceId());
        ApiResponseDTO<String> stringApiResponseDTO = machineService.applyMachine(param);
        if (stringApiResponseDTO.isSuccess()){
            delegateExecution.setVariable(MACHINE_APPLY_ID, stringApiResponseDTO.getBody());
            delegateExecution.setVariableLocal(MACHINE_APPLY_ID_LOCAL, stringApiResponseDTO.getBody());
            delegateExecution.setVariable(MACHINE_APPLY_CHECK, 0);
        } else {
            delegateExecution.setVariableLocal(ERROR_LOCAL, stringApiResponseDTO.getError());
            throw new BpmnError(ERROR_RETRY, stringApiResponseDTO.getError());
        }
    }
}
複製代碼

這裏說下參數設置,setVariable會設置流程全局變量,也就是流程中任意一個步驟都能獲取到的參數,用來參數傳遞,而setVariableLocal設置的是局部變量,只有當前步驟才能獲取的參數,局部變量的用處在於能夠記錄流程執行到該步驟時的狀態,參數涉及到覆蓋問題,因此不要出現全局和局部參數同名的現象(子流程的除外)。

業務規則任務(BusinessRule Task)

業務規則用戶用來同步執行一個或多個規則。activiti使用drools規則引擎執行業務規則。(沒用過)

郵件任務(Mail Task)

自動郵件任務,它能夠發送郵件給一個或多個參與者。(沒用過)

手工任務(Manual Task)

手工任務定義了BPM引擎外部的任務。 用來表示工做須要某人完成,而引擎不須要知道,也沒有對應的系統和UI接口。 對於引擎,手工任務是直接經過的活動, 流程到達它以後會自動向下執行。(沒用過)

接收任務(Receive Task)

接收任務是一個簡單任務,它會等待對應消息的到達。流程到達該步驟後會一直等待,直到消息觸發。以下圖所示,在一個接收任務上掛了一個定時邊界事件,定時邊界事件每隔一段時間觸發服務任務檢查狀態,當狀態經過時給接收任務發消息,接收任務完成,定時邊界網關中止。

定時邊界網關配置
消息觸發:

Execution receiveTask = runtimeService.createExecutionQuery().processInstanceId(execution.getProcessInstanceId())
    .activityId(ExpansionDisplayEnum.WAIT_INIT.getFlow()).singleResult();
runtimeService.trigger(receiveTask.getId());
複製代碼

調用子流程(CallActivity)

activity有兩種子流程,一種是CallActivity,一種是subprocess,區別在於CallActivity是調用主流程外部的一個流程,運行時會生成新的processId,做用域徹底獨立,一般用來複用其餘流程或者須要獨立processId的狀況;而subprocess是嵌套子流程,他並不會生成新的processId,可是定義了獨立的事件域,一般用來捕捉同一類事件。

  1. 畫一個子流程,這裏不能選async,選了就跑不起來了,個人理解是調用CallActivity和執行新的子流程是不一樣的線程執行的,調用CallActivity只是爲了生成建立子流程的job,而子流程是否異步執行是在multi instance頁面配置的。
  2. main config頁面配置要調用的子流程的process id和須要傳的參數,這裏參數的意思是主流程的參數名privilege傳過去後叫參數名param
  3. multi instance多實例頁配置,sequential就是配置是同步仍是異步的地方,loop 能夠直接指定實例個數,這裏經過傳參解決,定義collection表示會有一個列表名字叫privilegeList的全局參數,列表中每一個參數實例的名字叫privilege,completion condition設置完成條件。當建立了多實例後activiti會自動建立多個參數:
  • nrOfInstances:實例總數
  • nrOfActiveInstances:當前活動的實例
  • nrOfCompletedInstances:已經完成的實例

這裏設置的條件是所有完成的時候主流程繼續。

媽媽,我想用try catch

activiti有兩種事件,一種是流程事件,放在流程中的,一種是邊界事件,掛在某個步驟上因爲流程運行間接觸發的,而這兩種事件又分爲兩類,一種是捕獲一種是觸發。

  • 捕獲(Catching):當流程執行到事件, 它會等待被觸發。
  • 觸發(Throwing):當流程執行到事件, 會觸發一個事件。

這裏要說的就是邊界錯誤捕獲事件。

如圖所示,把啓動應用的全部步驟都放到了一個內嵌子流程subprocess中,而後在subprocess上掛載了一個錯誤邊界捕獲事件,當子流程中任意一個步驟拋錯時捕獲錯誤觸發重試按鈕,若是重啓則從新執行整個子流程,若是跳過這流程繼續。

錯誤邊界事件中有一個errorCode參數,errorCode用來匹配捕獲的錯誤:

  • 若是沒有設置errorRef,邊界錯誤事件會捕獲 全部錯誤事件,不管錯誤的errorCode是什麼。
  • 若是設置了errorRef,並引用了一個已存的錯誤, 邊界事件就只捕獲錯誤代碼與之相同的錯誤。
  • 若是設置了errorRef,可是BPMN 2.0中沒有定義錯誤, errorRef就會當作errorCode使用 (和錯誤結束事件的用法相似)。

配置一個排他網關,在每條路徑上寫明條件

子流程每一個步驟中try catch住錯誤,並拋出

delegateExecution.setVariableLocal(ERROR_LOCAL,stringApiResponseDTO.getError());
throw new BpmnError(ERROR_RETRY, stringApiResponseDTO.getError());
複製代碼

須要注意的是拋錯中的錯誤文本好像獲取不到,因此仍是要用局部變量來存一下。

如何作審批和駁回

由於我想記錄全部的操做步驟,因此我把每一種操做都進行了分割,所以出現了下圖所示的審批流程。

第一次提交任務是在流程開始時直接執行的,而後使用了監聽器在審批步驟開始前設置審批候選人,若是當前用戶是審批候選人直接跳過審批流程,審批是否經過是經過排他網關加上參數控制的。

審批usertask上設置一個監聽器,用來設置任務的審批候選人

@Component
public class ApprovalTaskListener implements TaskListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(ApprovalTaskListener.class);

    @Autowired
    private RuntimeService runtimeService;

    @Autowired
    private ApprovalTask approvalTask;

    @Autowired
    private ApplicationService applicationService;

    @Autowired
    private TaskService taskService;

    @Override
    public void notify(DelegateTask delegateTask) {
        String processInstanceId = delegateTask.getProcessInstanceId();
        ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult();
        String appName = processInstance.getBusinessKey();
        String owner = processInstance.getStartUserId();
        Integer priority = delegateTask.getVariable(INPUT_PRIORITY, Integer.class);
        taskService.setPriority(delegateTask.getId(), priority);
        ApiResponseDTO<ApplicationDTO> application = applicationService.getApplication(appName);
        if (application.isSuccess()){
            List<String> bizMaintainer = application.getBody().getBizMaintainer();
            delegateTask.addCandidateUsers(bizMaintainer);
            //申請人是審批人時直接處理
            if (bizMaintainer.contains(owner)){
                taskService.setAssignee(delegateTask.getId(), owner);
            }
        } else {
            LOGGER.error("get application info error, pId: {}, app: {}", processInstanceId, application);
            throw new RuntimeException(application.getError());
        }
    }
}
複製代碼

在審批usertask外層設置exection的監聽器approvalSkipListener,前面說過exection是先於usertask執行的,每一個usertask外層都有一個exection,因此這個監聽器能夠控制usertask的行爲。

@Component
public class ApprovalSkipListener implements ExecutionListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(ApprovalSkipListener.class);

    @Autowired
    private RuntimeService runtimeService;

    @Autowired
    private ApplicationService applicationService;

    @Override
    public void notify(DelegateExecution execution) {
        ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
        String appName = processInstance.getBusinessKey();
        String owner = processInstance.getStartUserId();
        ApiResponseDTO<ApplicationDTO> application = applicationService.getApplication(appName);
        if (application.isSuccess()){
            List<String> bizMaintainer = application.getBody().getBizMaintainer();
            if (bizMaintainer.contains(owner)){
                execution.setVariable(SKIP_ENABLE,true);
                execution.setVariable(APPROVAL_PASS, true);
            }
        } else {
            LOGGER.error("get application info error, pId: {}, app: {}", processInstance.getId(), application);
            throw new RuntimeException(application.getError());
        }
    }
}
複製代碼

設置跳過條件

若是駁回了,流程又會回到提交任務那,須要用戶從新提交,數據庫中會出現新的記錄。

說回滾就回滾

activiti有個頗有意思的功能就是補償,回滾就是用補償觸發事件和邊界補償捕獲事件完成的。當補償觸發後,activiti會從後往前依次觸發補償捕獲事件。

  1. 畫一個補償觸發事件
  2. 畫一個邊界補償捕獲事件
    上圖的意思是當流程走到靜默期這個接收任務的時候會卡住並觸發一個邊界定時事件,若是定時到了就會走機器下線流程結束,若是定時沒到卻觸發了靜默期消息就會走補償觸發事件,觸發應用啓動,回滾成功。

問題不少慢慢解

任務怎麼沒記錄

說到這個問題,就要說起異步任務:

  • async:標識任務是否是異步執行的,若是爲true表示任務不是jobExecutor直接執行,而是生成額外的job放到工做線程池由工做線程執行。

借用官方的圖來簡要說明一下。activiti是經過事務的方式執行流程,當流程開始後,activit會一直推動流程直到遇到等待狀態,而後把當前狀態存到數據庫,並等待下一次觸發。以下圖中usertask是第一個等待狀態,定時器是第二個等待狀態,而完成usertask和執行服務任務在同一個工做單元中,同一個工做單元中的成功和失敗是原子的。若是服務任務成功了數據庫中看到的是完成的usertask和完成的服務任務,若是服務任務出錯了就會致使activti回滾事務,這個時候就會回到最初的狀態,現象就是數據庫中依然只有未完成的usertask的記錄,而看不到服務任務的記錄。

若是我想看到服務任務的執行狀態呢,解法就是設置async爲true,將服務任務交給後臺執行,後臺的JobExecutor會週期性的掃描數據庫的job提交給工做線程池。以下圖所示,服務任務設置了async爲true,這個時候有三個等待狀態,第一個是usertask,第二個是服務任務,第三個是定時器。如今流程開始,usertask完成後會建立一個等待狀態的服務任務job並把他保存到數據庫,而後由工做線程池執行,若是服務任務job執行失敗了,這個job仍然在數據庫中。而郵件任務若是失敗了則會和以前同樣回滾到服務任務初始狀態。

爲啥job不跑了

這也是常常遇到的一個坑,目前尚未解決,這涉及到排他任務。

  • exclusive:表示任務是否是排他任務,若是爲true,工做線程會把同一個流程實例的排他任務一次所有拿出來順序執行。

仍是借用官方的圖,假設有以下場景,三個服務任務都是異步的,jobExecutor會把這3個job分給工做線程池來執行,若是他們同時到達併發匯聚網關那就會出現一致性問題,由於每一個分支都會檢查其餘分支是否到達就會出現一直等待的狀況。

爲了解決這個問題,activiti使用了樂觀鎖,當全部分支匯聚時他們會去更新流程實例的版本號,若是有分支更新失敗則會被鎖上一段時間而後重試。可是樂觀鎖的問題在於失敗的job默認只重試三次並且job是非事務的會致使數據重複。所以又出現了排他任務,exclusive爲true時保證activiti對於同一個流程實例不會同時執行兩個排他任務,也就是說async和exclusive同時爲true時並非真正的並行,而6.0 exclusive默認爲true。

按道理只有async爲true,exclusive爲false時纔會job鎖住不跑了,可是事實並非這樣,所以我懷疑是當多臺機器同時獲取job時致使的這個問題,還待後續排查。

後記

洋洋灑灑終於把activiti的總結寫完了,不過是點到爲止,還有不少功能沒有嘗試,bug也沒查出來。革命還沒有成功,同志仍需努力。

相關文章
相關標籤/搜索