目錄git
爲了不歧義,事先約定。github
工做流有不少節點組成,一個節點成爲步驟點(Step)。sql
Workflow Core 中,用於構建工做流的類繼承 IWorkflow
,表明一條有任務規則的工做流,能夠表示工做流任務的開始或者 Do() 方法,或工做流分支獲取其它方法。數據庫
IWorkflow 有兩個同名接口:c#
public interface IWorkflow<TData> where TData : new() { string Id { get; } int Version { get; } void Build(IWorkflowBuilder<TData> builder); } public interface IWorkflow : IWorkflow<object> { }
Id:此工做流的惟一標識符;多線程
Version:此工做流的版本。併發
void Build
:在此方法內構建工做流。dom
工做流運做過程當中,能夠傳遞數據。有兩種傳遞方法:使用泛型,從運行工做流時就要傳入;使用 object 簡單類型,由單獨的步驟產生而且傳遞給下一個節點。異步
IWorkflowBuilder 是工做流對象,構建一個具備邏輯規則的工做流。能夠構建複雜的、具備循環、判斷的工做流規則,或者並行或者異步處理工做流任務。ui
一個簡單的工做流規則:
public class DeferSampleWorkflow : IWorkflow { public string Id => "DeferSampleWorkflow"; public int Version => 1; public void Build(IWorkflowBuilder<object> builder) { builder .StartWith(context => { // 開始工做流任務 Console.WriteLine("Workflow started"); return ExecutionResult.Next(); }) .Then<SleepStep>() .Input(step => step.Period, data => TimeSpan.FromSeconds(20)) .Then(context => { Console.WriteLine("workflow complete"); return ExecutionResult.Next(); }); } }
此對象表示當前工做流任務已經結束,能夠表示主工做流或者工做流分支任務的完成。
/// Ends the workflow and marks it as complete IStepBuilder<TData, TStepBody> EndWorkflow();
由於工做流是能夠出現分支的,每一個工做流各自獨立工做,每一個分支都有其生命週期。
ForEach
、While
、If
、When
、Schedule
、Recur
是步驟容器。都返回IContainerStepBuilder<TData, Schedule, TStepBody>
Parallel、Saga是步驟的容器,都返回 IStepBuilder<TData, Sequence>
。
ForEach、While、If、When、Schedule、Recur 返回類型的接口:
public interface IContainerStepBuilder<TData, TStepBody, TReturnStep> where TStepBody : IStepBody where TReturnStep : IStepBody { /// The block of steps to execute IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);
Parallel、Saga :
/// Execute multiple blocks of steps in parallel IParallelStepBuilder<TData, Sequence> Parallel(); /// Execute a sequence of steps in a container IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
也就是說,ForEach、While、If、When、Schedule、Recur 是真正的容器。
按照個人理解,繼承了 IContainerStepBuilder
的,是一個容器,一個流程下的一個步驟/容器;由於 Workflow Core 做者對接口的命名很明顯表達了 This a container
。
由於裏面包含了一組操做,能夠說是一個步驟裏面包含了一個流程,這個流程由一系列操做組成,它是線性的,是順序的。裏面是一條工做流(Workflow)。
而 Parllel、Saga,至關於步驟點的容器。
更直觀的理解是電路,繼承 IContainerStepBuilder 的是串聯設備的容器,是順序的;
Parllel 是並聯電路/設備的一個容器,它既是一個開關,使得一條電路變成多條並流的電路,又包含了這些電路的電器。裏面能夠產生多條工做流,是多分支的、不一樣步的、獨立的。
從實現接口上看,ForEach、While、If、When、Schedule、Recur、Parllel 都實現了 Do()
方法,而 Saga 沒有實現。
關於 Saga,後面說明。
實現接口以下:
IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody; IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body); IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body); IEnumerable<WorkflowStep> GetUpstreamSteps(int id); IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
方法名稱 | 說明 |
---|---|
StartWith | 任務的開始,必須調用此方法 |
GetUpstreamSteps | 獲取上一個步驟(StepBody)的ID |
UseDefaultErrorBehavior | 不詳 |
StepBody 是一個節點,IStepBuilder 構建一個節點,只有經過 StartWith,才能開始一個工做流、一個分支、異步任務等。
UseDefaultErrorBehavior
筆者沒有使用到,不敢瞎說。貌似與事務有關,當一個步驟點發生異常時,能夠終止、重試等。
IStepBuilder 表示一個節點,或者說一個容器,裏面能夠含有其它操做,例如並行、異步、循環等。
Name:設置此步驟點的名稱;
id:步驟點的惟一標識符。
/// Specifies a display name for the step IStepBuilder<TData, TStepBody> Name(string name); /// Specifies a custom Id to reference this step IStepBuilder<TData, TStepBody> Id(string id);
前面說到,工做流每一個步驟點傳遞數據有兩種方式。
TData(泛型) 是工做流中,隨着流傳的數據,這個對象會在整個工做流程生存。
例如 Mydata
class RecurSampleWorkflow : IWorkflow<MyData> { public string Id => "recur-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { ... } } public class MyData { public int Counter { get; set; } }
爲當前步驟點(StepBody)設置數據,亦可爲 TData 設置數據。
兩類數據:每一個步驟點均可以擁有不少字段、屬性和方法等;工做流流轉 TData。
Input、Output 是設置這些數據的具體方法。
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value); IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value); IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action); IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);
用於在容器中執行一系列操做。
/// Execute a sequence of steps in a container IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
雖然註釋說明 「用於在容器中執行一系列操做」,但實際上它不是一個真正的」容器「。
由於它沒有繼承 IContainerStepBuilder
,也沒有實現 Do()
。
可是它返回的 Sequence
實現了ContainerStepBody
。
若是說真正的容器至關於一條長河流中的一個湖泊(能夠容納和儲水),而 Saga 可能只是某一段河流的命名,而不是具體的湖泊。
或者說 static void Main(string[] args)
裏面的代碼太多了,新建一個方法體,把部分代碼放進去。總不能把全部代碼寫在一個方法裏吧?那麼建立一個類,把代碼分紅多個部分,放到不一樣方法中,加強可讀性。本質仍是沒有變。
Saga 能夠用來處理事務,進行重試或回滾等操做。後面說明。
用於建立下一個節點,建立一個普通節點。能夠是主工做流的節點(最外層)、或者做爲循環、條件節點裏的節點、做爲節點中節點的節點。
IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody; IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody; IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body); IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);
Then 做爲普通節點,按順序執行。操做對象是類型、StepBody。
Attach 也是普通節點,無特殊意義,經過 id 來指定要執行 StepBody 。能夠做爲流程控制的跳轉。
至關於 goto 語句。
/// Specify the next step in the workflow by Id IStepBuilder<TData, TStepBody> Attach(string id);
用於定義事件,將當前節點做爲事件節點,而後在後臺掛起,工做流會接着執行下一個節點。在工做流中止前,能夠經過指定 標識符(Id) 觸發事件。在一個工做流中,每一個事件的標識符都是惟一的。
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null); IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
意思應該是結束一個節點的運行。
若是在 When 中使用,至關於 break。
IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;
使用例子
builder .StartWith<RandomOutput>(x => x.Name("Random Step")) .When(0) .Then<TaskA>() .Then<TaskB>() .End<RandomOutput>("Random Step") .When(1) .Then<TaskC>() .Then<TaskD>() .End<RandomOutput>("Random Step");
在一個條件下過早地取消此步驟的執行。
應該至關於 contiune。
/// Prematurely cancel the execution of this step on a condition IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);
延遲執行,使得當前節點延時執行。並不是是阻塞當前的工做流運行。Delay 跟在節點後面,使得這個節點延時運行。能夠理解成異步,工做流不會等待此節點執行完畢,會直接執行下一個節點/步驟。
/// Wait for a specified period IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);
預約執行。將當前節點設置一個時間,將在一段時間後執行。Schedule 不會阻塞工做流。
Schedule 是非阻塞的,工做流不會等待Schedule執行完畢,會直接執行下一個節點/步驟。
/// Schedule a block of steps to execute in parallel sometime in the future IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);
例子
builder .StartWith(context => Console.WriteLine("Hello")) .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule .StartWith(context => Console.WriteLine("Doing scheduled tasks")) ) .Then(context => Console.WriteLine("Doing normal tasks"));
用於重複執行某個節點,直至條件不符。
Recur 是非阻塞的,工做流不會等待 Rezur 執行完畢,會直接執行下一個節點/步驟。
/// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);
至關於數據庫中的事務,流程中某些步驟發生異常時的時候執行某些操做。
例如:
builder .StartWith(context => Console.WriteLine("Begin")) .Saga(saga => saga .StartWith<Task1>() .CompensateWith<UndoTask1>() .Then<Task2>() .CompensateWith<UndoTask2>() .Then<Task3>() .CompensateWith<UndoTask3>() ) .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5)) .Then(context => Console.WriteLine("End"));
若是此步驟引起未處理的異常,則撤消步驟;若是發生異常,則執行。
能夠做爲節點的 B計劃。當節點執行任務沒有問題時, CompensateWith 不會運行;若是節點發生錯誤,就會按必定要求執行 CompensateWith 。
/// Undo step if unhandled exception is thrown by this step IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody; IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body); IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);
若是此步驟引起未處理的異常,則撤消步驟;若是發生異常,則執行。與 CompensateWith 的區別是,傳入參數前者是 Func,後者是 Action。
CompensateWith
的內部實現了 CompensateWith
,是對 CompensateWith
的封裝。
/// Undo step if unhandled exception is thrown by this step IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);
用於事務操做,表示發生錯誤時若是回滾、設置時間等。通常與 Saga 一塊兒使用。
OnError 是阻塞的。
/// Configure the behavior when this step throws an unhandled exception IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
OnError 能夠捕獲一個容器內,某個節點的異常,並執行回滾操做。若是直接在節點上使用而不是容器,能夠發生回滾,而後執行下個節點。若是做用於容器,那麼可讓容器進行從新運行,等一系列操做。
OnError 能夠與 When、While 等節點容器一塊兒使用,但他們自己帶有循環功能,使用事務會讓代碼邏輯變得奇怪。
Saga 沒有條件判斷、沒有循環,自己就是一個簡單的袋子,是節點的容器。所以使用 Saga 做爲事務操做的容器,十分適合,進行回滾、重試等一系列操做。
迭代,也能夠說是循環。內部使用 IEnumerable 來實現。
與 C# 中 Foreach 的區別是,C# 中是用來迭代數據;
而工做流中 ForEach 用來判斷元素個數,標識應該循環多少次。
ForEach 是阻塞的。
/// Execute a block of steps, once for each item in a collection in a parallel foreach IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);
示例
builder .StartWith<SayHello>() .ForEach(data => new List<int>() { 1, 2, 3, 4 }) .Do(x => x .StartWith<DisplayContext>() .Input(step => step.Item, (data, context) => context.Item) .Then<DoSomething>()) .Then<SayGoodbye>();
最終會循環5次。
條件判斷,條件是否真。
When 是阻塞的。
When 能夠捕獲上一個節點流轉的數據(非 TData)。
/// Configure an outcome for this step, then wire it to another step [Obsolete] IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null); /// Configure an outcome for this step, then wire it to a sequence IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);
前一個方法例如
When(0)
,會捕獲 return ExecutionResult.Outcome(value);
的值,判斷是否相等。可是這種方式已通過時。
須要使用表達式來判斷。例如
.When(data => 1)
.When(data => data.value==1)
條件判斷,條件是否真。與When有區別,When能夠捕獲 ExecutionResult.Outcome(value);
。
While 是阻塞的。
/// Repeat a block of steps until a condition becomes true IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);
示例
builder .StartWith<SayHello>() .While(data => data.Counter < 3) .Do(x => x .StartWith<DoSomething>() .Then<IncrementStep>() .Input(step => step.Value1, data => data.Counter) .Output(data => data.Counter, step => step.Value2)) .Then<SayGoodbye>();
條件判斷,是否符合條件。
If是阻塞的。
/// Execute a block of steps if a condition is true IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);
When、While、If的區別是,When、While 是條件是否爲真,If是表達式是否爲真。
實質上,是語言上的區別,與代碼邏輯無關。
真假用 When/While,條件判斷、表達式判斷用 If 。
並行任務。做爲容器,能夠在裏面設置多組任務,這些任務將會同時、併發運行。
Parallel 是阻塞的。
/// Execute multiple blocks of steps in parallel IParallelStepBuilder<TData, Sequence> Parallel();
示例:
.StartWith<SayHello>() .Parallel() .Do(then => then.StartWith<PrintMessage>() .Input(step => step.Message, data => "Item 1.1") .Then<PrintMessage>() .Input(step => step.Message, data => "Item 1.2")) .Do(then => then.StartWith<PrintMessage>() .Input(step => step.Message, data => "Item 2.1") .Then<PrintMessage>() .Input(step => step.Message, data => "Item 2.2") .Then<PrintMessage>() .Input(step => step.Message, data => "Item 2.3")) .Do(then => then.StartWith<PrintMessage>() .Input(step => step.Message, data => "Item 3.1") .Then<PrintMessage>() .Input(step => step.Message, data => "Item 3.2")) .Join() .Then<SayGoodbye>();
有三個 Do,表明三個並行任務。三個 Do 是並行的,Do 內的代碼,會按順序執行。
Paeallel 的 Do:
public interface IParallelStepBuilder<TData, TStepBody> where TStepBody : IStepBody { IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder); IStepBuilder<TData, Sequence> Join(); }
比起 ForEach、When、While、If,除了有 Do,還有 Join 方法。
對於其它節點類型來講,Do直接構建節點。
對於Parallel來講,Do收集任務,最終須要Join來構建節點和運行任務。
寫得長很差看,其它內容壓縮一下。
數據傳遞和依賴注入
Workflow Core 支持對每一個步驟點進行依賴注入。
支持數據持久化
Workflow Core 支持將構建的工做流存儲到數據庫中,以便之後再次調用。
支持 Sql Server、Mysql、SQLite、PostgreSQL、Redis、MongoDB、AWS、Azure、
Elasticsearch、RabbitMQ... ....
支持動態調用和動態生成工做流
你能夠經過 C# 代碼構建工做流,或者經過 Json、Yaml 動態構建工做流。
能夠利用可視化設計器,將邏輯和任務生成配置文件,而後動態傳遞,使用 Workflow Core 動態建立工做流。
篇幅有限,再也不贅述。
有興趣請關注 Workflow Core:https://github.com/danielgerlag/workflow-core