[Abp 源碼分析]9、事件總線

0.簡介

事件總線就是訂閱/發佈模式的一種實現,本質上事件總線的存在是爲了下降耦合而存在的。html

從上圖能夠看到事件由發佈者發佈到事件總線處理器當中,而後經由事件總線處理器調用訂閱者的處理方法,而發佈者和訂閱者之間並無耦合關係。數據庫

像 Windows 自己的設計也是基於事件驅動,當用戶點擊了某個按鈕,那麼就會觸發相應的按鈕點擊事件,而程序只須要監聽這個按鈕點擊事件便可進行相應的處理,而事件被觸發的時候每每都會附帶相應的事件源,事件所產生的數據等。框架

仍是以按鈕被點擊爲例,該事件被觸發的時候會裝填上觸發時間,被誰觸發的數據放在一個 EventArgs 內部,而後將其存放到事件處理器中,而後處理器根據事件的類型去查找訂閱了該事件類型的對象,附帶上事件數據去調用這些訂閱者對象的處理方法。異步

Abp 自己也實現了事件總線,而且在框架內部也實現了豐富的事件類型,例如實體更新事件、異常事件等等。async

注意:在下文當中處理器的含義等同於訂閱者,請閱讀的時候自行切換。ide

0.1.使用方法

在引用了 Abp 框架的項目當中使用事件總線至關簡單,只須要直接注入 IEventBus 便可觸發相應的事件。若是你想要監聽某個事件,而且你也想在事件被觸發的時候進行處理,那麼直接繼承自 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 實現其接口方法 HandleEvent() 便可。測試

好比說,咱們首先定義了一個 TestEventData 的事件,以下:this

/// <summary>
/// 測試事件
/// </summary>
public class TestEventData : EventData
{
    public TestEventData(string code)
    {
        Code = code;
    }

    /// <summary>
    /// 待驗證的編碼
    /// </summary>
    public string Code { get; }
}

很簡單,這個事件觸發的時候會傳遞一個 string 類型的 Code 參數。編碼

以後咱們使用 TestEventHandler 訂閱這個事件,固然訂閱的方式很簡單,實現接口便可。線程

public class TestEventHandler : IAsyncEventHandler<TestEventData>, IEventHandler<TestEventData>
{
    public Task HandleEventAsync(TestEventData eventData)
    {
        if (eventData.Code == "1") Console.WriteLine("# 異步測試,編碼正確");

        Console.WriteLine("# 異步測試,編碼錯誤");
        return Task.FromResult(0);
    }

    public void HandleEvent(TestEventData eventData)
    {
        if (eventData.Code == "1") Console.WriteLine("# 同步測試,編碼正確");

        Console.WriteLine("# 同步測試,編碼錯誤");
    }
}

Abp 在底層會掃描實現了 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 這兩個接口的類型,將其自動註冊爲訂閱者。

固然你也能夠手動訂閱:

public class TestAppService : ApplicationService
{
    private readonly IEventBus _eventBus;

    public TestAppService(IEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public async Task TestMethod()
    {
        // 同步觸發
        _eventBus.Trigger(new TestEventData("Code1"));
        // 異步觸發,3.6.x 版本新增的
        await _eventBus.TriggerAsync(new TestEventData("Code1"));

        // 手動註冊事件範例
        _eventBus.Register<TestEventData, TestEventHandler>();
    }
}

這裏的 Register() 方法會讓你傳入一個事件數據類型,以及該事件對應的處理器。

同一個事件能夠被多個對象所訂閱,只要該對象實現 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 接口或者是顯式地被 IEventBus.Register() 註冊,他們都會在事件被觸發的時候調用。

2.啓動流程

按照慣例咱們來分析一下 Abp 針對事件總線的實現,看一下它的總體啓動流程,何時被注入,被初始化。

事件總線比起其餘 Abp 基礎設施來講他的註冊點就一個,在 EventBusInstaller 裏面,包含了針對 IEventBus 的註冊以及對實現了 IEventHandler 處理器的註冊。
EventBusInstaller 在 Abp 框架的核心模塊 AbpKernelModuleInitialize 被註冊調用:

public override void Initialize()
{
    // ...其餘代碼

    IocManager.IocContainer.Install(new EventBusInstaller(IocManager));

    // ... 其餘代碼
}

裏面的 Install() 方法作了兩個動做,第一是根據事件總線配置來決定 IEventBus 的註冊方式,第二則是將訂閱者(事件處理器)經過 IEventBus.Register() 方法自動放到事件總線管理器裏面。

public void Install(IWindsorContainer container, IConfigurationStore store)
{
    // 這裏是注入的配置類
    if (_eventBusConfiguration.UseDefaultEventBus)
    {
        container.Register(
            Component.For<IEventBus>().UsingFactoryMethod(() => EventBus.Default).LifestyleSingleton()
            );
    }
    else
    {
        container.Register(
            Component.For<IEventBus>().ImplementedBy<EventBus>().LifestyleSingleton()
            );
    }

    // 解析事件總線管理器
    _eventBus = container.Resolve<IEventBus>();

    // 註冊訂閱者對象
    container.Kernel.ComponentRegistered += Kernel_ComponentRegistered;
}

Emmmm,具體的代碼分析請看下面:

private void Kernel_ComponentRegistered(string key, IHandler handler)
{
    // 判斷當前注入的對象是否實現了 IEventHandler 接口,沒有實現則跳過
    if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(handler.ComponentModel.Implementation))
    {
        return;
    }

    // 得到當前注入對象實現的全部接口
    var interfaces = handler.ComponentModel.Implementation.GetTypeInfo().GetInterfaces();
    // 遍歷獲取到的全部接口
    foreach (var @interface in interfaces)
    {
        // 若是當前被遍歷的接口類型不是 IEventHandler 或者不是從 IEventHandler 繼承的,則跳過
        if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
        {
            continue;
        }

        // 到這裏獲取這個 IEventHandler 處理器的泛型參數
        var genericArgs = @interface.GetGenericArguments();
        // 而且它的泛型參數應該只有一個
        if (genericArgs.Length == 1)
        {
            // 根據 IEventHandler 的定義,拿到的泛型參數確定就是事件數據類型啦
            // 第二個參數就是一個 Handler 的工廠咯,每次觸發事件的時候都會從這個
            // 工廠解析出具體的事件處理器來響應事件的操做。
            _eventBus.Register(genericArgs[0], new IocHandlerFactory(_iocResolver, handler.ComponentModel.Implementation));
        }
    }
}

目前看來仍是十分簡單的。

3.代碼分析

3.1 事件總線管理器

整個事件總線的核心就是這個管理器(IEventBus/EventBus),事件的註冊,事件的觸發,全部這些東西都是由它來提供的,其實嘛事件總線沒你想象得那麼複雜。

它的基本原理很簡單,就是用戶向事件總線管理器註冊我想要觸發的事件,還有響應我事件的訂閱者,將其放在一個字典裏面。當 A 對象在數據庫 斷開鏈接 的時候,經過事件總線管理器觸發 斷開鏈接事件,事件總線管理器就會從以前註冊的字典,根據觸發時候傳遞的類型拿到響應的處理器集合,遍歷這個集合調用對應的方法。

說這麼多,咱們來看一下代碼吧,首先看看事件總線管理器的定義(固然接口太多,這裏是精簡過的):

public interface IEventBus
{
    // 註冊並訂閱事件
    IDisposable Register(Type eventType, IEventHandlerFactory factory);
    // 這裏沒有異步註冊的緣由是它最後仍是會調用上面這個方法

    // 取消事件的訂閱,這裏傳入的是一個 Action
    void Unregister<TEventData>(Action<TEventData> action) where TEventData : IEventData;
    // 這裏傳入的是一個 IEventHandler
    void Unregister<TEventData>(IEventHandler<TEventData> handler) where TEventData : IEventData;
    
    // 異步取消事件訂閱
    void AsyncUnregister<TEventData>(Func<TEventData, Task> action) where TEventData : IEventData;

    // 一樣是取消事件訂閱
    void Unregister(Type eventType, IEventHandler handler);
    void Unregister(Type eventType, IEventHandlerFactory factory);
    void UnregisterAll(Type eventType);

    // 觸發事件
    void Trigger(Type eventType, object eventSource, IEventData eventData);
    // 異步觸發事件
    Task TriggerAsync(Type eventType, object eventSource, IEventData eventData);
}

Emm,看了一下,大概就分爲三類,註冊事件取消事件的訂閱觸發事件,其餘定義的接口大多都是不一樣形式的重載,本質上仍是會調用到上述方法的。

首先在事件總線管理器內部有一個字典,這個字典就是咱們剛纔所提到的事件總線維護的事件字典,大概長這個樣子:

private readonly ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories;

能夠看到,這個字典的 Key 是一個 Type 類型,其實就是咱們所註冊的事件類型罷了,後面那個呢就是事件處理器的工廠。那爲何這個工廠會用一個 List 來存儲呢?

緣由有兩點:

  1. 由於咱們對應的事件處理器的生命週期與生成方式都有所不一樣,好比說 Abp 它本身自己就提供了IocHandlerFactoryTransientEventHandlerFactorySingleInstanceHandlerFactory 這三種實現。
  2. 由於一個事件可能會被多個處理器所訂閱,那麼一個處理器擁有一個工廠,因此會是一個集合。

3.1.1 註冊事件

在默認的 Register() 方法裏面就是使用的 IocHandlerFactory 來進行註冊事件的,若是你須要手動註冊事件呢,可使用簽名爲:

public IDisposable Register(Type eventType, IEventHandlerFactory factory);

的方法,來傳入本身實現的處理器工廠或者是 Abp 提供的事件處理器工廠。

看了它的定義以後,咱們來看一下它的具體實現,首先來看看註冊事件的 Register() 方法:

public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{
    // 得到指定事件類型的工廠集合,而後往這個集合添加一個事件處理器工廠
    GetOrCreateHandlerFactories(eventType)
        .Locking(factories => factories.Add(factory));

    // Emm,這裏面就是一個 Dispose 方法,用於釋放建立好的工廠對象,裏面的 Dispose 方法
    // 最終會調用 IEventBus 的 UnRegister 方法來卸載工廠
    return new FactoryUnregistrar(this, eventType, factory);
}

private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
    // 根據事件類型建立/獲取一個事件處理器工廠集合
    return _handlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>());
}

能夠看到調用了註冊方法以後,它返回了一個 FactoryUnregistrar ,查看它的定義以下:

internal class FactoryUnregistrar : IDisposable
{
    private readonly IEventBus _eventBus;
    private readonly Type _eventType;
    private readonly IEventHandlerFactory _factory;

    public FactoryUnregistrar(IEventBus eventBus, Type eventType, IEventHandlerFactory factory)
    {
        _eventBus = eventBus;
        _eventType = eventType;
        _factory = factory;
    }

    public void Dispose()
    {
        _eventBus.Unregister(_eventType, _factory);
    }
}

很簡單的一個類,重點就是在 Dispose() 內部調用了 IEventBusUnregister() 方法,下面就會講解這東西。

3.1.2 取消事件的訂閱

接着是 UnRegister() 方法,UnRegister 方法有不少個,通常分爲兩類,第一是取消訂閱,第二就是卸載工廠。

public void Unregister<TEventData>(Action<TEventData> action) where TEventData : IEventData
{
    // 確保不爲空
    Check.NotNull(action, nameof(action));

    // 根據類型獲得該類型全部的事件處理器集合
    GetOrCreateHandlerFactories(typeof(TEventData))
        // 使用 lock 加鎖,防止線程同步問題
        .Locking(factories =>
        {
            // 調用 List 的 RemoveAll() 方法清除指定條件的工廠
            factories.RemoveAll(
                factory =>
                {
                    // 判斷工廠是否爲單例工廠
                    var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
                    // 若是不是單例工廠則不移除
                    if (singleInstanceFactory == null)
                    {
                        return false;
                    }
                    
                    // 若是是單例工廠,拿到其內部的具體事件處理器,並強制換爲 ActionEventHandler
                    var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEventData>;
                    // 爲空的話,不移除
                    if (actionHandler == null)
                    {
                        return false;
                    }

                   // 判斷傳入的處理邏輯是否與事件處理器邏輯相等,相等則移除
                    return actionHandler.Action == action;
                });
        });
}

// 取消訂閱的另外一種實現,只是針對 SingleInstanceHandlerFactory 進行了處理
public void Unregister(Type eventType, IEventHandler handler)
{
    GetOrCreateHandlerFactories(eventType)
        .Locking(factories =>
                    {
                        factories.RemoveAll(
                            factory =>
                            factory is SingleInstanceHandlerFactory &&
                            (factory as SingleInstanceHandlerFactory).HandlerInstance == handler
                        );
                    });
}

// 第二種狀況,卸載工廠,也就是 Register() 以後返回的 FactoryUnregistrar 釋放時調用的方法
public void Unregister(Type eventType, IEventHandlerFactory factory)
{
    // 根據傳入的類型,得到事件處理器工廠集合,移除相應工廠
    GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
}

在上方代碼能夠看到,彷佛 Unregister() 方法只針對 SingleInstanceHandlerFactory 工廠進行了處理,而沒有處理 IocHandlerFactoryTransientEventHandlerFactory

這是由於在 IEventBus 當中實現了這兩個方法:

IDisposable Register(Type eventType, IEventHandler handler);
IDisposable Register<TEventData>(Action<TEventData> action) where TEventData : IEventData;

能夠看到這兩個方法都沒有傳入工廠,第一個容許你傳入一個事件處理器對象,第二個則是讓你傳入一個 Action 做爲其事件訂閱者。

看看實現:

public IDisposable Register<TEventData>(Action<TEventData> action) where TEventData : IEventData
{
    // new 了一個 ActionEventHandler 做爲處理器
    return Register(typeof(TEventData), new ActionEventHandler<TEventData>(action));
}

public IDisposable Register<TEventData>(IEventHandler<TEventData> handler) where TEventData : IEventData
{
    // 傳入具體的處理器對象進行註冊
    return Register(typeof(TEventData), handler);
}

public IDisposable Register(Type eventType, IEventHandler handler)
{
    // 使用 SingleInstanceHandlerFactory 工廠進行註冊。
    return Register(eventType, new SingleInstanceHandlerFactory(handler));
}

由於單例工廠與其餘兩個工廠不同,單例工廠的生命週期貫穿整個程序的生命週期,也就是說除非程序被結束,那麼單例工廠內部的事件處理器就會一直存在,因此在 UnRegister() 方法內部只會針對 SingleInstanceHandlerFactory 工廠進行處理。

TransientEventHandlerFactory

IocHandlerFactory 工廠產生的對象的生命週期是隨着具體類型在被注入時候的生命週期所決定,有多是瞬時對象,也有多是單例對象,下文會詳細解說。

3.1.3 觸發事件

當事件的發佈者須要發佈(觸發)一個事件的時候,會調用 IEventBus 提供的 Trigger()/TriggerAsync() 方法。

而後事件總線管理器從本身的字典內匹配對應的事件,獲得對應的事件處理器工廠集合,而後呢使用工廠產生具體的處理器對象,調用其 HandleEvent / HandleEventAsync 方法,執行完成以後釋放對象。

public void Trigger(Type eventType, object eventSource, IEventData eventData)
{
    // 異常集合
    var exceptions = new List<Exception>();

    eventData.EventSource = eventSource;

    // 得到全部須要觸發的處理器工廠,遍歷傳入的事件類型以及他的子類事件
    foreach (var handlerFactories in GetHandlerFactories(eventType))
    {
        // 遍歷事件類型綁定的工廠集合
        foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
        {
            // 得到處理器類型
            var handlerType = handlerFactory.GetHandlerType();

            // 若是是異步處理器,以同步方式運行
            if (IsAsyncEventHandler(handlerType))
            {
                AsyncHelper.RunSync(() => TriggerAsyncHandlingException(handlerFactory, handlerFactories.EventType, eventData, exceptions));
            }
            else if (IsEventHandler(handlerType))
            {
                // 調用處理器的處理方法,並回收異常信息
                TriggerHandlingException(handlerFactory, handlerFactories.EventType, eventData, exceptions);
            }
            else
            {
                // 說明這個事件沒有對應的處理器實現,拋出異常
                var message = $"Event handler to register for event type {eventType.Name} does not implement IEventHandler<{eventType.Name}> or IAsyncEventHandler<{eventType.Name}> interface!";
                exceptions.Add(new AbpException(message));
            }
        }
    }

    // 處理繼承事件的狀況
    if (eventType.GetTypeInfo().IsGenericType &&
        eventType.GetGenericArguments().Length == 1 &&
        typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
    {
        var genericArg = eventType.GetGenericArguments()[0];
        var baseArg = genericArg.GetTypeInfo().BaseType;
        if (baseArg != null)
        {
            var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
            var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
            var baseEventData = (IEventData)Activator.CreateInstance(baseEventType, constructorArgs);
            baseEventData.EventTime = eventData.EventTime;
            Trigger(baseEventType, eventData.EventSource, baseEventData);
        }
    }

    if (exceptions.Any())
    {
        // 若是產生的異常數量爲 1 個的話,從新拋出具體的異常信息
        if (exceptions.Count == 1)
        {
            exceptions[0].ReThrow();
        }

        // 若是在執行過程當中產生了多個異常,將異常集合放在內部異常當中並拋出
        throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);
    }
}

// 篩選全部須要觸發的事件類型,並將其封裝爲 EventTypeWithEventHandlerFactories
private IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
    var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();

    foreach (var handlerFactory in _handlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
    {
        handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
    }

    return handlerFactoryList.ToArray();
}

// 判斷傳入的類型是不是事件類型的子類
private static bool ShouldTriggerEventForHandler(Type eventType, Type handlerType)
{
    if (handlerType == eventType)
    {
        return true;
    }

    if (handlerType.IsAssignableFrom(eventType))
    {
        return true;
    }

    return false;
}

// 拿着具體的處理器工廠去執行處理器的處理方法
private void TriggerHandlingException(IEventHandlerFactory handlerFactory, Type eventType, IEventData eventData, List<Exception> exceptions)
{
    // 得到一個新鮮的處理器對象
    var eventHandler = handlerFactory.GetHandler();
    try
    {
        if (eventHandler == null)
        {
            throw new ArgumentNullException($"Registered event handler for event type {eventType.Name} is null!");
        }

        var handlerType = typeof(IEventHandler<>).MakeGenericType(eventType);

        // 從這個處理器獲取處處理方法
        var method = handlerType.GetMethod(
            "HandleEvent",
            new[] { eventType }
        );

        // 調用處理方法,並傳入事件數據
        method.Invoke(eventHandler, new object[] { eventData });
    }
    // 產生異常進行處理
    catch (TargetInvocationException ex)
    {
        exceptions.Add(ex.InnerException);
    }
    catch (Exception ex)
    {
        exceptions.Add(ex);
    }
    finally
    {
        // 釋放資源
        handlerFactory.ReleaseHandler(eventHandler);
    }
}

3.2 處理器工廠

全部事件所對應的處理器對象都是由工廠所建立的,當一個事件被觸發,事件總線管理器就會從事件類型所對應的工廠產生一個相應的處理器對象執行調用。

簡而言之,每一個事件處理器都擁有一個單獨的工廠。

其接口定義以下:

public interface IEventHandlerFactory
{
    // 得到一個事件處理器對象
    IEventHandler GetHandler();

    // 得到當前工廠所產生的處理器類型
    Type GetHandlerType();

    // 釋放指定的處理器對象
    void ReleaseHandler(IEventHandler handler);
}
具體實現 生命週期 描述
TransientEventHandlerFactory 瞬時 工廠產生的事件處理器生命週期是瞬時的,是一個標準
的能夠被 GC 回收的對象。
SingleInstanceHandlerFactory 單例 該工廠產生的對象都會被保存在一個 Instance 內部,每
次生成對象的時候都會使用該 Instance 的值。
IocHandlerFactory 由類型註冊時決定 在使用 IocHandlerFactory 的時候,會傳入事件處理
器,該工廠在建立事件處理器對象的時候會從 Ioc 容器當中
解析對應的對象出來,而該對象的生命週期取決於註冊時
的定義。

4.擴展

4.1 實體更新事件

Abp 在倉儲每次執行 CRUD 操做的時候都會自動觸發響應的實體更新事件,這些事件的觸發都存放在 EntityChangeEventHelper 類當中,一共有如下幾個事件,你訂閱該這些事件以後就會在實體產生更改的時候被觸發。

  • EntityChangedEventData<TEntity> 實體被更改的時候觸發。

  • EntityCreatedEventData<TEntity> 實體建立完成後觸發。
  • EntityCreatingEventData<TEntity> 實體建立時被觸發。
  • EntityDeletedEventData<TEntity> 實體刪除完成後觸發。
  • EntityDeletingEventData<TEntity> 實體刪除時被觸發。
  • EntityUpdatedEventData<TEntity> 實體更新後觸發。
  • EntityUpdatingEventData<TEntity> 實體更新時被觸發。

public class TestHandler : IEventHandler<EntityChangedEventData<TestEntity>>
{
    public void HandleEvent(EntityChangedEventData<TestEntity> eventData)
    {
        Console.WriteLine($"測試實體,ID爲 {eventDate.Entity.Id} 被更改了");
    }
}

4.2 異常事件

Abp 在運行期間遇到的異常也會自動觸發異常事件,其類型爲 AbpHandledExceptionDataExceptionData

5.點此跳轉到總目錄

相關文章
相關標籤/搜索