當你在處理異步消息時,每一個單獨的消息處理程序都是一個單獨的handler,每一個handler之間互不影響。這時若是一個消息依賴另外一個消息的狀態呢? 這時業務邏輯怎麼處理?app
借用咱們上篇文章的業務場景,若是在Ship項目裏須要發送一個ShipOrder Command。這個ShipOrder須要依賴Sales.OrderPlaced和Bill.OrderBilled Command的狀態,目前咱們的兩個單獨的Message Handler都沒有保持任何的狀態字段,因此這時若是咱們須要完成這個業務模型,就須要跟蹤他們的狀態。框架
這個就是本篇文章要提的saga,定義在NServiceBus框架裏,他的本質是一個消息驅動模型裏的狀態機,或者也能夠理解爲一系列消息處理程序用來共享狀態的業務模型。我理解在消息隊列裏若是咱們要保證消息一致性一般會本身建立一張Event表,這裏saga維持狀態的角色有點像咱們這裏的Event表。
好的,回到正題上,若是咱們須要在Shipping Service裏發送一個ShipOrder,發送他以前須要肯定OrderPlaced和OrderBilled的狀態,確保這兩個消息都收到之後才能發送ShipOrder。異步
固然,我暫且理解Saga的目的是爲了處理在長時間運行的任務裏保證數據一致性這樣的一個角色。async
saga狀態主要是告訴NServiceBus在處理數據一致性的判斷邏輯,這裏須要繼承抽象類ContainSagaData,在咱們這個業務場景中則主要是判斷OrderPlaced和OrderBilled消息是否已經接收到並處理。ide
public class ShippingPolicyData:ContainSagaData { public string OrderId { get; set; } public bool IsOrderPlaced { get; set; } public bool IsOrderBilled { get; set; } }
有了狀態之後,咱們還須要一個「handler」來告訴NServiceBus,在這個handler裏主要用來處理消息數據一致性,我看了官方文檔後,他們建議咱們這裏的handler角色使用Policy後綴命名,固然我覺的也能夠用Saga後綴命名,好比ShippingPolicy或者ShippingSaga。
同時這裏咱們這個handler覺色還要繼承Saga
public class ShipPolicy:Saga<ShippingPolicyData>, IAmStartedByMessages<OrderPlaced>, IAmStartedByMessages<OrderBilled> //均可以建立Saga實例 { private static ILog log = LogManager.GetLogger<ShipPolicy>(); protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShippingPolicyData> mapper) { mapper.ConfigureMapping<OrderPlaced>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId); mapper.ConfigureMapping<OrderBilled>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId); } public Task Handle(OrderPlaced message, IMessageHandlerContext context) { log.Info("OrderPlaced message received "); this.Data.IsOrderPlaced = true; return ProcessOrder(context); } public Task Handle(OrderBilled message, IMessageHandlerContext context) { log.Info("OrderBilled message received"); this.Data.IsOrderBilled = true; return ProcessOrder(context); } private async Task ProcessOrder(IMessageHandlerContext context) { if (Data.IsOrderBilled && Data.IsOrderPlaced) { await context.SendLocal(new ShipOrder() { OrderId = Data.OrderId }); MarkAsComplete(); } } }
這個類裏你會發現還實現了接口IAmStartedByMessages
到這裏也就是咱們的OrderPlaced和OrderBIlled消息都收到了,業務邏輯符合要求,能夠發送ShipOrder消息了,也就是用戶建立了訂單,付了款,能夠發貨了。
code
新建ShipOrder類blog
public class ShipOrder:ICommand { public string OrderId { get; set; } }
新建ShipOrderHandler繼承
public class ShipOrderHandler:IHandleMessages<ShipOrder> { private static ILog log = LogManager.GetLogger<ShipOrderHandler>(); public Task Handle(ShipOrder message, IMessageHandlerContext context) { log.Info($"Order [{message.OrderId}] - Successfully shipped"); return Task.CompletedTask; } }
運行Shipping項目,看到下圖,則說明程序運行成功,咱們這個業務場景裏OrderPlaced消息確定先接受到,OrderBilled消息後接受到。
https://docs.particular.net/tutorials/nservicebus-sagas/1-getting-started/
https://docs.particular.net/nservicebus/sagas/