【譯】RabbitMQ系列(四) - 路由模式

路由模式

在以前的文章中咱們創建了一個簡單的日誌系統。咱們能夠經過這個系統將日誌message廣播給不少接收者。java

在本篇文章中,咱們在這之上,添加一個新的功能,即容許接收者訂閱message的一個子集。舉個例子,咱們將日誌分紅多個級別,一個接收者接收錯誤日誌將之保存到磁盤,另外一個接收者接收全部日誌將之打印到控制檯。算法

Bindings

在前面的章節中,咱們已經接觸過binding了,像下面的代碼這樣:bash

channel.queueBind(queueName,EXCHANGE_NAME,"");

binding將exchange和queue關聯在了一塊兒。更形象的表示,如:queue對exchange中的message感興趣。spa

bindings能夠攜帶一個routingKey參數。爲了不和basic_publish的參數弄混,咱們稱之它爲binding_key.咱們像下面這樣建立一個binding日誌

channel.queueBind(queueName,EXCHANGE_NAME,"black");

binding key的做用要看exchange的類型,對於fanout類型的exchange,binding key是直接忽略的。code

Direct Exchange

在以前的日誌系統中,message會推送到全部的消費者去。咱們想讓系統依據message的日誌級別進行過濾。好比一個消費者只接收嚴重級別的日誌。rabbitmq

fanout沒法幫咱們實現這樣的功能,它只是無腦的進行廣播。圖片

咱們使用direct類型的exchange,它的路由算法是很是簡單的 - 只要message的routing_key和bind的binding_key相同即進行轉發。ip

爲了進行說明,像下圖這麼來設置
圖片描述
如圖,能夠看到有兩個queue綁到了類型爲direct的exchange上。第一個queue綁定用了orange這個binding key,第二個則用了black和green兩個binding key。路由

那麼結果就是有routing key爲orange的message路由到了Q1.而routing key爲black和green的message則路由到了Q2,其餘的消息則被丟棄了。

Multiple Bindings

圖片描述
若使用相同的binding key將多個queue綁定到exchange上,就和fanout的行爲同樣了,message會廣播到binding key相同的queue去。如圖的設置中,一個routing key爲black的message就會同時發送到Q1和Q2。

Emitting logs

咱們將在咱們的日誌系統上應用這個模型,使用direct類型的exchange去替代fanout類型的exchange。提供日誌的嚴重性做爲routing key。接收程序能夠選擇要接收日誌的嚴重性級別。
首先咱們建立exchange

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

而後就是發送message

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

咱們先假設severity取值 info | warning | error

Subscribing

接收message和上一章沒什麼區別,只是須要給各個severity建立新的binding。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

開始執行

圖片描述

EmitLogDirect.java代碼以下

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsDirect.java代碼以下:

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

編譯代碼

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

若是想把warning和error的日誌保存到文件去,那麼

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

若是想把全部的日誌打印到控制檯,那麼

java -cp $CP ReceiveLogsDirect info warning error

發送error日誌

java -cp $CP EmitLogDirect error "Run.Run. Or it will explode"
相關文章
相關標籤/搜索