【譯】RabbitMQ:Topics

在前面的教程中,咱們對日誌系統進行了功能強化。咱們使用direct類型的交換器而且爲之提供了能夠選擇接收日誌的能力,替換了只能傻乎乎的廣播消息的fanout類型的交換器。儘管使用direct類型的交換器強化了系統,可是它依然有一些限制,不能基於條件的進行路由。html

在日誌系統中,咱們或許但願不只能根據嚴重等級,還能基於日誌的發送源來訂閱日誌日誌。你可能已經從Unixsyslog工具中知道了這個概念,該工具路由日誌的時候既基於嚴重等級(info/warn/crit...)又基於設備(auth/cron/kern...)。這將帶來極大的靈活性,咱們可能僅僅但願監聽來自‘cron’的極其重要的錯誤,同時監聽來自‘kern’的全部日誌。爲了在日誌系統中實現它,咱們須要學習更復雜的topic交換器。工具

Topic交換器

發送消息到topic交換器不能爲所欲爲的指定路由關鍵字(routing key),它必須是一個由點(.)分割的單詞列表。這些單詞能夠是任意的,可是一般會在其中指定一些發送消息的特性,如「stock.usd.nyse」,「nyse.vmw」,「quick.orange.rabbit」等即是一些合法的路由關鍵字。你能夠在路由關鍵字中指定任意數量的單詞,可是不能超過上限255字節。學習

綁定關鍵字必須是一樣的樣式,topic交換器的內部邏輯和direct交換器的內部邏輯很類似。使用特定路由關鍵字發送的消息會被轉發到匹配綁定關鍵字的全部隊列。關於綁定關鍵字,有兩點須要特別注意:ui

  • *(星號)能夠表明一個單詞
  • #(井號)能夠表明零個或多個單詞

下圖示例是對此最簡單的說明:spa

                                         

在這個示例中,咱們打算髮送的全是用來描述動物的消息,這些消息會使用由三個單詞(兩個點)組成的路由關鍵字來發送。在路由關鍵字中,第一個單詞描述速度,第二個單詞描述顏色,第三個單詞描述物種。即:日誌

"<speed>.<colour>.<species>"。code

咱們將建立三個綁定,隊列Q1綁定到關鍵字「*.orange.*」,隊列Q2綁定到關鍵字「*.*.rabbit」和lazy.#」。這些綁定能夠被總結以下:htm

  1. 隊列Q1對全部橙色(orange)的動物感興趣
  2. 隊列Q2對兔子(rabbit)的全部事情和全部懶惰(lazy)動物的事情感興趣

一個被設置路由關鍵字被設置爲「quick.orange.rabbit」的消息將會被同時發佈到兩個隊列,路由關鍵字爲「lazy.orange.elephant」的消息也會被同時發佈到兩個隊列。反之,路由關鍵字爲「quick.orange.fox」的消息將會被髮布到隊列Q1,路由關鍵字爲「lazy.brown.fox」的消息將被髮布到隊列Q2。而路由關鍵字爲「lazy.pink.rabbit」的消息只會被髮布到隊列Q2一次,儘管它匹配兩個綁定。路由關鍵字爲「quick.brown.fox」的消息不匹配任意一個隊列,故該消息會被銷燬。blog

若是咱們打破約定,使用一個單詞或者四個單詞做爲路由關鍵字(例如:「orange」或「quick.orange.male.rabbit」)會發生什麼呢?這類消息由於不匹配任意一個綁定,因此將會丟失。相反的,「lazy.orange.male.rabbit」雖然有四個單詞,可是它匹配最後一個綁定,消息將會被髮布到隊列Q2中。教程

 

Topic交換器

Topic交換器很是強大,能表現出其餘交換器的行爲。當一個隊列用井號(#)做爲綁定,它將接收全部的消息,忽略路由關鍵字,像fanout交換器同樣。當在綁定中不使用特殊字符星號(*)和井號(#)時,它看起來就像direct交換器了。

 

組合在一塊兒

咱們將要在日誌系統中使用topic交換器,從假設日誌的路由關鍵字由兩個單詞組成(<facility>.<severity>|<設備>.<嚴重程度>)。

代碼和上一個教程中是同樣的,EmitLogTopic.cs中的代碼:

 1 using System;
 2 using System.Linq;
 3 using RabbitMQ.Client;
 4 using System.Text;
 5 
 6 class EmitLogTopic
 7 {
 8     public static void Main(string[] args)
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "topic_logs",
15                                     type: "topic");
16 
17             var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
18             var message = (args.Length > 1)
19                           ? string.Join(" ", args.Skip( 1 ).ToArray())
20                           : "Hello World!";
21             var body = Encoding.UTF8.GetBytes(message);
22             channel.BasicPublish(exchange: "topic_logs",
23                                  routingKey: routingKey,
24                                  basicProperties: null,
25                                  body: body);
26             Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
27         }
28     }
29 }

ReceiveLogsTopics.cs中的代碼:

 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class ReceiveLogsTopic
 7 {
 8     public static void Main(string[] args)
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
15             var queueName = channel.QueueDeclare().QueueName;
16 
17             if(args.Length < 1)
18             {
19                 Console.Error.WriteLine("Usage: {0} [binding_key...]",
20                                         Environment.GetCommandLineArgs()[0]);
21                 Console.WriteLine(" Press [enter] to exit.");
22                 Console.ReadLine();
23                 Environment.ExitCode = 1;
24                 return;
25             }
26 
27             foreach(var bindingKey in args)
28             {
29                 channel.QueueBind(queue: queueName,
30                                   exchange: "topic_logs",
31                                   routingKey: bindingKey);
32             }
33 
34             Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
35 
36             var consumer = new EventingBasicConsumer(channel);
37             consumer.Received += (model, ea) =>
38             {
39                 var body = ea.Body;
40                 var message = Encoding.UTF8.GetString(body);
41                 var routingKey = ea.RoutingKey;
42                 Console.WriteLine(" [x] Received '{0}':'{1}'",
43                                   routingKey,
44                                   message);
45             };
46             channel.BasicConsume(queue: queueName,
47                                  noAck: true,
48                                  consumer: consumer);
49 
50             Console.WriteLine(" Press [enter] to exit.");
51             Console.ReadLine();
52         }
53     }
54 }

運行下面的示例:

接收全部的日誌:

1 $ ReceiveLogsTopic.exe "#"

接收來自設備「kern」的全部日誌:

1 $ ReceiveLogsTopic.exe "kern.*"

或者你只想關注「嚴重(critical)」等級的日誌:

1 $ ReceiveLogsTopic.exe "*.critical"

你也能夠建立多個綁定:

1 $ ReceiveLogsTopic.exe "kern.*" "*.critical"

使用路由關鍵字「kern.critical」發送一條日誌:

1 $ EmitLogTopic.exe "kern.critical" "A critical kernel error"

但願這些程序能讓你玩的開心。注意,這些代碼並無對路由或綁定關鍵字作任何假設,你也能夠嘗試超過兩個參數的路由關鍵字。

下面是一些棘手的問題:

  1. 使用星號(*)的綁定是否能捕獲到空路由關鍵字的消息?
  2. 使用「#.*」的綁定是否會將「..」做爲一個關鍵字?是否會捕獲僅使用單個單詞做爲關鍵字的消息?
  3. a.*.#」和「a.#」有什麼不一樣?

下一步,在教程六種將瞭解如何在遠程過程調用中使用往返消息。

 

原文連接:http://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html

相關文章
相關標籤/搜索