RabbitMQ學習之Routing(4)

上一節,是廣播日誌message到不少的receivers.html

這節,咱們講訂閱其中的一個子集。例如,咱們想能夠把危機的error message導到log file。而仍然能夠打印全部的log messages到控制檯。算法

這裏使用到Direct exchagespa

Direct exchange

在使用fanout exchange時,沒有不少的靈活性,它只是廣播。日誌

這節,咱們將使用direct exchange . 在direct exchange背後的路由算法是簡單的,即message會發送到一個binding key 正好匹配messagerouting keyqueue.code

如圖server

咱們能夠看到,有兩個queue綁定到exchange了。第一個queue是和binding keyorange的綁定的。而且第二個有兩個bindings.一個是black,另外一個是green.htm

帶有routing key 爲orange的發送到exchangemessage將會發送到queue Q1blog

routing keyblack greenmessages將會發送到Q2. 其餘的messages會被丟棄。rabbitmq

Multiple binding(多重綁定)

如圖,多重綁定,即一個binding key爲black綁定到兩個queue.ip

Emitting logs

咱們會把日誌嚴重級別(log severity)做爲routing key. 那樣,接收腳本將會選擇它想要接收的嚴重級別進行接收。

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

嚴重級別(Severity

info, warning, error

Subscribing

接收程序跟以前大體同樣,除了一個例外,咱們將會爲咱們感興趣的嚴重級別(serverity)建立一個新的binding. 

var queueName = channel.QueueDeclare().QueueName;
foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}

代碼

EmitLogDirect.cs

using System;using System.Linq;using RabbitMQ.Client;using System.Text;
class EmitLogDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs", //聲明direct類型exchange
                                    type: "direct");

            var severity = (args.Length > 0) ? args[0] : "info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip( 1 ).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "direct_logs", //發送routingkey 爲severity的message
                                 routingKey: severity,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
        }
  }
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
}

運行示例:

cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."# => [x] Sent 'error':'Run. Run. Or it will explode.'

ReceiveLogsDirect.cs

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
class ReceiveLogsDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",  //聲明direct類型exchange
                                    type: "direct");
            var queueName = channel.QueueDeclare().QueueName; //聲明帶隨機queue name的queue

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var severity in args)
            {
                channel.QueueBind(queue: queueName,  //綁定queue和exchange和特定值的routingkey
                                  exchange: "direct_logs",
                                  routingKey: severity);
            }

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey; //接收的message的routing key
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

使用示例:

cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log

示例2

cd ReceiveLogsDirect
dotnet run info warning error# => [*] Waiting for logs. To exit press CTRL+C

 參考網址:

https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

相關文章
相關標籤/搜索