咱們先來看看CQRS架構,你對下圖的架構還有印象嗎?每一個組件的功能都還清楚嗎?若是有疑問,請查考文章《微服務實戰(五):落地微服務架構到直銷系統(構建高性能大併發系統)》。html
前一篇文章已經實現了Event Store的基礎功能部分,本篇文章咱們經過C端的標準方式,實現一個下單的高併發命令端,來看看須要實現的具體流程:前端
1.前端用戶調用一個下單Command WebApi,傳遞下單命令;下單Command WebApi接受到下單命令後,將下單命令數據投遞到一個命令隊列中,向前端用戶返回一個信息。web
2.下單Command Handler WebApi偵聽命令隊列中的下單命令,而後調用領域對象邏輯,將執行的結果也就是Order對象的當前狀態持久化到Event Store中。json
3.下單Command Handler WebApi將下單相關信息經過事件的方式發佈到一個事件隊列中。(用戶事件處理器最終將訂單信息更新到業務庫中)api
下面經過代碼簡單體現下過程:微信
1.定義建立訂單命令:架構
public class CreateOrderCommand:BaseEvent { public OrderDTO orderdto { get; set; } public CreateOrderCommand() { } public CreateOrderCommand(OrderDTO orderdto) { this.orderdto = orderdto; this.AggregationRootId = Guid.NewGuid(); this.AssemblyQualifiedAggreateRooType = typeof(Orders).AssemblyQualifiedName; this.AssemblyQualifiedCommandAndEventType = this.GetType().AssemblyQualifiedName; } }
2.訂單 Command WebApi接受前端用戶傳遞的訂單命令: 併發
[Produces("application/json")] [Route("api/Order")] public class OrderController : Controller { private readonly IEventBus commandbus; public OrderController(IEventBus commandbus) { this.commandbus = commandbus; } [HttpPost] [Route("CreateOrderCmd")] public ResultEntity<bool> CreateOrderCmd([FromBody] OrderDTO orderdto) { var result = new ResultEntity<bool>(); try { var createordercommand = new CreateOrderCommand(orderdto); //發佈命令到命令總線 commandbus.Publish(createordercommand); result.IsSuccess = true; result.Msg = "下單處理中!"; } catch(Exception error) { result.ErrorCode = 200; result.Msg = error.Message; } return result; } }
固然須要定義要注入的命令總線:app
public void ConfigureServices(IServiceCollection services) { services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); //定義要發佈命令的命令總線 services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services)); var connectionFactory = new ConnectionFactory { HostName = "localhost" }; services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory, sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange1", "direct", "ordercreatecommandqueue", 1)); }
3.訂單Command Handler Web Api偵聽訂單命令,並訂閱須要處理訂單的訂單命令處理器:ide
//偵聽訂單建立命令隊列裏的消息 services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services)); var connectionFactory = new ConnectionFactory { HostName = "localhost" }; services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory, sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange1", "direct", "ordercreatecommandqueue", 2));
//訂閱建立訂單命令 var commandbuss = app.ApplicationServices.GetServices<IEventBus>(); var commandbus = commandbuss.ToList()[0]; commandbus.Subscribe<CreateOrderCommand, OrderCreateCommandHandler>();
4.實現訂單命令處理器:
public class OrderCreateCommandHandler : IEventHandler { private readonly IServiceProvider iserviceprovider; public OrderCreateCommandHandler() { var iwebhost = FindIWebHost.GetWwebHost("OrderCommandHandler.WebApi"); iserviceprovider = iwebhost.Services; } public Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent { var orderdtocommand = @event as CreateOrderCommand; var orderdto = orderdtocommand.orderdto; var orderid = orderdtocommand.AggregationRootId; Orders order = new Orders(); var productskus = new List<ProductSKU>(); for (int i = 0; i < orderdto.ProductSPUNames.Count; i++) { var productsku = new ProductSKU(); productsku.ProductSPUName = orderdto.ProductSPUNames[i]; productsku.DealerPrice = orderdto.ProductDealerPrices[i]; productsku.PV = orderdto.ProductPVS[i]; productsku.Id = orderdto.ProductSKUIds[i]; productsku.Spec = orderdto.ProductSepcs[i]; productskus.Add(productsku); } var contact = new Contact(); contact.ContactName = orderdto.ContactName; contact.ContactTel = orderdto.ContactTel; contact.Province = orderdto.Privence; contact.City = orderdto.City; contact.Zero = orderdto.Zero; contact.Street = orderdto.Street; //完成業務邏輯 var orders = order.CreateOrders(orderid, orderdto.DealerId, productskus, orderdto.Counts, contact); var ordercreateevent = new OrderCreateEvent(); ordercreateevent.AggregationRootId = orders.Id; ordercreateevent.AssemblyQualifiedAggreateRooType = orderdtocommand.AssemblyQualifiedAggreateRooType; ordercreateevent.AssemblyQualifiedCommandAndEventType = orderdtocommand.AssemblyQualifiedCommandAndEventType; ordercreateevent.CreateDate = orders.OrderDateTime; ordercreateevent.Id = orders.Id; ordercreateevent.OrderDateTime = orders.OrderDateTime; ordercreateevent.OrderDealerId = orders.OrderDealerId; ordercreateevent.OrderItems = orders.OrderItems; ordercreateevent.OrderStreet = orders.OrderStreet; ordercreateevent.OrderTotalPrice = orders.OrderTotalPrice; ordercreateevent.OrderTotalPV = orders.OrderTotalPV; ordercreateevent.Code = orders.Code; ordercreateevent.Telephone = orders.Telephone; ordercreateevent.Version = 0; //對建立訂單事件持久化事件存儲 try { new DomainAndEventStorage().SaveEvent(ordercreateevent); var eventbuss = iserviceprovider.GetServices(typeof(IEventBus)) as IEnumerable<IEventBus>; var eventbusls = eventbuss.ToList(); var eventbus = eventbusls[1]; //發佈到事件隊列,用於將來持久化到業務庫中 eventbus.Publish(ordercreateevent); } catch(Exception error) { throw error; } return Task.FromResult(true); } }
QQ討論羣:309287205
微服務實戰視頻請關注微信公衆號: