大話RabbitMQ 基礎入門

----------寫在前面----------

近些年微服務愈來愈火,讓我也忍不住想去一窺微服務究竟,講到微服務,就離不開分佈式,而分佈式,也離不開消息隊列,在消息隊列中,RabbitMQ能夠說是比較具備表明性的一款。網絡

 

這裏是一篇介紹消息隊列以及各類消息隊列產品對比的文章,講得很好,有興趣的能夠看一看。分佈式

https://cloud.tencent.com/developer/article/1006035微服務

 

在講RabbitMQ以前,首先須要在電腦上安裝和配置RabbitMQ,網絡上已經有不少這類文章,若是懶得去搜索,能夠看看這篇介紹如何安裝配置RabbitMQ的文章。ui

 

http://www.javashuo.com/article/p-uznunwyh-ho.htmlspa

 

其中,在安裝RabbitMQ的過程當中,遇到了一個坑,在啓用RabbltMQ的管理界面執行.net

rabbitmq-plugins enable rabbitmq_managementcode

命令時,出現瞭如下這樣的報錯blog

 

能夠在該指令前加上 .\ 即rabbitmq

.\rabbitmq-plugins enable rabbitmq_management隊列

 

祝安裝順利 !!

 -------正文------

基本概念

下面是在.Net中使用RabbitMQ要明白的一些名詞概念。

 

 

綜上所訴,他們之間的關係能夠用我下面的 醜圖 表示。

 

在圖中,沒有吧Routing key畫出。Producer每一次發送消息,除了發出消息自己,還會隨着消息帶上一個routingKey,並且每一次將Exchange和Queue綁定,大致須要三個參數,

string queueName, string exchangeName, string routingKey

其中也有一個routingKey,但此RoutingKey非彼Routingkey。

大白話

對這個過程,咱們能夠理解爲國家給災區發送救災物資,國家給當地政府劃撥物資的時候,會規定,誰才能拿到這批物資,如(房子倒了的.家裏有人受傷了的.家庭經濟困難的)。

 

而當地政府在分配這批物資以前,爲了方便物資的分配,會給每一個家庭貼上一個標籤,如

 

家庭A 經濟困難

家庭B 房子倒了.經濟困難

家庭C 家庭富有.房子倒了

家庭D 房子倒了的.家裏有人受傷了的.家庭經濟困難的

 

因此,發送消息時候的routingKey就是國家規定的那批物質分配規則。

而Exchange和Queue綁定時的RoutingKey能夠理解爲當地政府給每一個家庭貼上的一個標籤。

 

 

Exchange(交換機)轉發消息的規則也有不少種:direct, topic, headers(不經常使用) 和 fanout,咱們稱之爲交換類型。

 

咱們能夠把Exchange理解爲分配這批物質的政府,如今國家規定了宏觀的分配方向(發送消息時的routingKey),每一個家庭也有了家庭狀況的標籤(綁定Exchange時的routingKey),可是這個物資具體怎麼分,仍是當地政府說了算。

 

Direct 嚴格按照國家規定來,只有房子倒了的,家裏有人受傷了的並且家庭經濟困難的才能分到救災物資。    家庭D能分到

 

Fanout 只要是災區的居民都能分到, 無論家庭狀況如何。 家庭A\B\C\D都能分到

 

Topic 主題匹配: 只要家庭狀況在國家規定分配規則內的,都能分到物資,可是家庭C分不到,由於他家太有錢了,這個條件不在國家的分配規則裏。家庭A\B\D能分到

 

因此,咱們在聲明一個Exchange(交換機)的同時,還要指定該交換機的類型,即(當地政府怎麼來分救災物資)

 

其實,用這個例子,我是想說,生產者和消費者之間,就像國家與難民之間同樣,國家只知道,我要幫助難民,可是難民有誰,物資能不能分到難民手裏,還得當地政府說了算,你就說我這個例子恰不恰當吧!哈哈😄

 好了,懂了概念,咱們再來結合具體例子看看。

Fanout

Producer.cs的代碼

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //建立鏈接工廠
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //建立鏈接
            using (var connection = factory.CreateConnection())
            {
                //建立會話
                using (var chancel = connection.CreateModel())
                {
                    //生命交換機
                    chancel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //給交換機發送消息
                        chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body);
                        Console.WriteLine($"成功發送消息{readMsg}");
                        Console.WriteLine("請輸入要發送的內容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

Customer.cs代碼

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //建立鏈接工廠
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //建立鏈接
            using (var connection = factory.CreateConnection())
            {
                //建立會話
                using (var channel = connection.CreateModel())
                {
                    //聲明一個Fanout類型的交換機
                    channel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout);

                    //聲明一個消息隊列並獲取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    //把消息隊列和交換機綁定
                    channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: "");

                    //建立消費者
                    var consume = new EventingBasicConsumer(channel);

                    //把消費者和隊列綁定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

 

在上面的代碼中,不管是在生產者的發送消息裏

                         //給交換機發送消息

                        chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body);

 

仍是消費者所在的Queue的綁定裏

                 //把消息隊列和交換機綁定

                    channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: "");

 

咱們都沒有制定routingKey,由於沒我的都能獲取消息,因此此處,聲明routingKey就沒有意義了。

咱們看看運行效果。

運行了三個消費者,當生產者發出消息時,三個消費者都收到了相同的消息。能夠理解爲廣播模式。(Customer單詞拼寫錯了,圖片修改不方便,就不改了,你們將就一下)

Direct

Direct時嚴格匹配的,只有隊列綁定的RoutingKey與生產者發送消息時指定的RoutingKey徹底相同,才能接收成功。

Producer.cs

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //建立鏈接工廠
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //建立鏈接
            using (var connection = factory.CreateConnection())
            {
                //建立會話
                using (var chancel = connection.CreateModel())
                {
                    //生命交換機
                    chancel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //給交換機發送消息
                        chancel.BasicPublish(exchange: "DirectDemo", routingKey: "Direct.Key", body: body);
                        Console.WriteLine($"成功發送消息{readMsg}");
                        Console.WriteLine("請輸入要發送的內容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

我把Exchange的類型更改成Direct類型,而且發送消息的routingKey設置爲Direct.Key。

而後咱們來定義消費者

Customer.CS

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //建立鏈接工廠
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //建立鏈接
            using (var connection = factory.CreateConnection())
            {
                //建立會話
                using (var channel = connection.CreateModel())
                {
                    //聲明一個Fanout類型的交換機
                    channel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct);

                    //聲明一個消息隊列並獲取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    Console.WriteLine("請輸入RoutingKey!");
                    var routingKey = Console.ReadLine();
                    //把消息隊列和交換機綁定
                    channel.QueueBind(exchange: "DirectDemo", queue: queueName, routingKey: routingKey);

                    //建立消費者
                    var consume = new EventingBasicConsumer(channel);

                    //把消費者和隊列綁定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    Console.WriteLine("開始監聽消息");

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

消費者的RoutingKey再控制檯輸入,

運行效果以下:

能夠看到,只有RoutingKey爲Direct.Key的消費者才收到了生產者發出的消息。

Topic

RabbitMQ 中的 RouteKey 支持綁定鍵表達式寫法,有兩種主要的綁定鍵:

*(星號)能夠代替一個單詞.

# (井號) 能夠代替0個或多個單詞.

好比在下面這個圖中(P爲發送者,X爲RabbitMQ中的Exchange,C爲消費者,Q爲隊列)

在這個示例中,咱們將發送一條關於動物描述的消息,也就是說 Name(routeKey) 字段中的內容包含 3 個單詞。第一個單詞是描述速度的(celerity),第二個單詞是描述顏色的(colour),第三個是描述哪一種動物的(species),它們組合起來相似:「..」。

而後在使用 CapSubscribe 綁定的時候,Q1綁定爲 CapSubscribe["*.orange.*"], Q2 綁定爲CapSubscribe["*.*.rabbit"] 和 [CapSubscribe["lazy.#]

那麼,當發送一個名爲 "quick.orange.rabbit" 消息的時候,這兩個隊列將會同時收到該消息。一樣名爲 lazy.orange.elephant的消息也會被同時收到。另外,名爲 "quick.orange.fox" 的消息將僅會被髮送到Q1隊列,名爲 "lazy.brown.fox" 的消息僅會被髮送到Q2。"lazy.pink.rabbit" 僅會被髮送到Q2一次,即便它被綁定了2次。"quick.brown.fox" 沒有匹配到任何綁定的隊列,因此它將會被丟棄。

另一種狀況,若是你違反約定,好比使用 4個單詞進行組合,例如 "quick.orange.male.rabbit",那麼它將匹配不到任何的隊列,消息將會被丟棄。

可是,假如你的消息名爲 "lazy.orange.male.rabbit",那麼他們將會被髮送到Q2,由於 #(井號)能夠匹配 0 或者多個單詞。

咱們結合代碼來看一看。

Producer.cs

using System;
using System.Text;
using RabbitMQ.Client;

namespace _2_Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //建立鏈接工廠
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //建立鏈接
            using (var connection = factory.CreateConnection())
            {
                //建立會話
                using (var chancel = connection.CreateModel())
                {
                    //生命交換機
                    chancel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic);

                    string readMsg = "helloWorld";
                    while (readMsg.ToLower() != "exit")
                    {
                        var body = Encoding.UTF8.GetBytes(readMsg);

                        //給交換機發送消息
                        chancel.BasicPublish(exchange: "TopicDemo", routingKey: "Topic.Demo.Key", body: body);
                        Console.WriteLine($"成功發送消息{readMsg}");
                        Console.WriteLine("請輸入要發送的內容!");
                        readMsg = Console.ReadLine();
                    }
                }
            }
        }
    }
}

我給發送消息的routingKey指定爲Topic.Demo.Key

再來看看消費者

Cuustomer.cs

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace _2_Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            //建立鏈接工廠
            var factory = new ConnectionFactory() { HostName = "localhost" };

            //建立鏈接
            using (var connection = factory.CreateConnection())
            {
                //建立會話
                using (var channel = connection.CreateModel())
                {
                    //聲明一個Fanout類型的交換機
                    channel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic);

                    //聲明一個消息隊列並獲取它的名字
                    var queueName = channel.QueueDeclare().QueueName;

                    Console.WriteLine("請輸入RoutingKey!");
                    var routingKey = Console.ReadLine();
                    //把消息隊列和交換機綁定
                    channel.QueueBind(exchange: "TopicDemo", queue: queueName, routingKey: routingKey);

                    //建立消費者
                    var consume = new EventingBasicConsumer(channel);

                    //把消費者和隊列綁定
                    channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume);

                    Console.WriteLine("開始監聽消息");

                    consume.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"收到消息{message}");
                    };

                    Console.ReadLine();
                }
            }
        }
    }
}

其RoutingKey也是在外部輸入。

咱們看看運行效果

由於Producer發佈消息的RoutingKey是Topic.Demo.Key

又由於#能夠表明0個或者多個單詞 ,*能表明一個單詞

因此*.*.Key    Topic.#與Topic.Demo.Key匹配,而其餘兩個*.Key和test.1.2固然是不匹配的,因此沒有收到消息。

總結

對於上面的例子,咱們能夠總結出,編寫一個生產者的過程以下:

建立鏈接工廠-》建立鏈接-》建立會話(Chanel)-》建立交換機(Exchange)-》發送消息

編寫一個生產者的過程以下:

建立鏈接工廠-》建立鏈接-》建立會話(Chanel)-》建立交換機(Exchange)-》建立隊列-》綁定隊列和交換機-》建立消費者-》把消費者和隊列綁定-》監聽消息

 

掌握這個大的方向,無論交換機怎麼分配,代碼應該都會寫了。

爲何在生產者中和消費者中都要建立交換機呢? 由於咱們不肯定是生產者先執行仍是消費者先執行,因此提早建立一下,避免鏈接時發現沒有建立交換機,出現錯誤,若是交換機已經建立了,那麼默認不會再次建立的。

另外,交換機建立後,同一名稱的交換機使用完不會自動刪除,可是第二次若是建立的名稱和上次同樣,可是交換機類型不同了,那麼便會出現報錯。

 

這裏總結的是一些RabbitMQ的基礎知識,後面還會繼續寫一些更深刻的使用技巧,若是不想錯過精彩信息,點擊關注一下吧(๑¯◡¯๑)!

相關文章
相關標籤/搜索