【譯】RabbitMQ系列(五) - 主題模式

主題模式

在上一章咱們改進了咱們的日誌系統,若是使用fanout咱們只能簡單進行廣播,而使用direct則容許消費者能夠進行必定程度的選擇。可是direct仍是有其侷限性,其路由不支持多個條件。java

在咱們的日誌系統中,消費者程序可能不止是基於日誌的severity,同時也想基於發送日誌的源系統。你可能知道linux的syslog工具,它就是同時基於severity(info/warn/crit...)和功能(auth/cron/kern...).linux

這就提供了很大的靈活性-咱們想接收來自cron的嚴重錯誤日誌和kern的全部日誌。bash

下面咱們就使用更復雜的topic來改進咱們的日誌系統。工具

Topic exchange

發送到topic類型exchange的message不能夠具備模糊的routing_key,它必須具備以冒號分割的詞。就像"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"等,限制長度255字節。ui

binding key也採用類似的形勢。topic exchange的邏輯和direct類似,經過比較message的routing key和bind的binding key,來匹配轉發的queue。可是topic的binding支持通配符:spa

  • 」 * 「表示任何一個詞
  • 」 # 「 表示0或1個詞

圖片描述
經過上面圖示的場景來解釋會比較好理解。日誌

例子中咱們將發送描述動物的message。message會攜帶routing key(包含三個詞),第一個詞表示speed,第二個表示color,第三個表示species"<speed>.<colour>.<species>".code

建立了三個綁定:Q1的binding key是」*.orange.*" Q2的binding key是「*.*.rabbit」和 "lazy.#".rabbitmq

以文字表述即是:圖片

  • Q1 關心全部橘色的動物
  • Q2 關心全部的rabbit和全部的lazy動物

routing key爲「quick.orange.rabbit"的message會同時發佈到這兩個queue。
routing key爲"lazy.orange.elephant"的message會同時發佈到這兩個queue。
routing key爲」quick.orange.fox「只會發佈到第一個queue.
routing key爲」lazy.brown.fox"的message只會發佈到第二個queue.
routing key爲"lazy.pink.rabbit"的message雖然知足Q2的兩個條件,但也只會發佈到Q2一次。
routing key爲"quick.brown.fox"的message沒有任何匹配,就會被丟失。

若是咱們發送的message只有一個word或者多餘三個word,如"orange"或者"quick.orange.male.rabbit"會發生什麼呢?這些message不會匹配任何binding key,均會被丟棄掉。

另外"lazy.orange.male.rabbit"雖然具備四個詞,可是會匹配最後的binding key,而被髮送到第二個queue。

Topic exhange很是強大,同時能夠模仿其餘兩種類型的exchange。當binding key爲 # 時,queue會接收全部的message。當binding key中沒有使用通配符(* 和 #)時,topic的行爲和direct一致。

開始執行

咱們將在日誌系統中使用topic exchange。咱們的routding key採用兩個詞 "<facility>.<severity>".
EmitLogTopic.java的代碼以下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsTopic.java的代碼以下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
    }

    for (String bindingKey : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

編譯

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接收全部日誌

java -cp $CP ReceiveLogsTopic "#"

接收功能"kern"的日誌

java -cp $CP ReceiveLogsTopic "kern.*"

接收嚴重級別日誌

java -cp $CP ReceiveLogsTopic "*.critical"

接收者使用兩個綁定條件

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

發送日誌message

java -cp $CP EmitLogTopic "kern.critical" "A critical kernal error"
相關文章
相關標籤/搜索