在前一篇教程中,咱們建立了一個工做隊列,咱們假設在工做隊列後的每個任務都只被調度給一個消費者。在這一部分,咱們將作一些徹底不同的事情,調度同一條消息給多個消費者,也就是有名的「發佈-訂閱」模式。爲了闡述這種模式,咱們將構建一個簡單的日誌系統。該系統將由兩部分組成:一部分發送日誌消息,另外一部分接收而且打印日誌消息,在這個日誌系統中,每一份運行着的接收程序都將會收到消息。這樣咱們能夠運行一個接收者把日誌寫入到磁盤中,同時能夠運行另外一個接收者將日誌打印到顯示器上面。也就是說,發佈的日誌消息會被廣播到全部的接收者。html
在前面的教程中,咱們發送消息到隊列,而後從隊列中接收消息。如今開始介紹RabbitMQ完整的消息模式。緩存
讓咱們快速的複習一下在前面的教程中講過的內容:安全
RabbitMQ消息模式的核心是生產者從不直接發送消息到隊列。事實上,生產者每每不知道他產生的消息會被分發到哪些隊列,它只能將消息發送到一個交換器。交換器很是簡單,它一方面從生產者接收消息,另外一方面又將消息壓入隊列中。交換器必須清楚的知道要用接收到的消息作什麼,是應當追加到某個指定的隊列?或者追加到不少隊列?或者應當丟棄?要完成這些的規則都被定義在交換器的類型中。服務器
有幾種可用的交換器類型:direct、topic、headers和fanout。本文主要關注最後一種類型:fanout,讓咱們建立一個這種類型的交換器,命名爲logs:函數
1 channel.ExchangeDeclare("logs", "fanout");
類型爲fanout的交換器很是簡單,顧名思義,它會廣播全部收到的消息到它知道的全部的隊列,而這也正是咱們的日誌系統所須要的。spa
交換器清單日誌
爲了展現服務器上交換器的清單,你能夠運行在任什麼時候候都特別有用的rabbitmqctl:code
1 $ sudo rabbitmqctl list_exchanges 2 Listing exchanges ... 3 direct 4 amq.direct direct 5 amq.fanout fanout 6 amq.headers headers 7 amq.match headers 8 amq.rabbitmq.log topic 9 amq.rabbitmq.trace topic 10 amq.topic topic 11 logs fanout 12 ...done.
在清單裏,有一些amp.*樣式的交換器和一個默認(未命名)的交換器,這些都是默認建立的,但並非說你立刻就須要使用它們。htm
匿名交換器blog
在前面的教程中咱們並不知曉交換器的任何信息,可是任然能夠將消息發送到隊列中,那是由於咱們使用了默認的交換器,使用空字符串表示("")。
回憶一下以前是如何發佈消息的:
1 var message = GetMessage(args); 2 var body = Encoding.UTF8.GetBytes(message); 3 channel.BasicPublish(exchange: "", 4 routingKey: "hello", 5 basicProperties: null, 6 body: body);
第一個參數就是交換器的名稱,空字符串指代的是默認交換器或者是匿名交換器,若是隊列存在,消息將經過指定的routingKey路由到隊列。
如今咱們能夠將消息發佈到上面定義的命名交換器了:
1 var message = GetMessage(args); 2 var body = Encoding.UTF8.GetBytes(message); 3 channel.BasicPublish(exchange: "logs", 4 routingKey: "", 5 basicProperties: null, 6 body: body);
你或許還記得咱們以前使用的有指定名稱的隊列(還記得hello和task_queue麼?)。能爲隊列命名對咱們來講是相當重要的,咱們須要指定給消費者相同的隊列。當你想在生產者和消費者間共享隊列時,給隊列指定一個名字就顯得特別重要了。
可是這並非咱們日誌系統的問題。咱們但願能監聽到全部消息,而不只僅是其中一個子集;咱們對當前流入的消息感興趣而不是以前的舊信息。爲了解決這個問題,咱們須要作兩件事:第1、不管什麼時候鏈接到RabbitMQ,咱們須要一個新的空隊列,爲此咱們能夠建立一個擁有隨機名稱的隊列或者更好的是直接讓RabbitMQ服務替咱們生成一個隨機名稱;第2、一旦消費者斷開鏈接,隊列應當被自動刪除。
在.NET 客戶端,咱們經過提供無參數的QueueDeclare()函數能夠建立一個不持久化、獨佔的、自動刪除的擁有隨機名稱的隊列:
1 var queueName = channel.QueueDeclare().QueueName;
這樣queueName就是一個隨機的隊列名稱,看起來會是這樣的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
咱們已經建立了一個fanout類型的交換器和一個隊列,如今須要告訴交換器把消息發送到咱們的隊列。交換器和隊列的關係就叫作綁定。
1 channel.QueueBind(queue: queueName, 2 exchange: "logs", 3 routingKey: "");
到目前爲止,交換器logs將能添加消息到咱們的隊列中了。
綁定清單
你能夠經過rabbitmqctl list_bingdings命令查看綁定清單。
發送日誌的生產者程序和以前教程裏面的沒有太多不一樣,最重要的改變是如今咱們但願將消息發送到logs交換器,而不是以前的匿名交換器。當發送消息的時候,咱們須要指定一個routingKey,可是在使用fanout類型交換器的時候,它的值將被忽略。下面是EmitLog.cs文件裏面的代碼:
1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class EmitLog 6 { 7 public static void Main(string[] args) 8 { 9 var factory = new ConnectionFactory() { HostName = "localhost" }; 10 using(var connection = factory.CreateConnection()) 11 using(var channel = connection.CreateModel()) 12 { 13 channel.ExchangeDeclare(exchange: "logs", type: "fanout"); 14 15 var message = GetMessage(args); 16 var body = Encoding.UTF8.GetBytes(message); 17 channel.BasicPublish(exchange: "logs", 18 routingKey: "", 19 basicProperties: null, 20 body: body); 21 Console.WriteLine(" [x] Sent {0}", message); 22 } 23 24 Console.WriteLine(" Press [enter] to exit."); 25 Console.ReadLine(); 26 } 27 28 private static string GetMessage(string[] args) 29 { 30 return ((args.Length > 0) 31 ? string.Join(" ", args) 32 : "info: Hello World!"); 33 } 34 }
如你所見,在建立連接以後咱們申明瞭交換器,這一步用於禁止發佈到不存在的交換器是頗有必要的。若是沒有隊列綁定到交換器發佈的消息將會丟失,這是沒有問題的;若是沒有消費者監聽消息,咱們能夠安全的銷燬它。
ReceiveLog.cs中的代碼:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class ReceiveLogs 7 { 8 public static void Main() 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.ExchangeDeclare(exchange: "logs", type: "fanout"); 15 16 var queueName = channel.QueueDeclare().QueueName; 17 channel.QueueBind(queue: queueName, 18 exchange: "logs", 19 routingKey: ""); 20 21 Console.WriteLine(" [*] Waiting for logs."); 22 23 var consumer = new EventingBasicConsumer(channel); 24 consumer.Received += (model, ea) => 25 { 26 var body = ea.Body; 27 var message = Encoding.UTF8.GetString(body); 28 Console.WriteLine(" [x] {0}", message); 29 }; 30 channel.BasicConsume(queue: queueName, 31 noAck: true, 32 consumer: consumer); 33 34 Console.WriteLine(" Press [enter] to exit."); 35 Console.ReadLine(); 36 } 37 } 38 }
像以前那樣編譯,工做就完成了。
1 $ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs 2 $ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs
若是你想將日誌保存到文件中,打開控制檯而後輸入:
1 $ ReceiveLogs.exe > logs_from_rabbit.log
若是你想在屏幕上看到日誌,打開一個新的終端,執行下面的代碼:
1 $ ReceiveLogs.exe
固然,發送日誌輸入:
1 $ EmitLog.exe
使用rabbitmqctl list_bindings命令,能夠看到代碼確如咱們但願的那樣建立了綁定和隊列。若是同時運行兩個消費者(ReceiveLogs.cs)你將能看到下面這樣的信息:
1 $ sudo rabbitmqctl list_bindings 2 Listing bindings ... 3 logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] 4 logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] 5 ...done.
結果很是的直觀:數據從交換器logs發送到兩個服務自動指定名稱的隊列,這正是咱們以前預期的。
要了解如何監聽消息的子集,讓咱們進入下一篇。
原文連接:http://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html