在上一章中,咱們完善了咱們的日誌系統,用direct交換器替換了fanout交換器,使得咱們能夠有選擇性地接收消息。儘管如此,仍然還有限制:不能基於多個標準進行路由。在咱們的日誌系統中,咱們可能不只但願根據日誌等級訂閱日誌,還但願根據日誌來源訂閱日誌。這個概念來自於unix工具syslog,它不只能夠根據日誌等級(info/warn/crit...)來路由日誌,同時還能夠根據設備(auth/cron/kern...)來路由日誌。這將更加靈活,咱們可能但願只監聽來自'cron'的error級別日誌,同時又要接收來自'kern'的全部級別的日誌。咱們的日誌系統若是要實現這個功能,就須要使用到另一種交換器:主題交換器(Topic Exchange)。java
發送到主題交換器的消息不能有任意的routing key,必須是由點號分開的一串單詞,這些單詞能夠是任意的,但一般是與消息相關的一些特徵。好比如下是幾個有效的routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的單詞能夠有不少,最大限制是255 bytes。程序員
binding key必須與routing key模式同樣。Topic交換器的邏輯與direct交換器有點類似:使用特定路由鍵發送的消息將被髮送到全部使用匹配綁定鍵綁定的隊列,然而,綁定鍵有兩個特殊的狀況,以下:spring
下圖很好地表示這這兩個通配符的用法: app
在這個例子中,咱們將發送全部跟動物有關的消息,這些消息將會發送到由三個單詞,兩個點號組成的routing key,第一個單詞了表示的是速度,第二個單詞表示顏色,第三個單詞表示種類:ide
"<speed>.<colour>.<species>"。工具
咱們建立三個綁定關係:隊列Q1綁定到綁定鍵*.orange.* ,隊列Q2綁定到*.*.rabbit和lazy.#。ui
總結下來就是:unix
一個路由爲 "quick.orange.rabbit"的消息,將會被轉發到這兩個隊列,路由爲"lazy.orange.elephant"的消息也被轉發給這兩個隊列,路由爲 "quick.orange.fox"的消息將只被轉發到Q1隊列,路由爲 "lazy.brown.fox"的消息將只被轉發到Q2隊列。"lazy.pink.rabbit" 只被轉發到Q2隊列一次(雖然它匹配綁定鍵*.*.rabbit和lazy.#),路由爲 "quick.brown.fox"的消息與任何一個綁定鍵都不匹配,所以將會被丟棄。日誌
若是咱們發送的消息的的路由是由一個單詞「orangle"或4個單詞」quick.orangle.male.rabbit「將會怎樣?會由於與任何一個綁定鍵不匹配而被丟棄。code
另外一方面,路由爲 "lazy.orange.male.rabbit"的消息,由於匹配"lazy.#"綁定鍵,於是會被轉發到Q2隊列。
Topic交換器很是強大,能夠像其餘類型的交換器同樣工做:
當一個隊列的綁定鍵是"#"是,它將會接收全部的消息,而再也不考慮所接收消息的路由鍵,就像是fanout交換器同樣;
當一個隊列的綁定鍵沒有用到」#「和」*「時,它又像direct交換同樣工做。
下面是在咱們日誌系統中採用Topic交換器的完整代碼,咱們要發送的日誌消息的路由由兩個單詞組成:"<facility>.<severity>"。
EmitLogTopic.java
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private final static String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost); try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String message = "A critical kernel error"; String routingKey = "kern.critical"; channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("utf-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } }
ReceiveLogsTopic.java
import com.rabbitmq.client.*; public class ReceiveLogsTopic { private final static String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); if (args.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : args) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
啓動4個接收者,分別傳入綁定鍵:#、kern.、.critical、kern.* *.critical。
啓動生產者:發送一條路由爲「kern.critical」的消息,消息內容爲:「A critical kernel error」,分別查看接收狀況:
能夠看到,全部綁定鍵的隊列都正常接收到了消息。
工程以下圖:
1、生產者
application.properties
#RabbitMq spring.rabbitmq.host=localhost rabbitmq.exchange.topic=topic_logs2 rabbitmq.exchange.topic.routing.key=kern.critical
EmitLogTopic.java
1 import org.springframework.amqp.core.AmqpTemplate; 2 import org.springframework.beans.factory.annotation.Autowired; 3 import org.springframework.beans.factory.annotation.Value; 4 import org.springframework.stereotype.Component; 5 6 @Component 7 public class EmitLogTopic { 8 9 @Value("${rabbitmq.exchange.topic}") 10 private String exchangeName; 11 12 @Value("${rabbitmq.exchange.topic.routing.key}") 13 private String routingKey; 14 15 @Autowired 16 private AmqpTemplate template; 17 18 public void sendMessage(Object message) { 19 System.out.println("發送消息:" + message); 20 template.convertAndSend(exchangeName,routingKey,message); 21 } 22 }
EmitLogTopicRunner.java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Component public class EmitLogTopicRunner implements ApplicationRunner { @Autowired private EmitLogTopic emitLogTopic; @Override public void run(ApplicationArguments args) throws Exception { emitLogTopic.sendMessage("A critical kernel error"); } }
2、消費者
application.properties
#RabbitMq spring.rabbitmq.host=localhost rabbitmq.exchange.topic=topic_logs2 rabbitmq.topic.queue=topic_queue rabbitmq.exchange.topic.binding.key=kern.critical server.port=8081
ReceiveLogsTopic.java
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${rabbitmq.topic.queue}",autoDelete = "true"), exchange = @Exchange(value = "${rabbitmq.exchange.topic}",type = ExchangeTypes.TOPIC), key = {"#","kern.*","*.critical"} ) ) public class ReceiveLogsTopic { @RabbitHandler public void reeive(Object message) { System.out.println("接收到消息:" + message); } }
啓動查看控制檯輸出:
生者產輸出:
消費者輸出: