上一篇【.Net Core微服務入門全紀錄(五)——Ocelot-API網關(下)】中已經完成了Ocelot + Consul的搭建,這一篇簡單說一下EventBus。html
貼一段引用:前端
事件總線是對觀察者(發佈-訂閱)模式的一種實現。它是一種集中式事件處理機制,容許不一樣的組件之間進行彼此通訊而又不須要相互依賴,達到一種解耦的目的。android
若是沒有接觸過EventBus,可能不太好理解。其實EventBus在客戶端開發中應用很是普遍(android,ios,web前端等),用於多個組件(或者界面)之間的相互通訊,懂的人都懂。。。ios
就拿當前的項目舉例,咱們有一個訂單服務,一個產品服務。客戶端有一個下單功能,當用戶下單時,調用訂單服務的下單接口,那麼下單接口須要調用產品服務的減庫存接口,這涉及到服務與服務之間的調用。那麼服務之間又怎麼調用呢?直接RESTAPI?或者效率更高的gRPC?可能這二者各有各的使用場景,可是他們都存在一個服務之間的耦合問題,或者難以作到異步調用。git
試想一下:假設咱們下單時調用訂單服務,訂單服務須要調用產品服務,產品服務又要調用物流服務,物流服務再去調用xx服務 等等。。。若是每一個服務處理時間須要2s,不使用異步的話,那這種體驗可想而知。github
若是使用EventBus的話,那麼訂單服務只須要向EventBus發一個「下單事件」就能夠了。產品服務會訂閱「下單事件」,當產品服務收到下單事件時,本身去減庫存就行了。這樣就避免了兩個服務之間直接調用的耦合性,而且真正作到了異步調用。web
既然涉及到多個服務之間的異步調用,那麼就不得不提分佈式事務。分佈式事務並非微服務獨有的問題,而是全部的分佈式系統都會存在的問題。
關於分佈式事務,能夠查一下「CAP原則」和「BASE理論」瞭解更多。當今的分佈式系統更多的會追求事務的最終一致性。sql
下面使用國人開發的優秀項目「CAP」,來演示一下EventBus的基本使用。之因此使用「CAP」是由於它既能解決分佈式系統的最終一致性,同時又是一個EventBus,它具有EventBus的全部功能!
做者介紹:https://www.cnblogs.com/savorboard/p/cap.htmldocker
在Docker中準備一下須要的環境,首先是數據庫,數據庫我使用PostgreSQL,用別的也行。CAP支持:SqlServer,MySql,PostgreSql,MongoDB。
關於在Docker中運行PostgreSQL能夠看個人另外一篇博客:http://www.javashuo.com/article/p-kiulcfug-ma.html數據庫
而後是MQ,這裏我使用RabbitMQ,Kafka也能夠。
Docker運行RabbitMQ:
docker pull rabbitmq:management docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management
默認用戶:guest,密碼:guest
環境準備就完成了,Docker就是這麼方便。。。
爲了模擬以上業務,須要修改大量代碼,下面代碼若有遺漏的直接去github找。
NuGet安裝:
Microsoft.EntityFrameworkCore Microsoft.EntityFrameworkCore.Tools Npgsql.EntityFrameworkCore.PostgreSQL
CAP相關:
DotNetCore.CAP DotNetCore.CAP.RabbitMQ DotNetCore.CAP.PostgreSql
Order.API/Controllers/OrdersController.cs增長下單接口:
[Route("[controller]")] [ApiController] public class OrdersController : ControllerBase { private readonly ILogger<OrdersController> _logger; private readonly IConfiguration _configuration; private readonly ICapPublisher _capBus; private readonly OrderContext _context; public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context) { _logger = logger; _configuration = configuration; _capBus = capPublisher; _context = context; } [HttpGet] public IActionResult Get() { string result = $"【訂單服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" + $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}"; return Ok(result); } /// <summary> /// 下單 發佈下單事件 /// </summary> /// <param name="order"></param> /// <returns></returns> [Route("Create")] [HttpPost] public async Task<IActionResult> CreateOrder(Models.Order order) { using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true)) { //業務代碼 order.CreateTime = DateTime.Now; _context.Orders.Add(order); var r = await _context.SaveChangesAsync() > 0; if (r) { //發佈下單事件 await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID }); return Ok(); } return BadRequest(); } } }
Order.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary> /// 下單事件消息 /// </summary> public class CreateOrderMessageDto { /// <summary> /// 產品ID /// </summary> public int ProductID { get; set; } /// <summary> /// 購買數量 /// </summary> public int Count { get; set; } }
Order.API/Models/Order.cs訂單實體類:
public class Order { [Key] [DatabaseGenerated(DatabaseGeneratedOption.Identity)] public int ID { get; set; } /// <summary> /// 下單時間 /// </summary> [Required] public DateTime CreateTime { get; set; } /// <summary> /// 產品ID /// </summary> [Required] public int ProductID { get; set; } /// <summary> /// 購買數量 /// </summary> [Required] public int Count { get; set; } }
Order.API/Models/OrderContext.cs數據庫Context:
public class OrderContext : DbContext { public OrderContext(DbContextOptions<OrderContext> options) : base(options) { } public DbSet<Order> Orders { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { } }
Order.API/appsettings.json增長數據庫鏈接字符串:
"ConnectionStrings": { "OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;" }
Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext"))); //CAP services.AddCap(x => { x.UseEntityFramework<OrderContext>(); x.UseRabbitMQ("host.docker.internal"); }); }
以上是訂單服務的修改。
Product.API/Controllers/ProductsController.cs增長減庫存接口:
[Route("[controller]")] [ApiController] public class ProductsController : ControllerBase { private readonly ILogger<ProductsController> _logger; private readonly IConfiguration _configuration; private readonly ICapPublisher _capBus; private readonly ProductContext _context; public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context) { _logger = logger; _configuration = configuration; _capBus = capPublisher; _context = context; } [HttpGet] public IActionResult Get() { string result = $"【產品服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" + $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}"; return Ok(result); } /// <summary> /// 減庫存 訂閱下單事件 /// </summary> /// <param name="message"></param> /// <returns></returns> [NonAction] [CapSubscribe("order.services.createorder")] public async Task ReduceStock(CreateOrderMessageDto message) { //業務代碼 var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID); product.Stock -= message.Count; await _context.SaveChangesAsync(); } }
Product.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary> /// 下單事件消息 /// </summary> public class CreateOrderMessageDto { /// <summary> /// 產品ID /// </summary> public int ProductID { get; set; } /// <summary> /// 購買數量 /// </summary> public int Count { get; set; } }
Product.API/Models/Product.cs產品實體類:
public class Product { [Key] [DatabaseGenerated(DatabaseGeneratedOption.Identity)] public int ID { get; set; } /// <summary> /// 產品名稱 /// </summary> [Required] [Column(TypeName = "VARCHAR(16)")] public string Name { get; set; } /// <summary> /// 庫存 /// </summary> [Required] public int Stock { get; set; } }
Product.API/Models/ProductContext.cs數據庫Context:
public class ProductContext : DbContext { public ProductContext(DbContextOptions<ProductContext> options) : base(options) { } public DbSet<Product> Products { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder); //初始化種子數據 modelBuilder.Entity<Product>().HasData(new Product { ID = 1, Name = "產品1", Stock = 100 }, new Product { ID = 2, Name = "產品2", Stock = 100 }); } }
Product.API/appsettings.json增長數據庫鏈接字符串:
"ConnectionStrings": { "ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;" }
Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext"))); //CAP services.AddCap(x => { x.UseEntityFramework<ProductContext>(); x.UseRabbitMQ("host.docker.internal"); }); }
以上是產品服務的修改。
訂單服務和產品服務的修改到此就完成了,看着修改不少,其實功能很簡單。就是各自增長了本身的數據庫表,而後訂單服務增長了下單接口,下單接口會發出「下單事件」。產品服務增長了減庫存接口,減庫存接口會訂閱「下單事件」。而後客戶端調用下單接口下單時,產品服務會減去相應的庫存,功能就這麼簡單。
關於EF數據庫遷移之類的基本使用就不介紹了。使用Docker從新構建鏡像,運行訂單服務,產品服務:
docker build -t orderapi:1.1 -f ./Order.API/Dockerfile . docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060" docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061" docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062" docker build -t productapi:1.1 -f ./Product.API/Dockerfile . docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050" docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051" docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"
最後 Ocelot.APIGateway/ocelot.json 增長一條路由配置:
好了,進行到這裏,整個環境就有點複雜了。確保咱們的PostgreSQL,RabbitMQ,Consul,Gateway,服務實例都正常運行。
服務實例運行成功後,數據庫應該是這樣的:
產品表種子數據:
cap.published表和cap.received表是由CAP自動生成的,它內部是使用本地消息表+MQ來實現異步確保。
此次使用Postman做爲客戶端調用下單接口(9070是以前的Ocelot網關端口):
訂單庫published表:
訂單庫order表:
產品庫received表:
產品庫product表:
再試一下:
OK,完成。雖然功能很簡單,可是咱們實現了服務的解耦,異步調用,和最終一致性。
注意,上面的例子純粹是爲了說明EventBus的使用,實際中的下單流程絕對不會這麼作的!但願你們不要較真。。。
可能有人會說若是下單成功,可是庫存不足致使減庫存失敗了怎麼辦,是否是要回滾訂單表的數據?若是產生這種想法,說明尚未真正理解最終一致性的思想。首先下單前確定會檢查一下庫存數量,既然容許下單那麼必然是庫存充足的。這裏的事務是指:訂單保存到數據庫,和下單事件保存到cap.published表(保存到cap.published表理論上就可以發送到MQ)這兩件事情,要麼一同成功,要麼一同失敗。若是這個事務成功,那麼就能夠認爲這個業務流程是成功的,至於產品服務的減庫存是否成功那就是產品服務的事情了(理論上也應該是成功的,由於消息已經確保發到了MQ,產品服務必然會收到消息),CAP也提供了失敗重試,和失敗回調機制。
若是非要數據回滾也是能實現的,CAP的ICapPublisher.Publish方法提供一個callbackName參數,當減庫存時,能夠觸發這個回調。其本質也是經過發佈訂閱完成,這是不推薦的作法,就不詳細說了,有興趣本身研究一下。
另外,CAP沒法保證消息不重複,實際使用中須要本身考慮一下消息的重複過濾和冪等性。
這一篇內容有點多,不知道有沒有表達清楚,有問題歡迎評論交流,若有不對之處還望你們指出。
下一篇計劃寫一下受權認證相關的內容。
代碼放在:https://github.com/xiajingren/NetCoreMicroserviceDemo
未完待續...