以前待了7年的公司倒閉,終於找了一份真正的程序員工做,總算體驗了996的感受,如今項目接近尾聲了,總算有點時間下寫博客了。找工做時看到中高級工程師都要求熟練\精通掌握RabbitMQ跟CAP,而作爲中級開發工程師的我意識到,不得不學,這幾天找了時間學習了下。程序員
如下個人理解說法不知規不規範,只是用我最通俗的理解寫出來數據庫
RabbitMQ是一種底層隊列的實現(Kafka也是一種隊列),CAP提供了一種通用隊列發佈、訂閱使用方法。服務器
能夠理解爲SqlServer、MySql跟EF的關係吧(EF比做爲CAP),你不經過EF,也能夠用SqlClient相關類來使用SqlServer,但EF提供一種通用的代碼使用方法,一樣的代碼能夠同時用於SqlServer、MySql,代碼使用者不用關心我底層是用了SqlServer仍是MySql。分佈式
RabbitMQ簡單介紹微服務
我先說下RabbitMQ原生的使用方法,而後再說下怎麼跟CAP結合性能
說到隊列,先理解下如下幾個概念學習
消息從發佈到訂閱的流程步驟:測試
生產者發佈消息給交換器(傳遞一個key值),交換器在它綁定的隊列中根據key值及交換器模式找到匹配的隊列發送消息,訂閱了此隊列的消費者就能夠獲取消息進行處理,並返回應答;spa
交換器模式code
topic匹配規則:隊列的key爲TestRouteKey.#,能夠匹配到 TestRouteKey.A.B,隊列的key爲TestRouteKey.*,能夠匹配到TestRouteKey.A
如下是須要注意理解的點
如下演示,能夠建立兩個控制檯程序,而後在Main裏面寫相關代碼進行測試
生產者發佈消息代碼
安裝包 RabbitMQ.Client
//鏈接工廠 var factory = new ConnectionFactory(){ UserName="", Password="", HostName="", Port=0 }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //聲明交換器,模式爲direct channel.ExchangeDeclare("exchangeName","direct"); //聲明隊列 channel.QueueDeclare("queueName",durable:true); //將交換器跟隊列進行綁定 channel.QueueBind("queueName","exchangeName","routeKey",null); //發佈消息 channel.BasicPublish("exchangeName","routeKey",null,Encoding.UTF8.GetBytes("hello world")); channel.Close(); connection.Close();
消費者訂閱消息
var factory = new ConnectionFactory { UserName = "", Password = "", HostName = "", }; //建立個鏈接 var connection = factory.CreateConnection(); //建立個通道 var channel = connection.CreateModel(); var consumer = new EventingBasicConsumer(channel); //定義事件消費者,及消費接收事件(返回應答) consumer.Received += (o, e) => { var message = Encoding.UTF8.GetString(e.Body.ToArray()); Console.WriteLine($"收到消息:{message}"); channel.BasicAck(e.DeliveryTag, false); }; //啓動消費者,第二個參數是表明是否自動應答,false就得手動調用BasicAck方法 channel.BasicConsume("hello", false, consumer); Console.WriteLine("消費者已啓動"); Console.ReadKey(); channel.Close(); connection.Close();
CAP簡單介紹
上面簡單的介紹完RabbitMQ使用方法,下面再來簡單說下CAP是幹什麼的
CAP可用於微服務分佈式事務解決方案,就是能夠搭建不一樣站點,使用CAP,鏈接同一個RabbitMQ,部署在不一樣的服務器上,實現分佈式部署。
那要實現CAP,須要一個數據庫來記錄事件,須要一個隊列來存放事件消息。
CAP更詳情的文檔可查看它的官網,重點有中文的 http://cap.dotnetcore.xyz/
建立一個WebApi初始項目來演示一下。
安裝包 DotNetCore.CAP
安裝包DotNetCore.CAP.SqlServer,這是提供Sqlserver來記錄事件的包
安裝包 DotNetCore.CAP.RabbitMQ,這是提供RabbitMQ來存放事件消息的包
安裝包 DotNetCore.CAP.Dashboard,這是提供一個Web管理後臺可查看發佈、訂閱消息狀況
在Startup.cs的ConfigureServices方法中注入
services.AddCap(o=>{ o.UseSqlServer(""); o.UseRabbitMQ(mq => { mq.HostName = "";//RabbitMQ服務器地址 mq.Port=5672; mq.UserName = "admin"; mq.Password = "admin"; }); o.UseDashboard(); //添加監控儀表盤,經過http://localhost/cap訪問 o.FailedRetryInterval = 30;//失敗後的重拾間隔,默認60秒 o.FailedRetryCount = 10;//失敗後的重試次數,默認50次;在FailedRetryInterval默認60秒的狀況下,即默認重試50*60秒(50分鐘)以後放棄失敗重試 o.SucceedMessageExpiredAfter = 60 * 60; //設置成功信息的刪除時間默認24*3600秒 });
而後在Controllers目錄下建立一個測試控制器
[ApiController] [Route("[controller]/[action]")] public class TestController : ControllerBase { private readonly ICapPublisher _capPublisher; public TestController(ICapPublisher capPublisher) { _capPublisher = capPublisher; } [HttpPost] public void Test1() { //發佈消息,消息被訂閱處理後,會回調到Test.Callback _capPublisher.Publish<string>("Test.Event", "Hello,World","Test.Callback"); } [NonAction] [CapSubscribe("Test.Event")] //訂閱Test.Event事件 public string Test2(string message) { //進行訂閱消息處理 Console.WriteLine(message); return "OK"; } [NonAction] [CapSubscribe("Test.Callback")] public void TestCallback(string result) { //發佈消息完成後的回調 Console.WriteLine(result); } }
好了,上面就簡單的介紹了RabbitMQ跟CAP的使用方法,原本還在想這些東西適用於哪些場景,而後今天項目上線後出現問題了,裏面涉及到兩個系統的調用,一個系統A由於接口被頻繁地調用超時,致使另外一個系統B一直顯示出錯,我就發現這個場景就很適合用這個CAP了。
系統A的崩潰不該影響到系統B,而系統A崩潰時也能夠自動進行重試,當系統B發佈消息後,也不用等待系統A,顯示處理中,等系統A處理成功後再通知系統B,B再顯示成功就能夠了。