在上文中,我介紹了事件驅動型架構的一種簡單的實現,並演示了一個完整的事件派發、訂閱和處理的流程。這種實現太簡單了,百十行代碼就展現了一個基本工做原理。然而,要將這樣的解決方案運用到實際生產環境,還有很長的路要走。今天,咱們就研究一下在事件處理器中,對象生命週期的管理問題。html
事實上,不只僅是在事件處理器中,咱們須要關心對象的生命週期,在整個ASP.NET Core Web API的應用程序裏,咱們須要理解並仔細推敲被註冊到IoC容器中的服務,它們的生命週期應該是個怎樣的情形,這也是服務端應用程序設計必須認真考慮的內容。由於若是生命週期管理不合理,程序申請的資源沒法合理釋放,最後便會帶來內存泄漏、程序崩潰等各類問題,然而這樣的問題對於服務端應用程序來講,是很是嚴重的。git
記得在上一篇文章的結束部分,我給你們留下一個練習,就是讓你們在CustomerCreatedEventHandler事件處理器的HandleAsync方法中,填入本身的代碼,以便對得到的事件消息作進一步的處理。做爲本文的引子,咱們首先將這部分工做作完,而後再進一步分析生命週期的問題。github
Event Store是CQRS體系結構模式中最爲重要的一個組成部分,它的主要職責就是保存發生於領域模型中的領域事件,並對事件數據進行歸檔。當倉儲須要獲取領域模型對象時,Event Store也會配合快照數據庫一塊兒,根據領域事件的發生順序,逐步回放並重塑領域模型對象。事實上,Event Store的實現是很是複雜的,雖然從它的職責上來看並不算太複雜,然而它所須要解決的事件同步、快照、性能、消息派發等問題,使得CQRS體系結構的實現變得很是複雜。在實際應用中,已經有一些比較成熟的框架和工具集,可以幫助咱們在CQRS中很方便地實現Event Store,好比GetEventStore就是一個很好的開源Event Store框架,它是基於.NET開發的,在微軟官方的eShopOnContainers說明文檔中,也提到了這個框架,推薦你們上他們的官網(https://eventstore.org/)瞭解一下。在這裏咱們就先不深刻研究Event Store應該如何實現,咱們先作一個簡單的Event Store,以便展現咱們須要討論的問題。sql
延續着上一版的代碼庫(https://github.com/daxnet/edasample/tree/chapter_1),咱們首先在EdaSample.Common.Events命名空間下,定義一個IEventStore的接口,這個接口很是簡單,僅僅包含一個保存事件的方法,代碼以下:shell
public interface IEventStore : IDisposable { Task SaveEventAsync<TEvent>(TEvent @event) where TEvent : IEvent; }
SaveEventAsync方法僅有一個參數:由泛型類型TEvent綁定的@event對象。泛型約束表示SaveEventAsync方法僅能接受IEvent接口及其實現類型的對象做爲參數傳入。接口定義好了,下一步就是實現這個接口,對傳入的事件對象進行保存。爲了實現過程的簡單,咱們使用Dapper,將事件數據保存到SQL Server數據庫中,來模擬Event Store對事件的保存操做。數據庫
Note:爲何IEventStore接口的SaveEventAsync方法簽名中,沒有CancellationToken參數?嚴格來講,支持async/await異步編程模型的方法定義上,是須要帶上CancellationToken參數的,以便調用方請求取消操做的時候,方法內部能夠根據狀況對操做進行取消。然而有些狀況下取消操做並非那麼合理,或者方法內部所使用的API並無提供更深層的取消支持,所以也就沒有必要在方法定義上增長CancellationToken參數。在此處,爲了保證接口的簡單,沒有引入CancellationToken的參數。編程
接下來,咱們實現這個接口,並用Dapper將事件數據保存到SQL Server中。出於框架設計的考慮,咱們新建一個Net Standard Class Library項目,在這個新的項目中實現IEventStore接口,這麼作的緣由已經在上文中介紹過了。代碼以下:架構
public class DapperEventStore : IEventStore { private readonly string connectionString; public DapperEventStore(string connectionString) { this.connectionString = connectionString; } public async Task SaveEventAsync<TEvent>(TEvent @event) where TEvent : IEvent { const string sql = @"INSERT INTO [dbo].[Events] ([EventId], [EventPayload], [EventTimestamp]) VALUES (@eventId, @eventPayload, @eventTimestamp)"; using (var connection = new SqlConnection(this.connectionString)) { await connection.ExecuteAsync(sql, new { eventId = @event.Id, eventPayload = JsonConvert.SerializeObject(@event), eventTimestamp = @event.Timestamp }); } } #region IDisposable Support // 此處省略 #endregion }
IDisposable接口的實現部分暫且省略,能夠看到,實現仍是很是簡單的:經過構造函數傳入數據庫的鏈接字符串,在SaveEventAsyc方法中,基於SqlConnection對象執行Dapper的擴展方法來完成事件數據的保存。併發
Note: 此處使用了JsonConvert.SerializeObject方法來序列化事件對象,也就意味着DapperEventStore程序集須要依賴Newtonsoft.Json程序集。雖然在咱們此處的案例中不會有什麼影響,但這樣作會形成DapperEventStore對Newtonsoft.Json的強依賴,這樣的依賴關係不只讓DapperEventStore變得不可測試,並且Newtonsoft.Json未來未知的變化,也會影響到DapperEventStore,帶來一些不肯定性和維護性問題。更好的作法是,引入一個IMessageSerializer接口,在另外一個新的程序集中使用Newtonsoft.Json來實現這個接口,同時僅讓DapperEventStore依賴IMessageSerializer,並在應用程序啓動時,將Newtonsoft.Json的實現註冊到IoC容器中。此時,IMessageSerializer能夠被Mock,DapperEventStore就變得可測試了;另外一方面,因爲只有那個新的程序集會依賴Newtonsoft.Json,所以,Newtonsoft.Json的變化也僅僅會影響那個新的程序集,不會對框架主體的其它部分形成任何影響。app
EventStore實現好了,接下來,咱們將其用在CustomerCreatedEventHandler中,以便將訂閱的CustomerCreatedEvent保存下來。
保存事件數據的第一步,就是在ASP.NET Core Web API的IoC容器中,將DapperEventStore註冊進去。這一步是很是簡單的,只須要在Startup.cs的ConfigureServices方法中完成便可。代碼以下:
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); services.AddTransient<IEventHandler, CustomerCreatedEventHandler>(); services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"])); services.AddSingleton<IEventBus, PassThroughEventBus>(); }
注意咱們使用的是services.AddTransient方法來註冊DapperEventStore,咱們但願應用程序在每次請求IEventStore實例時,都能得到一個新的DapperEventStore的實例。
接下來,打開CustomerCreatedEventHandler.cs文件,在構造函數中加入對IEventStore的依賴,而後修改HandleAsync方法,在該方法中使用IEventStore的實例來完成事件數據的保存。代碼以下:
public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent> { private readonly IEventStore eventStore; public CustomerCreatedEventHandler(IEventStore eventStore) { this.eventStore = eventStore; } public bool CanHandle(IEvent @event) => @event.GetType().Equals(typeof(CustomerCreatedEvent)); public async Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default) { await this.eventStore.SaveEventAsync(@event); return true; } public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default) => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false); }
OK,代碼修改完畢,測試一下。
看看數據庫中客戶信息是否已經建立:
看看數據庫中事件數據是否已經保存成功:
OK,數據所有保存成功。
然而,事情真的就這麼簡單麼?No。在追蹤了IEventStore實例(也就是DapperEventStore)的生命週期後,你會發現,問題沒有想象的那麼簡單。
在使用services.AddTransient/AddScoped/AddSingleton/AddScoped這些方法對服務進行註冊時,使用不一樣的方法也就意味着選擇了不一樣的對象生命週期。在此咱們也再也不深刻討論每種方法之間的差別,微軟官方有詳細的文檔和demo(抱歉我沒有貼出中文連接,由於機器翻譯的緣故,實在有點不堪入目),若是對ASP.NET Core的IoC容器不熟悉的話,建議先了解一下官網文章的內容。在上面我稍微提了一下,咱們是用AddTransient方法來註冊DapperEventStore的,由於咱們但願在每次使用IEventStore的時候,都會有一個新的DapperEventStore被建立。如今,讓咱們來驗證一下,看狀況是否果然如此。
追蹤程序執行的最有效的方式就是使用日誌。在咱們的場景中,使用基於文件的日誌會更合適,由於這樣咱們能夠更清楚地看到程序的執行過程以及對象的變化過程。一樣,我不打算詳細介紹如何在ASP.NET Core Web API中使用日誌,微軟官網一樣有着很是詳盡的文檔來介紹這些內容。在這裏,我簡要地將相關代碼列出來,以介紹如何啓用基於文件的日誌系統。
首先,在Web API服務的項目上,添加對Serilog.Extensions.Logging.File的nuget包,使用它可以很是方便地啓用基於文件的日誌。而後,打開Program.cs文件,添加ConfigureLogging的調用:
public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .ConfigureLogging((context, lb) => { lb.AddFile(LogFileName); }) .UseStartup<Startup>() .Build();
此處LogFileName爲本地文件系統中的日誌文件文件名,爲了不權限問題,我將日誌寫入C:\Users\<user>\appdata\local目錄下,由於個人Web API進程是由當前登陸用戶啓動的,因此寫在這個目錄下不會有權限問題。若是從此咱們把Web API host在IIS中,那麼啓動IIS服務的用戶須要對日誌所在的目錄具備寫入的權限,日誌文件才能被正確寫入,這一點是須要注意的。
好了,如今可使用日誌了,先試試看。在Startup類的構造函數中,加入ILoggerFactory參數,並在構造函數執行時獲取ILogger實例,而後在ConfigureServices調用中輸出一些內容:
public class Startup { private readonly ILogger logger; public Startup(IConfiguration configuration, ILoggerFactory loggerFactory) { Configuration = configuration; this.logger = loggerFactory.CreateLogger<Startup>(); } public IConfiguration Configuration { get; } public void ConfigureServices(IServiceCollection services) { this.logger.LogInformation("正在對服務進行配置..."); services.AddMvc(); services.AddTransient<IEventHandler, CustomerCreatedEventHandler>(); services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"])); services.AddSingleton<IEventBus, PassThroughEventBus>(); this.logger.LogInformation("服務配置完成,已註冊到IoC容器!"); } // 其它方法暫時省略 }
如今從新啓動服務,而後查看日誌文件,發現日誌能夠被正確輸出:
接下來,使用相似的方式,向PassThroughEventBus的構造函數和Dispose方法中加入一些日誌輸出,在CustomersController的Create方法中、CustomerCreatedEventHandler的構造函數和HandleAsync方法中、DapperEventStore的構造函數和Dispose方法中也加入一些日誌輸出,以便可以觀察當新的客戶信息被建立時,Web API的執行過程。限於文章篇幅,就不在此一一貼出各方法中加入日誌輸出的代碼了,你們能夠根據本文最後所提供的源代碼連接來獲取源代碼。簡單地舉個例子吧,好比對於DapperEventStore,咱們經過構造函數注入ILogger的實例:
public class DapperEventStore : IEventStore { private readonly string connectionString; private readonly ILogger logger; public DapperEventStore(string connectionString, ILogger<DapperEventStore> logger) { this.connectionString = connectionString; this.logger = logger; logger.LogInformation($"DapperEventStore構造函數調用完成。Hash Code:{this.GetHashCode()}."); } // 其它函數省略 }
這樣一來,在DapperEventStore的其它方法中,就能夠經過logger來輸出日誌了。
一樣,再次運行Web API,並經過Powershell發起一次建立客戶信息的請求,而後打開日誌文件,整個程序的執行過程基本上就一目瞭然了:
從上面的日誌內容能夠得知,當應用程序正常退出時,由IoC容器託管的PassThroughEventBus和DapperEventStore都可以被正常Dispose,目前看來沒什麼問題,由於資源能夠正常釋放。如今讓咱們從新啓動Web API,連續發送兩次建立客戶信息的請求,再次查看日誌,咱們獲得了下面的內容:
從上面的日誌內容能夠看到,在Web API的整個運行期間,CustomerCreatedEventHandler僅被構造了一次,並且在每次處理CustomerCreatedEvent事件的時候,都是使用同一個DapperEventStore實例來保存事件數據。也就是說,CustomerCreatedEventHandler和DapperEventStore在整個Web API服務的生命週期中,有且僅有一個實例,它們是Singleton的!然而,在進行系統架構的時候,咱們應該儘可能保證較短的對象生命週期,以避免由於狀態的不一致性致使不可回滾的錯誤出現,這也是架構設計中的一種最佳實踐。雖然目前咱們的DapperEventStore在程序正常退出的時候可以被Dispose掉,但若是DapperEventStore使用了非託管資源,而且非託管資源並無很好地管理本身的內存呢?長此以往,DapperEventStore就產生了內存泄漏點,慢慢地,Web API就會出現內存泄漏,系統資源將被耗盡。假如Web API被部署在雲中,應用程序監控裝置(好比AWS的Cloud Watch)就會持續報警,並強制服務斷線,整個系統的可用性就沒法獲得保障。因此,咱們更指望DapperEventStore可以正確地實現C#的Dispose模式,在Dispose方法中合理地釋放資源,而且僅在須要使用DapperEventStore時候纔去構建它,用完就及時Dispose,以保證資源的合理使用。這也就是爲何咱們使用services.AddTransient方法來註冊CustomerCreatedEventHandler以及DapperEventStore的緣由。
然而,事實卻並不是如此。究其緣由,就是由於PassThroughEventBus是單例實例,它的生命週期是整個Web API服務。而在PassThroughEventBus的構造函數中,CustomerCreatedEventHandler被做爲參數傳入,因而,PassThroughEventBus產生了對CustomerCreatedEventHandler的依賴,而連帶地也產生了對DapperEventStore的依賴。換句話說,在整個應用程序運行的過程當中,IoC框架徹底沒有理由再去建立新的CustomerCreatedEventHandler以及DapperEventStore的實例,由於事件處理器做爲強引用被註冊到PassThroughEventBus中,而PassThroughEventBus至始至終沒有變過!
Note:爲何PassThroughEventBus能夠做爲單例註冊到IoC容器中?由於它提供了無狀態的全局性的基礎結構層服務:事件總線。在PassThroughEventBus的實現中,這種全局性體現得不明顯,咱們固然能夠每一次HTTP請求都建立一個新的PassThroughEventBus來轉發事件消息並做處理。然而,在從此咱們要實現的基於RabbitMQ的事件總線中,若是咱們仍是每次HTTP請求都建立一個新的消息隊列,不只性能得不到保證,並且消息並不能路由到新建立的channel上。注意:咱們將其註冊成單例,一個很重要的依據是因爲它是無狀態的,但即便如此,咱們也要注意在應用程序退出的時候,合理Dispose掉它所佔用的資源。固然,在這裏,ASP.NET Core的IoC機制會幫咱們解決這個問題(由於我註冊了PassThroughEventBus,但我沒有顯式調用Dispose方法,我仍然能從日誌中看到「PassThroughEventBus已經被Dispose」的字樣),然而有些狀況下,ASP.NET Core不會幫咱們作這些,就須要咱們本身手工完成。
OMG!因爲構造函數注入,使得對象之間產生了依賴關係,從而影響到了它們的生命週期,這可怎麼辦?既然問題是由依賴引發的,那麼就須要想辦法解耦。
通過分析,咱們須要解除PassThroughEventBus對各類EventHandler的直接依賴。由於PassThroughEventBus是單例的,那麼由它引用的全部組件也只可能具備相同的生命週期。然而,這樣的解耦又該如何作呢?將EventHandler封裝到另外一個類中?結果仍是同樣,PassThroughEventBus總會經過某種對象關係,來間接引用到EventHandler上,形成EventHandler全局惟一。
或許,應該要有另外一套生命週期管理體系來管理EventHandler的生命週期,使得每當PassThroughEventBus須要使用EventHandler對所訂閱的事件進行處理的時候,都會經過這套體系來請求新的EventHandler實例,這樣一來,PassThroughEventBus也就再也不依賴於某個特定的實例了,而僅僅是引用了各類EventHandler在新的生命週期管理體系中的註冊信息。每當須要的時候,PassThroughEventBus都會將事件處理器的註冊信息傳給新的管理體系,而後由這套新的體系來維護事件處理器的生命週期。
經過閱讀微軟官方的eShopOnContainers案例代碼後,證明了這一想法。在案例中,有以下代碼:
// namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } }
能夠看到,高亮的這一行,經過Autofac建立了一個新的LifetimeScope,在這個Scope中,經過eventName來得到一個subscription對象(也就是EventHandler的註冊信息),進而經過scope的ResolveOptional調用來得到新的EventHandler實例。基本過程就是這樣,目前也不須要糾結IDynamicIntegrationEventHandler是幹什麼用的,也不須要糾結爲何要使用dynamic來保存事件數據。重點是,autofac的BeginLifetimeScope方法調用建立了一個新的IoC Scope,在這個Scope中解析(resolve)了新的EventHandler實例。在eShopOnContainer案例中,EventBusRabbitMQ的設計是特定的,必須依賴於Autofac做爲依賴注入框架。或許這部分設計能夠進一步改善,使得EventBusRabbitMQ不會強依賴於Autofac。
接下來,咱們會引入一個新的概念:事件處理器執行上下文,使用相似的方式來解決對象生命週期問題。
事件處理器執行上下文(Event Handler Execution Context, EHEC)爲事件處理器提供了一個完整的生命週期管理機制,在這套機制中,事件處理器及其引用的對象資源能夠被正常建立和正常銷燬。如今讓咱們一塊兒看看,如何在EdaSample的案例代碼中使用事件處理器執行上下文。
事件處理器執行上下文的接口定義以下,固然,這部分接口是放在EdaSample.Common.Events目錄下,做爲消息系統的框架代碼提供給調用方:
public interface IEventHandlerExecutionContext { void RegisterHandler<TEvent, THandler>() where TEvent : IEvent where THandler : IEventHandler<TEvent>; void RegisterHandler(Type eventType, Type handlerType); bool HandlerRegistered<TEvent, THandler>() where TEvent : IEvent where THandler : IEventHandler<TEvent>; bool HandlerRegistered(Type eventType, Type handlerType); Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default); }
這個接口主要包含三種方法:註冊事件處理器、判斷事件處理器是否已經註冊,以及對接收到的事件消息進行處理。整個結構仍是很是清晰簡單的。如今須要實現這個接口。根據上面的分析,這個接口的實現是須要依賴於IoC容器的,目前簡單起見,咱們僅使用微軟ASP.NET Core標準的Dependency Injection框架來實現,固然,也可使用Autofac,取決於你怎樣去實現上面這個接口。須要注意的是,因爲該接口的實現是須要依賴於第三方組件的(在這裏是微軟的Dependency Injection框架),所以,最佳作法是新建一個類庫,並引用EdaSample.Common程序集,並在這個新的類庫中,依賴Dependency Injection框架來實現這個接口。
如下是基於Microsoft.Extensions.DependencyInjection框架來實現的事件處理器執行上下文完整代碼,這裏有個兼容性問題,就是構造函數的第二個參數:serviceProviderFactory。在Microsoft.Extensions.DependencyInjection框架2.0版本以前,IServiceCollection.BuildServiceProvider方法的返回類型是IServiceProvider,但從2.0開始,它的返回類型已經從IServiceProvider接口,變成了ServiceProvider類。這裏引出了框架設計的另外一個原則,就是依賴較低版本的.NET Core,以便得到更好的兼容性。若是咱們的EdaSample是使用.NET Core 1.1開發的,那麼當下面這個類被直接用在ASP.NET Core 2.0的項目中時,若是不經過構造函數參數傳入ServiceProvider建立委託,而是直接在代碼中使用registry.BuildServiceProvider調用,就會出現異常。
public class EventHandlerExecutionContext : IEventHandlerExecutionContext { private readonly IServiceCollection registry; private readonly Func<IServiceCollection, IServiceProvider> serviceProviderFactory; private readonly ConcurrentDictionary<Type, List<Type>> registrations = new ConcurrentDictionary<Type, List<Type>>(); public EventHandlerExecutionContext(IServiceCollection registry, Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null) { this.registry = registry; this.serviceProviderFactory = serviceProviderFactory ?? (sc => registry.BuildServiceProvider()); } public async Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken)) { var eventType = @event.GetType(); if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypes) && handlerTypes?.Count > 0) { var serviceProvider = this.serviceProviderFactory(this.registry); using (var childScope = serviceProvider.CreateScope()) { foreach(var handlerType in handlerTypes) { var handler = (IEventHandler)childScope.ServiceProvider.GetService(handlerType); if (handler.CanHandle(@event)) { await handler.HandleAsync(@event, cancellationToken); } } } } } public bool HandlerRegistered<TEvent, THandler>() where TEvent : IEvent where THandler : IEventHandler<TEvent> => this.HandlerRegistered(typeof(TEvent), typeof(THandler)); public bool HandlerRegistered(Type eventType, Type handlerType) { if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypeList)) { return handlerTypeList != null && handlerTypeList.Contains(handlerType); } return false; } public void RegisterHandler<TEvent, THandler>() where TEvent : IEvent where THandler : IEventHandler<TEvent> => this.RegisterHandler(typeof(TEvent), typeof(THandler)); public void RegisterHandler(Type eventType, Type handlerType) { Utils.ConcurrentDictionarySafeRegister(eventType, handlerType, this.registrations); this.registry.AddTransient(handlerType); } }
好了,事件處理器執行上下文就定義好了,接下來就是在咱們的ASP.NET Core Web API中使用。爲了使用IEventHandlerExecutionContext,咱們須要修改事件訂閱器的接口定義,並相應地修改PassThroughEventBus以及Startup.cs。代碼以下:
// IEventSubscriber public interface IEventSubscriber : IDisposable { void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler<TEvent>; } // PassThroughEventBus public sealed class PassThroughEventBus : IEventBus { private readonly EventQueue eventQueue = new EventQueue(); private readonly ILogger logger; private readonly IEventHandlerExecutionContext context; public PassThroughEventBus(IEventHandlerExecutionContext context, ILogger<PassThroughEventBus> logger) { this.context = context; this.logger = logger; logger.LogInformation($"PassThroughEventBus構造函數調用完成。Hash Code:{this.GetHashCode()}."); eventQueue.EventPushed += EventQueue_EventPushed; } private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e) => await this.context.HandleEventAsync(e.Event); public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent => Task.Factory.StartNew(() => eventQueue.Push(@event)); public void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler<TEvent> { if (!this.context.HandlerRegistered<TEvent, TEventHandler>()) { this.context.RegisterHandler<TEvent, TEventHandler>(); } } #region IDisposable Support private bool disposedValue = false; // To detect redundant calls void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { this.eventQueue.EventPushed -= EventQueue_EventPushed; logger.LogInformation($"PassThroughEventBus已經被Dispose。Hash Code:{this.GetHashCode()}."); } disposedValue = true; } } public void Dispose() => Dispose(true); #endregion } // Startup.cs public void ConfigureServices(IServiceCollection services) { this.logger.LogInformation("正在對服務進行配置..."); services.AddMvc(); services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"], serviceProvider.GetRequiredService<ILogger<DapperEventStore>>())); var eventHandlerExecutionContext = new EventHandlerExecutionContext(services, sc => sc.BuildServiceProvider()); services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext); services.AddSingleton<IEventBus, PassThroughEventBus>(); this.logger.LogInformation("服務配置完成,已註冊到IoC容器!"); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env) { var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<CustomerCreatedEvent, CustomerCreatedEventHandler>(); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseMvc(); }
代碼修改完成後,再次執行Web API,併發送兩次(或屢次)建立客戶的請求,而後查看日誌,咱們發現,每次請求都會使用新的事件處理器去處理接收到的消息,在保存消息數據時,會使用新的DapperEventStore來保存數據,而保存完成後,會及時將DapperEventStore dispose掉:
本文篇幅比較長,或許你沒有太多耐心將文章讀完。但我儘可能將問題分析清楚,但願提供給讀者的內容是詳細的、有理有據的。文章中黑體部分是在設計過程當中的一些思考和須要注意的地方,但願可以給讀者在工做和學習之中帶來啓發和收穫。總而言之,對象生命週期的管理,在服務端應用程序中是很是重要的,須要引發足夠的重視。在下文中,咱們打算逐步擺脫PassThroughEventBus,基於RabbitMQ來實現消息總線的基礎結構。
本系列文章的源代碼在https://github.com/daxnet/edasample這個Github Repo裏,經過不一樣的release tag來區分針對不一樣章節的源代碼。本文的源代碼請參考chapter_2這個tag,以下: