.NET Core微服務之開源項目CAP的初步使用

Tip: 此篇已加入.NET Core微服務基礎系列文章索引html

1、CAP簡介

  下面的文字來自CAP的Wiki文檔:https://github.com/dotnetcore/CAP/wikigit

  CAP 是一個在分佈式系統中(SOA,MicroService)實現事件總線及最終一致性(分佈式事務)的一個開源的 C# 庫,她具備輕量級,高性能,易使用等特色。咱們能夠輕鬆的在基於 .NET Core 技術的分佈式系統中引入CAP,包括但限於 ASP.NET Core 和 ASP.NET Core on .NET Framework。github

  CAP 的應用場景主要有如下兩個:sql

  • 分佈式事務中的最終一致性(異步確保)的方案
  • 具備高可用性的 EventBus

  CAP 同時支持使用 RabbitMQ 或 Kafka 進行底層之間的消息發送,咱們不須要具有 RabbitMQ 或者 Kafka 的使用經驗,仍然能夠輕鬆的將CAP集成到項目中。數據庫

  CAP 目前支持使用 Sql Server,MySql,PostgreSql 數據庫的項目;api

  CAP 同時支持使用 EntityFrameworkCore 和 Dapper 的項目,能夠根據須要選擇不一樣的配置方式;網絡

  CAP的做者爲園友savorboard(楊曉東),成都地區的.NET社區領導者,棒棒噠!app

2、案例結構

  這次試驗仍然和上一篇基於MassTransit的案例同樣(實際上是我懶得再改,直接拿來複用),共有四個MicroService應用程序,當用戶下訂單時會經過CAP做爲事件總線發佈消息,做爲訂閱者的庫存和配送服務會接收到消息並消費消息。這次試驗會採用RabbitMQ做爲消息隊列,採用MSSQL做爲關係型數據庫(同時CAP也是支持MSSQL的)。異步

  準備工做:爲全部服務經過NuGet安裝CAP及其相關包async

PM> Install-Package DotNetCore.CAP

 下面是RabbitMQ的支持包

PM> Install-Package DotNetCore.CAP.RabbitMQ

 下面是MSSQL的支持包

PM> Install-Package DotNetCore.CAP.SqlServer

3、具體實現

3.1 OrderService

  (1)啓動配置:這裏主要須要給CAP指定數據庫(它會在這個數據庫中建立本地消息表Published和Received)以及使用到的消息隊列(這裏是RabbitMQ)

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        // Repository
        services.AddScoped<IOrderRepository, OrderRepository>();

        // EF DbContext
        services.AddDbContext<OrderDbContext>();

        // Dapper-ConnString
        services.AddSingleton(Configuration["DB:OrderDB"]);

        // CAP
        services.AddCap(x =>
        {
            x.UseEntityFramework<OrderDbContext>(); // EF

            x.UseSqlServer(Configuration["DB:OrderDB"]); // SQL Server

            x.UseRabbitMQ(cfg =>
            {
                cfg.HostName = Configuration["MQ:Host"];
                cfg.VirtualHost = Configuration["MQ:VirtualHost"];
                cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]);
                cfg.UserName = Configuration["MQ:UserName"];
                cfg.Password = Configuration["MQ:Password"]; 
            }); // RabbitMQ

            // Below settings is just for demo
            x.FailedRetryCount = 2;
            x.FailedRetryInterval = 5;
        });

        ......
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        app.UseMvc();

        // CAP
        app.UseCap();

        ......
    }

  (2)Controller:這裏會調用Repository去實現業務邏輯和發送消息

    [Route("api/Order")]
    public class OrderController : Controller
    {
        public IOrderRepository OrderRepository { get; }

        public OrderController(IOrderRepository OrderRepository)
        {
            this.OrderRepository = OrderRepository;
        }

        [HttpPost]
        public string Post([FromBody]OrderDTO orderDTO)
        {
            var result = OrderRepository.CreateOrderByDapper(orderDTO).GetAwaiter().GetResult();

            return result ? "Post Order Success" : "Post Order Failed";
        }
    }

  (3)Repository:這裏實現了兩種方式:EF和Dapper(基於ADO.NET),其中EF方式中不須要傳transaction(當CAP檢測到 Publish 是在EF事務區域內的時候,將使用當前的事務上下文進行消息的存儲),而基於ADO.NET方式中須要傳transaction(因爲不能獲取到事務上下文,因此須要用戶手動的傳遞事務上下文到CAP中)。

    public class OrderRepository : IOrderRepository
    {
        public OrderDbContext DbContext { get; }
        public ICapPublisher CapPublisher { get; }
        public string ConnStr { get; } // For Dapper use

        public OrderRepository(OrderDbContext DbContext, ICapPublisher CapPublisher, string ConnStr)
        {
            this.DbContext = DbContext;
            this.CapPublisher = CapPublisher;
            this.ConnStr = ConnStr;
        }

        public async Task<bool> CreateOrderByEF(IOrder order)
        {
            using (var trans = DbContext.Database.BeginTransaction())
            {
                var orderEntity = new Order()
                {
                    ID = GenerateOrderID(),
                    OrderUserID = order.OrderUserID,
                    OrderTime = order.OrderTime,
                    OrderItems = null,
                    ProductID = order.ProductID // For demo use
                };

                DbContext.Orders.Add(orderEntity);
                await DbContext.SaveChangesAsync();

                // When using EF, no need to pass transaction
                var orderMessage = new OrderMessage()
                {
                    ID = orderEntity.ID,
                    OrderUserID = orderEntity.OrderUserID,
                    OrderTime = orderEntity.OrderTime,
                    OrderItems = null,
                    ProductID = orderEntity.ProductID // For demo use
                };
                
                await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage);

                trans.Commit();
            }

            return true;
        }

        public async Task<bool> CreateOrderByDapper(IOrder order)
        {
            using (var conn = new SqlConnection(ConnStr))
            {
                conn.Open();
                using (var trans = conn.BeginTransaction())
                {
                    // business code here
                    string sqlCommand = @"INSERT INTO [dbo].[Orders](OrderID, OrderTime, OrderUserID, ProductID)
                                                                VALUES(@OrderID, @OrderTime, @OrderUserID, @ProductID)";

                    order.ID = GenerateOrderID();
                    await conn.ExecuteAsync(sqlCommand, param: new
                    {
                        OrderID = order.ID,
                        OrderTime = DateTime.Now,
                        OrderUserID = order.OrderUserID,
                        ProductID = order.ProductID
                    }, transaction: trans);

                    // For Dapper/ADO.NET, need to pass transaction
                    var orderMessage = new OrderMessage()
                    {
                        ID = order.ID,
                        OrderUserID = order.OrderUserID,
                        OrderTime = order.OrderTime,
                        OrderItems = null,
                        MessageTime = DateTime.Now,
                        ProductID = order.ProductID // For demo use
                    };

                    await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage, trans);

                    trans.Commit();
                }
            }

            return true;
        }

        private string GenerateOrderID()
        {
            // TODO: Some business logic to generate Order ID
            return Guid.NewGuid().ToString();
        }

        private string GenerateEventID()
        {
            // TODO: Some business logic to generate Order ID
            return Guid.NewGuid().ToString();
        }
    }

  這裏摘抄一段CAP wiki中關於事務的一段介紹:

  事務在 CAP 具備重要做用,它是保證消息可靠性的一個基石。 在發送一條消息到消息隊列的過程當中,若是不使用事務,咱們是沒有辦法保證咱們的業務代碼在執行成功後消息已經成功的發送到了消息隊列,或者是消息成功的發送到了消息隊列,可是業務代碼確執行失敗。

  這裏的失敗緣由多是多種多樣的,好比鏈接異常,網絡故障等等。

  只有業務代碼和CAP的Publish代碼必須在同一個事務中,纔可以保證業務代碼和消息代碼同時成功或者失敗

  換句話說,CAP會確保咱們這段邏輯中業務代碼和消息代碼都成功了,纔會真正讓事務commit。

3.2 StorageService

  (1)啓動配置:這裏主要是指定Subscriber

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        // EF DbContext
        services.AddDbContext<StorageDbContext>();

        // Dapper-ConnString
        services.AddSingleton(Configuration["DB:StorageDB"]);

        // Subscriber
        services.AddTransient<IOrderSubscriberService, OrderSubscriberService>();

        // CAP
        services.AddCap(x =>
        {
            x.UseEntityFramework<StorageDbContext>(); // EF

            x.UseSqlServer(Configuration["DB:StorageDB"]); // SQL Server

            x.UseRabbitMQ(cfg =>
            {
                cfg.HostName = Configuration["MQ:Host"];
                cfg.VirtualHost = Configuration["MQ:VirtualHost"];
                cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]);
                cfg.UserName = Configuration["MQ:UserName"];
                cfg.Password = Configuration["MQ:Password"];
            }); // RabbitMQ

            // Below settings is just for demo
            x.FailedRetryCount = 2;
            x.FailedRetryInterval = 5;
        });

        ......
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        app.UseMvc();

        // CAP
        app.UseCap();

        ......
    }

  (2)實現Subscriber

  首先定義一個接口,建議放到公共類庫中

    public interface IOrderSubscriberService
    {
        Task ConsumeOrderMessage(OrderMessage message);
    }

  而後實現這個接口,記得讓其實現ICapSubscribe接口,而後咱們就可使用 CapSubscribeAttribute 來訂閱 CAP 發佈出來的消息。

    public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe
    {
        private readonly string _connStr;
        
        public OrderSubscriberService(string connStr)
        {
            _connStr = connStr;
        }

        [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)]
        public async Task ConsumeOrderMessage(OrderMessage message)
        {
            await Console.Out.WriteLineAsync($"[StorageService] Received message : {JsonHelper.SerializeObject(message)}");
            await UpdateStorageNumberAsync(message);
        }

        private async Task<bool> UpdateStorageNumberAsync(OrderMessage order)
        {
            //throw new Exception("test"); // just for demo use
            using (var conn = new SqlConnection(_connStr))
            {
                string sqlCommand = @"UPDATE [dbo].[Storages] SET StorageNumber = StorageNumber - 1
                                                                WHERE StorageID = @ProductID";

                int count = await conn.ExecuteAsync(sqlCommand, param: new
                {
                    ProductID = order.ProductID
                });

                return count > 0;
            }
        }
    }

  *.CAP約定消息端在方法實現的過程當中須要實現冪等性,所謂冪等性就是指用戶對於同一操做發起的一次請求或者屢次請求的結果是一致的,不會由於屢次點擊而產生了反作用。這裏我沒有考慮,實際中須要首先進行驗證,避免二次更新

3.3 DeliveryService

  (1)啓動配置:與StorageService高度相似,只是使用的不是同一個數據庫

  (2)實現Subscriber

    public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe
    {
        private readonly string _connStr;

        public OrderSubscriberService(string connStr)
        {
            _connStr = connStr;
        }

        [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)]
        public async Task ConsumeOrderMessage(OrderMessage message)
        {
            await Console.Out.WriteLineAsync($"[DeliveryService] Received message : {JsonHelper.SerializeObject(message)}");
            await AddDeliveryRecordAsync(message);
        }

        private async Task<bool> AddDeliveryRecordAsync(OrderMessage order)
        {
            //throw new Exception("test"); // just for demo use
            using (var conn = new SqlConnection(_connStr))
            {
                string sqlCommand = @"INSERT INTO [dbo].[Deliveries] (DeliveryID, OrderID, ProductID, OrderUserID, CreatedTime)
                                                            VALUES (@DeliveryID, @OrderID, @ProductID, @OrderUserID, @CreatedTime)";

                int count = await conn.ExecuteAsync(sqlCommand, param: new
                {
                    DeliveryID = Guid.NewGuid().ToString(),
                    OrderID = order.ID,
                    OrderUserID = order.OrderUserID,
                    ProductID = order.ProductID,
                    CreatedTime = DateTime.Now
                });

                return count > 0;
            }
        }
    }

3.4 快速測試

  (1)啓動3個微服務,Check 數據庫表狀態

  

  首先會看到在各個數據庫中均建立了本地消息表,這兩個表的含義以下:

  Cap.Published:這個表主要是用來存儲 CAP 發送到MQ(Message Queue)的客戶端消息,也就是說你使用 ICapPublisher 接口 Publish 的消息內容。

  Cap.Received:這個表主要是用來存儲 CAP 接收到 MQ(Message Queue) 的客戶端訂閱的消息,也就是使用 CapSubscribe[] 訂閱的那些消息。

  

  而後看看各個表的數據,目前只有庫存表有數據,由於咱們要作的只是更新。

  (2)經過Postman發一個Post請求

  

  (3)Check控制檯輸出的日誌信息

  

  

  (4)Check數據庫中的業務表和消息表數據:能夠看到發送者和接收者都執行成功了,若是其中任何一個參與者發生了異常或者鏈接不上,CAP會有默認的重試機制(默認是50次最大重試次數,每次重試間隔60s),當失敗總次數達到默認失敗總次數後,就不會進行重試了,咱們能夠在 Dashboard 中查看消息失敗的緣由,而後進行人工重試處理。

  

  

  另外,因爲CAP會在數據庫中建立消息表,所以不免會考慮到其性能。CAP提供了一個數據清理的機制,默認狀況下會每隔一個小時將消息表的數據進行清理刪除,避免數據量過多致使性能的下降。清理規則爲 ExpiresAt (字段名)不爲空而且小於當前時間的數據。

4、小結

  本篇首先簡單介紹了一下CAP這個開源項目,而後基於上一篇中的下訂單的小案例來進行了基於CAP的改造,並經過一個實例的運行來看到告終果。固然,這個實例並不完美,不少點都沒有考慮(好比消息端消費時的冪等性)和失敗重試的場景實踐等等等等。因爲時間和精力的關係,目前只使用到這兒,之後有機會可以應用上會研究下CAP的源碼,最後感謝楊曉東爲.NET社區帶來了一個優秀的開源項目!

示例代碼

  Click Here => 點我點我

參考資料

  CAP - GitHub : https://github.com/dotnetcore/CAP

  CAP - Wiki : https://github.com/dotnetcore/CAP/wiki

  楊曉東,《BASE:一種ACID的替代方案

 

相關文章
相關標籤/搜索