年初要作一個運維自動化平臺,須要用到流程引擎,原本打算項目用golang寫的,可是golang的流程引擎功能太簡單實在是用不來,最後仍是選型java + activiti。到activiti官網一看,嘿出7.0告終果文檔是剛寫的還不全,咱們java仍是8的,7.0是匹配的java11,最終是問題太多隻好放棄用activiti6.0了。java
雖然網上教程有很多,不過要真正跑起來着實不容易,有些內容好比參數的意義仍是看5.0的手冊才弄明白的。mysql
搭建環境:java8 + springboot2.1.3 + activiti6.0 + mysqlgolang
pom依賴:spring
<dependency>
<groupId>org.activiti</groupId>
<artifactId>activiti-spring-boot-starter-basic</artifactId>
<version>${activiti.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
複製代碼
配置數據庫:sql
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/activiti?nullCatalogMeansCurrent=true&serverTimezone=Asia/Shanghai
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root
複製代碼
而後啓動,恭喜你,應用沒起來,會出現以下報錯:數據庫
Caused by: java.io.FileNotFoundException: class path resource [org/springframework/security/config/annotation/authentication/configurers/GlobalAuthenticationConfigurerAdapter.class] cannot be opened because it does not exist:
express
這個緣由是activiti6.0開發的時候springboot2.0還沒出來以至年久失修,文件路徑不對了,解決方案:springboot
啓動類前排除掉SecurityAutoConfigurationbash
@SpringBootApplication(exclude = SecurityAutoConfiguration.class)
併發
再次啓動,若是你碰到建立數據庫表卻找不到表的問題,兩種解決方法:
activiti-engine-6.0.0.jar/org/activiti/db/create
nullCatalogMeansCurrent=true
這是由於在org/activiti/engine/impl/db/DbSqlSession
中activiti使用databaseMetaData.getTables
尋找庫表是否存在,而dbSqlSessionFactory
獲取到的catalog
爲空由於mysql使用schema
標識庫名而不是catalog
,致使mysql掃描全部的庫來找表,一旦其餘庫中有同名表activiti就覺得找到了其實表並不存在。nullCatalogMeansCurrent
的意義就在於讓mysql默認當前庫,在mysql-connector-java 5.x
該參數默認爲true,但在6.x以上默認爲false。
再次啓動,仍然沒起來:
Caused by: java.io.FileNotFoundException: class path resource [processes/] cannot be resolved to URL because it does not exist
這是由於activiti會到resource/processes下面尋找流程文件,建立該目錄。
activiti提供了多種設計器來畫流程圖,我安裝了IDEA的插件和eclipse的插件,比較了下仍是eclipse的好用,因此只介紹eclipse 插件的安裝過程。
Names : Activiti BPMN 2.0 designer
Location : https://www.activiti.org/designer/update/
複製代碼
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:tns="http://www.activiti.org/test" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test" id="m1551347612734" name="">
<process id="machine_expansion" name="Machine Expansion" isExecutable="true" isClosed="false" processType="None">
<startEvent id="start" name="開始"></startEvent>
...
複製代碼
使用runtimeservice建立一個createProcessInstanceBuilder,設置processDefinitionKey爲xml中的process id,而後start就能夠了。
@Autowired
private RuntimeService runtimeService;
@Override
public String start(BaseParamDTO baseParamDTO) {
String currentUserName = CurrentUserUtil.getCurrentUserName();
identityService.setAuthenticatedUserId(currentUserName);
ExpansionParamDTO expansionParamDTO = (ExpansionParamDTO) baseParamDTO.getParams();
ProcessInstance machineExpansion = runtimeService.createProcessInstanceBuilder().processDefinitionKey(WorksheetTypeEnum.EXPANSION.getActivitiDefineName())
.businessKey(expansionParamDTO.getAppName()).name(WorksheetTypeEnum.EXPANSION.name()).start();
baseParamDTO.setId(machineExpansion.getProcessInstanceId());
submit(baseParamDTO);
return machineExpansion.getProcessInstanceId();
}
複製代碼
爲了設置流程的startuser,必需要使用identityService.setAuthenticatedUserId設置,由於process獲取的是當前線程的用戶。
要熟悉一個項目,最簡單的方式就是看他的數據庫表是怎麼設計的,表裏面都存了什麼東西,activiti數據庫中一共有28張表
spring.activiti.historyLevel
配置,對應HistoryService
表名 | 描述 |
---|---|
act_evt_log | 事件日誌表 |
act_ge_bytearray | 二進制數據表,好比拋錯 |
act_ge_property | activiti屬性表 |
act_hi_actinst | 歷史的流程活動實例 |
act_hi_attachment | 歷史的流程附件 |
act_hi_comment | 歷史的描述信息 |
act_hi_detail | 歷史的流程運行中的細節信息 |
act_hi_identitylink | 歷史的用戶關係信息 |
act_hi_procinst | 歷史的流程實例 |
act_hi_taskinst | 歷史的usertask實例 |
act_hi_varinst | 歷史的流程參數 |
act_id_group | 用戶組 |
act_id_info | 用戶詳情信息 |
act_id_membership | 用戶和用戶組關聯關係 |
act_id_user | 用戶信息 |
act_procdef_info | 流程定義擴展表 |
act_re_deployment | 流程部署信息 |
act_re_model | 流程模型 |
act_re_procdef | 已部署過的流程 |
act_ru_deadletter_job | 運行時中死掉的job |
act_ru_event_subscr | 運行時的事件監聽 |
act_ru_execution | 運行時的流程執行實例 |
act_ru_identitylink | 運行時的用戶關係信息 |
act_ru_job | 運行時的job |
act_ru_suspended_job | 運行時暫停的job |
act_ru_task | 運行時的usertask實例 |
act_ru_timer_job | 運行時的定時器job |
act_ru_variable | 運行時的參數 |
在activiti中有幾個基礎定義:
eclipse設計器總共有8種task,目前我只用到其中的4種
用戶任務用來設置須要人操做才能完成的任務如審批,執行到usertask時,流程會卡住,只有用戶人工觸發了流程纔會繼續。示例以下,這裏沒有在畫bpmn時寫死用戶,而是在流程中經過taskService設置assigness,usertask完成只能是complete,後續的流程能夠用參數和排他網關控制。
@Override
public void submit(BaseParamDTO baseParamDTO) {
String currentUserName = CurrentUserUtil.getCurrentUserName();
ExpansionParamDTO expansionParamDTO = (ExpansionParamDTO) baseParamDTO.getParams();
Task submit = taskService.createTaskQuery().processInstanceId(baseParamDTO.getId()).taskDefinitionKey(ExpansionDisplayEnum.SUBMIT.getFlow()).singleResult();
taskService.setAssignee(submit.getId(), currentUserName);
taskService.setVariable(submit.getId(), INPUT_PARAM, expansionParamDTO);
taskService.setVariable(submit.getId(), INPUT_PRIORITY, baseParamDTO.getPriority());
taskService.setPriority(submit.getId(), baseParamDTO.getPriority());
if (StringUtils.isNotBlank(baseParamDTO.getComment())){
taskService.addComment(submit.getId(), null, baseParamDTO.getComment());
}
taskService.complete(submit.getId());
}
複製代碼
用戶任務涉及到的幾個概念:
寫一段腳本,執行到該步驟時自動執行腳本。(沒用過)
最經常使用的task,執行到該步驟時自動執行對應的java類
@Component
public class ApplyMachineTask implements JavaDelegate {
public static final Logger LOGGER = LoggerFactory.getLogger(ApplyMachineTask.class);
public static final String MACHINE_APPLY_ID = "applyId";
public static final String MACHINE_APPLY_ID_LOCAL = "localApplyId";
public static final String MACHINE_APPLY_CHECK = "applyCheckCount";
@Autowired
private MachineService machineService;
@Override
public void execute(DelegateExecution delegateExecution){
ExpansionParamDTO param = delegateExecution.getVariable(INPUT_PARAM, ExpansionParamDTO.class);
LOGGER.debug("start apply machine, param {}", param);
param.setWorksheetId(delegateExecution.getProcessInstanceId());
ApiResponseDTO<String> stringApiResponseDTO = machineService.applyMachine(param);
if (stringApiResponseDTO.isSuccess()){
delegateExecution.setVariable(MACHINE_APPLY_ID, stringApiResponseDTO.getBody());
delegateExecution.setVariableLocal(MACHINE_APPLY_ID_LOCAL, stringApiResponseDTO.getBody());
delegateExecution.setVariable(MACHINE_APPLY_CHECK, 0);
} else {
delegateExecution.setVariableLocal(ERROR_LOCAL, stringApiResponseDTO.getError());
throw new BpmnError(ERROR_RETRY, stringApiResponseDTO.getError());
}
}
}
複製代碼
這裏說下參數設置,setVariable
會設置流程全局變量,也就是流程中任意一個步驟都能獲取到的參數,用來參數傳遞,而setVariableLocal
設置的是局部變量,只有當前步驟才能獲取的參數,局部變量的用處在於能夠記錄流程執行到該步驟時的狀態,參數涉及到覆蓋問題,因此不要出現全局和局部參數同名的現象(子流程的除外)。
業務規則用戶用來同步執行一個或多個規則。activiti使用drools規則引擎執行業務規則。(沒用過)
自動郵件任務,它能夠發送郵件給一個或多個參與者。(沒用過)
手工任務定義了BPM引擎外部的任務。 用來表示工做須要某人完成,而引擎不須要知道,也沒有對應的系統和UI接口。 對於引擎,手工任務是直接經過的活動, 流程到達它以後會自動向下執行。(沒用過)
接收任務是一個簡單任務,它會等待對應消息的到達。流程到達該步驟後會一直等待,直到消息觸發。以下圖所示,在一個接收任務上掛了一個定時邊界事件,定時邊界事件每隔一段時間觸發服務任務檢查狀態,當狀態經過時給接收任務發消息,接收任務完成,定時邊界網關中止。
定時邊界網關配置 消息觸發:Execution receiveTask = runtimeService.createExecutionQuery().processInstanceId(execution.getProcessInstanceId())
.activityId(ExpansionDisplayEnum.WAIT_INIT.getFlow()).singleResult();
runtimeService.trigger(receiveTask.getId());
複製代碼
activity有兩種子流程,一種是CallActivity,一種是subprocess,區別在於CallActivity是調用主流程外部的一個流程,運行時會生成新的processId,做用域徹底獨立,一般用來複用其餘流程或者須要獨立processId的狀況;而subprocess是嵌套子流程,他並不會生成新的processId,可是定義了獨立的事件域,一般用來捕捉同一類事件。
這裏設置的條件是所有完成的時候主流程繼續。
activiti有兩種事件,一種是流程事件,放在流程中的,一種是邊界事件,掛在某個步驟上因爲流程運行間接觸發的,而這兩種事件又分爲兩類,一種是捕獲一種是觸發。
這裏要說的就是邊界錯誤捕獲事件。
如圖所示,把啓動應用的全部步驟都放到了一個內嵌子流程subprocess中,而後在subprocess上掛載了一個錯誤邊界捕獲事件,當子流程中任意一個步驟拋錯時捕獲錯誤觸發重試按鈕,若是重啓則從新執行整個子流程,若是跳過這流程繼續。錯誤邊界事件中有一個errorCode參數,errorCode用來匹配捕獲的錯誤:
配置一個排他網關,在每條路徑上寫明條件
子流程每一個步驟中try catch住錯誤,並拋出
delegateExecution.setVariableLocal(ERROR_LOCAL,stringApiResponseDTO.getError());
throw new BpmnError(ERROR_RETRY, stringApiResponseDTO.getError());
複製代碼
須要注意的是拋錯中的錯誤文本好像獲取不到,因此仍是要用局部變量來存一下。
由於我想記錄全部的操做步驟,因此我把每一種操做都進行了分割,所以出現了下圖所示的審批流程。
第一次提交任務是在流程開始時直接執行的,而後使用了監聽器在審批步驟開始前設置審批候選人,若是當前用戶是審批候選人直接跳過審批流程,審批是否經過是經過排他網關加上參數控制的。 審批usertask上設置一個監聽器,用來設置任務的審批候選人@Component
public class ApprovalTaskListener implements TaskListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ApprovalTaskListener.class);
@Autowired
private RuntimeService runtimeService;
@Autowired
private ApprovalTask approvalTask;
@Autowired
private ApplicationService applicationService;
@Autowired
private TaskService taskService;
@Override
public void notify(DelegateTask delegateTask) {
String processInstanceId = delegateTask.getProcessInstanceId();
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult();
String appName = processInstance.getBusinessKey();
String owner = processInstance.getStartUserId();
Integer priority = delegateTask.getVariable(INPUT_PRIORITY, Integer.class);
taskService.setPriority(delegateTask.getId(), priority);
ApiResponseDTO<ApplicationDTO> application = applicationService.getApplication(appName);
if (application.isSuccess()){
List<String> bizMaintainer = application.getBody().getBizMaintainer();
delegateTask.addCandidateUsers(bizMaintainer);
//申請人是審批人時直接處理
if (bizMaintainer.contains(owner)){
taskService.setAssignee(delegateTask.getId(), owner);
}
} else {
LOGGER.error("get application info error, pId: {}, app: {}", processInstanceId, application);
throw new RuntimeException(application.getError());
}
}
}
複製代碼
在審批usertask外層設置exection的監聽器approvalSkipListener,前面說過exection是先於usertask執行的,每一個usertask外層都有一個exection,因此這個監聽器能夠控制usertask的行爲。
@Component
public class ApprovalSkipListener implements ExecutionListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ApprovalSkipListener.class);
@Autowired
private RuntimeService runtimeService;
@Autowired
private ApplicationService applicationService;
@Override
public void notify(DelegateExecution execution) {
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
String appName = processInstance.getBusinessKey();
String owner = processInstance.getStartUserId();
ApiResponseDTO<ApplicationDTO> application = applicationService.getApplication(appName);
if (application.isSuccess()){
List<String> bizMaintainer = application.getBody().getBizMaintainer();
if (bizMaintainer.contains(owner)){
execution.setVariable(SKIP_ENABLE,true);
execution.setVariable(APPROVAL_PASS, true);
}
} else {
LOGGER.error("get application info error, pId: {}, app: {}", processInstance.getId(), application);
throw new RuntimeException(application.getError());
}
}
}
複製代碼
設置跳過條件
若是駁回了,流程又會回到提交任務那,須要用戶從新提交,數據庫中會出現新的記錄。activiti有個頗有意思的功能就是補償,回滾就是用補償觸發事件和邊界補償捕獲事件完成的。當補償觸發後,activiti會從後往前依次觸發補償捕獲事件。
說到這個問題,就要說起異步任務:
借用官方的圖來簡要說明一下。activiti是經過事務的方式執行流程,當流程開始後,activit會一直推動流程直到遇到等待狀態,而後把當前狀態存到數據庫,並等待下一次觸發。以下圖中usertask是第一個等待狀態,定時器是第二個等待狀態,而完成usertask和執行服務任務在同一個工做單元中,同一個工做單元中的成功和失敗是原子的。若是服務任務成功了數據庫中看到的是完成的usertask和完成的服務任務,若是服務任務出錯了就會致使activti回滾事務,這個時候就會回到最初的狀態,現象就是數據庫中依然只有未完成的usertask的記錄,而看不到服務任務的記錄。
若是我想看到服務任務的執行狀態呢,解法就是設置async爲true,將服務任務交給後臺執行,後臺的JobExecutor會週期性的掃描數據庫的job提交給工做線程池。以下圖所示,服務任務設置了async爲true,這個時候有三個等待狀態,第一個是usertask,第二個是服務任務,第三個是定時器。如今流程開始,usertask完成後會建立一個等待狀態的服務任務job並把他保存到數據庫,而後由工做線程池執行,若是服務任務job執行失敗了,這個job仍然在數據庫中。而郵件任務若是失敗了則會和以前同樣回滾到服務任務初始狀態。
這也是常常遇到的一個坑,目前尚未解決,這涉及到排他任務。
仍是借用官方的圖,假設有以下場景,三個服務任務都是異步的,jobExecutor會把這3個job分給工做線程池來執行,若是他們同時到達併發匯聚網關那就會出現一致性問題,由於每一個分支都會檢查其餘分支是否到達就會出現一直等待的狀況。
爲了解決這個問題,activiti使用了樂觀鎖,當全部分支匯聚時他們會去更新流程實例的版本號,若是有分支更新失敗則會被鎖上一段時間而後重試。可是樂觀鎖的問題在於失敗的job默認只重試三次並且job是非事務的會致使數據重複。所以又出現了排他任務,exclusive爲true時保證activiti對於同一個流程實例不會同時執行兩個排他任務,也就是說async和exclusive同時爲true時並非真正的並行,而6.0 exclusive默認爲true。按道理只有async爲true,exclusive爲false時纔會job鎖住不跑了,可是事實並非這樣,所以我懷疑是當多臺機器同時獲取job時致使的這個問題,還待後續排查。
洋洋灑灑終於把activiti的總結寫完了,不過是點到爲止,還有不少功能沒有嘗試,bug也沒查出來。革命還沒有成功,同志仍需努力。