.NET Core RabbitMQ探索(2)——RabbitMQ的Exchange

  實際上,RabbitMQ的生產者並不會直接把消息發送給隊列,甚至生產者都不知道消息是否會被髮送給一個隊列。對於生產者而言,它們只能把消息發送到Exchange,一個Exchange所完成的工做至關簡單,一方面,它從生產者那裏接收消息;另外一方面它將消息存入隊列中。一個Exchange須要準確的知道它要如何處理它接收到的消息,例如,它須要把消息轉發到特定的隊列,仍是進行廣播處理,或者直接將它丟棄。能夠經過exchange type來定義Exchange處理消息的規則。
  整個框架結構圖如圖所示。html

  Exchange types有如下幾種:direct、topic、headers和fanout。若是咱們沒有定義Exchange,那麼系統就會默認使用一個默認的Exchange,名爲:"",就像咱們入門篇裏的同樣,它會本身建立一個""的默認Exchange,而後將消息轉發給特定routingKey的隊列。框架

  • Direct Exchange

  使用direct exchange時,會將exchange與特定的隊列進行綁定,轉發時由routingkey進行隊列的匹配,如圖所示。spa

  在direct類型的exchange中,只有這兩個routingkey徹底相同,exchange纔會選擇對應的binding進行消息路由,代碼示例以下所示:設計

  1. 首先咱們須要將exchange和queue進行binding
channel.QueueBind(queue: "create_pdf_queue",
                    exchange: "pdf_events",
                    routingKey: "pdf_create",
                    arguments: null);

  綁定時須要設置:隊列名、exchange名和它們的routingkey。3d

  1. 在發送消息到exchange時會設置對應的routingkey
channel.BasicPublish(exchange: "pdf_events",
                        routingKey: "pdf_create",
                        basicProperties: properties,
                        body: body);

  生產者發佈消息時,須要設置exchange名和routingKey,若是exchange名和routingKey都與上述綁定的徹底一致,那麼該exchange就會將這條消息路由到隊列。rest

  • Topic Exchange

  此類exchange與direct相似,惟一不一樣的地方是,direct類型要求routingKey徹底一致,而這裏能夠可使用通配符進行模糊匹配,符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞。所以「JiangYuZhou.#」可以匹配到「JiangYuZhou.pets.cat」,可是「JiangYuZhou.*」 只會匹配到「JiangYuZhou.money」。
  因此,Topic Exchange 使用很是靈活,topic exchange如圖所示。code

  例如,咱們首先聲明一個topic exchange,它的名稱爲"agreements":orm

// Topic類型的exchange, 名稱 agreements
    channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Topic,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

  而後,咱們聲明三個隊列,它們分別以下:htm

// 建立berlin_agreements隊列
    channel.QueueDeclare(queue: "berlin_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //建立 all_agreements 隊列
    channel.QueueDeclare(queue: "all_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //建立 headstore_agreements 隊列
    channel.QueueDeclare(queue: "headstore_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

  最後,咱們將agreements exchange分別與上面的三個隊列以不一樣通配符的routingKey進行綁定:blog

//綁定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#
    channel.QueueBind(queue: "berlin_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.berlin.#",
                        arguments: null);

    //綁定 agreements --> all_agreements 使用routingkey:agreements.#
    channel.QueueBind(queue: "all_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.#",
                        arguments: null);

    //綁定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore
    channel.QueueBind(queue: "headstore_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.*.headstore",
                        arguments: null);

  這時咱們若是發送下列消息:

 var message = "hello world";
 var body = Encoding.UTF8.GetBytes(message);
 var properties = channel.CreateBasicProperties();
 properties.Persistent = true;

channel.BasicPublish(exchange: "agreements",
    routingKey: "agreements.eu.berlin",
    basicProperties: properties,
    body: body);

  該消息設置的exchange爲"agreements",routingKey爲"agreements.eu.berlin",因此它能夠匹配上面的"agreements.eu.berlin.#"和"agreements.#",消息被轉發到了"berlin_agreements"和"all_agreements"隊列。

  • Fanout Exchange

  該exchange無需對routingKey進行匹配操做,而是很簡單的直接將消息路由到全部綁定的隊列中,如圖所示。

  • Header Exchange

  此類型的路由規是根據header來判斷的,首先須要以鍵值對的形式設置header的參數,在綁定exchange的時候將header以arguments的形式傳遞進去,傳遞參數時,鍵爲"x-match"的header能夠設置它的值爲all或any,其中,all表示只有當發佈的消息匹配該header中除"x-match"之外的全部值時,消息纔會被轉發到該隊列;any表示當發佈的消息匹配該header種除"x-match"外的任意值時,該消息會被轉發到匹配隊列。

 

代碼操練

  最後咱們以header exchange爲例,演示咱們的Exchange。首先咱們建立四個項目,其中一個做爲生產者,另做三個均做爲消費者,而且使用:

dotnet add package RabbitMQ.Client

  給四個項目均安裝上RabbitMQ的.NET包,並進行restore,項目結構如圖所示:

  開始編寫Send端的代碼,其中,RabbitMQ仍是使用咱們在上一章種使用的Docker中RabbitMQ,程序以下:

using System;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;

namespace Send
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "148.70.210.208" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //聲明Headers類型的exchange,名稱爲agreements
                    channel.ExchangeDeclare(exchange: "agreements",
                        type: ExchangeType.Headers,
                        autoDelete: false,
                        arguments: null);

                    //建立隊列queue.A
                    channel.QueueDeclare(queue: "queue.A",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    //建立隊列queue.B
                    channel.QueueDeclare(queue: "queue.B",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    //建立隊列queue.C
                    channel.QueueDeclare(queue: "queue.C",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    //綁定agreements=>queue.A,使用arguments(format=pdf、type=report、x-match=all)
                    //只有當header中同時知足format=pdf、type=report時,消息纔會被轉發到隊列A
                    Dictionary<string, object> aHeader = new Dictionary<string, object>();
                    aHeader.Add("format", "pdf");
                    aHeader.Add("type", "report");
                    aHeader.Add("x-match", "all");
                    channel.QueueBind(queue: "queue.A",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: aHeader);

                    //綁定agreements=>queue.B,使用arguments(format=pdf、type=log、x-match=any)
                    //當header中知足format=pdf或type=log任意一個時,消息就會被轉發到隊列B
                    Dictionary<string, object> bHeader = new Dictionary<string, object>();
                    bHeader.Add("format", "pdf");
                    bHeader.Add("type", "log");
                    bHeader.Add("x-match", "any");
                    channel.QueueBind(queue: "queue.B",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: bHeader);

                    //綁定agreements=>queue.C,使用arguments(format=zip、type=report、x-match=all)
                    //當header中同時知足format=zip和type=report時,消息會被轉發到隊列C
                    Dictionary<string, object> cHeader = new Dictionary<string, object>();
                    cHeader.Add("format", "zip");
                    cHeader.Add("type", "report");
                    cHeader.Add("x-match", "all");
                    channel.QueueBind(queue: "queue.C",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: cHeader);

                    string message1 = "hello world From 1";
                    var body = Encoding.UTF8.GetBytes(message1);
                    var properties1 = channel.CreateBasicProperties();
                    properties1.Persistent = true;
                    Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
                    mHeader1.Add("format", "pdf");
                    mHeader1.Add("type", "report");
                    properties1.Headers = mHeader1;

                    //這條消息會被轉發到queue.A和queue.B
                    //queue.A 的binding (format=pdf, type=report, x-match=all)
                    //queue.B 的binding (format=pdf, type=log, x-match=any)
                    channel.BasicPublish(exchange: "agreements",
                                    routingKey: string.Empty,
                                    basicProperties: properties1,
                                    body: body);

                    string message2 = "hello world From 2";
                    body = Encoding.UTF8.GetBytes(message2);
                    var properties2 = channel.CreateBasicProperties();
                    properties2.Persistent = true;
                    Dictionary<string, object> mHeader2 = new Dictionary<string, object>();
                    mHeader2.Add("type", "log");
                    properties2.Headers = mHeader2;

                    //這條消息會被轉發到queue.B 
                    //queue.B 的binding (format = pdf, type = log, x-match = any)
                    channel.BasicPublish(exchange: "agreements",
                                    routingKey: string.Empty,
                                    basicProperties: properties2,
                                    body: body);

                    string message3 = "hello world From 3";
                    body = Encoding.UTF8.GetBytes(message3);
                    var properties3 = channel.CreateBasicProperties();
                    properties3.Persistent = true;
                    Dictionary<string, object> mHeader3 = new Dictionary<string, object>();
                    mHeader3.Add("format", "zip");
                    properties3.Headers = mHeader3;

                    //這條消息不會被路由
                    //隊列C要求同時知足兩個條件,這裏只知足了一個,沒有匹配的隊列
                    channel.BasicPublish(exchange: "agreements",
                                    routingKey: string.Empty,
                                    basicProperties: properties3,
                                    body: body);
                }
            }
        }
    }
}

  運行程序後,能夠看到,queue.A中匹配了三條消息、queue.B中匹配了兩條、queue.C中沒有匹配到任何消息。

  能夠看到,隊列A中匹配了一條信息,即Message 1,隊列B中匹配了兩條信息,即Message 1和Message2,隊列C中沒有匹配信息,符合咱們程序的編寫,下面用接收端進行接收。
  接收端分別寫了三個程序,分別接收隊列A、B、C的消息,它們除了綁定隊列名稱不一樣外,其他所有相同,下面是綁定隊列A的接收程序:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Recieve1
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "148.70.210.208" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //注意要與發送端的聲明一致
                    channel.ExchangeDeclare(exchange: "agreements",
                        type: ExchangeType.Headers,
                        autoDelete: false,
                        arguments: null);

                    //綁定了queue.C和agreements Exchange
                    channel.QueueBind(queue: "queue.A",
                        exchange: "agreements",
                        routingKey: string.Empty);

                    Console.WriteLine("Waiting for messages");

                    var consumer = new EventingBasicConsumer(channel);

                    //綁定接收完成事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"Recieve Message:{message}");
                    };

                    channel.BasicConsume(queue: "queue.A",
                        autoAck: true,
                        consumer: consumer);

                    Console.WriteLine("Press [enter] to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}

  最後,咱們分別運行這三個接收程序:

  符合程序設計。

  參考:JulyLuo——http://www.javashuo.com/article/p-cxwtfcin-eu.html

相關文章
相關標籤/搜索