NServiceBus+Saga開發分佈式應用

前言

      當你在處理異步消息時,每一個單獨的消息處理程序都是一個單獨的handler,每一個handler之間互不影響。這時若是一個消息依賴另外一個消息的狀態呢? 這時業務邏輯怎麼處理?app

image.png

     借用咱們上篇文章的業務場景,若是在Ship項目裏須要發送一個ShipOrder Command。這個ShipOrder須要依賴Sales.OrderPlaced和Bill.OrderBilled Command的狀態,目前咱們的兩個單獨的Message Handler都沒有保持任何的狀態字段,因此這時若是咱們須要完成這個業務模型,就須要跟蹤他們的狀態。框架

什麼是Saga

    這個就是本篇文章要提的saga,定義在NServiceBus框架裏,他的本質是一個消息驅動模型裏的狀態機,或者也能夠理解爲一系列消息處理程序用來共享狀態的業務模型。我理解在消息隊列裏若是咱們要保證消息一致性一般會本身建立一張Event表,這裏saga維持狀態的角色有點像咱們這裏的Event表。
   
屏幕快照 2019-10-23 20.46.15.png
   好的,回到正題上,若是咱們須要在Shipping Service裏發送一個ShipOrder,發送他以前須要肯定OrderPlaced和OrderBilled的狀態,確保這兩個消息都收到之後才能發送ShipOrder。異步

如何使用Saga

   固然,我暫且理解Saga的目的是爲了處理在長時間運行的任務裏保證數據一致性這樣的一個角色。async

Saga狀態

  saga狀態主要是告訴NServiceBus在處理數據一致性的判斷邏輯,這裏須要繼承抽象類ContainSagaData,在咱們這個業務場景中則主要是判斷OrderPlaced和OrderBilled消息是否已經接收到並處理。ide

public class ShippingPolicyData:ContainSagaData
{
   public string OrderId { get; set; }
   public bool IsOrderPlaced { get; set; }
   public bool IsOrderBilled { get; set; }
}

Saga如何工做

   有了狀態之後,咱們還須要一個「handler」來告訴NServiceBus,在這個handler裏主要用來處理消息數據一致性,我看了官方文檔後,他們建議咱們這裏的handler角色使用Policy後綴命名,固然我覺的也能夠用Saga後綴命名,好比ShippingPolicy或者ShippingSaga。
   同時這裏咱們這個handler覺色還要繼承Saga 類,Saga類主要重寫方法ConfigureHowToFindSaga,這個方法的做用主要是在接受的消息和咱們的Saga實體之間創建映射關係。 this

public class ShipPolicy:Saga<ShippingPolicyData>,
        IAmStartedByMessages<OrderPlaced>,
        IAmStartedByMessages<OrderBilled> //均可以建立Saga實例
    {
        private static ILog log = LogManager.GetLogger<ShipPolicy>();

        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShippingPolicyData> mapper)
        {
            mapper.ConfigureMapping<OrderPlaced>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
            mapper.ConfigureMapping<OrderBilled>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
            
        }

        public Task Handle(OrderPlaced message, IMessageHandlerContext context)
        {
            log.Info("OrderPlaced message received ");
            this.Data.IsOrderPlaced = true;
            return ProcessOrder(context);
        }

        public Task Handle(OrderBilled message, IMessageHandlerContext context)
        {
            
            log.Info("OrderBilled message received");
            this.Data.IsOrderBilled = true;
            return ProcessOrder(context);
        }

        private async Task ProcessOrder(IMessageHandlerContext context)
        {
            if (Data.IsOrderBilled && Data.IsOrderPlaced)
            {
                await context.SendLocal(new ShipOrder()
                {
                    OrderId = Data.OrderId
                });
                
                MarkAsComplete();
            }
        }
    }

    
    這個類裏你會發現還實現了接口IAmStartedByMessages , 這個接口主要是告訴Saga,不管是那種消息類型先進來,均可以建立一個Saga實例,就好比是Event表,無論那個消息進來,都須要先插入一條數據,後續消息再進來時要更新數據狀態,固然,這裏的Saga實例也好,Event表也好,關鍵問題就是有效標識,或者叫主鍵,咱們這個業務模型裏,OrderPlaced和OrderBilled都包含一個屬性OrderId, 這裏Saga實例則使用這個OrderId作關鍵屬性。.net

發送ShipOrder Command

    到這裏也就是咱們的OrderPlaced和OrderBIlled消息都收到了,業務邏輯符合要求,能夠發送ShipOrder消息了,也就是用戶建立了訂單,付了款,能夠發貨了。
image.pngcode

新建ShipOrder類blog

public class ShipOrder:ICommand
{
    public string OrderId { get; set; }
}

新建ShipOrderHandler繼承

public class ShipOrderHandler:IHandleMessages<ShipOrder>
{
   private static ILog log = LogManager.GetLogger<ShipOrderHandler>();
   public Task Handle(ShipOrder message, IMessageHandlerContext context)
   {
       log.Info($"Order [{message.OrderId}] - Successfully shipped");
       return Task.CompletedTask;
   }
}

運行Shipping項目,看到下圖,則說明程序運行成功,咱們這個業務場景裏OrderPlaced消息確定先接受到,OrderBilled消息後接受到。
屏幕快照 2019-10-23 20.29.05.png

參考連接

https://docs.particular.net/tutorials/nservicebus-sagas/1-getting-started/
https://docs.particular.net/nservicebus/sagas/

相關文章
相關標籤/搜索