rabbitMQ tipic 模式 RabbitMQ消息隊列(八)-經過Topic主題模式分發消息(.Net Core版)

RabbitMQ消息隊列(八)-經過Topic主題模式分發消息(.Net Core版)

前兩章咱們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。若是對direct模式下經過routingkey來匹配消息的模式已經有必定了解那fanout也很好理解。簡單的能夠理解成direct是經過routingkey精準匹配的,而topic是經過routingkey來模糊匹配。 
在topic模式下支持兩個特殊字符的匹配。html

* (星號) 表明任意 一個單詞
# (井號) 0個或者多個單詞

注意:上面說的是單詞不是字符。ide

以下圖所示,RabbitMQ direct模式經過RoutingKey來精準匹配,RoutingKey爲red的投遞到Queue1,RoutingKey爲black和white的投遞到Queue2。 post

咱們能夠假設一個場景,咱們要作一個日誌模塊來收集處理不一樣的日誌,日誌區分包含三個維度的標準:模塊、日誌緊急程度、日誌重要程度。模塊分爲:red、black、white;緊急程度分爲:critical、normal;把重要程度分爲:medium、low、high在RoutingKey字段中咱們把這三個維度經過兩個「.「鏈接起來。 
如今咱們須要對black模塊,緊急程度爲critical,重要程度爲high的日誌分配到隊列1打印到屏幕;對因此模塊重要程度爲high的日誌和white緊急程度爲critical的日誌發送到隊列2持久化到硬盤。以下示例:url

  • RoutingKey爲「black.critical.high」的日誌會投遞到queue1和queue2,。spa

  • RoutingKey爲「red.critical.high」的日誌會只投遞到queue2。日誌

  • RoutingKey爲「white.critical.high」的日誌會投遞到queue2,而且雖然queue2的兩個匹配規則都符合但只會向queue2投遞一份。code

 新建TopicProduct用來發布三種routingkey的消息。orm

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace TopicProduct
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchangeTopic";
            String routeKeyName1 = "black.critical.high";
            String routeKeyName2 = "red.critical.high";
            String routeKeyName3 = "white.critical.high";
            
            String message1 = "black-critical-high!";
            String message2 = "red-critical-high!";
            String message3 = "white-critical-high!";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);

                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    Byte[] body1 = Encoding.UTF8.GetBytes(message1);
                    Byte[] body2 = Encoding.UTF8.GetBytes(message2);
                    Byte[] body3 = Encoding.UTF8.GetBytes(message3);

                    //消息推送
                    channel.BasicPublish(exchange: exchangeName, routingKey:routeKeyName1,basicProperties: properties, body: body1);
                    channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body2);
                    channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body3);

                    Console.WriteLine(" [x] Sent {0}", message1);
                    Console.WriteLine(" [x] Sent {0}", message2);
                    Console.WriteLine(" [x] Sent {0}", message3);
                }
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

  

複製代碼
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace TopicProduct
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchangeTopic";
            String routeKeyName1 = "black.critical.high";
            String routeKeyName2 = "red.critical.high";
            String routeKeyName3 = "white.critical.high";
            
            String message1 = "black-critical-high!";
            String message2 = "red-critical-high!";
            String message3 = "white-critical-high!";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);

                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    Byte[] body1 = Encoding.UTF8.GetBytes(message1);
                    Byte[] body2 = Encoding.UTF8.GetBytes(message2);
                    Byte[] body3 = Encoding.UTF8.GetBytes(message3);

                    //消息推送
                    channel.BasicPublish(exchange: exchangeName, routingKey:routeKeyName1,basicProperties: properties, body: body1);
                    channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body2);
                    channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body3);

                    Console.WriteLine(" [x] Sent {0}", message1);
                    Console.WriteLine(" [x] Sent {0}", message2);
                    Console.WriteLine(" [x] Sent {0}", message3);
                }
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}
複製代碼

新建TopicCustomerA接收一種消息htm

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace TopicCustomerA
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchangeTopic";
            String routeKeyName1 = "black.critical.high";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);

                    String queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);

                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
}

  

複製代碼
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace TopicCustomerA
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchangeTopic";
            String routeKeyName1 = "black.critical.high";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);

                    String queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);

                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
}
複製代碼

新建TopicCustomerB接收兩種消息blog

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace TopicCustomerB
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchangeTopic";
            String routeKeyName1 = "red.critical.*";
            String routeKeyName2 = "white.critical.*";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);

                    String queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);

                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
}

  

複製代碼
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace TopicCustomerB
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchangeTopic";
            String routeKeyName1 = "red.critical.*";
            String routeKeyName2 = "white.critical.*";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);

                    String queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);
                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);

                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
}
複製代碼

先運行TopicCustomerA和TopicCustomerB保持訂閱狀態。而後執行TopicProduct發佈消息。TopicCustomerA和TopicCustomerB收到的消息以下:

相關文章
相關標籤/搜索