使用NServiceBus開發分佈式應用

前言

NServiceBus是.Net平臺下的開源的消息服務框架,已經支持.Net Core。目前穩定版本7.1。企業開發須要購買License,開發者可在線下載開發者License。
官方網站:https://particular.net/
官方示例:https://docs.particular.net/get-started/框架

NServiceBus入門

屏幕快照 2019-10-21 21.17.54.png
如圖所示,項目一共包括4個端點(Endpoint),也就是四個單獨的項目,端點是NServiceBus中的核心概念,發送消息和事件發佈訂閱的基礎都是Endpoint。這個項目中包括髮送消息和事件的發佈訂閱。async

完整的項目結構如圖所示:
屏幕快照 2019-10-21 21.22.11.png學習

ClientUI

class Program
    {
        private static ILog log = LogManager.GetLogger<Program>();
        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }


        static async Task RunAsync(IEndpointInstance endpointInstance)
        {
            log.Info("Press 'P' to place an order,press 'Q' to quit");
            
            while (true)
            {
                
                var key = Console.ReadKey();
                Console.WriteLine();

                switch (key.Key)
                {
                    case ConsoleKey.P:
                    {
                        var command = new PlaceOrder
                        {
                            OrderId = Guid.NewGuid().ToString()
                        };
                        
                        log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}");
                        //發送到Sales端點
                        await endpointInstance.Send("Sales",command).ConfigureAwait(false);
                        break;
                    }
                    
                    case ConsoleKey.Q:
                        return;
                    default:
                        log.Info("Please try again");
                        break;
                }
                
            }
        }
        
        static async Task MainAsync()
        {
            Console.Title = "Client-UI";
            var config = new EndpointConfiguration("ClientUI");//設置端點名稱
            config.UseTransport<LearningTransport>(); //設置消息管道模式,LearningTransport僅僅用來學習,生產慎用
            config.UsePersistence<LearningPersistence>();//持久化

            var endpointInstance =await Endpoint.Start(config).ConfigureAwait(false);

            await RunAsync(endpointInstance).ConfigureAwait(false); //RunAsync返回的是Task,因此這裏使用ConfigureAwait()
            
            await endpointInstance.Stop().ConfigureAwait(false);

        }
    }

Sales

class Program
{
    static async Task Main(string[] args)
    {
        Console.Title = "Sales";

        var config = new EndpointConfiguration("Sales");
        config.UseTransport<LearningTransport>();
        config.UsePersistence<LearningPersistence>();

        var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

        Console.WriteLine("Press Enter to quit...");
        Console.ReadLine();

        await endpointInstance.Stop().ConfigureAwait(false);

    }
}

public class PlaceOrderHandler:IHandleMessages<PlaceOrder>
{
   private static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
   public Task Handle(PlaceOrder message, IMessageHandlerContext context)
   {
      //接受端點消息
      log.Info($"Received PlaceOrder ,OrderId:{message.OrderId}");

      //發佈OrderPlaced事件
      var order=new OrderPlaced();
      order.OrderId = message.OrderId;

      return context.Publish(order);
   }
}

Billing

static async Task Main(string[] args)
{
    Console.Title = "Sales";

    var config = new EndpointConfiguration("Billing");
    config.UseTransport<LearningTransport>();
    config.UsePersistence<LearningPersistence>();

    var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

    Console.WriteLine("Press Enter to quit...");
    Console.ReadLine();

    await endpointInstance.Stop().ConfigureAwait(false);
}

 public class OrderPlacedHandler:IHandleMessages<OrderPlaced>
 {
    private static ILog log = LogManager.GetLogger<OrderPlacedHandler>();
         
    public Task Handle(OrderPlaced message, IMessageHandlerContext context)
    {
       //訂閱OrderPlaced事件
       log.Info($"Received OrderPlaced,OrderId {message.OrderId} - Charging credit card"); 
            
       //發佈OrderBilled事件
       var order=new OrderBilled();
       order.OrderId = message.OrderId;
       return context.Publish(order);
     }
}

Shipping

static async Task Main(string[] args)
{
    Console.Title = "Sales";

    var config = new EndpointConfiguration("Shipping");
    config.UseTransport<LearningTransport>();
    config.UsePersistence<LearningPersistence>();

    var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

    Console.WriteLine("Press Enter to quit...");
    Console.ReadLine();

    await endpointInstance.Stop().ConfigureAwait(false);
}

public class OrderBilledHandler:IHandleMessages<OrderBilled>
{
  private static ILog log = LogManager.GetLogger<OrderBilledHandler>();
  //處理OrderBilled訂閱事件
  public Task Handle(OrderBilled message, IMessageHandlerContext context)
  {
      log.Info($"Received OrderBilled,OrderId={message.OrderId} Should we ship now?");
      return Task.CompletedTask;
  }
}

public class OrderPlacedHandler:IHandleMessages<OrderPlaced>
{
   private static ILog log = LogManager.GetLogger<OrderPlacedHandler>();
   //處理OrderPlaced訂閱事件
   public Task Handle(OrderPlaced message, IMessageHandlerContext context)
   {
       log.Info($"Received OrderPlaced,OrderId={message.OrderId} Should we ship now?");
       return Task.CompletedTask;
   }
}

運行結果

屏幕快照 2019-10-21 21.07.43.png

屏幕快照 2019-10-21 21.07.51.png

屏幕快照 2019-10-21 21.07.57.png

屏幕快照 2019-10-21 21.08.03.png

總結

      NServiceBus的核心是在端點之間通訊,通訊的實體須要實現ICommand接口,通訊的事件須要實現IEvent事件,NServiceBus會掃描實現這兩個接口的類。每一個端點之間的關鍵配置就是EndpointConfiguration。網站

相關文章
相關標籤/搜索