最近在看微軟eShopOnContainers 項目,看到事件總線以爲不錯,和你們分享一下異步
發佈訂閱模式可讓應用程序組件之間解耦,這是咱們使用這種模式最重要的理由之一,若是你徹底不知道這個東西,建議你先經過搜索引擎瞭解一下這種模式,網上的資料不少這裏就再也不贅述了。async
eShop中的EventBus就是基於這種模式的發佈/訂閱。
發佈訂閱模式核心概念有三個:發佈者、訂閱者、調度中心,這些概念在消息隊列中就是生產者、消費者、MQ實例。ide
在eShop中有兩個EventBus的實現:函數
EventBusRabbitMQ
EventBusServiceBus
。IEventBus
開始先來看一看,全部EventBus的接口IEventBus
ui
public interface IEventBus { void Publish(IntegrationEvent @event); void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; }
嗯,乍一看看是有點眼暈的,仔細看它的核心功能只有三個:搜索引擎
這對應着發佈訂閱模式的基本概念,不過對於事件總線的接口添加了許多約束:線程
IntegrationEvent
及其子類IIntegrationEventHandler
的實現類Ok,看到這裏先不要管Dynamic
相關的方法,而後記住這個兩個關鍵點:設計
IntegrationEvent
IIntegrationEventHandler<T>
且T
是IntegrationEvent
子類另外,看下 IntegrationEvent
有什麼日誌
public class IntegrationEvent { public IntegrationEvent() { Id = Guid.NewGuid(); CreationDate = DateTime.UtcNow; } public Guid Id { get; } public DateTime CreationDate { get; } }
public interface IEventBusSubscriptionsManager { bool IsEmpty { get; } event EventHandler<string> OnEventRemoved; void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void RemoveSubscription<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; void RemoveDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent; bool HasSubscriptionsForEvent(string eventName); Type GetEventTypeByName(string eventName); void Clear(); IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent; IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName); string GetEventKey<T>(); }
這個接口看起來稍顯複雜些,咱們來簡化下看看:code
public interface IEventBusSubscriptionsManager { void AddSubscription<T, TH>() void RemoveSubscription<T, TH>() IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() }
最終,這三個方法就是咱們要關注的,添加訂閱、移除訂閱、獲取指定事件的訂閱信息。
SubscriptionInfo
是什麼?public bool IsDynamic { get; } public Type HandlerType{ get; }
SubscriptionInfo
中只有兩個信息,這是否是一個Dynamic類型的Event以及這個Event所對應的處理器的類型。
這是你可能會有另外一個疑問:
IEventBus
有什麼關係?IEventBusSubscriptionsManager
含有更多功能:查看是否有訂閱,獲取事件的Type,獲取事件的處理器等等IEventBusSubscriptionsManager
由IEventBus
使用,在RabbitMq和ServiceBus的實現中,都使用Manager去存儲事件的信息,例以下面的代碼:
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { // 查詢事件的全名 var eventName = _subsManager.GetEventKey<T>(); //向mq添加註冊 DoInternalSubscription(eventName); // 向manager添加訂閱 _subsManager.AddSubscription<T, TH>(); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } }
查詢事件的名字是manager作的,訂閱的時候是先向mq添加訂閱,以後又加到manager中,manager管理着訂閱的基本信息。
另一個重要功能是獲取事件的處理器信息,在rabbit mq的實現中,ProcessEvent方法中用manager獲取了事件的處理器,再用依賴注入得到處理器的實例,反射調用Handle
方法處理事件信息:
private async Task ProcessEvent(string eventName, string message) { // 從manager查詢信息 if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { // 從manager獲取處理器 var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { // Di + 反射調用,處理事件(兩個都是,只是針對是不是dynamic作了不一樣的處理) if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } }
在eShop中只有一個實現就是InMemoryEventBusSubscriptionsManager
類
這個類中有兩個重要的字段
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; private readonly List<Type> _eventTypes;
他們分別存儲了事件列表和事件處理器信息詞典
接下來就是實現一個
了
咱們要作什麼呢?IEventBusSubscriptionsManager 已經有了InMemory的實現了,咱們能夠直接拿來用,因此咱們只須要本身實現一個EventBus就行了
先貼出最終代碼:
public class InMemoryEventBus : IEventBus { private readonly IServiceProvider _provider; private readonly ILogger<InMemoryEventBus> _logger; private readonly ISubscriptionsManager _manager; private readonly IList<IntegrationEvent> _events; public InMemoryEventBus( IServiceProvider provider, ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager) { _provider = provider; _logger = logger; _manager = manager; } public void Publish(IntegrationEvent e) { var eventType = e.GetType(); var handlers = _manager.GetHandlersForEvent(eventType.FullName); foreach (var handlerInfo in handlers) { var handler = _provider.GetService(handlerInfo.HandlerType); var method = handlerInfo.HandlerType.GetMethod("Handle"); method.Invoke(handler, new object[] { e }); } } public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.AddSubscription<T, TH>(); } public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { throw new NotImplementedException(); } public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.RemoveSubscription<T, TH>(); } public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { throw new NotImplementedException(); } }
首先構造函數中聲明咱們要使用的東西:
public InMemoryEventBus( IServiceProvider provider, ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager) { _provider = provider; _logger = logger; _manager = manager; }
這裏要注意的就是IServiceProvider provider
這是 DI容器,當咱們在切實處理事件的時候咱們選擇從DI獲取處理器的實例,而不是反射建立,這要作的好處在於,處理器能夠依賴於其它東西,而且能夠是單例的
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.AddSubscription<T, TH>(); } public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.RemoveSubscription<T, TH>(); }
訂閱和取消訂閱很簡單,由於咱們是InMemory的因此只調用了manager的方法。
接下來就是最重要的Publish方法,實現Publish有兩種方式:
爲了簡單起見,咱們先寫個簡單易懂的同步的
public void Publish(IntegrationEvent e) { // 首先要拿到集成事件的Type信息 var eventType = e.GetType(); // 獲取屬於這個事件的處理器列表,可能有不少,注意得到的是SubscriptionInfo var handlers = _manager.GetHandlersForEvent(eventType.FullName); // 不解釋循環 foreach (var handlerInfo in handlers) { // 從DI中獲取類型的實例 var handler = _provider.GetService(handlerInfo.HandlerType); // 拿到Handle方法 var method = handlerInfo.HandlerType.GetMethod("Handle"); // 調用方法 method.Invoke(handler, new object[] { e }); } }
OK,咱們的InMemoryEventBus就寫好了!
要實踐這個InMemoryEventBus,那麼還須要一個IntegrationEvent
的子類,和一個IIntegrationEventHandler<T>
的實現類,這些都不難,例如咱們作一個添加用戶的事件,A在添加用戶後,發起一個事件並將新用戶的名字做爲事件數據,B去訂閱事件,並在本身的處理器中處理名字信息。
思路是這樣的:
AddUserEvent:IntegrationEvent
,裏面有一個UserId和一個UserName
。AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>
,在Handle
方法中輸出UserId和Name到日誌。註冊DI,你要註冊下面這些服務:
IEventBus=>InMemoryEventBus ISubscriptionsManager=>InMemorySubscriptionsManager AddUserEventHandler=>AddUserEventHandler
寫一個Api接口或是什麼,調用IEventBus的Publish方法,new 一個新的AddUserEvent
做爲參數傳進去。
OK!到這裏一個切實可用的InMemoryEventBus就可使用了。