在上一章咱們改進了咱們的日誌系統,若是使用fanout咱們只能簡單進行廣播,而使用direct則容許消費者能夠進行必定程度的選擇。可是direct仍是有其侷限性,其路由不支持多個條件。java
在咱們的日誌系統中,消費者程序可能不止是基於日誌的severity,同時也想基於發送日誌的源系統。你可能知道linux的syslog工具,它就是同時基於severity(info/warn/crit...)和功能(auth/cron/kern...).linux
這就提供了很大的靈活性-咱們想接收來自cron的嚴重錯誤日誌和kern的全部日誌。bash
下面咱們就使用更復雜的topic來改進咱們的日誌系統。工具
發送到topic類型exchange的message不能夠具備模糊的routing_key,它必須具備以冒號分割的詞。就像"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"等,限制長度255字節。ui
binding key也採用類似的形勢。topic exchange的邏輯和direct類似,經過比較message的routing key和bind的binding key,來匹配轉發的queue。可是topic的binding支持通配符:spa
經過上面圖示的場景來解釋會比較好理解。日誌
例子中咱們將發送描述動物的message。message會攜帶routing key(包含三個詞),第一個詞表示speed,第二個表示color,第三個表示species"<speed>.<colour>.<species>".code
建立了三個綁定:Q1的binding key是」*.orange.*" Q2的binding key是「*.*.rabbit」和 "lazy.#".rabbitmq
以文字表述即是:圖片
routing key爲「quick.orange.rabbit"的message會同時發佈到這兩個queue。
routing key爲"lazy.orange.elephant"的message會同時發佈到這兩個queue。
routing key爲」quick.orange.fox「只會發佈到第一個queue.
routing key爲」lazy.brown.fox"的message只會發佈到第二個queue.
routing key爲"lazy.pink.rabbit"的message雖然知足Q2的兩個條件,但也只會發佈到Q2一次。
routing key爲"quick.brown.fox"的message沒有任何匹配,就會被丟失。
若是咱們發送的message只有一個word或者多餘三個word,如"orange"或者"quick.orange.male.rabbit"會發生什麼呢?這些message不會匹配任何binding key,均會被丟棄掉。
另外"lazy.orange.male.rabbit"雖然具備四個詞,可是會匹配最後的binding key,而被髮送到第二個queue。
Topic exhange很是強大,同時能夠模仿其餘兩種類型的exchange。當binding key爲 # 時,queue會接收全部的message。當binding key中沒有使用通配符(* 和 #)時,topic的行爲和direct一致。
咱們將在日誌系統中使用topic exchange。咱們的routding key採用兩個詞 "<facility>.<severity>".
EmitLogTopic.java的代碼以下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } //.. }
ReceiveLogsTopic.java的代碼以下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
編譯
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接收全部日誌
java -cp $CP ReceiveLogsTopic "#"
接收功能"kern"的日誌
java -cp $CP ReceiveLogsTopic "kern.*"
接收嚴重級別日誌
java -cp $CP ReceiveLogsTopic "*.critical"
接收者使用兩個綁定條件
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
發送日誌message
java -cp $CP EmitLogTopic "kern.critical" "A critical kernal error"