EventBus InMemory 的實踐基於eShopOnContainers (二)

前言

最近在工做中遇到了一個需求,會用到EventBus,正好看到eShopOnContainers上有相關的實例,去研究了研究。下面來分享一下用EventBus 來改造一下咱們上篇Event發佈與實踐 中所用的Eventhtml

上一篇中講到Event在發佈與訂閱模式中的一些實例,接下來實踐一下經過把上面的例子改形成EventBus來加深理解。也感謝參考資料中大佬前輩們的思想和精華。git

理解事件及其本質

咱們所構建的一個場景是這樣的:有個CarManager類,其中有個發車了的事件名爲CarNotification, 有司機Driver和乘客Passenger這兩個類。分別訂閱了 發車事件, 這裏司機和乘客收到通知後,而後簡單的處理,僅僅打印出 司機和乘客的姓名信息。github

在上面的場景中,咱們能夠知道,事件源,事件處理各是什麼。c#

事件源:司機 或者 乘客 類。async

事件處理: 打印出 司機或 乘客的信息。ide

大概花了一張巨醜的圖,下面:ui

開始抽象事件源

接下來,咱們能夠開始抽象起來了。首先是 事件源:spa

定義一個全部事件源的父類,名爲 EventData: 全部的事件源都須要繼承該類.net

public class EventData{
    public Guid Id{ get; }
    public DateTime CreationDate{ get; }
    
    public EventData(){
        Id = Guid.NewGuid();
        CreationDate = DateTime.Now;
    }
}

而後咱們就能夠把咱們以前的DriverPassenger 用咱們定義的事件源來改造一下:code

public class CarNotificationEventData : EventData{
    private string _driverName;
    private string _passengerName;
    
    public CarNotification(string driverName, string passengerName){
        _driverName = driverName;
        _passengerName = passengerName;
    }
    
    public string Driver{ get{ return _driverName; } }
    
    public string Passenger{ get{return _passengerName; } }
}

接下來就是抽象 事件處理 了,咱們在此定義一個名爲 IEventDataHandler 的接口:

public interface IEventHandler<in TEventData> : IEventHandler where TEventData : EventData{
    Task Handle(TEventData eventData);
}

public interface IEventHandler{
    
}
  • 定義了一個泛型接口,參數必須繼承自 EventData 類型。
  • 一個方法 Handle 方法,接收的參數也爲 EventData 類型。

查看eShopOnContainer的源碼時,上面那個爲啥定義,繼承一個空接口IEventHandler ,當時有點沒搞明白,後來繼續去看了看源碼,發現了下面這段:

var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);

大膽猜想一下,應該是反射時會用到,因此定義了一個空接口

那麼咱們根據上面的接口,簡單改造一下咱們的事件處理代碼:

public class DriverHandler : IEventHandler<CarNotificationEventData>{
    public void Handle (CarNotificationEventData carNotificationEventData) {
            Console.WriteLine ("Driver Hanlder---------");
            Console.WriteLine (carNotificationEventData.Driver + "\n" + carNotificationEventData.Passenger +
                "\n" + carNotificationEventData.EventDate);
        }
}

偷了個懶,簡單了打印瞭如下事件源中的信息,固然生產環境中,你就根據本身的業務邏輯來進行處理。

固然,改了事件源和事件處理,固然也須要從新對 咱們當初 事件 以及 委託的定義。

//修改委託的定義:

public delegate void CarEventDataHandler(CarNotificationEventData eventData);

//修改事件的定義:
public event CarEventDataHandler CarNotification;

實現到上面那步,其實咱們還只是剛剛開始,由於你會發現,咱們只僅僅把那些事件源和事件處理抽線了出來,在每一個事件處理程序中,咱們可能還須要經過

但並無真正上的作到用事件總線來實現。

爲了更好的實現下面的事件總線,咱們把委託也單獨定義到一個class 中:

namespace EventDemo{
    public delegate void CarNotificationDelegate(CatNotificationEventData eventData);
}

實現事件總線

根據 eShopOnContainers上的源碼 IEventBusSubscription,咱們來實現一個基於InMemory(存在Dictionary 中)的事件總線。

首先想到的 發佈與訂閱模式,因此呢,這個事件總線裏面必定要有能夠訂閱和移除訂閱的方法,還有來個額外的判斷當前事件總線是否爲空,固然還有一個就是 事件,接下來咱們就定義一個 IEventBusSubscription:

public interface IEventBusSubscriptionsManager {
        bool IsEmpty { get; }

        event CarNotificationDelegate OnEventRemoved;

        void AddSubscription<T, TH> () where T : CarNotificationEventData where TH : IEventHandler<T>;

        void RemoveSubscription<T, TH> (T eventBusData) where T : CarNotificationEventData where TH : IEventHandler<T>;

        bool HasSubscriptionsForEvent<T> () where T : CarNotificationEventData;

        bool HasSubscriptionsForEvent (string eventName);

    }

就簡單點,一個訂閱方法,一個移除訂閱方法。還有一個事件 OnEventRemoved

接下來就是實現了,eShopOnContainer 上面有好多個版本,我在實踐中,試了試 InMemory 和 RabbitMQ 的。由於十分貼合個人業務場景,本篇先介紹一下InMemory的實現,由於對RabbitMQ理解的還不是很深刻, RabbitMQ版本的後續博文中跟進。

而後咱們創建一個InMemoryEventBusSubscriptionsManager 繼承至IEventBusSubscripionsManager

public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager {
        private readonly Dictionary<string, List<Type>> _handler; //Type is HandlerType
        private readonly List<Type> _eventTypes;
        public event CarNotificationDelegate OnEventRemoved;
        private readonly IServiceProvider _service;

        public InMemoryEventBusSubscriptionsManager (IServiceCollection service) {
            _handler = new Dictionary<string, List<Type>> ();
            _eventTypes = new List<Type> ();
            _service = service.BuildServiceProvider ();
            OnEventRemoved += BeiginProcess;
        }

        public bool IsEmpty => !_handler.Keys.Any ();

        public void AddSubscription<T, TH> ()
        where T : CarNotificationEventData
        where TH : IEventHandler<T> {
            var eventName = GetEventKey<T> ();

            if (!HasSubscriptionsForEvent<T> ()) {
                _handler.Add (eventName, new List<Type> ());
            }

            if (_handler[eventName].Any (t => t == typeof (TH))) {
                throw new ArgumentException (
                    $"Handler Type {typeof(TH).Name} already registered");
            }
            _handler[eventName].Add (typeof (TH));

            _eventTypes.Add (typeof (T));
        }

        public void RemoveSubscription<T, TH> (T eventData)
        where T : CarNotificationEventData
        where TH : IEventHandler<T> {
            var handlerToRemove = FindSubscriptionToRemove<T, TH> ();
            DoRemoveHandler (eventData, handlerToRemove);
        }

        private void DoRemoveHandler (CarNotificationEventData eventData, Type subsToRemove) {
            if (subsToRemove != null) {

                var eventName = eventData.GetType ().Name;

                _handler[eventName].Remove (subsToRemove);
                if (!_handler[eventName].Any ()) {
                    _handler.Remove (eventName);
                    var eventType = _eventTypes.SingleOrDefault (e => e == eventName.GetType ());
                    if (eventType != null) {
                        _eventTypes.Remove (eventType);
                    }
                    RaiseOnEventRemoved (eventData);
                }

            }
        }

        private void RaiseOnEventRemoved (CarNotificationEventData eventData) {
            var handler = OnEventRemoved;
            if (handler != null) {
                OnEventRemoved (eventData);
            }
        }

        private Type FindSubscriptionToRemove<T, TH> ()
        where T : CarNotificationEventData
        where TH : IEventHandler<T> {
            var eventName = GetEventKey<T> ();
            return DoFindSubscriptionToRemove (eventName, typeof (TH));
        }

        private Type DoFindSubscriptionToRemove (string eventName, Type handlerType) {
            if (!HasSubscriptionsForEvent (eventName)) {
                return null;
            }

            return _handler[eventName].SingleOrDefault (s => s == handlerType);

        }
        public bool HasSubscriptionsForEvent<T> () where T : CarNotificationEventData {
            var keyName = GetEventKey<T> ();
            return _handler.ContainsKey (keyName);
        }

        public bool HasSubscriptionsForEvent (string eventName) => _handler.ContainsKey (eventName);

        public string GetEventKey<T> () {
            return typeof (T).Name;
        }

        public Type GetEventTypeByName (string eventName) => _eventTypes.SingleOrDefault (t => t.Name == eventName);

        public async void BeiginProcess (CarNotificationEventData eventData) {
            await Process (eventData);
        }

        private async Task Process (CarNotificationEventData eventBusData) {
            var eventName = eventBusData.GetType ().Name;
            if (HasSubscriptionsForEvent (eventName)) {
                var subscriptions = _handler[eventName];

                foreach (var subscription in subscriptions) {
                    var eventType = GetEventTypeByName (eventName);
                    var handler = _service.GetService (subscription);
                    var concreteType = typeof (IEventHandler<>).MakeGenericType (eventType);

                    await (Task) concreteType.GetMethod ("EventHandle").Invoke (handler, new object[] { eventBusData });
                }
            }
        }
    }

我稍微改動了如下地方的代碼,使個人實例更加符合場景上的運行,

  • eShopOnContainer 裏面使用了 Autofac 來運用DI,我直接使用了 .net -core 中自帶的 IServiceProvider來替代,感受更加方便
  • 直接對外顯示一個public 的BeginProcess 來引起事件,主要爲了演示方便。

既然EventBus 都寫好,咱們能夠開始運行了,

//... 
public static void Main (string[] args) {
            #region EventBusRegister Demo

            var serviceProvider = new ServiceCollection ()
                .AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager> ()
                .AddTransient<DriverHandler> ();

            var eventBus = new InMemoryEventBusSubscriptionsManager (serviceProvider);

            RegisterEventBus (eventBus);

            CarNotificationEventData carNotificationEventData = new CarNotificationEventData ("Robert 1", "Passenger 1");

            eventBus.BeiginProcess (carNotificationEventData);
            #endregion
        }
//...

dotnet run 運行一下,獲得以下結果:

這樣就算大功告成了。接下來會寫一篇結合RabbitMQ 的EventBus ,就更加符合生產環境的情景了。文中若是解釋的不到位處,歡迎評論中指出,一塊兒探討。

參考資料

相關文章
相關標籤/搜索