【譯】RabbitMQ:發佈-訂閱(Publish/Subscribe)

在前一篇教程中,咱們建立了一個工做隊列,咱們假設在工做隊列後的每個任務都只被調度給一個消費者。在這一部分,咱們將作一些徹底不同的事情,調度同一條消息給多個消費者,也就是有名的「發佈-訂閱」模式。爲了闡述這種模式,咱們將構建一個簡單的日誌系統。該系統將由兩部分組成:一部分發送日誌消息,另外一部分接收而且打印日誌消息,在這個日誌系統中,每一份運行着的接收程序都將會收到消息。這樣咱們能夠運行一個接收者把日誌寫入到磁盤中,同時能夠運行另外一個接收者將日誌打印到顯示器上面。也就是說,發佈的日誌消息會被廣播到全部的接收者。html

 

交換器

在前面的教程中,咱們發送消息到隊列,而後從隊列中接收消息。如今開始介紹RabbitMQ完整的消息模式。緩存

讓咱們快速的複習一下在前面的教程中講過的內容:安全

  • 生產者是一個發送消息的應用程序。
  • 隊列是存儲消息的緩存。
  • 消費者是一個接收消息的應用程序。

 

RabbitMQ消息模式的核心是生產者從不直接發送消息到隊列。事實上,生產者每每不知道他產生的消息會被分發到哪些隊列,它只能將消息發送到一個交換器。交換器很是簡單,它一方面從生產者接收消息,另外一方面又將消息壓入隊列中。交換器必須清楚的知道要用接收到的消息作什麼,是應當追加到某個指定的隊列?或者追加到不少隊列?或者應當丟棄?要完成這些的規則都被定義在交換器的類型中。服務器

                                                 

有幾種可用的交換器類型:directtopicheadersfanout。本文主要關注最後一種類型:fanout,讓咱們建立一個這種類型的交換器,命名爲logs:函數

1 channel.ExchangeDeclare("logs", "fanout");

類型爲fanout的交換器很是簡單,顧名思義,它會廣播全部收到的消息到它知道的全部的隊列,而這也正是咱們的日誌系統所須要的。spa

 

交換器清單日誌

爲了展現服務器上交換器的清單,你能夠運行在任什麼時候候都特別有用的rabbitmqctl:code

 1 $ sudo rabbitmqctl list_exchanges
 2 Listing exchanges ...
 3         direct
 4 amq.direct      direct
 5 amq.fanout      fanout
 6 amq.headers     headers
 7 amq.match       headers
 8 amq.rabbitmq.log        topic
 9 amq.rabbitmq.trace      topic
10 amq.topic       topic
11 logs    fanout
12 ...done.

在清單裏,有一些amp.*樣式的交換器和一個默認(未命名)的交換器,這些都是默認建立的,但並非說你立刻就須要使用它們。htm

 

匿名交換器blog

在前面的教程中咱們並不知曉交換器的任何信息,可是任然能夠將消息發送到隊列中,那是由於咱們使用了默認的交換器,使用空字符串表示("")。

回憶一下以前是如何發佈消息的:

1 var message = GetMessage(args);
2 var body = Encoding.UTF8.GetBytes(message);
3 channel.BasicPublish(exchange: "",
4                      routingKey: "hello",
5                      basicProperties: null,
6                      body: body);

第一個參數就是交換器的名稱,空字符串指代的是默認交換器或者是匿名交換器,若是隊列存在,消息將經過指定的routingKey路由到隊列。

 

如今咱們能夠將消息發佈到上面定義的命名交換器了:

1 var message = GetMessage(args);
2 var body = Encoding.UTF8.GetBytes(message);
3 channel.BasicPublish(exchange: "logs",
4                      routingKey: "",
5                      basicProperties: null,
6                      body: body);

 

臨時隊列

你或許還記得咱們以前使用的有指定名稱的隊列(還記得hellotask_queue麼?)。能爲隊列命名對咱們來講是相當重要的,咱們須要指定給消費者相同的隊列。當你想在生產者和消費者間共享隊列時,給隊列指定一個名字就顯得特別重要了。

可是這並非咱們日誌系統的問題。咱們但願能監聽到全部消息,而不只僅是其中一個子集;咱們對當前流入的消息感興趣而不是以前的舊信息。爲了解決這個問題,咱們須要作兩件事:第1、不管什麼時候鏈接到RabbitMQ,咱們須要一個新的空隊列,爲此咱們能夠建立一個擁有隨機名稱的隊列或者更好的是直接讓RabbitMQ服務替咱們生成一個隨機名稱;第2、一旦消費者斷開鏈接,隊列應當被自動刪除。

.NET 客戶端,咱們經過提供無參數的QueueDeclare()函數能夠建立一個不持久化、獨佔的、自動刪除的擁有隨機名稱的隊列:

1 var queueName = channel.QueueDeclare().QueueName;

這樣queueName就是一個隨機的隊列名稱,看起來會是這樣的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

 

綁定

                                                 

咱們已經建立了一個fanout類型的交換器和一個隊列,如今須要告訴交換器把消息發送到咱們的隊列。交換器和隊列的關係就叫作綁定。

1 channel.QueueBind(queue: queueName,
2                   exchange: "logs",
3                   routingKey: "");

到目前爲止,交換器logs將能添加消息到咱們的隊列中了。

 

綁定清單

你能夠經過rabbitmqctl list_bingdings命令查看綁定清單。

 

組合在一塊兒

                                                 

發送日誌的生產者程序和以前教程裏面的沒有太多不一樣,最重要的改變是如今咱們但願將消息發送到logs交換器,而不是以前的匿名交換器。當發送消息的時候,咱們須要指定一個routingKey,可是在使用fanout類型交換器的時候,它的值將被忽略。下面是EmitLog.cs文件裏面的代碼:

 1 using System;
 2 using RabbitMQ.Client;
 3 using System.Text;
 4 
 5 class EmitLog
 6 {
 7     public static void Main(string[] args)
 8     {
 9         var factory = new ConnectionFactory() { HostName = "localhost" };
10         using(var connection = factory.CreateConnection())
11         using(var channel = connection.CreateModel())
12         {
13             channel.ExchangeDeclare(exchange: "logs", type: "fanout");
14 
15             var message = GetMessage(args);
16             var body = Encoding.UTF8.GetBytes(message);
17             channel.BasicPublish(exchange: "logs",
18                                  routingKey: "",
19                                  basicProperties: null,
20                                  body: body);
21             Console.WriteLine(" [x] Sent {0}", message);
22         }
23 
24         Console.WriteLine(" Press [enter] to exit.");
25         Console.ReadLine();
26     }
27 
28     private static string GetMessage(string[] args)
29     {
30         return ((args.Length > 0)
31                ? string.Join(" ", args)
32                : "info: Hello World!");
33     }
34 }

如你所見,在建立連接以後咱們申明瞭交換器,這一步用於禁止發佈到不存在的交換器是頗有必要的。若是沒有隊列綁定到交換器發佈的消息將會丟失,這是沒有問題的;若是沒有消費者監聽消息,咱們能夠安全的銷燬它。

ReceiveLog.cs中的代碼:

 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class ReceiveLogs
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "logs", type: "fanout");
15 
16             var queueName = channel.QueueDeclare().QueueName;
17             channel.QueueBind(queue: queueName,
18                               exchange: "logs",
19                               routingKey: "");
20 
21             Console.WriteLine(" [*] Waiting for logs.");
22 
23             var consumer = new EventingBasicConsumer(channel);
24             consumer.Received += (model, ea) =>
25             {
26                 var body = ea.Body;
27                 var message = Encoding.UTF8.GetString(body);
28                 Console.WriteLine(" [x] {0}", message);
29             };
30             channel.BasicConsume(queue: queueName,
31                                  noAck: true,
32                                  consumer: consumer);
33 
34             Console.WriteLine(" Press [enter] to exit.");
35             Console.ReadLine();
36         }
37     }
38 }

像以前那樣編譯,工做就完成了。

1 $ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs
2 $ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs

若是你想將日誌保存到文件中,打開控制檯而後輸入:

1 $ ReceiveLogs.exe > logs_from_rabbit.log

若是你想在屏幕上看到日誌,打開一個新的終端,執行下面的代碼:

1 $ ReceiveLogs.exe

固然,發送日誌輸入:

1 $ EmitLog.exe

使用rabbitmqctl list_bindings命令,能夠看到代碼確如咱們但願的那樣建立了綁定和隊列。若是同時運行兩個消費者(ReceiveLogs.cs)你將能看到下面這樣的信息:

1 $ sudo rabbitmqctl list_bindings
2 Listing bindings ...
3 logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
4 logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
5 ...done.

結果很是的直觀:數據從交換器logs發送到兩個服務自動指定名稱的隊列,這正是咱們以前預期的。

 

要了解如何監聽消息的子集,讓咱們進入下一篇。

 

原文連接:http://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

相關文章
相關標籤/搜索