微服務實戰(四):落地微服務架構到直銷系統(將生產者與消費者接入消息總線)

前一篇文章咱們已經完成了基於RabbitMq實現的的消息總線,這篇文章就來看看生產者(訂單微服務)與消費者(經銷商微服務)如何接入消息總線實現消息的發送與消息的接收處理。服務器

 

定義須要發送的消息:微信

下單消息要被髮送到消息總線,並被經銷商微服務的處理器處理。經銷商微服務處理時,須要知道要對哪一個經銷商處理多少的PV值與電子幣餘額。這些信息就是事件消息須要承載的重要信息。app

public class OrderCreatedProcessDealerEvent:BaseEvent
    {
        public decimal OrderTotalPrice { get; set; }
        public decimal OrderTotalPV { get; set; }
        public Guid DealerId { get; set; }
        public Guid OrderId { get; set; }
        public OrderCreatedProcessDealerEvent(Guid dealerid,Guid orderid,decimal ordertotalprice,decimal
            ordertotalpv)
        {
            this.OrderTotalPrice = ordertotalprice;
            this.OrderTotalPV = ordertotalpv;
            this.DealerId = dealerid;
            this.OrderId = orderid;
        }
    }

 

生產者(訂單微服務)鏈接到消息總線:ide

生產者-訂單微服務經過Asp.net core WebApi自帶的依賴注入,鏈接到RabbitMq消息總線。函數

            services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
            var connectionFactory = new ConnectionFactory { HostName = "localhost" };
            services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
                sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 1));

從上面代碼能夠看出,生產者鏈接到了localhost的Rabbit服務器,並經過調用消息總線的構造函數,定義了發送消息的通道。構造函數具體內容能夠查看上一篇文章。微服務

 

生產者(訂單微服務)發送消息到消息總線:ui

ieventbus.Publish(new OrderCreatedProcessDealerEvent(orderdto.DealerId,
                        orderid, order.OrderTotalPrice.TotalPrice, order.OrderTotalPV.TotalPV));

ieventbus是注入到訂單微服務的構造函數中,並傳遞到訂單建立的用例中。 this

 

實現消費者(經銷商微服務)的消息處理器:idea

消費者會鏈接到消息總線,接收到特定類型的消息(這裏是OrderCreatedProcessDealerEvent),會交給特定的處理器進行處理,因此須要先定義並實現消息處理器。spa

public class OrderCreatedEventHandler : IEventHandler
    {
        ServiceLocator servicelocator = new ServiceLocator();
        public Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
        {
            var idealercontext = servicelocator.GetService<IDealerContext>();
            var irepository =
                servicelocator.GetService<IRepository>(new ParameterOverrides { { "context", idealercontext } });
            var idealerrepository = servicelocator.GetService<IDealerRepository>(new ParameterOverrides { { "context", idealercontext } });
  //先將接收到的消息轉換成特定類型          
var ordercreatedevent = @event as OrderCreatedProcessDealerEvent;
            using (irepository)
            {
                try
                {
                  //根據消息內容,處理本身的邏輯與持久化
                    idealerrepository.SubParentEleMoney(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPrice);
                    idealerrepository.AddDealerPV(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPV);
                    irepository.Commit();
                }
                catch (EleMoneyNotEnoughException)
                {
                     //先不處理電子幣餘額不足的狀況                   

                }
            }
            return Task.FromResult(true);
        }
    }

 

消費者(經銷商微服務)鏈接到消息總線:

須要在經銷商微服務指定須要鏈接到的消息總線,並訂閱哪一個類型的消息交給哪一個事件處理器進行處理。

            //用於偵聽訂單上下文傳遞的消息
            services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
            var connectionFactory = new ConnectionFactory { HostName = "localhost" };
            services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
                sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 2));
            var eventbus = app.ApplicationServices.GetService<IEventBus>();
//訂閱消息
                        eventbus.Subscribe<OrderCreatedProcessDealerEvent, OrderCreatedEventHandler>();

這樣,兩個微服務直接就能經過RabbitMq消息總線進行消息的發送、消息的接收與處理了,實現瞭解耦。

 

QQ討論羣:309287205 

微服務實戰視頻請關注微信公衆號:

相關文章
相關標籤/搜索