CAP是由咱們園子裏的楊曉東大神開發出來的一套分佈式事務的決絕方案,是.Net Core Community中的第一個千星項目(目前已經1656 Star),具備輕量級、易使用、高性能等特色。html
https://github.com/dotnetcore/CAPgit
本博客主要針對易用性這一點,展開敘述,一塊兒看看CAP如何結合EF Core和RabbitMQ帶領小白輕鬆走入分佈式消息隊列的世界。github
首先,你須要搭建一套RabbitMQ系統,搭建過程在此再也不敘述,若是你們以爲麻煩,能夠用我搭好的。sql
HostName: coderayu.cn UserName:guest Password:guest (僅僅可用做實驗,數據丟失不負責)數據庫
建立Asp.Net Core 項目,並引入Nuget包服務器
你能夠運行如下下命令在你的項目中安裝 CAP。網絡
PM> Install-Package DotNetCore.CAP
若是你的消息隊列使用的是 Kafka 的話,你能夠:app
PM> Install-Package DotNetCore.CAP.Kafka
若是你的消息隊列使用的是 RabbitMQ 的話,你能夠:分佈式
PM> Install-Package DotNetCore.CAP.RabbitMQ
CAP 提供了 Sql Server, MySql, PostgreSQL 的擴展做爲數據庫存儲:函數
// 按需選擇安裝你正在使用的數據庫 PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql PM> Install-Package DotNetCore.CAP.PostgreSql
由於我採用的是EF Core,因此首先要建立一個DbContext上下文,代碼以下:
public class CapDbContext:DbContext { public CapDbContext(DbContextOptions options) : base(options) { } }
首先須要在ConfigureServices函數中進行相關服務的注入,對應的操做和功能解釋以下:
public void ConfigureServices(IServiceCollection services) { //注入DbContext上下文,若是用的是Mysql可能還須要添加Pomelo.EntityFrameworkCore.MySql這個Nuget包 services.AddDbContext<CapDbContext>(options => options.UseMySql("Server=127.0.0.1;Database=testcap;UserId=root;Password=123456;")); //配置CAP services.AddCap(x => { x.UseEntityFramework<CapDbContext>(); //啓用操做面板 x.UseDashboard(); //使用RabbitMQ x.UseRabbitMQ(rb => { //rabbitmq服務器地址 rb.HostName = "coderayu.cn"; rb.UserName = "guest"; rb.Password = "guest"; //指定Topic exchange名稱,不指定的話會用默認的 rb.ExchangeName = "cap.text.exchange"; }); //設置處理成功的數據在數據庫中保存的時間(秒),爲保證系統新能,數據會按期清理。 x.SucceedMessageExpiredAfter = 24 * 3600; //設置失敗重試次數 x.FailedRetryCount = 5; }); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); }
最後還要再Congiure中啓用CAP中間件
public void Configure(IApplicationBuilder app, IHostingEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } //啓用cap中間件 app.UseCap(); app.UseMvc(); }
再程序包管理控制檯中依此輸入如下命令行
PM> Add-Migration Init
PM> update-database
若是成成功執行,那麼打開數據庫,就能夠看到用來存儲CAP發送和接收數據的表格了。
表格中每列的含義以下:
咱們直接在ValuesController的基礎上進行改造。
在 Controller 中注入 ICapPublisher
而後使用 ICapPublisher
進行消息發送
private readonly ICapPublisher _publisher; public ValuesController(ICapPublisher publisher) { _publisher = publisher; }
發送消息
[HttpGet] public string Get(string message) { //"cap.test.queue"是發送的消息RouteKey,能夠理解爲消息管道的名稱 _publisher.Publish("cap.test.queue",message); return "發送成功"; }
訂閱消息
//"cap.test.queue"爲發送消息時的RauteKey,也能夠模糊匹配 //詳情https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html [CapSubscribe("cap.test.queue")] public void HandleMessage(string message) { Console.Write(DateTime.Now.ToString()+"收到消息:"+message); }
啓動程序後,首先看到CAP啓動成功
緊隨其後,消費者也就是咱們的訂閱方法在RabbitMQ服務器上註冊成功。
發送消息,發送成功,以下
發送後,當即在控制檯看到了訂閱方法輸出的結果。
在訂閱方法中,若是拋出異常,那麼CAP就會認爲該條消息處理失敗,會自動進行重試,重試次數在前方已經進行了配置。
咱們把訂閱方法作一個改動,打印接收的信息到控制檯中,並拋出異常
//"cap.test.queue"爲發送消息時的RauteKey,也能夠模糊匹配 //詳情https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html [CapSubscribe("cap.test.queue")] public void HandleMessage(string message) { Console.WriteLine(DateTime.Now.ToString()+"收到消息:"+message); throw new Exception("測試失敗重試"); }
能夠看到,當即進行了三次重試
但是在前面,咱們設置的失敗重試次數是5次,爲何這裏只重試三次嗎?是否是要叫曉東過來改BUG了呢?固然不是。
觀察發現,CAP重試的前三次是當即進行的,然後面的重試,是每隔一段時間進行的,當在分佈式通信的過程當中,可能出現了問題確實不會當即修復解決,可能過了必定時間,系統就自動恢復了,如網絡抖動。
發送成功了五條消息,成功接收處理了三條,兩條處理失敗,處理失敗的任務,咱們能夠直接在面板中進行從新消費,可謂很是方便。
同時,處理失敗的消息,點擊消息的編號後,能夠查看到消息的內容和異常緣由。
CAP如此強大,讓消息隊列這種高大上產品操做So Easy,學會了CAP,也能夠吹牛說,我也懂分佈式任務處理啦。
感謝曉東開發出如此強大的項目,同時感謝.Net Core Community。
參考 CAP Github wiki
https://github.com/dotnetcore/CAP/wiki
本博客Demo代碼