ChuanGoing 2019-08-06 git
前言github
開篇以前,簡單說明下隨筆緣由。在園子裏遊蕩了很久,期間也起過要寫一些關於.NET的隨筆,因各類緣由未能付諸實現。web
前段時間拜讀daxnet的系列文章,感覺頗深,猶豫很久,終於決定開始記錄本人的學習點滴。api
系列說明架構
本系列目的是構建一套基於領域驅動設計(DDD)的基礎架構,漸進式實現CQRS/消息事件驅動型業務基礎框架,中間會夾雜着其餘的中間件的學習介紹,僅供學習交流用(.NET CORE/Standard 2.0)。框架
由於接觸領域驅動設計時間不長,現實上述目標可能會比較曲折,有不規範的地方望讀者指正。async
構建開始前,簡單介紹下本篇的學習曲線:ide
1.引入Ioc/DI模塊化
2.簡單型事件驅動總線(EventBus)實現(事件定義/訂閱及派發,事件處理器等)函數
注:篇尾我會附上Github源碼地址(開發工具是VS2017/19,.NET CORE 2.2)
Asp.net Core 自帶的Ioc容器用起來不大方便,本系列引入Autofac做爲Ioc/DI容器,先簡單介紹下幾個常規用法
首先建立一個Asp.net core web api應用程序,新建一個.Net Standard項目Base.Ioc用於管理Ioc/DI操做,並添加下圖的Nuget依賴
添加擴展類AutofacExtensions,添加以下方法(這裏引入了AspectCore動態代理後續實現Aop會用到)
public static IServiceProvider UseAutofac<TModule>(this IServiceCollection services) where TModule : Module, new() { ContainerBuilder builder = new ContainerBuilder(); builder.Populate(services); builder.RegisterModule<TModule>();
//引入AspectCore.Extensions.Autofac builder.RegisterDynamicProxy(); IContainer container = builder.Build(); return new AutofacServiceProvider(container); }
在Startup.cs文件的ConfigureServices中替換Asp.net Core自帶Ioc容器:
// This method gets called by the runtime. Use this method to add services to the container.
public IServiceProvider ConfigureServices(IServiceCollection services)
{
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
//替換Ioc容器,並擴展Autofac模塊註冊
return services.UseAutofac<WebModule>(); }
上面替換Ioc容器的時候,引入了Autofac的模塊自動注入功能
public class WebModule : Module { protected override void Load(ContainerBuilder builder) { //builder.RegisterType<AutoFacManager>(); //builder.RegisterType<Worker>().As<IPerson>();
//掃描程序集自動註冊
builder.RegisterAssembly(ThisAssembly);
}
}
在Base.Ioc項目中添加AutofacInjectionExtensions.cs文件
/// <summary> /// Autofac自動注入 /// </summary> /// <param name="builder"></param> /// <param name="assembly"></param> public static void RegisterAssembly(this ContainerBuilder builder, Assembly assembly) { foreach (var type in assembly.ExportedTypes) { if (type.IsPublic && !type.IsAbstract && type.IsClass) { var interfaces = type.GetInterfaces(); IList<Type> transientList = new List<Type>(); IList<Type> scopeList = new List<Type>(); IList<Type> singletonList = new List<Type>(); foreach (var intrType in interfaces) { if (intrType.IsGenericType) { if (intrType.IsAssignableTo<IDependencyInstance>()) { transientList.Add(intrType); } else if (intrType.IsAssignableTo<IScopeInstance>()) { scopeList.Add(intrType); } else if (intrType.IsAssignableTo<ISingletonInstance>()) { singletonList.Add(intrType); } } else { if (intrType.IsAssignableTo<IDependencyInstance>()) { transientList.Add(intrType); } else if (intrType.IsAssignableTo<IScopeInstance>()) { scopeList.Add(intrType); } else if (intrType.IsAssignableTo<ISingletonInstance>()) { singletonList.Add(intrType); } } } if (type.IsGenericType) { if (transientList.Count > 0) { builder.RegisterGeneric(type).As(transientList.ToArray()).InstancePerDependency(); } if (scopeList.Count > 0) { builder.RegisterGeneric(type).As(scopeList.ToArray()).InstancePerLifetimeScope(); } if (singletonList.Count > 0) { builder.RegisterGeneric(type).As(singletonList.ToArray()).SingleInstance(); } //泛型 if (type.IsAssignableTo<IDependencyInstance>()) { builder.RegisterGeneric(type).AsSelf().InstancePerDependency(); } else if (type.IsAssignableTo<IScopeInstance>()) { builder.RegisterGeneric(type).AsSelf().InstancePerLifetimeScope(); } else if (type.IsAssignableTo<ISingletonInstance>()) { builder.RegisterGeneric(type).AsSelf().SingleInstance(); } } else { if (transientList.Count > 0) { builder.RegisterType(type).As(transientList.ToArray()).InstancePerDependency(); } if (scopeList.Count > 0) { builder.RegisterType(type).As(scopeList.ToArray()).InstancePerLifetimeScope(); } if (singletonList.Count > 0) { builder.RegisterType(type).As(singletonList.ToArray()).SingleInstance(); } // if (type.IsAssignableTo<IDependencyInstance>()) { builder.RegisterType(type).AsSelf().InstancePerDependency(); } else if (type.IsAssignableTo<IScopeInstance>()) { builder.RegisterType(type).AsSelf().InstancePerLifetimeScope(); } else if (type.IsAssignableTo<ISingletonInstance>()) { builder.RegisterType(type).AsSelf().SingleInstance(); } } } } }
上面一段代碼用到了IDependencyInstance/IScopeInstance/ISingletonInstance三個接口,分別用於瞬時/Scope/單例的服務標識。
大概說明下這段代碼的做用:經過掃描傳入的程序集獲取外部可見的Public類型的Type(這裏咱們指的是類),掃描該類的全部繼承了服務標識接口,並註冊爲對應的服務
至此,Ioc容器已替換爲Autofac
EventBus
事件總線實現發佈/訂閱功能,首先定義IEvent/IEventHandler,IEventHandler 定義了事件處理方法
public interface IEvent { Guid Id { get; } /// <summary> /// 時間戳 /// </summary> long Timestamp { get; } }
public interface IEventHandler { /// <summary> /// 處理事件 /// </summary> /// <param name="event"></param> /// <param name="cancellationToken"></param> /// <returns></returns> Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken));
/// <summary>
/// 能否處理
/// </summary>
/// <param name="event"></param>
/// <returns></returns>
bool CanHandle(IEvent @event);
}
接着定義發佈/訂閱及事件總線接口
public interface IEventPublisher { /// <summary> /// 發佈事件 /// </summary> /// <typeparam name="TEvent"></typeparam> /// <param name="event"></param> /// <param name="cancellationToken"></param> /// <returns></returns> Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken)) where TEvent : IEvent; }
public interface IEventSubscriber { /// <summary> /// 事件訂閱 /// </summary> void Subscribe(); }
public interface IEventBus : IEventSubscriber, IEventPublisher { }
EventBus實現發佈/訂閱器,並在事件發佈的同時通知相應的事件處理器進行相關處理。
這裏引入了"消息隊列"的概念(固然目前咱們只是用來模擬消息隊列,後續會引入RabbitMQ來實現)
相關代碼以下:
internal sealed class EventQueue { public event EventHandler<EventProcessedEventArgs> EventPushed; public EventQueue() { } public void Push(IEvent @event) { OnMessagePushed(new EventProcessedEventArgs(@event)); } private void OnMessagePushed(EventProcessedEventArgs e) { this.EventPushed?.Invoke(this, e); } }
/// <summary> /// 消息事件參數 /// </summary> public class EventProcessedEventArgs : EventArgs { public IEvent Event { get; } public EventProcessedEventArgs(IEvent @event) { Event = @event; } }
public class EventBus : IEventBus { private readonly EventQueue eventQueue = new EventQueue(); private readonly IEnumerable<IEventHandler> eventHandlers; public EventBus(IEnumerable<IEventHandler> eventHandlers) { this.eventHandlers = eventHandlers; } /// <summary> /// 發佈事件到隊列時觸發處理事件 /// </summary> /// <param name="sendere"></param> /// <param name="e"></param> private void EventQueue_EventPushed(object sendere, EventProcessedEventArgs e) { (from eh in this.eventHandlers where eh.CanHandle(e.Event) select eh).ToList().ForEach(async eh => await eh.HandleAsync(e.Event)); } public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken)) where TEvent : IEvent => Task.Factory.StartNew(() => eventQueue.Push(@event)); /// <summary> /// 事件訂閱(訂閱隊列上的事件) /// </summary> public void Subscribe() { eventQueue.EventPushed += EventQueue_EventPushed; }
上面的代碼中EventQueue的Push方法被調用時,會觸發EventPushed事件,在EventBus中,咱們註冊了EventQueue的EventPushed事件,即最終會觸發EventBus的EventQueue_EventPushed事件,進而經過事件處理器來處理(這塊詳細說明,請閱讀DaxNet-事件驅動型架構實現一)
到此,消息總線機制處理完成,接下來咱們建立一個Web API應用程序來演示消息發佈/訂閱及處理
上面咱們定義了IEvent/IEventHandler,這裏咱們先在WebAPI 項目中來實現
public class CustomerCreatedEvent : IEvent { public CustomerCreatedEvent(string customerName) { this.Id = Guid.NewGuid(); this.Timestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(); this.CustomerName = customerName; } public Guid Id { get; } public long Timestamp { get; } public string CustomerName { get; } }
public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent> { public bool CanHandle(IEvent @event) => @event.GetType().Equals(typeof(CustomerCreatedEvent)); public Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default(CancellationToken)) { return Task.FromResult(true); } public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken)) => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false); }
值得說明下的是,在實現IEventHandler的時候,利用了泛型接口IEventHandler<T>來創建IEventHandler對於IEvent的依賴,由於事件處理器最終處理的必然是某個事件。
OK,如今新建一個Controller
[Route("api/[controller]")] public class CustomersController : Controller {private readonly IEventBus _eventBus; public CustomersController(IEventBus eventBus) { _eventBus = eventBus; } // 建立新的客戶信息 [HttpPost] public async Task<IActionResult> Create([FromBody] CustomerDto model) { var name = model.Name; if (string.IsNullOrEmpty(name)) { return BadRequest(); }
//這裏其餘業務處理...
await _eventBus.PublishAsync(new CustomerCreatedEvent(name));
} }
上面的CustomersController構造函數中由Ioc注入了eventBus,須要注意的是,引用eventBus前,須要在Startup.cs中註冊對應的服務,咱們這裏用到的是Autofac的模塊化註冊。
利用Web API下的WebModule.cs引用SimpleEventBus中的EventBusModule
public class WebModule : Module { protected override void Load(ContainerBuilder builder) {
//掃描程序集自動註冊 builder.RegisterAssembly(ThisAssembly);
//註冊模塊化EventBusModule builder.RegisterModule<EventBusModule>(); } }
public class EventBusModule : Module { protected override void Load(ContainerBuilder builder) {
//掃描程序集自動註冊 builder.RegisterAssembly(ThisAssembly); } }
關於掃描程序集自動註冊,上面Ioc段落有詳細說明,這裏就再也不囉嗦。
到此爲止,編碼工做告一段落,運行Web API,利用Postman或PowerShell 自帶的命令 Invoke-WebRequest模擬http請求,結果以下圖:
請求進來,數據加載到Dto中
EventBus的事件發佈時調用EventQueue的Push函數,同時會觸發EventPushed事件,經過對應的時間處理器處理事件
最終,消息在事件處理器中進行相關處理
回顧
回顧一下,本篇開頭介紹了Autofac替換Asp.Net Core自帶Ioc容器,Autofac的模塊註冊/泛型/服務註冊等;而後介紹了事件總線的工做流程:事件發佈到總線中,經過消息隊列觸發註冊到總線中的事件處理器處理事件消息;最後,咱們利用Web API 展現了程序的運行過程。
由於篇幅有限,代碼中關於Storage的部分,涉及到了倉儲的概念,我想到時放到領域設計部分一塊兒介紹。
源碼
本篇涉及的源碼在Github的https://github.com/ChuanGoing/Start.git 的SimpleEventBus分支能夠找到。