CAP-微服務間通訊實踐

微服務間通訊常見的兩種方式

因爲微服務架構慢慢被更多人使用後,迎面而來的問題是如何作好微服務間通訊的方案。咱們先分析下目前最經常使用的兩種服務間通訊方案。git

gRPC(rpc遠程調用)

gRPC-微服務間通訊實踐github

  • 場景:A服務主動發起請求到B服務,同步方式
  • 範圍:只在微服務間通訊應用

EventBus(基於消息隊列的集成事件)

  • 技術:NotNetCore.Cap + Rabbitmq + Database
  • 場景:A服務要在B服務作某件事情後響應,異步方式
  • 實現:B服務在完成某件事情後發佈消息,A服務訂閱此消息
  • 範圍:只在微服務間通訊應用

經過對比,兩種方式徹底不同。rpc是相似於http請求的及時響應機制,可是比http更輕量、快捷,它更像之前的微軟的WCF,能夠自動生成客戶端代碼,充分體現了面向實體對象的遠程調用的思想;Eventbus是異步的消息機制,基於cap的思想,不關心下游訂閱方服務是否消費成功,保障了主服務業務的流暢性,同時也是一款分佈式事務的實現方案,能夠保障分佈式架構中的數據的最終一致性。web

咱們今天主要介紹CAP在微服務中的實踐案例。sql

搭建框架介紹

新建項目

  1. 新建解決方案 DotNetCore.Cap.Demo
  2. 新建項目 DotNetCore.Cap.Demo.Publisher 消息發佈端
  3. 新建項目 DotNetCore.Cap.Demo.Subscriber 消息訂閱端

主要sdk

  • 項目框架 netcoreapp 3.1
  • 消息隊列選用RabbitMQ
  • 數據庫存儲選用PostgreSql

根據實際狀況選擇合適的消息隊列和數據庫存儲docker

CAP 支持 Kafka、RabbitMQ、AzureServiceBus 消息隊列:數據庫

PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus

CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 做爲數據庫存儲:api

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB

本次demo使用的nuget包以下所示架構

$ dotnet list package
項目「DotNetCore.Cap.Demo.Publisher」具備如下包引用
   [netcoreapp3.1]: 
   頂級包                                                          已請求      已解決   
   > DotNetCore.CAP                                             3.1.1    3.1.1
   > DotNetCore.CAP.PostgreSql                                  3.1.1    3.1.1
   > DotNetCore.CAP.RabbitMQ                                    3.1.1    3.1.1
   > Microsoft.VisualStudio.Azure.Containers.Tools.Targets      1.10.9   1.10.9
   > Npgsql.EntityFrameworkCore.PostgreSQL                      3.1.4    3.1.4
   > Npgsql.EntityFrameworkCore.PostgreSQL.Design               1.1.0    1.1.0

項目「DotNetCore.Cap.Demo.Subscriber」具備如下包引用
   [netcoreapp3.1]:
   頂級包                                                          已請求      已解決
   > DotNetCore.CAP                                             3.1.1    3.1.1
   > DotNetCore.CAP.PostgreSql                                  3.1.1    3.1.1
   > DotNetCore.CAP.RabbitMQ                                    3.1.1    3.1.1
   > Microsoft.VisualStudio.Azure.Containers.Tools.Targets      1.10.9   1.10.9
   > Npgsql.EntityFrameworkCore.PostgreSQL                      3.1.4    3.1.4
   > Npgsql.EntityFrameworkCore.PostgreSQL.Design               1.1.0    1.1.0

DotNetCore.Cap.Demo.Publisher 消息發佈端

修改Startup文件

注入dotnetcore.cap組件及數據庫上下文app

services.AddDbContext<PgDbContext>(p => p.UseNpgsql("數據庫鏈接字符串"));

    services.AddCap(x =>
            {
                //rabbitmq在docker運行時,須要映射兩個端口:5672和15672
                //5672供程序集訪問
                //15672供web訪問
                //默認用戶名和密碼爲:guest/guest
                x.UseRabbitMQ(p =>
                {
                    p.HostName = "localhost";
                    p.Port = 5672;
                    p.UserName = "guest";
                    p.Password = "guest";
                });
                x.UseEntityFramework<PgDbContext>();
                //x.FailedRetryCount = 1;
                //x.UseDashboard();
            });

新建Controller做爲Publisher

  • 構造函數注入DotNetCore.CAP.ICapPublisher

提供了同步和異步的消息發佈方法框架

  • 發佈字符串消息

await _capPublisher.PublishAsync("消息名稱", "消息內容");

[HttpGet("string")]
        public async Task<IActionResult> PublishString()
        {
            await _capPublisher.PublishAsync("sample.rabbitmq.demo.string", "this is text!");
            return Ok();
        }
  • 發佈對象

await _capPublisher.PublishAsync("消息名稱", "消息對象");

[HttpGet("dynamic")]
        public async Task<IActionResult> PublishDynamic()
        {
            await _capPublisher.PublishAsync("sample.rabbitmq.demo.dynamic", new
            {
                Name = "xiao gou",
                Age = 18
            });
            return Ok();
        }
  • 分佈式事務場景

當須要在數據庫操做後,發佈消息出去,DotNetCore.Cap也提供了分佈式事務的解決方案。它擴展的transcation能保證只有在數據庫操做和消息發送都完成後,才提交Commit

[HttpGet("transcation")]
        public async Task<IActionResult> PublishWithTranscation()
        {
            using (var trans = _pgDbContext.Database.BeginTransaction(_capPublisher))
            {
               
                    var apiConfig = new ApiConfig
                    {
                        ApiName = "111122",
                        ApiDesc = "223",
                        ReturnType = "1",
                        ReturnExpect = "1",
                        IsAsync = true,
                        OperCode = "999",
                        OperTime = DateTime.Now
                    };

                    await _pgDbContext.ApiConfig.AddAsync(apiConfig);
                    await _pgDbContext.SaveChangesAsync();

                    _capPublisher.Publish("sample.rabbitmq.demo.transcation", apiConfig);

                    trans.Commit();
               
                    return Ok();
            }
        }

DotNetCore.Cap.Demo.Subscriber 消息訂閱端

修改Startup文件

注入dotnetcore.cap組件及數據庫上下文

services.AddDbContext<PgDbContext>(p => p.UseNpgsql("數據庫鏈接字符串"));

    services.AddCap(x =>
            {
                //rabbitmq在docker運行時,須要映射兩個端口:5672和15672
                //5672供程序集訪問
                //15672供web訪問
                //默認用戶名和密碼爲:guest/guest
                x.UseRabbitMQ(p =>
                {
                    p.HostName = "localhost";
                    p.Port = 5672;
                    p.UserName = "guest";
                    p.Password = "guest";
                });
                x.UseEntityFramework<PgDbContext>();
                //x.FailedRetryCount = 1;
                //x.UseDashboard();
            });

新建Controller做爲Subscriber

  • 訂閱字符串消息

[NonAction] 標籤:Indicates that a controller method is not an action method.

[CapSubscribe] 標籤:標誌此方法爲訂閱方法,並以消息名稱匹配發布端的消息事件

[NonAction]
        [CapSubscribe("sample.rabbitmq.demo.string")]
        public void SubscriberString(string text)
        {
            Console.WriteLine($"【SubscriberString】Subscriber invoked, Info: {text}");
        }
  • 訂閱對象消息

方法入參person即爲消息體

[NonAction]
        [CapSubscribe("sample.rabbitmq.demo.dynamic")]
        public void SubscriberDynamic(dynamic person)
        {
            Console.WriteLine($"【SubscriberDynamic】Subscriber invoked, Info: {person.Name} {person.Age}");
        }

新建Service做爲Subscriber

  • 除了Controller能夠做爲消息訂閱端外,也能夠用繼承自DotNetCore.CAP.ICapSubscribe接口的Service做爲訂閱端
  • Controller做爲訂閱者時,不用繼承ICapSubscribe;Service做爲訂閱者時,必須繼承ICapSubscribe
  • Controller和Service同時訂閱了一個消息時,只觸發了Service的消費;若要多個消費,須要在不一樣的Group下
using DotNetCore.CAP;
using System;

namespace DotNetCore.Cap.Demo.Subscriber.Services
{
    /// <summary>
    /// 消費訂閱服務
    /// </summary>
    public class SubscriberService : ICapSubscribe
    {
        
        [CapSubscribe("sample.rabbitmq.demo.string")]
        public void SubscriberString(string text)
        {
            Console.WriteLine($"【SubscriberString】Subscriber invoked, Info: {text}");
        }

        [CapSubscribe("sample.rabbitmq.demo.dynamic")]
        public void SubscriberDynamic(dynamic person)
        {
            Console.WriteLine($"【SubscriberDynamic】Subscriber invoked, Info: {person.Name} {person.Age}");
        }

        [CapSubscribe("sample.rabbitmq.demo.object")]
        public void SubscriberObject(Person person)
        {
            Console.WriteLine($"【SubscriberObject】Subscriber invoked, Info: {person.Name} {person.Age}");
        }

        [CapSubscribe("sample.rabbitmq.demo.trans")]
        public void SubscriberTrans(ApiConfig apiConfig)
        {
            Console.WriteLine($"【SubscriberTrans】Subscriber invoked, Info: {apiConfig.Id}");
        }
    }
}

總結

  • DotNetCore.Cap 是一種異步消息的通訊,能夠做爲微服務間通訊的一種方式
  • DotNetCore.Cap 爲微服務架構提供了分佈式事務的解決方案,保障了數據的最終一致性
  • 開發中,應根據實際業務需求和場景,選擇合適可靠的微服務間通訊方案

參考

https://github.com/dotnetcore/CAP/blob/master/README.md
https://book.douban.com/subject/33425123/

Demo 代碼

ReadMe Card

相關文章
相關標籤/搜索