微服務實戰(二):落地微服務架構到直銷系統(構建消息總線框架接口)

微服務實戰(二):落地微服務架構到直銷系統(構建消息總線框架接口)前端

從上一篇文章你們能夠看出,實現一個本身的消息總線框架是很是重要的內容,消息總線能夠將界限上下文之間進行解耦,也能夠爲大併發訪問提供必要的支持。
圖片描述
消息總線的做用:
1.界限上下文解耦:在DDD第一波文章中,當更新了訂單信息後,咱們經過調用經銷商界限上下文的領域模型和倉儲,進行了經銷商信息的更新,這形成了耦合。經過一個消息總線,能夠在訂單界限上下文的WebApi服務(來源微服務-生產者)更新了訂單信息後,發佈一個事件消息到消息總線的某個隊列中,經銷商界限上下文的WebApi服務(消費者)訂閱這個事件消息,而後交給本身的Handler進行消息處理,更新本身的經銷商信息。這樣就實現了訂單界限上下文與經銷商界限上下文解耦。數據庫

2.大併發支持:能夠經過消息總線進一步提高下單的性能。咱們能夠將用戶下單的操做直接交給一個下單命令WebApi接收,下單命令WebApi接收到命令後,直接丟給一個消息總線的隊列,而後當即給前端返回下單結果。這樣用戶就不用等待後續的複雜訂單業務邏輯,加快速度。後續訂單的一系列處理交給消息的Handler進行後續的處理與消息的進一步投遞。微信

消息總線設計重點:
1.定義消息(事件)的接口:全部須要投遞與處理的消息,都從這個消息接口繼承,由於須要約束消息中必須包含的內容,好比消息的ID、消息產生的時間等。
public interface IEvent架構

{
    Guid Id { get; set; }
    DateTime CreateDate { get; set; }
}

2.定義消息(事件)處理器接口:當消息投遞到消息總線隊列中後,必定有消費者WebApi接收並處理這個消息,具體的處理方法邏輯在訂閱方處理器中實現,這裏先須要定義處理器的接口,便於在消息總線框架中使用。
public interface IEventHandler併發

{
    Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
}

從上面代碼能夠看出,消息(事件)處理器處理的類型就是從IEvent接口繼承的消息類。框架

3.定義消息(事件)與消息(事件)處理器關聯接口:一種類型的消息被投遞後,必定要在訂閱方找到這種消息的處理器進行處理,因此必定要定義兩者的關聯接口,這樣才能將消息與消息處理器對應起來,才能實現消息被訂閱後的處理。ssh

public interface IEventHandlerExecutionContextasync

{
    void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
    bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
    Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
}

RegisterEventHandler方法就是創建消息與消息處理器的關聯,這個方法實際上是在訂閱方使用,訂閱方告訴消息總線,什麼樣的消息應該交給個人哪一個處理器進行處理。
IsRegisterEventHandler方法是判斷消息與處理器之間是否已經存在關聯。
HandleAsync方法是經過查找到消息對應的處理器後,而後調用處理器本身的Handle方法進行消息的處理.ide

4.定義消息發佈、訂閱與消息總線接口:消息總線至少要支持兩個功能,一個是生產者可以發佈消息到個人消息總線,另外一個是訂閱方須要可以從我這個消息總線訂閱消息。
public interface IEventPublisher微服務

{
    void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
}

從上面代碼能夠看出,生產者發佈的消息仍然要從IEvent繼承的類型。
public interface IEventSubscriber

{
    void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
}

上面代碼是訂閱方用於從消息總線訂閱消息,從代碼中能夠看出,它的最終的實現其實就是創建消息與處理器之間的關聯。
public interface IEventBus:IEventPublisher,IEventSubscriber

{
}

消息(事件)總線從兩個接口繼承下來,同時支持消息的發佈與消息的訂閱。

5.實現事件基類:上面已經訂閱了消息(事件)的接口,這裏來實現事件的基類,其實就是實現消息ID與產生的時間:

public class BaseEvent : IEvent

{
    public Guid Id { get; set; }
    public DateTime CreateDate { get; set; }
    public BaseEvent()
    {
        this.Id = Guid.NewGuid();
        this.CreateDate = DateTime.Now;
    }
}

6.實現消息總線基類:消息總線底層的依賴能夠是各類消息代理產品,好比RabbitMq、Kafaka或第三方雲平臺提供的消息代理產品,一般咱們要封裝這些消息代理產品。在封裝以前,咱們須要定義頂層的消息總線基類實現,主要的目的是將來依賴於它的具體實現可替換,另外也將消息與消息處理器的關聯接口傳遞進來,便於訂閱方使用。

public abstract class BaseEventBus : IEventBus

{
    protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext;
    protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
    {
        this.eventHandlerExecutionContext = eventHandlerExecutionContext;
    }
    public abstract void Publish<TEvent>(TEvent @event)
        where TEvent : IEvent;
    public abstract void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler;
}

7.實現消息與處理器關聯:消息必須與處理器關聯,訂閱方收到特定類型的消息後,才知道交給哪一個處理器處理。

public class EventHandlerExecutionContext : IEventHandlerExecutionContext

{
    private readonly IServiceCollection registry;
    private readonly IServiceProvider serviceprovider;
    private Dictionary<Type, List<Type>> registrations = new Dictionary<Type, List<Type>>();
    public EventHandlerExecutionContext(IServiceCollection registry,Func<IServiceCollection,
        IServiceProvider> serviceProviderFactory = null)
    {
        this.registry = registry;
        this.serviceprovider = this.registry.BuildServiceProvider();
    }

   //查找消息關聯的處理器,而後調用處理器的處理方法
    public async Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        var eventtype = @event.GetType();
        if(registrations.TryGetValue(eventtype,out List<Type> handlertypes) && handlertypes.Count > 0)
        {
            using(var childscope = this.serviceprovider.CreateScope())
            {
                foreach(var handlertype in handlertypes)
                {
                    var handler = Activator.CreateInstance(handlertype) as IEventHandler;
                    await handler.HandleAsync(@event);
                }
            }
        }
    }

   //判斷消息與處理器之間是否有關聯
    public bool IsRegisterEventHandler<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler
    {
        if(registrations.TryGetValue(typeof(TEvent),out List<Type> handlertypelist))
        {
            return handlertypelist != null && handlertypelist.Contains(typeof(IEventHandler));
        }
        return false;
    }
   
  //將消息與處理器關聯起來,能夠在內存中創建關聯,也能夠創建在數據庫單獨表中
    public void RegisterEventHandler<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler
    {
        Utils.DictionaryRegister(typeof(TEvent), typeof(TEventHandler), registrations);
    }
}

上面咱們基本上就將消息總線的架子搭建起來了,也實現了基本的功能,下一章咱們基於它來實現RabbitMq的消息總線。

QQ討論羣:309287205 微服務實戰視頻請關注微信公衆號:msshcj

相關文章
相關標籤/搜索