在 上篇博文 譯:4.RabbitMQ 之Routing(路由) 中,咱們改進了日誌系統。html
咱們使用的是direct(直接交換),而不是使用只能進行虛擬廣播的 fanout(扇出交換) ,而且有可能選擇性地接收日誌。java
雖然使用direct(直接交換)改進了咱們的系統,但它仍然有侷限性 - 它不能基於多個標準進行路由。併發
在咱們的日誌系統中,咱們可能不只要根據嚴重性訂閱日誌,還要根據發出日誌的源來訂閱日誌。您可能從syslog unix工具中瞭解這個概念,該 工具根據嚴重性(info / warn / crit ...)和facility(auth / cron / kern ...)來路由日誌。app
這會給咱們帶來很大的靈活性 - 咱們可能想聽聽來自'cron'的關鍵錯誤以及來自'kern'的全部日誌。ide
要在咱們的日誌系統中實現這一點,咱們須要瞭解更復雜的topic (主題交換)。工具
本篇爲譯文,英文原文請移步:https://www.rabbitmq.com/tutorials/tutorial-five-java.htmlpost
發送到主題交換的消息不能具備任意 routing_key - 它必須是由點分隔的單詞列表。單詞能夠是任何內容,但一般它們指定與消息相關的一些功能。一些有效的路由密鑰示例:「 stock.usd.nyse 」,「 nyse.vmw 」,「 quick.orange.rabbit 」。路由密鑰中能夠包含任意數量的單詞,最多可達255個字節。ui
綁定密鑰也必須採用相同的形式。主題交換背後的邏輯 相似於直接交換- 使用特定路由密鑰發送的消息將被傳遞到與匹配綁定密鑰綁定的全部隊列。可是,綁定鍵有兩個重要的特殊狀況:url
在一個例子中解釋這個是最容易的:spa
在這個例子中,咱們將發送全部描述動物的消息。消息將與包含三個單詞(兩個點)的路由鍵一塊兒發送。
路由鍵中的第一個單詞將描述速度,第二個是顏色,第三個是物種:「 <speed>。<color>。<species> 」。
咱們建立了三個綁定:Q1綁定了綁定鍵「 * .orange。* 」,Q2 綁定了「 *。*。rabbit 」和「 lazy。# 」。
這些綁定能夠歸納爲:
路由密鑰設置爲「 quick.orange.rabbit 」的消息將傳遞到兩個隊列。消息「 lazy.orange.elephant 」也將同時發送給他們。另外一方面,「 quick.orange.fox 」只會進入第一個隊列,而「 lazy.brown.fox 」只會進入第二個隊列。「 lazy.pink.rabbit 」將僅傳遞到第二個隊列一次,即便它匹配兩個綁定。「 quick.brown.fox 」與任何綁定都不匹配,所以它將被丟棄。
若是咱們違反合同併發送帶有一個或四個單詞的消息,例如「 orange 」或「 quick.orange.male.rabbit」,會發生什麼?好吧,這些消息將不匹配任何綁定,將丟失。
另外一方面,「 lazy.orange.male.rabbit 」,即便它有四個單詞,也會匹配最後一個綁定,並將被傳遞到第二個隊列。
主題交換功能強大,能夠像其餘交易所同樣。
當隊列與「 # 」(哈希)綁定密鑰綁定時 - 它將接收全部消息,而無論路由密鑰 - 如扇出交換。
當特殊字符「 * 」(星號)和「 # 」(哈希)未在綁定中使用時,主題交換的行爲就像直接交換同樣
咱們將在日誌記錄系統中使用主題交換。咱們將首先假設日誌的路由鍵有兩個詞:「 <facility>。<severity> 」。
EmitLogTopic.java
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } private static String getRouting(String[] strings) { if (strings.length < 1) return "anonymous.info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
ReceiveLogsTopic.java
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }