《RabbitMQ Tutorial》譯文 第 4 章 路由

原文來自 RabbitMQ 英文官網教程(4.Routing),其示例代碼採用了 .NET C# 語言。html

Markdown

In the previous tutorial we built a simple logging system. We were able to broadcast log messages to many receivers.git

在以前的教程中,咱們構建了一個簡單的日誌系統,咱們能夠廣播日誌消息給衆多的接收人。github

In this tutorial we're going to add a feature to it - we're going to make it possible to subscribe only to a subset of the messages. For example, we will be able to direct only critical error messages to the log file (to save disk space), while still being able to print all of the log messages on the console.算法

在本教程中咱們即將爲日誌系統添加一個特性 - 使其能夠只訂閱消息的一個子集。好比,咱們只會直接將嚴重錯誤消息寫入到日誌文件(即保存到磁盤空間),與此同時還會把全部的日誌消息打印到控制檯。less

Bindings

綁定

In previous examples we were already creating bindings. You may recall code like:ide

在以前的示例中咱們已經建立過綁定,你可能會回憶起相似的代碼:flex

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.ui

綁定就是交換機和隊列之間的一種關係,能夠簡單地理解爲:某隊列對來自該交換機的消息感興趣。this

Bindings can take an extra routingKey parameter. To avoid the confusion with a BasicPublish parameter we're going to call it a binding key. This is how we could create a binding with a key:spa

綁定能夠使用一個額外的 routingKey 參數,爲避免與 BasicPublish 參數相混淆咱們將其稱呼爲「綁定鍵」,以下即是如何採用該鍵來建立一個綁定:

channel.QueueBind(queue: queueName,
                  exchange: "direct_logs",
                  routingKey: "black");

The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.

「綁定鍵」的含義取決於交換機類型,像咱們以前使用過的 fanout 型交換機,簡單起鍵其值在這裏就忽略了。(fanout 型交換機是無差異廣播到全部隊列,即便爲 routingKey 命名也沒有意義)

Direct exchange

直接型交換機

Our logging system from the previous tutorial broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on their severity.
For example we may want the script which is writing log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.

在咱們以前的教程中,日誌系統會廣播全部的消息給全部的消費者。咱們但願基於系統所面臨的壓力來作一些諸如容許過濾消息這樣的擴充。
好比咱們可能但願只接收嚴重錯誤的日誌消息腳本,而後將其寫入磁盤,而不要在警告型或提示型日誌消息方面浪費磁盤空間。

We were using a fanout exchange, which doesn't give us much flexibility - it's only capable of mindless broadcasting.

咱們一直在使用 fanout 型交換機,但它給予不了咱們足夠的靈活性 - 它僅僅適用於無差異(無腦式)的廣播。

We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.

好在咱們能夠使用 direct 型交換機,其背後的路由算法很簡單 - 即消息會流向其綁定鍵剛好與路由鍵相匹配的隊列。

To illustrate that, consider the following setup:

爲說明這個,能夠考慮接下來的設置:

Markdown

In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key black and the other one with green.

在這個設置中,咱們能夠看到符號爲 X 的 direct 型交換機有兩個隊列關聯 到它。第一個隊列經過「綁定鍵」 orange 來關聯,第二個隊列則有兩個綁定,一個基於 black 「綁定鍵」,另外一個基於 green 「綁定鍵」。

In such a setup a message published to the exchange with a routing key orange will be routed to queue Q1. Messages with a routing key of black or green will go to Q2. All other messages will be discarded.

在這樣一番設置中,發佈到基於 orange 路由鍵綁定的消息將會被路由到 Q1 隊列,而基於 black 或者 green 路由鍵的消息則去往 Q2 隊列,而其餘全部的消息將會被丟棄。

Multiple bindings

多重綁定

Markdown

It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.

採用相同的「綁定鍵」來綁定多個隊列是徹底合法的。在咱們的示例中,能夠基於 black 「綁定鍵」來添加一個 X 和 Q1 之間的綁定。如此,direct 型交換機將表現得與 fanout 型相像,而且會把消息廣播給全部匹配的隊列。如此,基於 black 「路由鍵」的消息將會被遞送到 Q1 和 Q2 兩個隊列。

Emitting logs

發出日誌

We'll use this model for our logging system. Instead of fanout we'll send messages to a direct exchange. We will supply the log severity as a routing key. That way the receiving script will be able to select the severity it wants to receive. Let's focus on emitting logs first.

咱們將爲日誌系統使用以上模型,咱們會在發送消息時使用 direct 型交換機,而不是 fanout 型。咱們會基於日誌的嚴重性做爲路由鍵,這樣的話接收端腳本將能夠選擇它指望接收的嚴重性。讓咱們首先聚焦在發送日誌方面。

As always, we need to create an exchange first:

一如既往,咱們須要首先建立一個交換機:

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

And we're ready to send a message:

接着咱們準備好發送消息:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

To simplify things we will assume that 'severity' can be one of 'info', 'warning', 'error'.

爲簡單起鍵,咱們假定「severity」變量指的就是「info」、「warning」以及「error」中的一種。

Subscribing

訂閱

Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.

就像以前教程同樣,接收消息這一塊都運行得很好,惟獨有一處不一樣 -- 那就是咱們 爲本身所感興趣的(日誌的)每一種嚴重性建立一個新的綁定。

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}

Putting it all together

融合一塊兒

Markdown

The code for EmitLogDirect.cs class:

EmitLogDirect.cs 類文件代碼:

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class EmitLogDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");

            var severity = (args.Length > 0) ? args[0] : "info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip( 1 ).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "direct_logs",
                                 routingKey: severity,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

The code for ReceiveLogsDirect.cs:

ReceiveLogsDirect.cs 類文件代碼:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogsDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var severity in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs",
                                  routingKey: severity);
            }

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

Create projects as usual (see tutorial one for advice).

像往常同樣建立工程(看看教程第一章的建議)

If you want to save only 'warning' and 'error' (and not 'info') log messages to a file, just open a console and type:

若是你只想保存「warning」和「error」(不包括「info」)類型的日誌消息到文件,只需打開控制檯並輸入:

cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log

If you'd like to see all the log messages on your screen, open a new terminal and do:

若是你想在顯示屏上看到全部的日誌消息,能夠打開新的終端並輸入:

cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C

And, for example, to emit an error log message just type:

好比,爲了產生一個 error 級別的日誌消息只需輸入:

cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(Full source code for (EmitLogDirect.cs source) and (ReceiveLogsDirect.cs source))

EmitLogDirect.cs 完整源代碼)和(ReceiveLogsDirect.cs 完整源代碼

相關文章
相關標籤/搜索