RabbitMQ教程C#版 - 發佈訂閱

先決條件
本教程假定 RabbitMQ 已經安裝,並運行在localhost標準端口(5672)。若是你使用不一樣的主機、端口或證書,則須要調整鏈接設置。javascript

從哪裏得到幫助
若是您在閱讀本教程時遇到困難,能夠經過郵件列表 聯繫咱們html

發佈/訂閱#

(使用 .NET Client)java

在 教程[2] 中,咱們建立了一個工做隊列,假設在工做隊列中的每個任務都只被分發給一個 Worker。那麼在這一章節,咱們要作與之徹底不一樣的事,那就是咱們將要把一條消息分發給多個消費者。這種模式被稱爲「發佈/訂閱」。nginx

爲了說明、體現這種模式,咱們將會建一個簡單的日誌系統。它將會包含兩個程序 - 第一個用來發送日誌消息,第二個用來接收並打印它們。git

在咱們創建的日誌系統中,每一個接收程序的運行副本都會收到消息。這樣咱們就能夠運行一個接收程序接收消息並將日誌寫入磁盤;同時運行另一個接收程序接收消息並將日誌打印到屏幕上。github

實質上,發佈的日誌消息將會被廣播給全部的接收者。docker

交換器#

在教程的前幾部分,咱們是發送消息到隊列並從隊列中接收消息。如今是時候介紹 Rabbit 中完整的消息傳遞模型了。shell

讓咱們快速回顧一下前面教程中的內容:bash

  • 生產者是發送消息的用戶應用程序。
  • 隊列是存儲消息的緩衝區。
  • 消費者是接收消息的用戶應用程序。

在 RabbitMQ 中,消息傳遞模型的核心理念是生產者歷來不會把任何消息直接發送到隊列,其實,一般生產者甚至不知道消息是否會被分發到任何隊列中。服務器

然而,生產者只能把消息發送給交換器。交換器很是簡單,一方面它接收來自生產者的消息,另外一方面又會把接收的消息推送到隊列中。交換器必須明確知道該如何處理收到的消息,應該追加到一個特定隊列中?仍是應該追加到多個隊列中?或者應該把它丟棄?這些規則都被定義在交換器類型中。

Exchanges

目前交換器類型有這幾種:directtopicheadersfanout。咱們先重點關注最後一個fanout,咱們建立一個這種類型的交換器,將其命名爲logs

Copy
channel.ExchangeDeclare("logs", "fanout");

fanout類型交換器很是簡單,正如您可能從名字中猜出的那樣,它會把收到的全部消息廣播到它已知的全部隊列中。這恰巧是咱們的日誌系統目前所須要的。

列舉交換器
要列舉出服務器上的交換器,您可使用很是有用的rabbitmqctl命令行工具:

Copy
sudo rabbitmqctl list_exchanges

執行上述命令後,出現的列表中將會有一些amq.*交換器和默認(未命名)交換器。這些是默認建立的,不過目前您可能用不到它們。

默認交換器
在教程的前些部分,咱們對交換器這一律念還一無所知,但仍然能夠把消息發送到隊列。之因此這樣,是由於咱們使用了一個用空字符串("")標識的默認交換器。

回顧一下咱們以前如何發佈消息:

Copy
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);

第一個參數就是交換器的名稱,空字符串表示默認或匿名交換器:將消息路由到routingKey指定的隊列(若是存在)中。

如今,咱們能夠把消息發佈到咱們指定的交換器:

Copy
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);

臨時隊列#

您是否還記得以前咱們使用過的隊列,它們都有一個特定的名稱(記得應該是hellotask_queue吧)。給隊列命名對咱們來講是相當重要的 -- 由於咱們可能須要多個 Worker 指向同一個隊列;當您想要在生產者和消費者之間共享隊列時,給隊列一個名稱也是很是重要的。

可是,咱們建立的日誌系統並不但願如此。咱們但願監聽全部的日誌消息,而不只僅是其中一部分。咱們也只對目前流動的消息感興趣,而不是舊消息。爲解決這個問題,咱們須要作好兩件事。

首先,咱們不管什麼時候鏈接 Rabbit,都須要一個新的、空的隊列。要作到這一點,咱們可使用隨機名稱來建立隊列,或許,甚至更好的方案是讓服務器爲咱們選擇一個隨機隊列名稱。

其次,一旦咱們與消費者斷開鏈接,與之相關的隊列應該被自動刪除。

在 .NET 客戶端中,若是不向QueueDeclare()方法提供任何參數,實際上就是建立了一個非持久化、獨佔、且自動刪除的隨機命名隊列:

Copy
var queueName = channel.QueueDeclare().QueueName;

您能夠在 隊列指南 中瞭解更多關於exclusive參數和其餘隊列屬性的信息。

此時,queueName包含一個隨機隊列名稱。例如,它看起來可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg

綁定#

Bindings

咱們已經建立好了一個fanout交換器和一個隊列。如今咱們須要告訴交換器把消息發送到咱們的隊列。而交換器和隊列之間的關係就稱之爲綁定

Copy
// 把一個隊列綁定到指定交換器。 channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

從如今起,logs交換器會把消息追加到咱們的隊列中。

列舉綁定
您可使用(您或許已經猜到了),列舉出現有的綁定。

Copy
sudo rabbitmqctl list_bindings

組合在一塊兒#

生產者程序負責分發消息,這與以前的教程看起來沒有太大區別。

最重要的變化是咱們如今想把消息發佈到咱們的logs交換器,而不是匿名交換器。在發送時咱們須要提供一個路由鍵routingKey,可是對於fanout交換器,它的值能夠被忽略。這裏是EmitLog.cs文件的代碼:

Copy
using System; using RabbitMQ.Client; using System.Text; class EmitLog { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); } }

EmitLog.cs 源碼)

如你所見,在創建鏈接後,咱們聲明瞭交換器。這一步很是有必要,由於發佈消息到一個不存在的交換器,這種狀況是被禁止的。

若是沒有隊列綁定到交換器上,消息將會丟失,但這對咱們來講並無什麼沒問題;若是沒有消費者正在監聽,咱們是能夠放心地把消息丟棄的。

ReceiveLogs.cs的代碼:

Copy
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class ReceiveLogs { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }

ReceiveLogs.cs 源碼)

按照 教程[1]中的設置說明生成EmitLogsReceiveLogs 項目。

若是您想把日誌保存到文件中,只需打開一個控制檯並輸入:

Copy
cd ReceiveLogs
dotnet run > logs_from_rabbit.log

若是你想在屏幕上看到日誌,我能夠新開一個終端並運行:

Copy
cd ReceiveLogs
dotnet run

固然,分發日誌須要輸入:

Copy
cd EmitLog
dotnet run

使用rabbitmqctl list_bindings命令,您能夠驗證代碼是否真正建立了咱們想要的綁定和隊列。當有兩個ReceiveLogs.cs程序運行時,您應該看到以下所示的內容:

Copy
sudo rabbitmqctl list_bindings
# => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.

對執行結果的解釋簡潔明瞭:來自logs交換器的數據轉發到了兩個由服務器隨機分配名稱的隊列。這正是咱們期待的結果。

想要了解如何監聽消息的這一塊內容,讓咱們繼續閱讀 教程[4]

寫在最後#

本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容爲準。水平有限,翻譯的很差請見諒,若有翻譯錯誤還請指正。

做者:Esofar

出處:https://www.cnblogs.com/esofar/p/rabbitmq-publish-subscribe.html

本站使用「CC BY 4.0」創做共享協議,轉載請在文章明顯位置註明做者及出處。

相關文章
相關標籤/搜索