RabbitMQ指南之五:主題交換器(Topic Exchange)

在上一章中,咱們完善了咱們的日誌系統,用direct交換器替換了fanout交換器,使得咱們能夠有選擇性地接收消息。儘管如此,仍然還有限制:不能基於多個標準進行路由。在咱們的日誌系統中,咱們可能不只但願根據日誌等級訂閱日誌,還但願根據日誌來源訂閱日誌。這個概念來自於unix工具syslog,它不只能夠根據日誌等級(info/warn/crit...)來路由日誌,同時還能夠根據設備(auth/cron/kern...)來路由日誌。這將更加靈活,咱們可能但願只監聽來自'cron'的error級別日誌,同時又要接收來自'kern'的全部級別的日誌。咱們的日誌系統若是要實現這個功能,就須要使用到另一種交換器:主題交換器(Topic Exchange)。java

一、主題交換器(Topic Exchange)

  發送到主題交換器的消息不能有任意的routing key,必須是由點號分開的一串單詞,這些單詞能夠是任意的,但一般是與消息相關的一些特徵。好比如下是幾個有效的routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的單詞能夠有不少,最大限制是255 bytes。程序員

  binding key必須與routing key模式同樣。Topic交換器的邏輯與direct交換器有點類似:使用特定路由鍵發送的消息將被髮送到全部使用匹配綁定鍵綁定的隊列,然而,綁定鍵有兩個特殊的狀況,以下:spring

  • "*" 表示匹配任意一個單詞
  • "#" 表示匹配任意一個或多個單詞

  下圖很好地表示這這兩個通配符的用法: app

  在這個例子中,咱們將發送全部跟動物有關的消息,這些消息將會發送到由三個單詞,兩個點號組成的routing key,第一個單詞了表示的是速度,第二個單詞表示顏色,第三個單詞表示種類:ide

  "<speed>.<colour>.<species>"。工具

  咱們建立三個綁定關係:隊列Q1綁定到綁定鍵*.orange.* ,隊列Q2綁定到*.*.rabbit和lazy.#。ui

  總結下來就是:unix

  • 隊列Q1對橘黃色(orange)顏色的全部動物感興趣;
  • 隊列Q2對全部的兔子(rabbit)和全部慢吞吞(lazy)的動物感興趣。

  一個路由爲 "quick.orange.rabbit"的消息,將會被轉發到這兩個隊列,路由爲"lazy.orange.elephant"的消息也被轉發給這兩個隊列,路由爲 "quick.orange.fox"的消息將只被轉發到Q1隊列,路由爲 "lazy.brown.fox"的消息將只被轉發到Q2隊列。"lazy.pink.rabbit" 只被轉發到Q2隊列一次(雖然它匹配綁定鍵*.*.rabbit和lazy.#),路由爲 "quick.brown.fox"的消息與任何一個綁定鍵都不匹配,所以將會被丟棄。日誌

  若是咱們發送的消息的的路由是由一個單詞「orangle"或4個單詞」quick.orangle.male.rabbit「將會怎樣?會由於與任何一個綁定鍵不匹配而被丟棄。code

  另外一方面,路由爲 "lazy.orange.male.rabbit"的消息,由於匹配"lazy.#"綁定鍵,於是會被轉發到Q2隊列。

  Topic交換器很是強大,能夠像其餘類型的交換器同樣工做:

  當一個隊列的綁定鍵是"#"是,它將會接收全部的消息,而再也不考慮所接收消息的路由鍵,就像是fanout交換器同樣;

  當一個隊列的綁定鍵沒有用到」#「和」*「時,它又像direct交換同樣工做。

二、完整的代碼

  下面是在咱們日誌系統中採用Topic交換器的完整代碼,咱們要發送的日誌消息的路由由兩個單詞組成:"<facility>.<severity>"。

  EmitLogTopic.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

    private final static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost);

        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String message = "A critical kernel error";
            String routingKey = "kern.critical";

            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("utf-8"));

            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }
}

 ReceiveLogsTopic.java

import com.rabbitmq.client.*;

public class ReceiveLogsTopic {

    private final static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String queueName = channel.queueDeclare().getQueue();

        if (args.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : args) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

  啓動4個接收者,分別傳入綁定鍵:#、kern..critical、kern.* *.critical。

  啓動生產者:發送一條路由爲「kern.critical」的消息,消息內容爲:「A critical kernel error」,分別查看接收狀況:

  能夠看到,全部綁定鍵的隊列都正常接收到了消息。 

三、SpringBoot實現

   工程以下圖:

  1、生產者

application.properties

#RabbitMq
spring.rabbitmq.host=localhost
rabbitmq.exchange.topic=topic_logs2
rabbitmq.exchange.topic.routing.key=kern.critical

  EmitLogTopic.java

1 import org.springframework.amqp.core.AmqpTemplate;
 2 import org.springframework.beans.factory.annotation.Autowired;
 3 import org.springframework.beans.factory.annotation.Value;
 4 import org.springframework.stereotype.Component;
 5 
 6 @Component
 7 public class EmitLogTopic {
 8 
 9     @Value("${rabbitmq.exchange.topic}")
10     private String exchangeName;
11 
12     @Value("${rabbitmq.exchange.topic.routing.key}")
13     private String routingKey;
14 
15     @Autowired
16     private AmqpTemplate template;
17 
18     public void sendMessage(Object message) {
19         System.out.println("發送消息:" + message);
20         template.convertAndSend(exchangeName,routingKey,message);
21     }
22 }

  EmitLogTopicRunner.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
public class EmitLogTopicRunner implements ApplicationRunner {

    @Autowired
    private EmitLogTopic emitLogTopic;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        emitLogTopic.sendMessage("A critical kernel error");
    }
}

2、消費者

  application.properties

#RabbitMq
spring.rabbitmq.host=localhost
rabbitmq.exchange.topic=topic_logs2
rabbitmq.topic.queue=topic_queue
rabbitmq.exchange.topic.binding.key=kern.critical

server.port=8081

  ReceiveLogsTopic.java

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${rabbitmq.topic.queue}",autoDelete = "true"),
                exchange = @Exchange(value = "${rabbitmq.exchange.topic}",type = ExchangeTypes.TOPIC),
                key = {"#","kern.*","*.critical"}
        )
)
public class ReceiveLogsTopic {

        @RabbitHandler
        public void reeive(Object message) {
                System.out.println("接收到消息:" + message);
        }
}

  啓動查看控制檯輸出:

  生者產輸出:

  消費者輸出:

點關注,不迷路,這是一個程序員都想關注的公衆號

相關文章
相關標籤/搜索