開源工做流引擎 Workflow Core 的研究和使用教程

開源工做流引擎 Workflow Core 的研究和使用教程

一,工做流對象和使用前說明

爲了不歧義,事先約定。github

工做流有不少節點組成,一個節點成爲步驟點(Step)。sql

1,IWorkflow / IWorkflowBuilder

Workflow Core 中,用於構建工做流的類繼承 IWorkflow,表明一條有任務規則的工做流,能夠表示工做流任務的開始或者 Do() 方法,或工做流分支獲取其它方法。數據庫

IWorkflow 有兩個同名接口:c#

public interface IWorkflow<TData>
        where TData : new()
    {
        string Id { get; }
        int Version { get; }
        void Build(IWorkflowBuilder<TData> builder);
    }

    public interface IWorkflow : IWorkflow<object>
    {
    }

Id:此工做流的惟一標識符;多線程

Version:此工做流的版本。併發

void Build:在此方法內構建工做流。dom

工做流運做過程當中,能夠傳遞數據。有兩種傳遞方法:使用泛型,從運行工做流時就要傳入;使用 object 簡單類型,由單獨的步驟產生而且傳遞給下一個節點。異步

IWorkflowBuilder 是工做流對象,構建一個具備邏輯規則的工做流。能夠構建複雜的、具備循環、判斷的工做流規則,或者並行或者異步處理工做流任務。ui

一個簡單的工做流規則:

public class DeferSampleWorkflow : IWorkflow
    {
        public string Id => "DeferSampleWorkflow";
            
        public int Version => 1;
            
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith(context =>
                {
                    // 開始工做流任務
                    Console.WriteLine("Workflow started");
                    return ExecutionResult.Next();
                })
                .Then<SleepStep>()
                    .Input(step => step.Period, data => TimeSpan.FromSeconds(20))
                .Then(context =>
                {
                    Console.WriteLine("workflow complete");
                    return ExecutionResult.Next();
                });
        }
    }

2,EndWorkflow

此對象表示當前工做流任務已經結束,能夠表示主工做流或者工做流分支任務的完成。

/// Ends the workflow and marks it as complete
        IStepBuilder<TData, TStepBody> EndWorkflow();

由於工做流是能夠出現分支的,每一個工做流各自獨立工做,每一個分支都有其生命週期。

3,容器

ForEachWhileIfWhenScheduleRecur步驟容器。都返回IContainerStepBuilder<TData, Schedule, TStepBody>

Parallel、Saga是步驟的容器,都返回 IStepBuilder<TData, Sequence>

ForEach、While、If、When、Schedule、Recur 返回類型的接口:

public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
        where TStepBody : IStepBody
        where TReturnStep : IStepBody
    {
        /// The block of steps to execute
        IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);

Parallel、Saga :

/// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();

        /// Execute a sequence of steps in a container
        IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

也就是說,ForEach、While、If、When、Schedule、Recur 是真正的容器。

按照個人理解,繼承了 IContainerStepBuilder的,是一個容器,一個流程下的一個步驟/容器;由於 Workflow Core 做者對接口的命名很明顯表達了 This a container

由於裏面包含了一組操做,能夠說是一個步驟裏面包含了一個流程,這個流程由一系列操做組成,它是線性的,是順序的裏面是一條工做流(Workflow)。

而 Parllel、Saga,至關於步驟點的容器。

更直觀的理解是電路,繼承 IContainerStepBuilder 的是串聯設備的容器,是順序的;

Parllel 是並聯電路/設備的一個容器,它既是一個開關,使得一條電路變成多條並流的電路,又包含了這些電路的電器。裏面能夠產生多條工做流,是多分支的、不一樣步的、獨立的。

1

從實現接口上看,ForEach、While、If、When、Schedule、Recur、Parllel 都實現了 Do() 方法,而 Saga 沒有實現。

關於 Saga,後面說明。

4,工做流的步驟點

實現接口以下:

IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body);

        IEnumerable<WorkflowStep> GetUpstreamSteps(int id);

        IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
方法名稱 說明
StartWith 任務的開始,必須調用此方法
GetUpstreamSteps 獲取上一個步驟(StepBody)的ID
UseDefaultErrorBehavior 不詳

StepBody 是一個節點,IStepBuilder 構建一個節點,只有經過 StartWith,才能開始一個工做流、一個分支、異步任務等。

UseDefaultErrorBehavior筆者沒有使用到,不敢瞎說。貌似與事務有關,當一個步驟點發生異常時,能夠終止、重試等。

二,IStepBuilder 節點

IStepBuilder 表示一個節點,或者說一個容器,裏面能夠含有其它操做,例如並行、異步、循環等。

1,設置屬性的方法

Name:設置此步驟點的名稱;
id:步驟點的惟一標識符。

/// Specifies a display name for the step
        IStepBuilder<TData, TStepBody> Name(string name);

        /// Specifies a custom Id to reference this step
        IStepBuilder<TData, TStepBody> Id(string id);

2,設置數據

前面說到,工做流每一個步驟點傳遞數據有兩種方式。

TData(泛型) 是工做流中,隨着流傳的數據,這個對象會在整個工做流程生存。

例如 Mydata

class RecurSampleWorkflow : IWorkflow<MyData>
    {
        public string Id => "recur-sample";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyData> builder)
        {
        ...
        }
    }
 public class MyData
    {
        public int Counter { get; set; }
    }

3,Input / Output

爲當前步驟點(StepBody)設置數據,亦可爲 TData 設置數據。

兩類數據:每一個步驟點均可以擁有不少字段、屬性和方法等;工做流流轉 TData。

Input、Output 是設置這些數據的具體方法。

IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);

        IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value);

        IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action);

        IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);

三,工做流節點的邏輯和操做

容器操做

1,Saga

用於在容器中執行一系列操做。

/// Execute a sequence of steps in a container
    IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

雖然註釋說明 「用於在容器中執行一系列操做」,但實際上它不是一個真正的」容器「。

由於它沒有繼承 IContainerStepBuilder,也沒有實現 Do()

可是它返回的 Sequence 實現了ContainerStepBody

若是說真正的容器至關於一條長河流中的一個湖泊(能夠容納和儲水),而 Saga 可能只是某一段河流的命名,而不是具體的湖泊。

或者說 static void Main(string[] args)裏面的代碼太多了,新建一個方法體,把部分代碼放進去。總不能把全部代碼寫在一個方法裏吧?那麼建立一個類,把代碼分紅多個部分,放到不一樣方法中,加強可讀性。本質仍是沒有變。

Saga 能夠用來處理事務,進行重試或回滾等操做。後面說明。

普通節點

1,Then

用於建立下一個節點,建立一個普通節點。能夠是主工做流的節點(最外層)、或者做爲循環、條件節點裏的節點、做爲節點中節點的節點。

IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody;

        IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);

2,Attach

Then 做爲普通節點,按順序執行。操做對象是類型、StepBody。

Attach 也是普通節點,無特殊意義,經過 id 來指定要執行 StepBody 。能夠做爲流程控制的跳轉。

至關於 goto 語句。

/// Specify the next step in the workflow by Id
        IStepBuilder<TData, TStepBody> Attach(string id);

事件

1,WaitFor

用於定義事件,將當前節點做爲事件節點,而後在後臺掛起,工做流會接着執行下一個節點。在工做流中止前,能夠經過指定 標識符(Id) 觸發事件。在一個工做流中,每一個事件的標識符都是惟一的。

IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);


        IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);

條件體和循環體

1,End

意思應該是結束一個節點的運行。

若是在 When 中使用,至關於 break。

IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;

使用例子

builder
                .StartWith<RandomOutput>(x => x.Name("Random Step"))
                    .When(0)
                        .Then<TaskA>()
                        .Then<TaskB>()                        
                        .End<RandomOutput>("Random Step")
                    .When(1)
                        .Then<TaskC>()
                        .Then<TaskD>()
                        .End<RandomOutput>("Random Step");

2,CancelCondition

在一個條件下過早地取消此步驟的執行。

應該至關於 contiune。

/// Prematurely cancel the execution of this step on a condition
        IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);

節點的異步或多線程

1,Delay

延遲執行,使得當前節點延時執行。並不是是阻塞當前的工做流運行。Delay 跟在節點後面,使得這個節點延時運行。能夠理解成異步,工做流不會等待此節點執行完畢,會直接執行下一個節點/步驟。

/// Wait for a specified period
        IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);

2,Schedule

預約執行。將當前節點設置一個時間,將在一段時間後執行。Schedule 不會阻塞工做流。

Schedule 是非阻塞的,工做流不會等待Schedule執行完畢,會直接執行下一個節點/步驟。

/// Schedule a block of steps to execute in parallel sometime in the future
        IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);

例子

builder
                .StartWith(context => Console.WriteLine("Hello"))
                .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
                )
                .Then(context => Console.WriteLine("Doing normal tasks"));

3,Recur

用於重複執行某個節點,直至條件不符。

Recur 是非阻塞的,工做流不會等待 Rezur 執行完畢,會直接執行下一個節點/步驟。

/// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
        IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);

用於事務的操做

至關於數據庫中的事務,流程中某些步驟發生異常時的時候執行某些操做。

例如:

builder
            .StartWith(context => Console.WriteLine("Begin"))
            .Saga(saga => saga
                .StartWith<Task1>()
                    .CompensateWith<UndoTask1>()
                .Then<Task2>()
                    .CompensateWith<UndoTask2>()
                .Then<Task3>()
                    .CompensateWith<UndoTask3>()
            )
                .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
            .Then(context => Console.WriteLine("End"));

1,CompensateWith

若是此步驟引起未處理的異常,則撤消步驟;若是發生異常,則執行。

能夠做爲節點的 B計劃。當節點執行任務沒有問題時, CompensateWith 不會運行;若是節點發生錯誤,就會按必定要求執行 CompensateWith 。

/// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);

2,CompensateWithSequence

若是此步驟引起未處理的異常,則撤消步驟;若是發生異常,則執行。與 CompensateWith 的區別是,傳入參數前者是 Func,後者是 Action。

CompensateWith 的內部實現了 CompensateWith,是對 CompensateWith 的封裝。

/// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);

3,OnError

用於事務操做,表示發生錯誤時若是回滾、設置時間等。通常與 Saga 一塊兒使用。

OnError 是阻塞的。

/// Configure the behavior when this step throws an unhandled exception
        IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);

OnError 能夠捕獲一個容器內,某個節點的異常,並執行回滾操做。若是直接在節點上使用而不是容器,能夠發生回滾,而後執行下個節點。若是做用於容器,那麼可讓容器進行從新運行,等一系列操做。

OnError 能夠與 When、While 等節點容器一塊兒使用,但他們自己帶有循環功能,使用事務會讓代碼邏輯變得奇怪。

Saga 沒有條件判斷、沒有循環,自己就是一個簡單的袋子,是節點的容器。所以使用 Saga 做爲事務操做的容器,十分適合,進行回滾、重試等一系列操做。

四,條件或開關

迭代

1,ForEach

迭代,也能夠說是循環。內部使用 IEnumerable 來實現。

與 C# 中 Foreach 的區別是,C# 中是用來迭代數據;

而工做流中 ForEach 用來判斷元素個數,標識應該循環多少次。

ForEach 是阻塞的。

/// Execute a block of steps, once for each item in a collection in a parallel foreach
        IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);

示例

builder
                .StartWith<SayHello>()
                .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                    .Do(x => x
                        .StartWith<DisplayContext>()
                            .Input(step => step.Item, (data, context) => context.Item)
                        .Then<DoSomething>())
                .Then<SayGoodbye>();

最終會循環5次。

條件判斷

1,When

條件判斷,條件是否真。

When 是阻塞的。

When 能夠捕獲上一個節點流轉的數據(非 TData)。

/// Configure an outcome for this step, then wire it to another step
        [Obsolete]
        IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
        
        
        /// Configure an outcome for this step, then wire it to a sequence
        IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);

前一個方法例如

When(0),會捕獲 return ExecutionResult.Outcome(value); 的值,判斷是否相等。可是這種方式已通過時。

須要使用表達式來判斷。例如

.When(data => 1)
.When(data => data.value==1)

2,While

條件判斷,條件是否真。與When有區別,When能夠捕獲 ExecutionResult.Outcome(value);

While 是阻塞的。

/// Repeat a block of steps until a condition becomes true
        IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);

示例

builder
                .StartWith<SayHello>()
                .While(data => data.Counter < 3)
                    .Do(x => x
                        .StartWith<DoSomething>()
                        .Then<IncrementStep>()
                            .Input(step => step.Value1, data => data.Counter)
                            .Output(data => data.Counter, step => step.Value2))
                .Then<SayGoodbye>();

3,If

條件判斷,是否符合條件。

If是阻塞的。

/// Execute a block of steps if a condition is true
        IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);

When、While、If的區別是,When、While 是條件是否爲真,If是表達式是否爲真。

實質上,是語言上的區別,與代碼邏輯無關。

真假用 When/While,條件判斷、表達式判斷用 If 。

節點併發

1,Parallel

並行任務。做爲容器,能夠在裏面設置多組任務,這些任務將會同時、併發運行。

Parallel 是阻塞的。

/// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();

示例:

.StartWith<SayHello>()
                .Parallel()
                    .Do(then => 
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.2"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.2")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.3"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.2"))
                .Join()
                .Then<SayGoodbye>();

有三個 Do,表明三個並行任務。三個 Do 是並行的,Do 內的代碼,會按順序執行。

Paeallel 的 Do:

public interface IParallelStepBuilder<TData, TStepBody>
        where TStepBody : IStepBody
    {
        IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
        IStepBuilder<TData, Sequence> Join();
    }

比起 ForEach、When、While、If,除了有 Do,還有 Join 方法。

對於其它節點類型來講,Do直接構建節點。

對於Parallel來講,Do收集任務,最終須要Join來構建節點和運行任務。

五,其它

寫得長很差看,其它內容壓縮一下。

數據傳遞和依賴注入

Workflow Core 支持對每一個步驟點進行依賴注入。

1565439224(1)

支持數據持久化

Workflow Core 支持將構建的工做流存儲到數據庫中,以便之後再次調用。

支持 Sql Server、Mysql、SQLite、PostgreSQL、Redis、MongoDB、AWS、Azure、

Elasticsearch、RabbitMQ... ....

支持動態調用和動態生成工做流

你能夠經過 C# 代碼構建工做流,或者經過 Json、Yaml 動態構建工做流。

能夠利用可視化設計器,將邏輯和任務生成配置文件,而後動態傳遞,使用 Workflow Core 動態建立工做流。

篇幅有限,再也不贅述。

有興趣請關注 Workflow Core:https://github.com/danielgerlag/workflow-core

相關文章
相關標籤/搜索