前一篇文章咱們已經完成了基於RabbitMq實現的的消息總線,這篇文章就來看看生產者(訂單微服務)與消費者(經銷商微服務)如何接入消息總線實現消息的發送與消息的接收處理。服務器
定義須要發送的消息:微信
下單消息要被髮送到消息總線,並被經銷商微服務的處理器處理。經銷商微服務處理時,須要知道要對哪一個經銷商處理多少的PV值與電子幣餘額。這些信息就是事件消息須要承載的重要信息。app
public class OrderCreatedProcessDealerEvent:BaseEvent { public decimal OrderTotalPrice { get; set; } public decimal OrderTotalPV { get; set; } public Guid DealerId { get; set; } public Guid OrderId { get; set; } public OrderCreatedProcessDealerEvent(Guid dealerid,Guid orderid,decimal ordertotalprice,decimal ordertotalpv) { this.OrderTotalPrice = ordertotalprice; this.OrderTotalPV = ordertotalpv; this.DealerId = dealerid; this.OrderId = orderid; } }
生產者(訂單微服務)鏈接到消息總線:ide
生產者-訂單微服務經過Asp.net core WebApi自帶的依賴注入,鏈接到RabbitMq消息總線。函數
services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services)); var connectionFactory = new ConnectionFactory { HostName = "localhost" }; services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory, sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 1));
從上面代碼能夠看出,生產者鏈接到了localhost的Rabbit服務器,並經過調用消息總線的構造函數,定義了發送消息的通道。構造函數具體內容能夠查看上一篇文章。微服務
生產者(訂單微服務)發送消息到消息總線:ui
ieventbus.Publish(new OrderCreatedProcessDealerEvent(orderdto.DealerId, orderid, order.OrderTotalPrice.TotalPrice, order.OrderTotalPV.TotalPV));
ieventbus是注入到訂單微服務的構造函數中,並傳遞到訂單建立的用例中。 this
實現消費者(經銷商微服務)的消息處理器:idea
消費者會鏈接到消息總線,接收到特定類型的消息(這裏是OrderCreatedProcessDealerEvent),會交給特定的處理器進行處理,因此須要先定義並實現消息處理器。spa
public class OrderCreatedEventHandler : IEventHandler { ServiceLocator servicelocator = new ServiceLocator(); public Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent { var idealercontext = servicelocator.GetService<IDealerContext>(); var irepository = servicelocator.GetService<IRepository>(new ParameterOverrides { { "context", idealercontext } }); var idealerrepository = servicelocator.GetService<IDealerRepository>(new ParameterOverrides { { "context", idealercontext } }); //先將接收到的消息轉換成特定類型 var ordercreatedevent = @event as OrderCreatedProcessDealerEvent; using (irepository) { try { //根據消息內容,處理本身的邏輯與持久化 idealerrepository.SubParentEleMoney(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPrice); idealerrepository.AddDealerPV(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPV); irepository.Commit(); } catch (EleMoneyNotEnoughException) { //先不處理電子幣餘額不足的狀況 } } return Task.FromResult(true); } }
消費者(經銷商微服務)鏈接到消息總線:
須要在經銷商微服務指定須要鏈接到的消息總線,並訂閱哪一個類型的消息交給哪一個事件處理器進行處理。
//用於偵聽訂單上下文傳遞的消息 services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services)); var connectionFactory = new ConnectionFactory { HostName = "localhost" }; services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory, sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 2));
var eventbus = app.ApplicationServices.GetService<IEventBus>(); //訂閱消息 eventbus.Subscribe<OrderCreatedProcessDealerEvent, OrderCreatedEventHandler>();
這樣,兩個微服務直接就能經過RabbitMq消息總線進行消息的發送、消息的接收與處理了,實現瞭解耦。
QQ討論羣:309287205
微服務實戰視頻請關注微信公衆號: