看eShopOnContainers學一個EventBus

最近在看微軟eShopOnContainers 項目,看到事件總線以爲不錯,和你們分享一下異步

看完此文你將得到什麼?

  1. eShop中是如何設計事件總線的
  2. 實現一個InMemory事件總線eShop中是沒有InMemory實現的,這算是一個小小小的挑戰

發佈訂閱模式

發佈訂閱模式可讓應用程序組件之間解耦,這是咱們使用這種模式最重要的理由之一,若是你徹底不知道這個東西,建議你先經過搜索引擎瞭解一下這種模式,網上的資料不少這裏就再也不贅述了。async

eShop中的EventBus就是基於這種模式的發佈/訂閱
發佈訂閱模式核心概念有三個:發佈者、訂閱者、調度中心,這些概念在消息隊列中就是生產者、消費者、MQ實例ide

在eShop中有兩個EventBus的實現:函數

  • 基於RabbitMq的EventBusRabbitMQ
  • 基於AzureServiceBus的EventBusServiceBus

IEventBus開始

先來看一看,全部EventBus的接口IEventBusui

public interface IEventBus
{
    void Publish(IntegrationEvent @event);

    void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>;

    void SubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    void UnsubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    void Unsubscribe<T, TH>()
        where TH : IIntegrationEventHandler<T>
        where T : IntegrationEvent;
}

嗯,乍一看看是有點眼暈的,仔細看它的核心功能只有三個:搜索引擎

  1. Publish 發佈
  2. Subscribe 訂閱
  3. Unsubscribe 取消訂閱

這對應着發佈訂閱模式的基本概念,不過對於事件總線的接口添加了許多約束:線程

  1. 發佈的內容(消息)必須是IntegrationEvent及其子類
  2. 訂閱事件必須指明要訂閱事件的類型,並附帶處理器類型
  3. 處理器必須是IIntegrationEventHandler的實現類

Ok,看到這裏先不要管Dynamic相關的方法,而後記住這個兩個關鍵點:設計

  1. 事件必須繼承IntegrationEvent
  2. 處理器必須實現IIntegrationEventHandler<T>TIntegrationEvent子類

另外,看下 IntegrationEvent有什麼日誌

public class IntegrationEvent
{
    public IntegrationEvent()
    {
        Id = Guid.NewGuid();
        CreationDate = DateTime.UtcNow;
    }

    public Guid Id  { get; }
    public DateTime CreationDate { get; }
}

IEventBusSubscriptionsManager是什麼

public interface IEventBusSubscriptionsManager
{
    bool IsEmpty { get; }
    event EventHandler<string> OnEventRemoved;
    void AddDynamicSubscription<TH>(string eventName)
       where TH : IDynamicIntegrationEventHandler;

    void AddSubscription<T, TH>()
       where T : IntegrationEvent
       where TH : IIntegrationEventHandler<T>;

    void RemoveSubscription<T, TH>()
         where TH : IIntegrationEventHandler<T>
         where T : IntegrationEvent;
    void RemoveDynamicSubscription<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler;

    bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
    bool HasSubscriptionsForEvent(string eventName);
    Type GetEventTypeByName(string eventName);
    void Clear();
    IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
    IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
    string GetEventKey<T>();
}

這個接口看起來稍顯複雜些,咱們來簡化下看看:code

public interface IEventBusSubscriptionsManager
{
    void AddSubscription<T, TH>()
    void RemoveSubscription<T, TH>()
    IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() 
}

最終,這三個方法就是咱們要關注的,添加訂閱、移除訂閱、獲取指定事件的訂閱信息。

SubscriptionInfo是什麼?

public bool IsDynamic { get; }
public Type HandlerType{ get; }

SubscriptionInfo中只有兩個信息,這是否是一個Dynamic類型的Event以及這個Event所對應的處理器的類型。

這是你可能會有另外一個疑問:

這個和IEventBus有什麼關係?

  1. IEventBusSubscriptionsManager含有更多功能:查看是否有訂閱,獲取事件的Type,獲取事件的處理器等等
  2. IEventBusSubscriptionsManagerIEventBus使用,在RabbitMq和ServiceBus的實現中,都使用Manager去存儲事件的信息,例以下面的代碼:

    public void Subscribe<T, TH>()
         where T : IntegrationEvent
         where TH : IIntegrationEventHandler<T>
     {
         // 查詢事件的全名
         var eventName = _subsManager.GetEventKey<T>();
    
         //向mq添加註冊
         DoInternalSubscription(eventName);
    
         // 向manager添加訂閱
         _subsManager.AddSubscription<T, TH>();
     }
    
     private void DoInternalSubscription(string eventName)
     {
         var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
         if (!containsKey)
         {
             if (!_persistentConnection.IsConnected)
             {
                 _persistentConnection.TryConnect();
             }
    
             using (var channel = _persistentConnection.CreateModel())
             {
                 channel.QueueBind(queue: _queueName,
                                     exchange: BROKER_NAME,
                                     routingKey: eventName);
             }
         }
     }

    查詢事件的名字是manager作的,訂閱的時候是先向mq添加訂閱,以後又加到manager中,manager管理着訂閱的基本信息。

另一個重要功能是獲取事件的處理器信息,在rabbit mq的實現中,ProcessEvent方法中用manager獲取了事件的處理器,再用依賴注入得到處理器的實例,反射調用Handle方法處理事件信息:

private async Task ProcessEvent(string eventName, string message)
    {
        // 從manager查詢信息
        if (_subsManager.HasSubscriptionsForEvent(eventName))
        {
            using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
            {

                // 從manager獲取處理器
                var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                foreach (var subscription in subscriptions)
                {

                    // Di + 反射調用,處理事件(兩個都是,只是針對是不是dynamic作了不一樣的處理)
                    if (subscription.IsDynamic)
                    { 
                        var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                        dynamic eventData = JObject.Parse(message);
                        await handler.Handle(eventData);
                    }
                    else
                    {
                        var eventType = _subsManager.GetEventTypeByName(eventName);
                        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                        var handler = scope.ResolveOptional(subscription.HandlerType);
                        var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                        await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                    }
                }
            }
        }
    }

IEventBusSubscriptionsManager的默認實現

在eShop中只有一個實現就是InMemoryEventBusSubscriptionsManager

這個類中有兩個重要的字段

private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
    private readonly List<Type> _eventTypes;

他們分別存儲了事件列表和事件處理器信息詞典

接下來就是實現一個

基於內存的事件總線

咱們要作什麼呢?IEventBusSubscriptionsManager 已經有了InMemory的實現了,咱們能夠直接拿來用,因此咱們只須要本身實現一個EventBus就行了

先貼出最終代碼:

public class InMemoryEventBus : IEventBus
{
    private readonly IServiceProvider _provider;
    private readonly ILogger<InMemoryEventBus> _logger;
    private readonly ISubscriptionsManager _manager;
    private readonly IList<IntegrationEvent> _events;
    public InMemoryEventBus(
        IServiceProvider provider,
        ILogger<InMemoryEventBus> logger, 
        ISubscriptionsManager manager)
    {
        _provider = provider;
        _logger = logger;
        _manager = manager;
    }

    public void Publish(IntegrationEvent e)
    {

        var eventType = e.GetType();
        var handlers = _manager.GetHandlersForEvent(eventType.FullName);

        foreach (var handlerInfo in handlers)
        {
            var handler = _provider.GetService(handlerInfo.HandlerType);

            var method = handlerInfo.HandlerType.GetMethod("Handle");

            method.Invoke(handler, new object[] { e });
        }
    }

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {

        _manager.AddSubscription<T, TH>();

    }

    public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
    {
        throw new NotImplementedException();
    }

    public void Unsubscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        _manager.RemoveSubscription<T, TH>();
    }

    public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
    {
        throw new NotImplementedException();
    }
}

首先構造函數中聲明咱們要使用的東西:

public InMemoryEventBus(
    IServiceProvider provider,
    ILogger<InMemoryEventBus> logger, 
    ISubscriptionsManager manager)
{
    _provider = provider;
    _logger = logger;
    _manager = manager;
}

這裏要注意的就是IServiceProvider provider這是 DI容器,當咱們在切實處理事件的時候咱們選擇從DI獲取處理器的實例,而不是反射建立,這要作的好處在於,處理器能夠依賴於其它東西,而且能夠是單例的

public void Subscribe<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{

    _manager.AddSubscription<T, TH>();

}

public void Unsubscribe<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    _manager.RemoveSubscription<T, TH>();
}

訂閱和取消訂閱很簡單,由於咱們是InMemory的因此只調用了manager的方法。

接下來就是最重要的Publish方法,實現Publish有兩種方式:

  1. 使用額外的線程和Queue讓發佈和處理異步
  2. 爲了簡單起見,咱們先寫個簡單易懂的同步的

    public void Publish(IntegrationEvent e)
     {
         // 首先要拿到集成事件的Type信息
         var eventType = e.GetType();
    
         // 獲取屬於這個事件的處理器列表,可能有不少,注意得到的是SubscriptionInfo
         var handlers = _manager.GetHandlersForEvent(eventType.FullName);
    
         // 不解釋循環
         foreach (var handlerInfo in handlers)
         {
             // 從DI中獲取類型的實例
             var handler = _provider.GetService(handlerInfo.HandlerType);
    
             // 拿到Handle方法
             var method = handlerInfo.HandlerType.GetMethod("Handle");
    
             // 調用方法
             method.Invoke(handler, new object[] { e });
         }
     }

OK,咱們的InMemoryEventBus就寫好了!

要實踐這個InMemoryEventBus,那麼還須要一個IntegrationEvent的子類,和一個IIntegrationEventHandler<T>的實現類,這些都不難,例如咱們作一個添加用戶的事件,A在添加用戶後,發起一個事件並將新用戶的名字做爲事件數據,B去訂閱事件,並在本身的處理器中處理名字信息。

思路是這樣的:

  1. 寫一個 AddUserEvent:IntegrationEvent,裏面有一個UserId和一個UserName
  2. 寫一個AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>,在Handle方法中輸出UserId和Name到日誌。
  3. 註冊DI,你要註冊下面這些服務:

    IEventBus=>InMemoryEventBus
     ISubscriptionsManager=>InMemorySubscriptionsManager
     AddUserEventHandler=>AddUserEventHandler
  4. 在Startup中爲剛剛寫的事件和處理器添加訂閱(在這裏已經能夠獲取到IEventBus實例了)
  5. 寫一個Api接口或是什麼,調用IEventBus的Publish方法,new 一個新的AddUserEvent做爲參數傳進去。

OK!到這裏一個切實可用的InMemoryEventBus就可使用了。

相關文章
相關標籤/搜索