事件總線這個概念對你來講可能很陌生,但提到觀察者(發佈-訂閱)模式,你也許就很熟悉。事件總線是對發佈-訂閱模式的一種實現。它是一種集中式事件處理機制,容許不一樣的組件之間進行彼此通訊而又不須要相互依賴,達到一種解耦的目的。
從上圖可知,核心就4個角色:html
實現事件總線的關鍵是:算法
以上源於我在事件總線知多少(1)中對於EventBus的分析和簡單總結。基於以上的簡單認知,咱們來梳理下eShopOnContainers中EventBus的實現機制·。數據庫
咱們直接以上帝視角,來看下其實現機制,上類圖。
json
咱們知道事件的本質是:事件源+事件處理。
針對事件源,其定義了IntegrationEvent
基類來處理。默認僅包含一個guid和一個建立日期,具體的事件能夠經過繼承該類,來完善事件的描述信息。服務器
這裏有必要解釋下Integration Event(集成事件)。由於在微服務中事件的消費再也不侷限於當前領域內,而是多個微服務可能共享同一個事件,因此這裏要和DDD中的領域事件區分開來。集成事件可用於跨多個微服務或外部系統同步領域狀態,這是經過在微服務以外發布集成事件來實現的。app
針對事件處理,其本質是對事件的反應,一個事件可引發多個反應,因此,它們之間是一對多的關係。
eShopOnContainers中抽象了兩個事件處理的接口:異步
兩者都定義了一個Handle
方法用於響應事件。不一樣之處在於方法參數的類型:
第一個接受的是一個強類型的IntegrationEvent
。第二個接收的是一個動態類型dynamic
。
爲何要單獨提供一個事件源爲dynamic
類型的接口呢?
不是每個事件源都須要詳細的事件信息,因此一個強類型的參數約束就沒有必要,經過dynamic
能夠簡化事件源的構建,更趨於靈活。async
有了事件源和事件處理,接下來就是事件的註冊和訂閱了。爲了方便進行訂閱管理,系統提供了額外的一層抽象IEventBusSubscriptionsManager
,其用於維護事件的訂閱和註銷,以及訂閱信息的持久化。其默認的實現InMemoryEventBusSubscriptionsManager
就是使用內存進行存儲事件源和事件處理的映射字典。
從類圖中看InMemoryEventBusSubscriptionsManager
中定義了一個內部類SubscriptionInfo
,其主要用於表示事件訂閱方的訂閱類型和事件處理的類型。分佈式
咱們來近距離看下InMemoryEventBusSubscriptionsManager
的定義:函數
//InMemoryEventBusSubscriptionsManager.cs //定義的事件名稱和事件訂閱的字典映射(1:N) private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; //保存全部的事件處理類型 private readonly List<Type> _eventTypes; //定義事件移除後事件 public event EventHandler<string> OnEventRemoved; //構造函數初始化 public InMemoryEventBusSubscriptionsManager() { _handlers = new Dictionary<string, List<SubscriptionInfo>>(); _eventTypes = new List<Type>(); } //添加動態類型事件訂閱(須要手動指定事件名稱) public void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { DoAddSubscription(typeof(TH), eventName, isDynamic: true); } //添增強類型事件訂閱(事件名稱爲事件源類型) public void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = GetEventKey<T>(); DoAddSubscription(typeof(TH), eventName, isDynamic: false); if (!_eventTypes.Contains(typeof(T))) { _eventTypes.Add(typeof(T)); } } //移除動態類型事件訂閱 public void RemoveDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName); DoRemoveHandler(eventName, handlerToRemove); } //移除強類型事件訂閱 public void RemoveSubscription<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent { var handlerToRemove = FindSubscriptionToRemove<T, TH>(); var eventName = GetEventKey<T>(); DoRemoveHandler(eventName, handlerToRemove); }
添加了這麼一層抽象,即符合了單一職責原則,又完成了代碼重用。IEventBus
的具體實現經過注入對IEventBusSubscriptionsManager
的依賴,便可完成訂閱管理。
你這裏可能會好奇,爲何要暴露一個OnEventRemoved
事件?這裏先按住不表,留給你們思考。
微服務的一大特色就是分佈式。若須要作到動一發而牽全身,就須要一個持久化的集中式的EventBus。這就要求各個微服務內部雖然分別持有一個對EventBus的引用,但它們背後都必須鏈接着同一個用於持久化的數據源。
那你可能會說:那這個很好實現,使用同一個數據庫就行了。爲何非要用個什麼RabbitMQ?問的好!這就要去探討下RabbitMQ是爲了解決什麼問題了。
RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集羣和分佈式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。
而關於RabbitMQ的具體使用,這裏再也不展開,可參考RabbitMQ知多少。
集成RabbitMQ的關鍵在於理解其對消息的處理機制:
基於以上的認知,咱們再與EventBusRabbitMQ
源碼親密接觸。
public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "eshop_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger<EventBusRabbitMQ> _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly ILifetimeScope _autofac; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly int _retryCount; private IModel _consumerChannel; private string _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _retryCount = retryCount; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; } private void SubsManager_OnEventRemoved(object sender, string eventName) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); if (_subsManager.IsEmpty) { _queueName = string.Empty; _consumerChannel.Close(); } } } //.... }
構造函數主要作了如下幾件事:
IRabbitMQPersistentConnection
以便鏈接到對應的Broke。IEventBusSubscriptionsManager
,進行訂閱管理。OnEventRemoved
事件,取消隊列的綁定。(這也就回答了上面遺留的問題)private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } }
從上面咱們能夠看到事件的訂閱主要是進行rabbitmq隊列的綁定。以eventName爲routingKey進行路由。
public void Publish(IntegrationEvent @event) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var policy = RetryPolicy.Handle<BrokerUnreachableException>() .Or<SocketException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); using (var channel = _persistentConnection.CreateModel()) { var eventName = @event.GetType() .Name; channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); policy.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; // persistent channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory:true, basicProperties: properties, body: body); }); } }
這裏面有如下幾個知識點:
DeliveryMode = 2
進行消息持久化mandatory: true
告知服務器當根據指定的routingKey和消息找不到對應的隊列時,直接返回消息給生產者。private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false,autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var eventName = ea.RoutingKey; var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(eventName, message); channel.BasicAck(ea.DeliveryTag, multiple:false); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; }
以上代碼演示瞭如建立消費信道進行消息處理的步驟:
Received
事件委託處理消息接收事件channel.BasicConsume
啓動監聽private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } }
以上代碼主要包括如下知識點:
以上介紹了EventBus的實現要點,那各個微服務是如何集成呢?
1. 註冊IRabbitMQPersistentConnection
服務用於設置RabbitMQ鏈接
services.AddSingleton<IRabbitMQPersistentConnection>(sp => { var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); //... return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); });
2. 註冊單例模式的IEventBusSubscriptionsManager
用於訂閱管理
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
3. 註冊單例模式的EventBusRabbitMQ
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp => { var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var retryCount = 5; if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { retryCount = int.Parse(Configuration["EventBusRetryCount"]); } return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); });
完成了以上集成,就能夠在代碼中使用事件總線,進行事件的發佈和訂閱。
4. 發佈事件
若要發佈事件,須要根據是否須要事件源(參數傳遞)來決定是否須要申明相應的集成事件,須要則繼承自IntegrationEvent
進行申明。而後在須要發佈事件的地方進行實例化,並經過調用IEventBus
的實例的Publish
方法進行發佈。
//事件源的聲明 public class ProductPriceChangedIntegrationEvent : IntegrationEvent { public int ProductId { get; private set; } public decimal NewPrice { get; private set; } public decimal OldPrice { get; private set; } public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice) { ProductId = productId; NewPrice = newPrice; OldPrice = oldPrice; } }
//聲明事件源 var priceChangedEvent = new ProductPriceChangedIntegrationEvent(1001, 200.00, 169.00) //發佈事件 _eventBus.Publish(priceChangedEvent)
5. 訂閱事件
若要訂閱事件,須要根據須要處理的事件類型,申明對應的事件處理類,繼承自IIntegrationEventHandler
或IDynamicIntegrationEventHandler
,並註冊到IOC容器。而後建立IEventBus
的實例調用Subscribe
方法進行顯式訂閱。
//定義事件處理 public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent> { public async Task Handle(ProductPriceChangedIntegrationEvent @event) { //do something } }
//事件訂閱 var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();
6. 跨服務事件消費
在微服務中跨服務事件消費很廣泛,這裏有一點須要說明的是若是訂閱的強類型事件非當前微服務中訂閱的事件,須要複製定義訂閱的事件類型。換句話說,好比在A服務發佈的TestEvent
事件,B服務訂閱該事件,一樣須要在B服務複製定義一個TestEvent
。
這也是微服務的一個通病,重複代碼。
經過一步一步的源碼梳理,咱們發現eShopOnContainers中事件總線的整體實現思路與引言部分的介紹十分契合。因此對於事件總線,不要以爲高深,明確參與的幾個角色以及基本的實現步驟,那麼不論是基於RabbitMQ實現也好仍是基於Azure Service Bus也好,萬變不離其宗!