上一節,是廣播日誌message到不少的receivers.html
這節,咱們講訂閱其中的一個子集。例如,咱們想能夠把危機的error message導到log file。而仍然能夠打印全部的log messages到控制檯。算法
這裏使用到Direct exchagespa
在使用fanout exchange時,沒有不少的靈活性,它只是廣播。日誌
這節,咱們將使用direct exchange . 在direct exchange背後的路由算法是簡單的,即message會發送到一個binding key 正好匹配message的routing key的queue.code
如圖server
咱們能夠看到,有兩個queue綁定到exchange了。第一個queue是和binding key爲orange的綁定的。而且第二個有兩個bindings.一個是black,另外一個是green.htm
帶有routing key 爲orange的發送到exchange的 message將會發送到queue Q1;blog
而routing key爲black 和green的messages將會發送到Q2. 其餘的messages會被丟棄。rabbitmq
如圖,多重綁定,即一個binding key爲black綁定到兩個queue.ip
咱們會把日誌嚴重級別(log severity)做爲routing key. 那樣,接收腳本將會選擇它想要接收的嚴重級別進行接收。
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body);
嚴重級別(Severity)
info, warning, error
接收程序跟以前大體同樣,除了一個例外,咱們將會爲咱們感興趣的嚴重級別(serverity)建立一個新的binding.
var queueName = channel.QueueDeclare().QueueName; foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); }
EmitLogDirect.cs
using System;using System.Linq;using RabbitMQ.Client;using System.Text; class EmitLogDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", //聲明direct類型exchange type: "direct"); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip( 1 ).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", //發送routingkey 爲severity的message routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
運行示例:
cd EmitLogDirect dotnet run error "Run. Run. Or it will explode."# => [x] Sent 'error':'Run. Run. Or it will explode.'
ReceiveLogsDirect.cs
using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text; class ReceiveLogsDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", //聲明direct類型exchange type: "direct"); var queueName = channel.QueueDeclare().QueueName; //聲明帶隨機queue name的queue if(args.Length < 1) { Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach(var severity in args) { channel.QueueBind(queue: queueName, //綁定queue和exchange和特定值的routingkey exchange: "direct_logs", routingKey: severity); } Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; //接收的message的routing key Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
使用示例:
cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log
示例2
cd ReceiveLogsDirect dotnet run info warning error# => [*] Waiting for logs. To exit press CTRL+C
參考網址:
https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html