在前一篇中,咱們構建了一個簡單的日誌系統,咱們已經可以廣播消息到許多的接收者。在這一篇中,咱們但願增長一個特性,讓訂閱消息的子集成爲可能。例如,咱們能夠將重要的錯誤日誌存放到日誌文件(即,磁盤上面),同時將仍然全部的日誌信息打印到控制檯。html
在前面的例子中咱們已經建立過綁定,你應該還能記得下面的代碼:算法
1 channel.QueueBind(queue: queueName, 2 exchange: "logs", 3 routingKey: "");
一個綁定是一個交換器和一個隊列的關係,能夠形象的解讀爲:這個隊列對來自這個交換器的消息感興趣。綁定能夠擁有一個額外的參數:routingKey,爲了不和BasicPublish裏面的routingKey混淆,咱們稱其爲綁定關鍵字(binding Key),下面展現瞭如何經過一個綁定關鍵字建立一個綁定:spa
1 channel.QueueBind(queue: queueName, 2 exchange: "direct_logs", 3 routingKey: "black");
綁定關鍵字的意義依賴於交換器的類型,前面咱們使用過的fanout類型的交換器,直接簡單的忽略了它的綁定關鍵字的值。3d
前一篇教程中的日誌系統廣播全部的消息到全部的消費者,咱們但願擴展它能根據嚴重程度對消息進行過濾。例如,咱們但願寫日誌到磁盤的程序可以只接收重要的錯誤,不要在「警告」、「信息」級別的日誌身上浪費磁盤空間。日誌
咱們使用fanout類型的交換器,並不能帶來很大的靈活性,它只能盲目的廣播。做爲替代,咱們將使用direct類型的交換器,這種交換器背後的路由算法很是簡單,消息發送到綁定關鍵字(binding Key)和消息的路由關鍵字(routing Key)徹底匹配的隊列。code
爲了幫助說明,設想下面的配置:htm
在這個設置中,咱們看到direct類型的交換器X綁定了兩個隊列.第一個隊列綁定關鍵字是orange;第二個隊列有兩個綁定,一個是black,另外一個是green。在這樣的配置中,發送到使用orange爲路由關鍵字的消息將被路由到Q1,使用black或green爲路由關鍵字的消息將被路由到隊列Q2,其餘消息將被銷燬。blog
使用同一個綁定關鍵字綁定多個隊列是徹底合法的,在咱們的示例中咱們能夠在X和Q1中添加一個以black爲綁定關鍵字的綁定。在這種狀況下,direct類型的交換器會展示出fanout類型交換器的行爲,而且會廣播消息到全部知足條件的隊列,發送到路由關鍵字爲black的消息會發送到Q1和Q2.教程
咱們會爲日誌系統使用這種模式,將消息發送到direct類型而非fanout類型的交換器。咱們將使用日誌的嚴重等級來做爲路由關鍵字,這樣的話消息的接收程序就能夠依據本身的須要選擇不一樣的嚴重等級。首先讓咱們把注意力集中到發送日誌上。rabbitmq
和往常同樣,咱們首先須要建立一個交換器:
1 channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
接着咱們準備好發送消息了:
1 var body = Encoding.UTF8.GetBytes(message); 2 channel.BasicPublish(exchange: "direct_logs", 3 routingKey: severity, 4 basicProperties: null, 5 body: body);
爲了簡化問題,咱們假設嚴重等級是‘info’、‘warning’、‘error’。
立刻就能夠像前面的教程中那樣接收消息了,可是有一個不一樣點,咱們要用本身感興趣的全部日誌嚴重程度分別建立一個綁定。
1 var queueName = channel.QueueDeclare().QueueName; 2 3 foreach(var severity in args) 4 { 5 channel.QueueBind(queue: queueName, 6 exchange: "direct_logs", 7 routingKey: severity); 8 }
EmitLogDirect.cs類的代碼:
1 using System; 2 using System.Linq; 3 using RabbitMQ.Client; 4 using System.Text; 5 6 class EmitLogDirect 7 { 8 public static void Main(string[] args) 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.ExchangeDeclare(exchange: "direct_logs", 15 type: "direct"); 16 17 var severity = (args.Length > 0) ? args[0] : "info"; 18 var message = (args.Length > 1) 19 ? string.Join(" ", args.Skip( 1 ).ToArray()) 20 : "Hello World!"; 21 var body = Encoding.UTF8.GetBytes(message); 22 channel.BasicPublish(exchange: "direct_logs", 23 routingKey: severity, 24 basicProperties: null, 25 body: body); 26 Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); 27 } 28 29 Console.WriteLine(" Press [enter] to exit."); 30 Console.ReadLine(); 31 } 32 }
ReceiveLogsDirect.cs的代碼:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class ReceiveLogsDirect 7 { 8 public static void Main(string[] args) 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.ExchangeDeclare(exchange: "direct_logs", 15 type: "direct"); 16 var queueName = channel.QueueDeclare().QueueName; 17 18 if(args.Length < 1) 19 { 20 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", 21 Environment.GetCommandLineArgs()[0]); 22 Console.WriteLine(" Press [enter] to exit."); 23 Console.ReadLine(); 24 Environment.ExitCode = 1; 25 return; 26 } 27 28 foreach(var severity in args) 29 { 30 channel.QueueBind(queue: queueName, 31 exchange: "direct_logs", 32 routingKey: severity); 33 } 34 35 Console.WriteLine(" [*] Waiting for messages."); 36 37 var consumer = new EventingBasicConsumer(channel); 38 consumer.Received += (model, ea) => 39 { 40 var body = ea.Body; 41 var message = Encoding.UTF8.GetString(body); 42 var routingKey = ea.RoutingKey; 43 Console.WriteLine(" [x] Received '{0}':'{1}'", 44 routingKey, message); 45 }; 46 channel.BasicConsume(queue: queueName, 47 noAck: true, 48 consumer: consumer); 49 50 Console.WriteLine(" Press [enter] to exit."); 51 Console.ReadLine(); 52 } 53 } 54 }
像一般那樣編譯(編譯建議見教程一)。
若是你只想保存嚴重等級爲‘warning’、‘error’的日誌到文件中,只須要打開控制檯而後輸入:
1 $ ReceiveLogsDirect.exe warning error > logs_from_rabbit.log
若是你想在屏幕上看到全部的日誌,打開另外一個終端輸入:
1 $ ReceiveLogsDirect.exe info warning error 2 [*] Waiting for logs. To exit press CTRL+C
若是想發送‘error’級別的日誌,舉例來講,你只須要輸入以下信息:
1 $ EmitLogDirect.exe error "Run. Run. Or it will explode." 2 [x] Sent 'error':'Run. Run. Or it will explode.'
進入教程五,瞭解如何基於模式(pattern)監聽消息。
原文連接:http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html