譯:4.RabbitMQ Java Client 之 Routing(路由)

在上篇博文 譯:3.RabbitMQ 之Publish/Subscribe(發佈和訂閱)  咱們構建了一個簡單的日誌系統 咱們可以向許多接收者廣播日誌消息。html

在本篇博文中,咱們將爲其添加一個功能 - 咱們將只能訂閱一部分消息。 例如,咱們只能將關鍵錯誤消息定向到日誌文件(以節省磁盤空間),同時仍然可以在控制檯上打印全部日誌消息。java

本文是譯文,英文原文請移步:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
算法


Bindings 綁定

在前面的例子中,咱們已經在建立綁定。您可能會記得如下代碼:app

channel.queueBind(queueName, EXCHANGE_NAME, "");

 綁定是交換和隊列之間的關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。ide

綁定能夠採用額外的routingKey參數。爲了不與basic_publish參數混淆,咱們將其稱爲 綁定密鑰這就是咱們如何使用鍵建立綁定:ui

channel.queueBind(queueName,EXCHANGE_NAME,「black」);

綁定密鑰的含義取決於交換類型。咱們以前使用的 扇出交換隻是忽略了它的價值。spa

Direct exchange 直接交換

咱們上一個教程中的日誌記錄系統向全部消費者廣播全部消息。咱們但願擴展它以容許根據消息的嚴重性過濾消息。例如,咱們可能須要一個程序將日誌消息寫入磁盤以僅接收嚴重錯誤,而不是在警告或信息日誌消息上浪費磁盤空間。日誌

咱們使用的是扇出交換,它沒有給咱們太大的靈活性 - 它只能進行無心識的廣播。code

咱們將使用直接交換。直接交換背後的路由算法很簡單 - 消息進入隊列,其 綁定密鑰消息路由密鑰徹底匹配htm

咱們上一個教程中的日誌記錄系統向全部消費者廣播全部消息。咱們但願擴展它以容許根據消息的嚴重性過濾消息。例如,咱們可能須要一個程序將日誌消息寫入磁盤以僅接收嚴重錯誤,而不是在警告或信息日誌消息上浪費磁盤空間。

咱們使用的是扇出交換,它沒有給咱們太大的靈活性 - 它只能進行無心識的廣播。

咱們將使用直接交換。直接交換背後的路由算法很簡單 - 消息進入隊列,其 綁定密鑰消息路由密鑰徹底匹配

爲了說明這一點,請考慮如下設置:

 

 在此設置中,咱們能夠看到直接交換X與兩個綁定到它的隊列。第一個隊列綁定orange 綁定,第二個綁定有兩個綁定,一個綁定密鑰爲black,另外一個綁定爲green

在這樣的設置中,使用路由密鑰orange發佈到交換機的消息 將被路由到隊列Q1路由鍵爲black 或green的消息將轉到Q2全部其餘消息將被丟棄。

Multiple bindings 多個綁定

使用相同的綁定密鑰綁定多個隊列是徹底合法的。

在咱們的例子中,咱們能夠在XQ1之間添加綁定鍵黑色的綁定

在這種狀況下, direct 直接交換將表現得像 fanout同樣,並將消息廣播到全部匹配的隊列。路由密鑰爲black消息將傳送到 Q1Q2

Emitting logs 發送日誌

咱們將此模型用於咱們的日誌系統。咱們會將消息發送給直接交換,而不是扇出

咱們將提供日誌嚴重性做爲路由密鑰這樣接收程序將可以選擇它想要接收的嚴重性。

讓咱們首先關注發送日誌。一如既往,咱們須要先建立一個交換:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

咱們已準備好發送消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

爲簡化起見,咱們假設「嚴重性」能夠是「信息」,「警告」,「錯誤」之一。

Subscribing 訂閱

接收消息將像上一個教程同樣工做,但有一個例外 - 咱們將爲咱們感興趣的每一個嚴重性建立一個新的綁定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

Putting it all together 放到一塊兒來看

 

EmitLogDirect.java:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }

    private static String getSeverity(String[] strings) {
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0)
            return "";
        if (length < startIndex)
            return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

ReceiveLogsDirect.java

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for (String severity : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
相關文章
相關標籤/搜索