C#多線程(16):手把手教你擼一個工做流

前言

前面學習了不少多線程和任務的基礎知識,這裏要來實踐一下啦。經過本篇教程,你能夠寫出一個簡單的工做流引擎。html

本篇教程內容完成是基於任務的,只須要看過筆者的三篇關於異步的文章,掌握 C# 基礎,便可輕鬆完成。git

因爲本篇文章編寫的工做流程序,主要使用任務,有些邏輯過程會比較難理解,多測試一下就好。代碼主要仍是 C# 基礎,爲何說簡單?github

  • 不包含 async 、await
  • 幾乎不含包含多線程(有個讀寫鎖)
  • 不包含表達式樹
  • 幾乎不含反射(有個小地方須要反射一下,可是很是簡單)
  • 沒有複雜的算法

由於是基於任務(Task)的,因此能夠輕鬆設計組合流程,組成複雜的工做流。算法

因爲只是講述基礎,因此不會包含不少種流程控制,這裏只實現一些簡單的。微信

先說明,別用到業務上。。。這個工做流很是簡單,就幾個功能,這個工做流是基於筆者的多線程系列文章的知識點。寫這個東西是爲了講解任務操做,讓讀者更加深刻理解任務。多線程

代碼地址:https://github.com/whuanle/CZGL.FLow併發

這兩天忙着搬東西,今天沒認真寫文章,代碼不明白的地方,能夠到微信羣找我。微信名稱:癡者工良,dotnet 的羣基本我都在。異步

節點

在開始前,咱們來設計幾種流程控制的東西。async

將一個 步驟/流程/節點 稱爲 step。ide

Then

一個普通的節點,包含一個任務。

多個 Then 節點,能夠組成一條連續的工做流。

Parallel

並行節點,能夠設置多個並行節點放到 Parallel 中,以及在裏面爲任一個節點建立新的分支。

Schedule

定時節點,建立後會在必定時間後執行節點中的任務。

Delay

讓當前任務阻塞一段時間。

試用一下

順序節點

打開你的 VS ,建立項目,Nuget 引用 CZGL.DoFlow ,版本 1.0.2 。

建立一個類 MyFlow1,繼承 IDoFlow

public class MyFlow1 : IDoFlow
    {
        public int Id => 1;

        public string Name => "隨便起個名字";

        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            throw new NotImplementedException();
        }
    }

你能夠建立多個工做流任務,每一個工做流的 Id 必須惟一。Name 和 Version 隨便填,由於這裏筆者沒有對這幾個字段作邏輯。

IDoFlowBuilder 是構建工做流的一個接口。

咱們來寫一個工做流測試一下。

/// <summary>
/// 普通節點 Then 使用方法
/// </summary>
public class MyFlow1 : IDoFlow
{
    public int Id => 1;
    public string Name => "test";
    public int Version => 1;

    public IDoFlowBuilder Build(IDoFlowBuilder builder)
    {
        builder.StartWith(() =>
        {
            Console.WriteLine("工做流開始");
        }).Then(() =>
        {
            Console.WriteLine("下一個節點");
        }).Then(() =>
         {
             Console.WriteLine("最後一個節點");
         });
        return builder;
    }
}

Main 方法中:

static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow1>();
            // FlowCore.RegisterWorkflow(new MyFlow1());
            FlowCore.Start(1);
            Console.ReadKey();
        }

.StartWith() 方法開始一個工做流;

FlowCore.RegisterWorkflow<T>() 註冊一個工做流;

FlowCore.Start();執行一個工做流;

並行任務

其代碼以下:

/// <summary>
    /// 並行節點 Parallel 使用方法
    /// </summary>
    public class MyFlow2 : IDoFlow
    {
        public int Id => 2;
        public string Name => "test";
        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            builder.StartWith()
                .Parallel(steps =>
                {
                    // 每一個並行任務也能夠設計後面繼續執行其它任務
                    steps.Do(() =>
                    {
                        Console.WriteLine("並行1");
                    }).Do(() =>
                    {
                        Console.WriteLine("並行2");
                    });
                    steps.Do(() =>
                    {
                        Console.WriteLine("並行3");
                    });

                    // 並行任務設計完成後,必須調用此方法
                    // 此方法必須放在全部並行任務 .Do() 的最後
                    steps.EndParallel();

                    // 若是 .Do() 在 EndParallel() 後,那麼不會等待此任務
                    steps.Do(() => { Console.WriteLine("並行異步"); });

                    // 開啓新的分支
                    steps.StartWith()
                    .Then(() =>
                    {
                        Console.WriteLine("新的分支" + Task.CurrentId);
                    }).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); });

                }, false)
                .Then(() =>
                {
                    Console.WriteLine("11111111111111111 ");
                });

            return builder;
        }
    }

Main 方法中:

static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow2>();
            FlowCore.Start(2);
            Console.ReadKey();
        }

經過以上示例,能夠大概瞭解本篇文章中咱們要寫的程序。

編寫工做流

創建一個類庫項目,名爲 DoFlow

創建 ExtensionsInterfacesServices 三個目錄。

接口構建器

新建 IStepBuilder 接口文件到 Interfaces 目錄,其內容以下:

using System;

namespace DoFlow.Interfaces
{
    public interface IStepBuilder
    {
        /// <summary>
        /// 普通節點
        /// </summary>
        /// <param name="stepBuilder"></param>
        /// <returns></returns>
        IStepBuilder Then(Action action);

        /// <summary>
        /// 多個節點
        /// <para>默認下,須要等待全部的任務完成,這個step纔算完成</para>
        /// </summary>
        /// <param name="action"></param>
        /// <param name="anyWait">任意一個任務完成便可跳轉到下一個step</param>
        /// <returns></returns>
        IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);

        /// <summary>
        /// 節點將在某個時間間隔後執行
        /// <para>異步,不會阻塞當前工做流的運行,計劃任務將在一段時間後觸發</para>
        /// </summary>
        /// <returns></returns>
        IStepBuilder Schedule(Action action, TimeSpan time);

        /// <summary>
        /// 阻塞一段時間
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        IStepBuilder Delay(TimeSpan time);
    }
}

新建 IStepParallel 文件到 Interfaces 目錄。

using System;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 並行任務
    ///  <para>默認狀況下,只有這個節點的全部並行任務都完成後,這個節點纔算完成</para>
    /// </summary>
    public interface IStepParallel
    {
        /// <summary>
        /// 一個並行任務
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepParallel Do(Action action);

        /// <summary>
        /// 開始一個分支
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepBuilder StartWith(Action action = null);

        /// <summary>
        /// 必須使用此方法結束一個並行任務
        /// </summary>
        void EndParallel();
    }

    /// <summary>
    /// 並行任務
    /// <para>任意一個任務完成後,就能夠跳轉到下一個 step</para>
    /// </summary>
    public interface IStepParallelAny : IStepParallel
    {

    }
}

工做流構建器

新建 IDoFlowBuilder 接口文件到 Interfaces 目錄。

using System;
using System.Threading.Tasks;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 構建工做流任務
    /// </summary>
    public interface IDoFlowBuilder
    {
        /// <summary>
        /// 開始一個 step
        /// </summary>
        IStepBuilder StartWith(Action action = null);
        void EndWith(Action action);

        Task ThatTask { get; }
    }
}

新建 IDoFlow 接口文件到 Interfaces 目錄。

namespace DoFlow.Interfaces
{

    /// <summary>
    /// 工做流
    /// <para>無參數傳遞</para>
    /// </summary>
    public interface IDoFlow
    {
        /// <summary>
        /// 全局惟一標識
        /// </summary>
        int Id { get; }

        /// <summary>
        /// 標識此工做流的名稱
        /// </summary>
        string Name { get; }

        /// <summary>
        /// 標識此工做流的版本
        /// </summary>
        int Version { get; }

        IDoFlowBuilder Build(IDoFlowBuilder builder);
    }
}

依賴注入

新建 DependencyInjectionService 文件到 Services 目錄。

用於實現依賴注入和解耦。

using DoFlow.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;

namespace DoFlow.Services
{
    /// <summary>
    /// 依賴注入服務
    /// </summary>
    public static class DependencyInjectionService
    {
        private static IServiceCollection _servicesList;
        private static IServiceProvider _services;
        static DependencyInjectionService()
        {
            IServiceCollection services = new ServiceCollection();
            _servicesList = services;
            // 注入引擎須要的服務
            InitExtension.StartInitExtension();
            var serviceProvider = services.BuildServiceProvider();
            _services = serviceProvider;
        }

        /// <summary>
        /// 添加一個注入到容器服務
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        /// <typeparam name="TImplementation"></typeparam>
        public static void AddService<TService, TImplementation>()
            where TService : class
            where TImplementation : class, TService
        {
            _servicesList.AddTransient<TService, TImplementation>();
        }

        /// <summary>
        /// 獲取須要的服務
        /// </summary>
        /// <typeparam name="TIResult"></typeparam>
        /// <returns></returns>
        public static TIResult GetService<TIResult>()
        {
            TIResult Tservice = _services.GetService<TIResult>();
            return Tservice;
        }
    }
}

添加一個 InitExtension 文件到 Extensions 目錄。

using DoFlow.Interfaces;
using DoFlow.Services;

namespace DoFlow.Extensions
{
    public static class InitExtension
    {
        private static bool IsInit = false;
        public static void StartInitExtension()
        {
            if (IsInit) return;
            IsInit = true;
            DependencyInjectionService.AddService<IStepBuilder, StepBuilder>();
            DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>();
            DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>();
            DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>();
        }
    }
}

實現工做流解析

如下文件均在 Services 目錄創建。

新建 StepBuilder 文件,用於解析節點,構建任務。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{

    /// <summary>
    /// 節點工做引擎
    /// </summary>
    public class StepBuilder : IStepBuilder
    {
        private Task _task;

        /// <summary>
        /// 延遲執行
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Delay(TimeSpan time)
        {
            Task.Delay(time).Wait();
            return this;
        }

        /// <summary>
        /// 並行 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false)
        {
            IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>();
            Task task = new Task(() =>
            {
                action.Invoke(parallel);
            });

            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
            });
            _task = task;
            return this;
        }

        /// <summary>
        /// 計劃任務
        /// </summary>
        /// <param name="action"></param>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Schedule(Action action, TimeSpan time)
        {
            Task.Factory.StartNew(() =>
            {
                Task.Delay(time).Wait();
                action.Invoke();
            });
            return this;
        }

        /// <summary>
        /// 普通 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Then(Action action)
        {
            Task task = new Task(action);
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
                task.Wait();
            });
            _task = task;
            return this;
        }

        public void SetTask(Task task)
        {
            _task = task;
        }
    }
}

新建 StepParallel 文件,裏面有兩個類,用於實現同步任務。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    /// <summary>
    /// 第一層全部任務結束後才能跳轉下一個 step
    /// </summary>
    public class StepParallelWhenAll : IStepParallel
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAll()
        {
            _task = new Task(() => { },TaskCreationOptions.AttachedToParent);
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAll(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }

    /// <summary>
    /// 完成任意一個任務便可跳轉到下一個 step
    /// </summary>
    public class StepParallelWhenAny : IStepParallelAny
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAny()
        {
            _task = Task.Run(() => { });
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAny(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }
}

新建 DoFlowBuilder 文件,用於構建工做流。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    public class DoFlowBuilder : IDoFlowBuilder
    {
        private Task _task;
        public Task ThatTask => _task;

        public void EndWith(Action action)
        {
            _task.Start();
        }

        public IStepBuilder StartWith(Action action = null)
        {
            if (action is null)
                _task = new Task(() => { });
            else _task = new Task(action);

            IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            ((StepBuilder)_stepBuilder).SetTask(_task);
            return _stepBuilder;
        }
    }
}

新建 FlowEngine 文件,用於執行工做流。

using DoFlow.Interfaces;

namespace DoFlow.Services
{
    /// <summary>
    /// 工做流引擎
    /// </summary>
    public class FlowEngine
    {
        private readonly IDoFlow _flow;
        public FlowEngine(IDoFlow flow)
        {
            _flow = flow;
        }

        /// <summary>
        /// 開始一個工做流
        /// </summary>
        public void Start()
        {
            IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>();
            _flow.Build(builder).ThatTask.Start();
        }
    }
}

新建 FlowCore 文件,用於存儲和索引工做流。使用讀寫鎖解決併發字典問題。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;

namespace DoFlow.Services
{
    public static class FlowCore
    {
        private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();

        // 讀寫鎖
        private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();

        /// <summary>
        /// 註冊工做流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow(IDoFlow flow)
        {
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 註冊工做流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow<TDoFlow>()
        {

            Type type = typeof(TDoFlow);
            IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 要啓動的工做流
        /// </summary>
        /// <param name="id"></param>
        public static bool Start(int id)
        {
            FlowEngine engine;
            // 讀寫鎖
            try
            {
                readerWriterLockSlim.EnterUpgradeableReadLock();

                if (!flowEngines.ContainsKey(id))
                    return default;
                try
                {
                    readerWriterLockSlim.EnterWriteLock();
                    engine = flowEngines[id];
                }
                catch { return default; }
                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }
            }
            catch { return default; }
            finally
            {
                readerWriterLockSlim.ExitUpgradeableReadLock();
            }

            engine.Start();
            return true;
        }
    }
}

就這樣程序寫完了。

忙去了。

相關文章
相關標籤/搜索