微服務實戰(四):落地微服務架構到直銷系統(將生產者與消費者接入消息總線)

微服務實戰(四):落地微服務架構到直銷系統(將生產者與消費者接入消息總線)服務器

前一篇文章咱們已經完成了基於RabbitMq實現的的消息總線,這篇文章就來看看生產者(訂單微服務)與消費者(經銷商微服務)如何接入消息總線實現消息的發送與消息的接收處理。微信

定義須要發送的消息:架構

下單消息要被髮送到消息總線,並被經銷商微服務的處理器處理。經銷商微服務處理時,須要知道要對哪一個經銷商處理多少的PV值與電子幣餘額。這些信息就是事件消息須要承載的重要信息。app

public class OrderCreatedProcessDealerEvent:BaseEventssh

{
    public decimal OrderTotalPrice { get; set; }
    public decimal OrderTotalPV { get; set; }
    public Guid DealerId { get; set; }
    public Guid OrderId { get; set; }
    public OrderCreatedProcessDealerEvent(Guid dealerid,Guid orderid,decimal ordertotalprice,decimal
        ordertotalpv)
    {
        this.OrderTotalPrice = ordertotalprice;
        this.OrderTotalPV = ordertotalpv;
        this.DealerId = dealerid;
        this.OrderId = orderid;
    }
}

生產者(訂單微服務)鏈接到消息總線:ide

生產者-訂單微服務經過Asp.net core WebApi自帶的依賴注入,鏈接到RabbitMq消息總線。函數

services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
        var connectionFactory = new ConnectionFactory { HostName = "localhost" };
        services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
            sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 1));

從上面代碼能夠看出,生產者鏈接到了localhost的Rabbit服務器,並經過調用消息總線的構造函數,定義了發送消息的通道。構造函數具體內容能夠查看上一篇文章。微服務

生產者(訂單微服務)發送消息到消息總線:
ieventbus.Publish(new OrderCreatedProcessDealerEvent(orderdto.DealerId,ui

orderid, order.OrderTotalPrice.TotalPrice, order.OrderTotalPV.TotalPV));

ieventbus是注入到訂單微服務的構造函數中,並傳遞到訂單建立的用例中。 this

實現消費者(經銷商微服務)的消息處理器:

消費者會鏈接到消息總線,接收到特定類型的消息(這裏是OrderCreatedProcessDealerEvent),會交給特定的處理器進行處理,因此須要先定義並實現消息處理器。

public class OrderCreatedEventHandler : IEventHandler

{
    ServiceLocator servicelocator = new ServiceLocator();
    public Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        var idealercontext = servicelocator.GetService<IDealerContext>();
        var irepository =
            servicelocator.GetService<IRepository>(new ParameterOverrides { { "context", idealercontext } });
        var idealerrepository = servicelocator.GetService<IDealerRepository>(new ParameterOverrides { { "context", idealercontext } });

//先將接收到的消息轉換成特定類型
var ordercreatedevent = @event as OrderCreatedProcessDealerEvent;

using (irepository)
        {
            try
            {
              //根據消息內容,處理本身的邏輯與持久化
                idealerrepository.SubParentEleMoney(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPrice);
                idealerrepository.AddDealerPV(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPV);
                irepository.Commit();
            }
            catch (EleMoneyNotEnoughException)
            {
                 //先不處理電子幣餘額不足的狀況                   

            }
        }
        return Task.FromResult(true);
    }
}

消費者(經銷商微服務)鏈接到消息總線:

須要在經銷商微服務指定須要鏈接到的消息總線,並訂閱哪一個類型的消息交給哪一個事件處理器進行處理。

//用於偵聽訂單上下文傳遞的消息
        services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
        var connectionFactory = new ConnectionFactory { HostName = "localhost" };
        services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
            sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 2));
        var eventbus = app.ApplicationServices.GetService<IEventBus>();

//訂閱消息

eventbus.Subscribe<OrderCreatedProcessDealerEvent, OrderCreatedEventHandler>();

這樣,兩個微服務直接就能經過RabbitMq消息總線進行消息的發送、消息的接收與處理了,實現瞭解耦。

QQ討論羣:309287205 微服務實戰視頻請關注微信公衆號:msshcj

相關文章
相關標籤/搜索