因爲微服務架構慢慢被更多人使用後,迎面而來的問題是如何作好微服務間通訊的方案。咱們先分析下目前最經常使用的兩種服務間通訊方案。git
gRPC-微服務間通訊實踐github
經過對比,兩種方式徹底不同。rpc是相似於http請求的及時響應機制,可是比http更輕量、快捷,它更像之前的微軟的WCF,能夠自動生成客戶端代碼,充分體現了面向實體對象的遠程調用的思想;Eventbus是異步的消息機制,基於cap的思想,不關心下游訂閱方服務是否消費成功,保障了主服務業務的流暢性,同時也是一款分佈式事務的實現方案,能夠保障分佈式架構中的數據的最終一致性。web
咱們今天主要介紹CAP在微服務中的實踐案例。sql
根據實際狀況選擇合適的消息隊列和數據庫存儲docker
CAP 支持 Kafka、RabbitMQ、AzureServiceBus 消息隊列:數據庫
PM> Install-Package DotNetCore.CAP.Kafka PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.AzureServiceBus
CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 做爲數據庫存儲:api
PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql PM> Install-Package DotNetCore.CAP.PostgreSql PM> Install-Package DotNetCore.CAP.MongoDB
本次demo使用的nuget包以下所示架構
$ dotnet list package 項目「DotNetCore.Cap.Demo.Publisher」具備如下包引用 [netcoreapp3.1]: 頂級包 已請求 已解決 > DotNetCore.CAP 3.1.1 3.1.1 > DotNetCore.CAP.PostgreSql 3.1.1 3.1.1 > DotNetCore.CAP.RabbitMQ 3.1.1 3.1.1 > Microsoft.VisualStudio.Azure.Containers.Tools.Targets 1.10.9 1.10.9 > Npgsql.EntityFrameworkCore.PostgreSQL 3.1.4 3.1.4 > Npgsql.EntityFrameworkCore.PostgreSQL.Design 1.1.0 1.1.0 項目「DotNetCore.Cap.Demo.Subscriber」具備如下包引用 [netcoreapp3.1]: 頂級包 已請求 已解決 > DotNetCore.CAP 3.1.1 3.1.1 > DotNetCore.CAP.PostgreSql 3.1.1 3.1.1 > DotNetCore.CAP.RabbitMQ 3.1.1 3.1.1 > Microsoft.VisualStudio.Azure.Containers.Tools.Targets 1.10.9 1.10.9 > Npgsql.EntityFrameworkCore.PostgreSQL 3.1.4 3.1.4 > Npgsql.EntityFrameworkCore.PostgreSQL.Design 1.1.0 1.1.0
注入dotnetcore.cap組件及數據庫上下文app
services.AddDbContext<PgDbContext>(p => p.UseNpgsql("數據庫鏈接字符串")); services.AddCap(x => { //rabbitmq在docker運行時,須要映射兩個端口:5672和15672 //5672供程序集訪問 //15672供web訪問 //默認用戶名和密碼爲:guest/guest x.UseRabbitMQ(p => { p.HostName = "localhost"; p.Port = 5672; p.UserName = "guest"; p.Password = "guest"; }); x.UseEntityFramework<PgDbContext>(); //x.FailedRetryCount = 1; //x.UseDashboard(); });
提供了同步和異步的消息發佈方法框架
await _capPublisher.PublishAsync("消息名稱", "消息內容");
[HttpGet("string")] public async Task<IActionResult> PublishString() { await _capPublisher.PublishAsync("sample.rabbitmq.demo.string", "this is text!"); return Ok(); }
await _capPublisher.PublishAsync("消息名稱", "消息對象");
[HttpGet("dynamic")] public async Task<IActionResult> PublishDynamic() { await _capPublisher.PublishAsync("sample.rabbitmq.demo.dynamic", new { Name = "xiao gou", Age = 18 }); return Ok(); }
當須要在數據庫操做後,發佈消息出去,DotNetCore.Cap也提供了分佈式事務的解決方案。它擴展的transcation能保證只有在數據庫操做和消息發送都完成後,才提交Commit
[HttpGet("transcation")] public async Task<IActionResult> PublishWithTranscation() { using (var trans = _pgDbContext.Database.BeginTransaction(_capPublisher)) { var apiConfig = new ApiConfig { ApiName = "111122", ApiDesc = "223", ReturnType = "1", ReturnExpect = "1", IsAsync = true, OperCode = "999", OperTime = DateTime.Now }; await _pgDbContext.ApiConfig.AddAsync(apiConfig); await _pgDbContext.SaveChangesAsync(); _capPublisher.Publish("sample.rabbitmq.demo.transcation", apiConfig); trans.Commit(); return Ok(); } }
注入dotnetcore.cap組件及數據庫上下文
services.AddDbContext<PgDbContext>(p => p.UseNpgsql("數據庫鏈接字符串")); services.AddCap(x => { //rabbitmq在docker運行時,須要映射兩個端口:5672和15672 //5672供程序集訪問 //15672供web訪問 //默認用戶名和密碼爲:guest/guest x.UseRabbitMQ(p => { p.HostName = "localhost"; p.Port = 5672; p.UserName = "guest"; p.Password = "guest"; }); x.UseEntityFramework<PgDbContext>(); //x.FailedRetryCount = 1; //x.UseDashboard(); });
[NonAction] 標籤:Indicates that a controller method is not an action method.
[CapSubscribe] 標籤:標誌此方法爲訂閱方法,並以消息名稱匹配發布端的消息事件
[NonAction] [CapSubscribe("sample.rabbitmq.demo.string")] public void SubscriberString(string text) { Console.WriteLine($"【SubscriberString】Subscriber invoked, Info: {text}"); }
方法入參person即爲消息體
[NonAction] [CapSubscribe("sample.rabbitmq.demo.dynamic")] public void SubscriberDynamic(dynamic person) { Console.WriteLine($"【SubscriberDynamic】Subscriber invoked, Info: {person.Name} {person.Age}"); }
using DotNetCore.CAP; using System; namespace DotNetCore.Cap.Demo.Subscriber.Services { /// <summary> /// 消費訂閱服務 /// </summary> public class SubscriberService : ICapSubscribe { [CapSubscribe("sample.rabbitmq.demo.string")] public void SubscriberString(string text) { Console.WriteLine($"【SubscriberString】Subscriber invoked, Info: {text}"); } [CapSubscribe("sample.rabbitmq.demo.dynamic")] public void SubscriberDynamic(dynamic person) { Console.WriteLine($"【SubscriberDynamic】Subscriber invoked, Info: {person.Name} {person.Age}"); } [CapSubscribe("sample.rabbitmq.demo.object")] public void SubscriberObject(Person person) { Console.WriteLine($"【SubscriberObject】Subscriber invoked, Info: {person.Name} {person.Age}"); } [CapSubscribe("sample.rabbitmq.demo.trans")] public void SubscriberTrans(ApiConfig apiConfig) { Console.WriteLine($"【SubscriberTrans】Subscriber invoked, Info: {apiConfig.Id}"); } } }
https://github.com/dotnetcore/CAP/blob/master/README.md
https://book.douban.com/subject/33425123/