Asp.net Core 系列之--1.事件驅動初探:簡單事件總線實現(SimpleEventBus)

 ChuanGoing 2019-08-06 git

前言github

  開篇以前,簡單說明下隨筆緣由。在園子裏遊蕩了很久,期間也起過要寫一些關於.NET的隨筆,因各類緣由未能付諸實現。web

前段時間拜讀daxnet的系列文章,感覺頗深,猶豫很久,終於決定開始記錄本人的學習點滴。api

系列說明架構

  本系列目的是構建一套基於領域驅動設計(DDD)的基礎架構,漸進式實現CQRS/消息事件驅動型業務基礎框架,中間會夾雜着其餘的中間件的學習介紹,僅供學習交流用(.NET CORE/Standard 2.0)。框架

由於接觸領域驅動設計時間不長,現實上述目標可能會比較曲折,有不規範的地方望讀者指正。async

  構建開始前,簡單介紹下本篇的學習曲線:ide

1.引入Ioc/DI模塊化

2.簡單型事件驅動總線(EventBus)實現(事件定義/訂閱及派發,事件處理器等)函數

注:篇尾我會附上Github源碼地址(開發工具是VS2017/19,.NET CORE 2.2)

Ioc/DI

  Asp.net Core 自帶的Ioc容器用起來不大方便,本系列引入Autofac做爲Ioc/DI容器,先簡單介紹下幾個常規用法

首先建立一個Asp.net core web api應用程序,新建一個.Net Standard項目Base.Ioc用於管理Ioc/DI操做,並添加下圖的Nuget依賴

 

添加擴展類AutofacExtensions,添加以下方法(這裏引入了AspectCore動態代理後續實現Aop會用到)

public static IServiceProvider UseAutofac<TModule>(this IServiceCollection services)
           where TModule : Module, new()
        {
            ContainerBuilder builder = new ContainerBuilder();
            builder.Populate(services);

            builder.RegisterModule<TModule>();
       //引入AspectCore.Extensions.Autofac builder.RegisterDynamicProxy(); IContainer container
= builder.Build(); return new AutofacServiceProvider(container); }

在Startup.cs文件的ConfigureServices中替換Asp.net Core自帶Ioc容器:

// This method gets called by the runtime. Use this method to add services to the container.
        public IServiceProvider ConfigureServices(IServiceCollection services)
        {
            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);

            //替換Ioc容器,並擴展Autofac模塊註冊
            return services.UseAutofac<WebModule>(); }

 上面替換Ioc容器的時候,引入了Autofac的模塊自動注入功能

 public class WebModule : Module
    {
        protected override void Load(ContainerBuilder builder)
        {
            //builder.RegisterType<AutoFacManager>();
            //builder.RegisterType<Worker>().As<IPerson>();
        //掃描程序集自動註冊
            builder.RegisterAssembly(ThisAssembly);
        }
    }    

在Base.Ioc項目中添加AutofacInjectionExtensions.cs文件

/// <summary>
        /// Autofac自動注入
        /// </summary>
        /// <param name="builder"></param>
        /// <param name="assembly"></param>
        public static void RegisterAssembly(this ContainerBuilder builder, Assembly assembly)
        {
            foreach (var type in assembly.ExportedTypes)
            {
                if (type.IsPublic && !type.IsAbstract && type.IsClass)
                {
                    var interfaces = type.GetInterfaces();
                    IList<Type> transientList = new List<Type>();
                    IList<Type> scopeList = new List<Type>();
                    IList<Type> singletonList = new List<Type>();
                    foreach (var intrType in interfaces)
                    {
                        if (intrType.IsGenericType)
                        {
                            if (intrType.IsAssignableTo<IDependencyInstance>())
                            {
                                transientList.Add(intrType);
                            }
                            else if (intrType.IsAssignableTo<IScopeInstance>())
                            {
                                scopeList.Add(intrType);
                            }
                            else if (intrType.IsAssignableTo<ISingletonInstance>())
                            {
                                singletonList.Add(intrType);
                            }
                        }
                        else
                        {
                            if (intrType.IsAssignableTo<IDependencyInstance>())
                            {
                                transientList.Add(intrType);
                            }
                            else if (intrType.IsAssignableTo<IScopeInstance>())
                            {
                                scopeList.Add(intrType);
                            }
                            else if (intrType.IsAssignableTo<ISingletonInstance>())
                            {
                                singletonList.Add(intrType);
                            }
                        }
                    }

                    if (type.IsGenericType)
                    {
                        if (transientList.Count > 0)
                        {
                            builder.RegisterGeneric(type).As(transientList.ToArray()).InstancePerDependency();
                        }
                        if (scopeList.Count > 0)
                        {
                            builder.RegisterGeneric(type).As(scopeList.ToArray()).InstancePerLifetimeScope();
                        }
                        if (singletonList.Count > 0)
                        {
                            builder.RegisterGeneric(type).As(singletonList.ToArray()).SingleInstance();
                        }

                        //泛型
                        if (type.IsAssignableTo<IDependencyInstance>())
                        {
                            builder.RegisterGeneric(type).AsSelf().InstancePerDependency();
                        }
                        else if (type.IsAssignableTo<IScopeInstance>())
                        {
                            builder.RegisterGeneric(type).AsSelf().InstancePerLifetimeScope();
                        }
                        else if (type.IsAssignableTo<ISingletonInstance>())
                        {
                            builder.RegisterGeneric(type).AsSelf().SingleInstance();
                        }
                    }
                    else
                    {
                        if (transientList.Count > 0)
                        {
                            builder.RegisterType(type).As(transientList.ToArray()).InstancePerDependency();
                        }
                        if (scopeList.Count > 0)
                        {
                            builder.RegisterType(type).As(scopeList.ToArray()).InstancePerLifetimeScope();
                        }
                        if (singletonList.Count > 0)
                        {
                            builder.RegisterType(type).As(singletonList.ToArray()).SingleInstance();
                        }
                        //
                        if (type.IsAssignableTo<IDependencyInstance>())
                        {
                            builder.RegisterType(type).AsSelf().InstancePerDependency();
                        }
                        else if (type.IsAssignableTo<IScopeInstance>())
                        {
                            builder.RegisterType(type).AsSelf().InstancePerLifetimeScope();
                        }
                        else if (type.IsAssignableTo<ISingletonInstance>())
                        {
                            builder.RegisterType(type).AsSelf().SingleInstance();
                        }
                    }
                }
            }
        }

 上面一段代碼用到了IDependencyInstance/IScopeInstance/ISingletonInstance三個接口,分別用於瞬時/Scope/單例的服務標識

大概說明下這段代碼的做用:經過掃描傳入的程序集獲取外部可見的Public類型的Type(這裏咱們指的是類),掃描該類的全部繼承了服務標識接口,並註冊爲對應的服務

至此,Ioc容器已替換爲Autofac

EventBus

  事件總線實現發佈/訂閱功能,首先定義IEvent/IEventHandler,IEventHandler 定義了事件處理方法

public interface IEvent
    {
        Guid Id { get; }
        /// <summary>
        /// 時間戳
        /// </summary>
        long Timestamp { get; }
    }
public interface IEventHandler 
    {
        /// <summary>
        /// 處理事件
        /// </summary>
        /// <param name="event"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken));

  /// <summary>
  /// 能否處理
  /// </summary>
  /// <param name="event"></param>
  /// <returns></returns>
  bool CanHandle(IEvent @event);

    }
接着定義發佈/訂閱及事件總線接口
public interface IEventPublisher
    {
        /// <summary>
        /// 發佈事件
        /// </summary>
        /// <typeparam name="TEvent"></typeparam>
        /// <param name="event"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken))
            where TEvent : IEvent;
    }
public interface IEventSubscriber
    {
        /// <summary>
        /// 事件訂閱
        /// </summary>
        void Subscribe();
    }
public interface IEventBus : IEventSubscriber, IEventPublisher
    {
    }

 

EventBus實現發佈/訂閱器,並在事件發佈的同時通知相應的事件處理器進行相關處理。

這裏引入了"消息隊列"的概念(固然目前咱們只是用來模擬消息隊列,後續會引入RabbitMQ來實現)

相關代碼以下:

 internal sealed class EventQueue
    {
        public event EventHandler<EventProcessedEventArgs> EventPushed;

        public EventQueue()
        {

        }

        public void Push(IEvent @event)
        {
            OnMessagePushed(new EventProcessedEventArgs(@event));
        }

        private void OnMessagePushed(EventProcessedEventArgs e)
        {
            this.EventPushed?.Invoke(this, e);
        }
    }
/// <summary>
    /// 消息事件參數
    /// </summary>
    public class EventProcessedEventArgs : EventArgs
    {
        public IEvent Event { get; }

        public EventProcessedEventArgs(IEvent @event)
        {
            Event = @event;
        }
    }
public class EventBus : IEventBus
    {
        private readonly EventQueue eventQueue = new EventQueue();
        private readonly IEnumerable<IEventHandler> eventHandlers;

        public EventBus(IEnumerable<IEventHandler> eventHandlers)
        {
            this.eventHandlers = eventHandlers;
        }

        /// <summary>
        /// 發佈事件到隊列時觸發處理事件
        /// </summary>
        /// <param name="sendere"></param>
        /// <param name="e"></param>
        private void EventQueue_EventPushed(object sendere, EventProcessedEventArgs e)
        {
            (from eh in this.eventHandlers
             where
                eh.CanHandle(e.Event)
             select eh).ToList().ForEach(async eh => await eh.HandleAsync(e.Event));
        }

        public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken))
            where TEvent : IEvent
        => Task.Factory.StartNew(() => eventQueue.Push(@event));

        /// <summary>
        /// 事件訂閱(訂閱隊列上的事件)
        /// </summary>
        public void Subscribe()
        {
            eventQueue.EventPushed += EventQueue_EventPushed;
        }

上面的代碼中EventQueue的Push方法被調用時,會觸發EventPushed事件,在EventBus中,咱們註冊了EventQueue的EventPushed事件,即最終會觸發EventBus的EventQueue_EventPushed事件,進而經過事件處理器來處理(這塊詳細說明,請閱讀DaxNet-事件驅動型架構實現一

 

到此,消息總線機制處理完成,接下來咱們建立一個Web API應用程序來演示消息發佈/訂閱及處理

 上面咱們定義了IEvent/IEventHandler,這裏咱們先在WebAPI 項目中來實現

public class CustomerCreatedEvent : IEvent
    {
        public CustomerCreatedEvent(string customerName)
        {
            this.Id = Guid.NewGuid();
            this.Timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
            this.CustomerName = customerName;
        }

        public Guid Id { get; }

        public long Timestamp { get; }

        public string CustomerName { get; }
    }
public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>
    {
        public bool CanHandle(IEvent @event)
            => @event.GetType().Equals(typeof(CustomerCreatedEvent));

        public Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default(CancellationToken))
        {
            return Task.FromResult(true);
        }

        public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken))
            => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
    }

值得說明下的是,在實現IEventHandler的時候,利用了泛型接口IEventHandler<T>來創建IEventHandler對於IEvent的依賴,由於事件處理器最終處理的必然是某個事件。

OK,如今新建一個Controller

[Route("api/[controller]")]
    public class CustomersController : Controller
    {private readonly IEventBus _eventBus;

        public CustomersController(IEventBus eventBus)
        {
            _eventBus = eventBus;
        }
// 建立新的客戶信息
        [HttpPost]
        public async Task<IActionResult> Create([FromBody] CustomerDto model)
        {
            var name = model.Name;
            if (string.IsNullOrEmpty(name))
            {
                return BadRequest();
            }
       //這裏其餘業務處理...
        await _eventBus.PublishAsync(new CustomerCreatedEvent(name));
 } }

上面的CustomersController構造函數中由Ioc注入了eventBus,須要注意的是,引用eventBus前,須要在Startup.cs中註冊對應的服務,咱們這裏用到的是Autofac的模塊化註冊。

利用Web API下的WebModule.cs引用SimpleEventBus中的EventBusModule

 public class WebModule : Module
    {
        protected override void Load(ContainerBuilder builder)
        {
       //掃描程序集自動註冊 builder.RegisterAssembly(ThisAssembly);
       //註冊模塊化EventBusModule builder.RegisterModule
<EventBusModule>(); } }
public class EventBusModule : Module
    {
        protected override void Load(ContainerBuilder builder)
        {
       //掃描程序集自動註冊 builder.RegisterAssembly(ThisAssembly); } }

關於掃描程序集自動註冊,上面Ioc段落有詳細說明,這裏就再也不囉嗦。

到此爲止,編碼工做告一段落,運行Web API,利用Postman或PowerShell 自帶的命令 Invoke-WebRequest模擬http請求,結果以下圖:

請求進來,數據加載到Dto中

EventBus的事件發佈時調用EventQueue的Push函數,同時會觸發EventPushed事件,經過對應的時間處理器處理事件

最終,消息在事件處理器中進行相關處理

回顧

  回顧一下,本篇開頭介紹了Autofac替換Asp.Net Core自帶Ioc容器,Autofac的模塊註冊/泛型/服務註冊等;而後介紹了事件總線的工做流程:事件發佈到總線中,經過消息隊列觸發註冊到總線中的事件處理器處理事件消息;最後,咱們利用Web API 展現了程序的運行過程。

  由於篇幅有限,代碼中關於Storage的部分,涉及到了倉儲的概念,我想到時放到領域設計部分一塊兒介紹。

源碼

  本篇涉及的源碼在Github的https://github.com/ChuanGoing/Start.git  的SimpleEventBus分支能夠找到。

相關文章
相關標籤/搜索