在上篇博文 譯:3.RabbitMQ 之Publish/Subscribe(發佈和訂閱) 咱們構建了一個簡單的日誌系統 咱們可以向許多接收者廣播日誌消息。html
在本篇博文中,咱們將爲其添加一個功能 - 咱們將只能訂閱一部分消息。 例如,咱們只能將關鍵錯誤消息定向到日誌文件(以節省磁盤空間),同時仍然可以在控制檯上打印全部日誌消息。java
本文是譯文,英文原文請移步:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
算法
在前面的例子中,咱們已經在建立綁定。您可能會記得如下代碼:app
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定是交換和隊列之間的關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。ide
綁定能夠採用額外的routingKey參數。爲了不與basic_publish參數混淆,咱們將其稱爲 綁定密鑰。這就是咱們如何使用鍵建立綁定:ui
channel.queueBind(queueName,EXCHANGE_NAME,「black」);
綁定密鑰的含義取決於交換類型。咱們以前使用的 扇出交換隻是忽略了它的價值。spa
咱們上一個教程中的日誌記錄系統向全部消費者廣播全部消息。咱們但願擴展它以容許根據消息的嚴重性過濾消息。例如,咱們可能須要一個程序將日誌消息寫入磁盤以僅接收嚴重錯誤,而不是在警告或信息日誌消息上浪費磁盤空間。日誌
咱們使用的是扇出交換,它沒有給咱們太大的靈活性 - 它只能進行無心識的廣播。code
咱們將使用直接交換。直接交換背後的路由算法很簡單 - 消息進入隊列,其 綁定密鑰與消息的路由密鑰徹底匹配。htm
咱們上一個教程中的日誌記錄系統向全部消費者廣播全部消息。咱們但願擴展它以容許根據消息的嚴重性過濾消息。例如,咱們可能須要一個程序將日誌消息寫入磁盤以僅接收嚴重錯誤,而不是在警告或信息日誌消息上浪費磁盤空間。
咱們使用的是扇出交換,它沒有給咱們太大的靈活性 - 它只能進行無心識的廣播。
咱們將使用直接交換。直接交換背後的路由算法很簡單 - 消息進入隊列,其 綁定密鑰與消息的路由密鑰徹底匹配。
爲了說明這一點,請考慮如下設置:
在此設置中,咱們能夠看到直接交換X與兩個綁定到它的隊列。第一個隊列綁定orange 綁定,第二個綁定有兩個綁定,一個綁定密鑰爲black,另外一個綁定爲green。
在這樣的設置中,使用路由密鑰orange發佈到交換機的消息 將被路由到隊列Q1。路由鍵爲black 或green的消息將轉到Q2。全部其餘消息將被丟棄。
使用相同的綁定密鑰綁定多個隊列是徹底合法的。
在咱們的例子中,咱們能夠在X和Q1之間添加綁定鍵黑色的綁定。
在這種狀況下, direct 直接交換將表現得像 fanout同樣,並將消息廣播到全部匹配的隊列。路由密鑰爲black的消息將傳送到 Q1和Q2。
咱們將此模型用於咱們的日誌系統。咱們會將消息發送給直接交換,而不是扇出。
咱們將提供日誌嚴重性做爲路由密鑰。這樣接收程序將可以選擇它想要接收的嚴重性。
讓咱們首先關注發送日誌。一如既往,咱們須要先建立一個交換:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
咱們已準備好發送消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
爲簡化起見,咱們假設「嚴重性」能夠是「信息」,「警告」,「錯誤」之一。
接收消息將像上一個教程同樣工做,但有一個例外 - 咱們將爲咱們感興趣的每一個嚴重性建立一個新的綁定。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
EmitLogDirect.java:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } private static String getSeverity(String[] strings) { if (strings.length < 1) return "info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
ReceiveLogsDirect.java
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }