1、問題背景html
最近離職來到了一家新的公司,原先是在乙方工做,這回到了甲方,在這一個月中,發現目前的業務很大一部分是靠輪詢實現的,例如:經過輪詢判斷數據處於B狀態了,則輪詢到數據後執行某種動做,這個實際上是很是浪費的,而且對於數據的實時性也會不怎麼友好,基於以上的狀況,在某天開車堵車時候,想到了以前偶然瞭解過的事件總線(EventBus),對比了公司當前的場景後,以爲事件總線應該是能夠知足需求的(PS:只是我以爲這個有問題,不少人不以爲有問題),那既然想到了,那就想本身是否能夠作個事件總線的輪子git
2、什麼是事件總線數據庫
咱們知道事件是由一個Publisher跟一個或多個的Subsriber組成,可是在實際的使用過程當中,咱們會發現,Subsriber必須知道Publisher是誰才能夠註冊事件,進而達到目的,那這其實就是一種耦合,爲了解決這個問題,就出現了事件總線的模式,事件總線容許不一樣的模塊之間進行彼此通訊而又不須要相互依賴,以下圖所示,經過EventBus,讓Publisher以及Subsriber都只須要對事件源(EventData)進行關注,不用管Publisher是誰,那麼EventBus主要是作了一些什麼事呢?異步
3、EventBus作了什麼事?async
一、EventBus實現了對於事件的註冊以及取消註冊的管理函數
二、EventBus內部維護了一份事件源與事件處理程序的對應關係,而且經過這個對應關係在事件發佈的時候能夠找到對應的處理程序去執行微服務
三、EventBus應該要支持默認就註冊事件源與處理程序的關係,而不須要開發人員手動去註冊(這裏可讓開發人員去控制自動仍是手動)測試
4、具體實現思路spa
首先在事件總線中,存在註冊、取消註冊以及觸發事件這三種行爲,因此咱們能夠將這三種行爲抽象一個接口出來,最終的接口代碼以下:orm
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace MEventBus.Core { public interface IEventBus { #region 接口註冊 void Register<TEventData>(Type handlerType) where TEventData : IEventData; void Register(Type eventType, Type handlerType); void Register(string eventType, Type handlerType); #endregion #region 接口取消註冊 void Unregister<TEventData>(Type handler) where TEventData : IEventData; void Unregister(Type eventType, Type handlerType); void Unregister(string eventType, Type handlerType); #endregion void Trigger(string pubKey, IEventData eventData); Task TriggerAsync(string pubKey, IEventData eventData); Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData; void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData; } }
在以上代碼中發現有些方法是有IEventData約束的,這邊IEventData就是約束入參行爲,原則上規定,每次觸發的EventData都須要繼承IEventData,而註冊的行爲也是直接跟入參類型相關,具體代碼以下:
using System; using System.Collections.Generic; using System.Text; namespace MEventBus.Core { public interface IEventData { string Id { get; set; } DateTime EventTime { get; set; } object EventSource { get; set; } } }
接下來咱們看下具體的實現代碼
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace MEventBus.Core { public class EventBus : IEventBus { private static ConcurrentDictionary<string, List<Type>> dicEvent = new ConcurrentDictionary<string, List<Type>>(); private IResolve _iresolve { get; set; } public EventBus(IResolve resolve) { _iresolve = resolve; InitRegister(); } public void InitRegister() { if (dicEvent.Count > 0) { return; } //_iresolve = ioc_container; dicEvent = new ConcurrentDictionary<string, List<Type>>(); //自動掃描類型而且註冊 foreach (var file in Directory.GetFiles(AppDomain.CurrentDomain.BaseDirectory, "*.dll")) { var ass = Assembly.LoadFrom(file); foreach (var item in ass.GetTypes().Where(p => p.GetInterfaces().Contains(typeof(IEventHandler)))) { if (item.IsClass) { foreach (var item1 in item.GetInterfaces()) { foreach (var item2 in item1.GetGenericArguments()) { if (item2.GetInterfaces().Contains(typeof(IEventData))) { Register(item2, item); } } } } } } } //註冊以及取消註冊的時候須要加鎖處理 private static readonly object obj = new object(); #region 註冊事件 public void Register<TEventData>(Type handlerType) where TEventData : IEventData { //將數據存儲到mapDic var dataType = typeof(TEventData).FullName; Register(dataType, handlerType); } public void Register(Type eventType, Type handlerType) { var dataType = eventType.FullName; Register(dataType, handlerType); } public void Register(string pubKey, Type handlerType) { lock (obj) { //將數據存儲到mapDic if (dicEvent.Keys.Contains(pubKey) == false) { dicEvent[pubKey] = new List<Type>(); } if (dicEvent[pubKey].Exists(p => p.GetType() == handlerType) == false) { //IEventHandler obj = Activator.CreateInstance(handlerType) as IEventHandler; dicEvent[pubKey].Add(handlerType); } } } #endregion #region 取消事件註冊 public void Unregister<TEventData>(Type handler) where TEventData : IEventData { var dataType = typeof(TEventData); Unregister(dataType, handler); } public void Unregister(Type eventType, Type handlerType) { string _key = eventType.FullName; Unregister(_key, handlerType); } public void Unregister(string eventType, Type handlerType) { lock (obj) { if (dicEvent.Keys.Contains(eventType)) { if (dicEvent[eventType].Exists(p => p.GetType() == handlerType)) { dicEvent[eventType].Remove(dicEvent[eventType].Find(p => p.GetType() == handlerType)); } } } } #endregion #region Trigger觸發 //trigger時候須要記錄到數據庫 public void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData { var dataType = eventData.GetType().FullName; //獲取當前的EventData綁定的全部Handler Notify(dataType, eventData); } public void Trigger(string pubKey, IEventData eventData) { //獲取當前的EventData綁定的全部Handler Notify(pubKey, eventData); } public async Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData { await Task.Factory.StartNew(new Action(()=> { var dataType = eventData.GetType().FullName; Notify(dataType, eventData); })); } public async Task TriggerAsync(string pubKey, IEventData eventData) { await Task.Factory.StartNew(new Action(() => { var dataType = eventData.GetType().FullName; Notify(pubKey, eventData); })); } //通知每成功執行一個就須要記錄到數據庫 private void Notify<TEventData>(string eventType, TEventData eventData) where TEventData : IEventData { //獲取當前的EventData綁定的全部Handler var handlerTypes = dicEvent[eventType]; foreach (var handlerType in handlerTypes) { var resolveObj = _iresolve.Resolve(handlerType); IEventHandler<TEventData> handler = resolveObj as IEventHandler<TEventData>; handler.Handle(eventData); } } #endregion } }
代碼說明:
一、如上的EventBus是繼承了IEventBus後的具體實現,小夥伴可能看到在構造函數裏,有一個接口參數IResolve,這個主要是爲了將解析的過程進行解耦,因爲在一些WebApi的項目中,更加多的是使用IOC的機制進行對象的建立,那基於IResolve就能夠實現不一樣的對象建立方式(內置的是經過反射實現)
二、InitRegister方法經過遍歷當前目錄下的dll文件,去尋找全部實現了IEventHandler<IEventData>接口的信息,而且自動註冊到EventBus中,因此在實際使用過程當中,應該是沒有機會去適用register註冊的
三、觸發機制實現了同步以及異步的調用,這個從方法命名中就能夠看出來
5、程序Demo
TestHandler2(繼承IEventHandler)
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Windows.Forms; using MEventBus.Core; namespace MEventBusHandler.Test { public class TestHandler2 : IEventHandler<TestEventData> { public void Handle(TestEventData eventData) { Thread.Sleep(2000); MessageBox.Show(eventData.EventTime.ToString()); } } }
TestEventData(繼承EventData,EventData是繼承了IEventData的代碼)
using MEventBus.Core; using System; using System.Collections.Generic; using System.Text; namespace MEventBusHandler.Test { public class TestEventData : EventData { } }
調用代碼
using MEventBus.Core; using MEventBusHandler.Test; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; namespace MEventBus.Test { public partial class Form1 : Form { public Form1() { InitializeComponent(); TestHandler.OnOut += TestHandler_OnOut; } private void TestHandler_OnOut(object sender, EventArgs e) { MessageBox.Show("Hello World"); } private void button1_Click(object sender, EventArgs e) { var task = new MEventBus.Core.EventBus(new ReflectResolve()).TriggerAsync(new TestEventData()); task.ContinueWith((obj) => { MessageBox.Show("事情所有作完"); }); } private void button2_Click(object sender, EventArgs e) { new EventBus(new ReflectResolve()).Trigger(new TestEventData()); } } }
執行結果
我在真正的Demo中,實際上是註冊了2個handler,能夠在後續公佈的項目地址裏看到
6、總結
從有這個想法開始,到最終實現這個事件總線,大概總共花了2,3天的時間(PS:晚上回家獨自默默幹活),目前只能說是有一個初步可使用的版本,而且還存在着一些問題:
一、在.NetFrameWork下(目前公司還不想升級到.NetCore,吐血。。),若是使用AutoFac建立EventBus(單例模式下),若是Handler也使用AutoFac進行建立,會出現要麼對象建立失敗,要麼handler裏的對象與調用方的對象不是同一個實例,爲了解決這個問題,我讓EventBus再也不是單例模式,將dicEvent變成了靜態,暫時表面解決
二、未考慮跨進程的實現(感受用savorboard大佬的CAP就能夠了)
三、目前這個東西在一個小的新項目裏使用,暫時在測試環境還算沒啥問題,各位小夥伴若是有相似需求,能夠作個參考
因爲我的緣由,在測試上可能會有所不夠,若是有什麼bug的話,還請站內信告知,感謝(ps:文字表達弱雞,技術渣渣,各位多多包涵)
最後:附上項目地址:https://gitee.com/OneMango/MEventBus
做者: Mango
出處: http://www.cnblogs.com/OMango/
關於本身:專一.Net桌面開發以及Web後臺開發,對.NetCore、微服務、DevOps,K8S等感興趣,最近到了個甲方公司準備休養一段時間
本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文連接,若有問題, 可站內信告知.