在上文中,咱們討論了事件處理器中對象生命週期的問題,在進入新的討論以前,首先讓咱們總結一下,咱們已經實現了哪些內容。下面的類圖描述了咱們已經實現的組件及其之間的關係,貌似系統已經變得愈來愈複雜了。html
其中綠色的部分就是上文中新實現的部分,包括一個簡單的Event Store,一個事件處理器執行上下文的接口,以及一個基於ASP.NET Core依賴注入框架的執行上下文的實現。接下來,咱們打算淘汰PassThroughEventBus,而後基於RabbitMQ實現一套新的事件總線。git
根據前面的結論,事件總線的執行須要依賴於事件處理器執行上下文,也就是上面類圖中PassThroughEventBus對於IEventHandlerExecutionContext的引用。更具體些,是在事件總線訂閱某種類型的事件時,須要將事件處理器註冊到IEventHandlerExecutionContext中。那麼在實現RabbitMQ時,也會有着相似的設計需求,即RabbitMQEventBus也須要依賴IEventHandlerExecutionContext接口,以保證事件處理器生命週期的合理性。github
爲此,咱們新建一個基類:BaseEventBus,並將這部分公共的代碼提取出來,須要注意如下幾點:sql
BaseEventBus的代碼以下:shell
public abstract class BaseEventBus : IEventBus { protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext; protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext) { this.eventHandlerExecutionContext = eventHandlerExecutionContext; } public abstract Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent; public abstract void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler<TEvent>; // Disposable接口實現代碼省略 }
在上面的代碼中,PublishAsync和Subscribe方法是抽象方法,以便子類根據不一樣的須要來實現。數據庫
接下來就是調整PassThroughEventBus,使其繼承於BaseEventBus:json
public sealed class PassThroughEventBus : BaseEventBus { private readonly EventQueue eventQueue = new EventQueue(); private readonly ILogger logger; public PassThroughEventBus(IEventHandlerExecutionContext context, ILogger<PassThroughEventBus> logger) : base(context) { this.logger = logger; logger.LogInformation($"PassThroughEventBus構造函數調用完成。Hash Code:{this.GetHashCode()}."); eventQueue.EventPushed += EventQueue_EventPushed; } private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e) => await this.eventHandlerExecutionContext.HandleEventAsync(e.Event); public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) { return Task.Factory.StartNew(() => eventQueue.Push(@event)); } public override void Subscribe<TEvent, TEventHandler>() { if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>()) { this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>(); } } // Disposable接口實現代碼省略 }
代碼都很簡單,也就很少作說明了,接下來,咱們開始實現RabbitMQEventBus。架構
首先須要新建一個.NET Standard 2.0的項目,使用.NET Standard 2.0的項目模板所建立的項目,能夠同時被.NET Framework 4.6.1或者.NET Core 2.0的應用程序所引用。建立新的類庫項目的目的,是由於RabbitMQEventBus的實現須要依賴RabbitMQ C#開發庫這個外部引用。所以,爲了保證框架核心的純淨和穩定,須要在新的類庫項目中實現RabbitMQEventBus。app
Note:對於RabbitMQ及其C#庫的介紹,本文就再也不涉及了,網上有不少資料和文檔,博客園有不少朋友在這方面都有使用經驗分享,RabbitMQ官方文檔也寫得很是詳細,固然是英文版的,若是英語比較好的話,建議參考官方文檔。框架
如下就是在EdaSample案例中,RabbitMQEventBus的實現,咱們先讀一讀代碼,再對這部分代碼作些分析。
public class RabbitMQEventBus : BaseEventBus { private readonly IConnectionFactory connectionFactory; private readonly IConnection connection; private readonly IModel channel; private readonly string exchangeName; private readonly string exchangeType; private readonly string queueName; private readonly bool autoAck; private readonly ILogger logger; private bool disposed; public RabbitMQEventBus(IConnectionFactory connectionFactory, ILogger<RabbitMQEventBus> logger, IEventHandlerExecutionContext context, string exchangeName, string exchangeType = ExchangeType.Fanout, string queueName = null, bool autoAck = false) : base(context) { this.connectionFactory = connectionFactory; this.logger = logger; this.connection = this.connectionFactory.CreateConnection(); this.channel = this.connection.CreateModel(); this.exchangeType = exchangeType; this.exchangeName = exchangeName; this.autoAck = autoAck; this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType); this.queueName = this.InitializeEventConsumer(queueName); logger.LogInformation($"RabbitMQEventBus構造函數調用完成。Hash Code:{this.GetHashCode()}."); } public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken)) { var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); var eventBody = Encoding.UTF8.GetBytes(json); channel.BasicPublish(this.exchangeName, @event.GetType().FullName, null, eventBody); return Task.CompletedTask; } public override void Subscribe<TEvent, TEventHandler>() { if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>()) { this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>(); this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName); } } protected override void Dispose(bool disposing) { if (!disposed) { if (disposing) { this.channel.Dispose(); this.connection.Dispose(); logger.LogInformation($"RabbitMQEventBus已經被Dispose。Hash Code:{this.GetHashCode()}."); } disposed = true; base.Dispose(disposing); } } private string InitializeEventConsumer(string queue) { var localQueueName = queue; if (string.IsNullOrEmpty(localQueueName)) { localQueueName = this.channel.QueueDeclare().QueueName; } else { this.channel.QueueDeclare(localQueueName, true, false, false, null); } var consumer = new EventingBasicConsumer(this.channel); consumer.Received += async (model, eventArgument) => { var eventBody = eventArgument.Body; var json = Encoding.UTF8.GetString(eventBody); var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); await this.eventHandlerExecutionContext.HandleEventAsync(@event); if (!autoAck) { channel.BasicAck(eventArgument.DeliveryTag, false); } }; this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer); return localQueueName; } }
閱讀上面的代碼,須要注意如下幾點:
在Customer服務中,使用RabbitMQEventBus就很是簡單了,只須要引用RabbitMQEventBus的程序集,而後在Startup.cs文件的ConfigureServices方法中,替換PassThroughEventBus的使用便可:
public void ConfigureServices(IServiceCollection services) { this.logger.LogInformation("正在對服務進行配置..."); services.AddMvc(); services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"], serviceProvider.GetRequiredService<ILogger<DapperEventStore>>())); var eventHandlerExecutionContext = new EventHandlerExecutionContext(services, sc => sc.BuildServiceProvider()); services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext); // services.AddSingleton<IEventBus, PassThroughEventBus>(); var connectionFactory = new ConnectionFactory { HostName = "localhost" }; services.AddSingleton<IEventBus>(sp => new RabbitMQEventBus(connectionFactory, sp.GetRequiredService<ILogger<RabbitMQEventBus>>(), sp.GetRequiredService<IEventHandlerExecutionContext>(), RMQ_EXCHANGE, queueName: RMQ_QUEUE)); this.logger.LogInformation("服務配置完成,已註冊到IoC容器!"); }
Note:一種更好的作法是經過配置文件來配置IoC容器,在曾經的Microsoft Patterns and Practices Enterprise Library Unity Container中,使用配置文件是很方便的。這樣只須要Customer服務可以經過配置文件來配置IoC容器,同時只須要讓Customer服務依賴(注意,不是程序集引用)於不一樣的事件總線的實現便可,無需對Customer服務從新編譯。
下面來驗證一下效果。首先確保RabbitMQ已經配置並啓動穩當,我是安裝在本地機器上,使用默認安裝。首先啓動ASP.NET Core Web API,而後經過Powershell發起兩次建立Customer的請求:
查看一下數據庫是否更新正常:
並檢查一下日誌信息:
RabbitMQ中Exchange的信息:
本文提供了一種RabbitMQEventBus的實現,目前來講是夠用的,並且這種實現是可使用在實際項目當中的。在實際使用中,或許也會碰到一些與RabbitMQ自己有關的問題,這就須要具體問題具體分析了。此外,本文沒有涉及事件消息丟失、重發而後保證最終一致性的問題,這些內容會在後面討論。從下文開始,咱們着手逐步實現CQRS架構的領域事件和事件存儲部分。
本系列文章的源代碼在https://github.com/daxnet/edasample這個Github Repo裏,經過不一樣的release tag來區分針對不一樣章節的源代碼。本文的源代碼請參考chapter_3這個tag,以下:
歡迎訪問個人博客新站:http://sunnycoding.net。