微服務實戰(二):落地微服務架構到直銷系統(構建消息總線框架接口)前端
從上一篇文章你們能夠看出,實現一個本身的消息總線框架是很是重要的內容,消息總線能夠將界限上下文之間進行解耦,也能夠爲大併發訪問提供必要的支持。
消息總線的做用:
1.界限上下文解耦:在DDD第一波文章中,當更新了訂單信息後,咱們經過調用經銷商界限上下文的領域模型和倉儲,進行了經銷商信息的更新,這形成了耦合。經過一個消息總線,能夠在訂單界限上下文的WebApi服務(來源微服務-生產者)更新了訂單信息後,發佈一個事件消息到消息總線的某個隊列中,經銷商界限上下文的WebApi服務(消費者)訂閱這個事件消息,而後交給本身的Handler進行消息處理,更新本身的經銷商信息。這樣就實現了訂單界限上下文與經銷商界限上下文解耦。數據庫
2.大併發支持:能夠經過消息總線進一步提高下單的性能。咱們能夠將用戶下單的操做直接交給一個下單命令WebApi接收,下單命令WebApi接收到命令後,直接丟給一個消息總線的隊列,而後當即給前端返回下單結果。這樣用戶就不用等待後續的複雜訂單業務邏輯,加快速度。後續訂單的一系列處理交給消息的Handler進行後續的處理與消息的進一步投遞。微信
消息總線設計重點:
1.定義消息(事件)的接口:全部須要投遞與處理的消息,都從這個消息接口繼承,由於須要約束消息中必須包含的內容,好比消息的ID、消息產生的時間等。
public interface IEvent架構
{ Guid Id { get; set; } DateTime CreateDate { get; set; } }
2.定義消息(事件)處理器接口:當消息投遞到消息總線隊列中後,必定有消費者WebApi接收並處理這個消息,具體的處理方法邏輯在訂閱方處理器中實現,這裏先須要定義處理器的接口,便於在消息總線框架中使用。
public interface IEventHandler併發
{ Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent; }
從上面代碼能夠看出,消息(事件)處理器處理的類型就是從IEvent接口繼承的消息類。框架
3.定義消息(事件)與消息(事件)處理器關聯接口:一種類型的消息被投遞後,必定要在訂閱方找到這種消息的處理器進行處理,因此必定要定義兩者的關聯接口,這樣才能將消息與消息處理器對應起來,才能實現消息被訂閱後的處理。ssh
public interface IEventHandlerExecutionContextasync
{ void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent; }
RegisterEventHandler方法就是創建消息與消息處理器的關聯,這個方法實際上是在訂閱方使用,訂閱方告訴消息總線,什麼樣的消息應該交給個人哪一個處理器進行處理。
IsRegisterEventHandler方法是判斷消息與處理器之間是否已經存在關聯。
HandleAsync方法是經過查找到消息對應的處理器後,而後調用處理器本身的Handle方法進行消息的處理.ide
4.定義消息發佈、訂閱與消息總線接口:消息總線至少要支持兩個功能,一個是生產者可以發佈消息到個人消息總線,另外一個是訂閱方須要可以從我這個消息總線訂閱消息。
public interface IEventPublisher微服務
{ void Publish<TEvent>(TEvent @event) where TEvent : IEvent; }
從上面代碼能夠看出,生產者發佈的消息仍然要從IEvent繼承的類型。
public interface IEventSubscriber
{ void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; }
上面代碼是訂閱方用於從消息總線訂閱消息,從代碼中能夠看出,它的最終的實現其實就是創建消息與處理器之間的關聯。
public interface IEventBus:IEventPublisher,IEventSubscriber
{ }
消息(事件)總線從兩個接口繼承下來,同時支持消息的發佈與消息的訂閱。
5.實現事件基類:上面已經訂閱了消息(事件)的接口,這裏來實現事件的基類,其實就是實現消息ID與產生的時間:
public class BaseEvent : IEvent
{ public Guid Id { get; set; } public DateTime CreateDate { get; set; } public BaseEvent() { this.Id = Guid.NewGuid(); this.CreateDate = DateTime.Now; } }
6.實現消息總線基類:消息總線底層的依賴能夠是各類消息代理產品,好比RabbitMq、Kafaka或第三方雲平臺提供的消息代理產品,一般咱們要封裝這些消息代理產品。在封裝以前,咱們須要定義頂層的消息總線基類實現,主要的目的是將來依賴於它的具體實現可替換,另外也將消息與消息處理器的關聯接口傳遞進來,便於訂閱方使用。
public abstract class BaseEventBus : IEventBus
{ protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext; protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext) { this.eventHandlerExecutionContext = eventHandlerExecutionContext; } public abstract void Publish<TEvent>(TEvent @event) where TEvent : IEvent; public abstract void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; }
7.實現消息與處理器關聯:消息必須與處理器關聯,訂閱方收到特定類型的消息後,才知道交給哪一個處理器處理。
public class EventHandlerExecutionContext : IEventHandlerExecutionContext
{ private readonly IServiceCollection registry; private readonly IServiceProvider serviceprovider; private Dictionary<Type, List<Type>> registrations = new Dictionary<Type, List<Type>>(); public EventHandlerExecutionContext(IServiceCollection registry,Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null) { this.registry = registry; this.serviceprovider = this.registry.BuildServiceProvider(); } //查找消息關聯的處理器,而後調用處理器的處理方法 public async Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent { var eventtype = @event.GetType(); if(registrations.TryGetValue(eventtype,out List<Type> handlertypes) && handlertypes.Count > 0) { using(var childscope = this.serviceprovider.CreateScope()) { foreach(var handlertype in handlertypes) { var handler = Activator.CreateInstance(handlertype) as IEventHandler; await handler.HandleAsync(@event); } } } } //判斷消息與處理器之間是否有關聯 public bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler { if(registrations.TryGetValue(typeof(TEvent),out List<Type> handlertypelist)) { return handlertypelist != null && handlertypelist.Contains(typeof(IEventHandler)); } return false; } //將消息與處理器關聯起來,能夠在內存中創建關聯,也能夠創建在數據庫單獨表中 public void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler { Utils.DictionaryRegister(typeof(TEvent), typeof(TEventHandler), registrations); } }
上面咱們基本上就將消息總線的架子搭建起來了,也實現了基本的功能,下一章咱們基於它來實現RabbitMq的消息總線。
QQ討論羣:309287205 微服務實戰視頻請關注微信公衆號:msshcj