RabbitMQ跟CAP簡單入門

以前待了7年的公司倒閉,終於找了一份真正的程序員工做,總算體驗了996的感受,如今項目接近尾聲了,總算有點時間下寫博客了。找工做時看到中高級工程師都要求熟練\精通掌握RabbitMQ跟CAP,而作爲中級開發工程師的我意識到,不得不學,這幾天找了時間學習了下。程序員

如下個人理解說法不知規不規範,只是用我最通俗的理解寫出來數據庫

RabbitMQ是一種底層隊列的實現(Kafka也是一種隊列),CAP提供了一種通用隊列發佈、訂閱使用方法。服務器

能夠理解爲SqlServer、MySql跟EF的關係吧(EF比做爲CAP),你不經過EF,也能夠用SqlClient相關類來使用SqlServer,但EF提供一種通用的代碼使用方法,一樣的代碼能夠同時用於SqlServer、MySql,代碼使用者不用關心我底層是用了SqlServer仍是MySql。分佈式

RabbitMQ簡單介紹微服務

我先說下RabbitMQ原生的使用方法,而後再說下怎麼跟CAP結合性能

說到隊列,先理解下如下幾個概念學習

  • 生產者:也能夠說是發佈者,主要是發佈消息,發送給交換器;
  • 消費者:也能夠說是訂閱者,從隊列中訂閱消息進行處理並返回應答;
  • 交換器:可能鏈接多個隊列,將生產者發佈的消息發送到隊列中;
  • 隊列:存放生產者產生的消息,供消費者進行訂閱處理;

消息從發佈到訂閱的流程步驟:測試

生產者發佈消息給交換器(傳遞一個key值),交換器在它綁定的隊列中根據key值及交換器模式找到匹配的隊列發送消息,訂閱了此隊列的消費者就能夠獲取消息進行處理,並返回應答;spa

交換器模式code

  • direct 消息發送到RouteKey徹底匹配的隊列中
  • fanout 消息轉發到交換器綁定的全部隊列中
  • topic 消息發送到RouteKey模糊匹配的隊列中
  • header 會用headers屬性來進行匹配,性能最差(實際使用中不多)

topic匹配規則:隊列的key爲TestRouteKey.#,能夠匹配到 TestRouteKey.A.B,隊列的key爲TestRouteKey.*,能夠匹配到TestRouteKey.A

如下是須要注意理解的點

  • 生產者也能夠直接發佈消息到隊列中;
  • 若是交換器沒有綁定任何隊列,那發佈的消息將直接丟棄;
  • 一個消息只能被一個消費者獲取,要實現消息同時被多個消費者獲取,要使用交換器綁定多個隊列;
  • 消費者獲取消息後,若是處理過程當中失敗了沒有返回應答,那消息會在隊列中從新發送;

如下演示,能夠建立兩個控制檯程序,而後在Main裏面寫相關代碼進行測試

生產者發佈消息代碼

安裝包 RabbitMQ.Client

//鏈接工廠
var factory = new ConnectionFactory(){
    UserName="",
    Password="",
    HostName="",
    Port=0
};
//建立鏈接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();
//聲明交換器,模式爲direct
channel.ExchangeDeclare("exchangeName","direct");
//聲明隊列
channel.QueueDeclare("queueName",durable:true);
//將交換器跟隊列進行綁定
channel.QueueBind("queueName","exchangeName","routeKey",null);
//發佈消息
channel.BasicPublish("exchangeName","routeKey",null,Encoding.UTF8.GetBytes("hello world"));

channel.Close();
connection.Close();

 

消費者訂閱消息

var factory = new ConnectionFactory
{
    UserName = "",
    Password = "",
    HostName = "",
};
//建立個鏈接
var connection = factory.CreateConnection();
//建立個通道
var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
//定義事件消費者,及消費接收事件(返回應答)
consumer.Received += (o, e) =>
{
    var message = Encoding.UTF8.GetString(e.Body.ToArray());
    Console.WriteLine($"收到消息:{message}");
    channel.BasicAck(e.DeliveryTag, false);

};
//啓動消費者,第二個參數是表明是否自動應答,false就得手動調用BasicAck方法
channel.BasicConsume("hello", false, consumer);

Console.WriteLine("消費者已啓動");
Console.ReadKey();
channel.Close();
connection.Close();

 

CAP簡單介紹

上面簡單的介紹完RabbitMQ使用方法,下面再來簡單說下CAP是幹什麼的

CAP可用於微服務分佈式事務解決方案,就是能夠搭建不一樣站點,使用CAP,鏈接同一個RabbitMQ,部署在不一樣的服務器上,實現分佈式部署。

那要實現CAP,須要一個數據庫來記錄事件,須要一個隊列來存放事件消息。

CAP更詳情的文檔可查看它的官網,重點有中文的 http://cap.dotnetcore.xyz/

建立一個WebApi初始項目來演示一下。

安裝包 DotNetCore.CAP

安裝包DotNetCore.CAP.SqlServer,這是提供Sqlserver來記錄事件的包

安裝包 DotNetCore.CAP.RabbitMQ,這是提供RabbitMQ來存放事件消息的包

安裝包 DotNetCore.CAP.Dashboard,這是提供一個Web管理後臺可查看發佈、訂閱消息狀況

在Startup.cs的ConfigureServices方法中注入

services.AddCap(o=>{
    o.UseSqlServer("");
    o.UseRabbitMQ(mq => {
        mq.HostName = "";//RabbitMQ服務器地址
        mq.Port=5672;
        mq.UserName = "admin";
        mq.Password = "admin";
    });
    o.UseDashboard(); //添加監控儀表盤,經過http://localhost/cap訪問

    o.FailedRetryInterval = 30;//失敗後的重拾間隔,默認60秒
    o.FailedRetryCount = 10;//失敗後的重試次數,默認50次;在FailedRetryInterval默認60秒的狀況下,即默認重試50*60秒(50分鐘)以後放棄失敗重試
    o.SucceedMessageExpiredAfter = 60 * 60; //設置成功信息的刪除時間默認24*3600秒
});

 

而後在Controllers目錄下建立一個測試控制器

[ApiController]
[Route("[controller]/[action]")]
public class TestController : ControllerBase
{
    private readonly ICapPublisher _capPublisher;
    public TestController(ICapPublisher capPublisher)
    {

        _capPublisher = capPublisher;
    }
    [HttpPost]
    public void Test1()
    {
        //發佈消息,消息被訂閱處理後,會回調到Test.Callback
        _capPublisher.Publish<string>("Test.Event", "Hello,World","Test.Callback");
    }
    [NonAction]
    [CapSubscribe("Test.Event")] //訂閱Test.Event事件
    public string Test2(string message)
    {
        //進行訂閱消息處理
        Console.WriteLine(message);
        return "OK";
    }

    [NonAction]
    [CapSubscribe("Test.Callback")]
    public void TestCallback(string result)
    {
        //發佈消息完成後的回調
        Console.WriteLine(result);
    }
}

 

好了,上面就簡單的介紹了RabbitMQ跟CAP的使用方法,原本還在想這些東西適用於哪些場景,而後今天項目上線後出現問題了,裏面涉及到兩個系統的調用,一個系統A由於接口被頻繁地調用超時,致使另外一個系統B一直顯示出錯,我就發現這個場景就很適合用這個CAP了。

系統A的崩潰不該影響到系統B,而系統A崩潰時也能夠自動進行重試,當系統B發佈消息後,也不用等待系統A,顯示處理中,等系統A處理成功後再通知系統B,B再顯示成功就能夠了。

相關文章
相關標籤/搜索