RabbitMQ 在 Spring Boot 進階之交換器 Direct Exchange

上篇文章中咱們只看到了的生產者的消息發送與消費者的消息消費,實際上它隱藏了rabbitMQ中一個重要的環節。 上篇文章中,咱們在生產者中直接定義了消息送達隊列的名字java

@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

}

咱們指定了將消息發送到< hello >這個隊列中。可是這樣在實際使用時,每每難以知足某些業務需求。 因此RabbitMQ給咱們提供了四種交換器來知足不一樣的需求。 分別是:Direct、Fanout、Topic、Headersspring

####假如咱們目前有一個業務,它負責資料的上傳。分佈式

第一個需求是,不一樣的用戶上傳後,後續的處理不一樣。ide

  • 好比說咱們的普通用戶上傳資料後,能夠得到必定的積分。
  • 內部人員上傳資料後,不須要積分,但它會增長必定的業績。

按照上一篇文章的作法,咱們是沒法用一個隊列來完成這部分操做的。spring-boot

這時候咱們可使用Direct交換器,將業績隊列與積分隊列綁定到咱們的交換器上。測試


  • 首先定義一個交換器,名字隨便起。
static final String ExchangeName = "spring-boot-exchange";

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(ExchangeName);
    }
  • 而後定義兩個Queue,一個是pointQueue,積分隊列。另外一個是performanceQueue,業績隊列。
static final String pointQueueName = "point";

    static final String performanceQueueName = "performance";
    @Bean
    Queue performanceQueue() {
        return new Queue(performanceQueueName, false);
    }

    @Bean
    Queue pointQueue() {
        return new Queue(pointQueueName, false);
    }
  • 將兩個Queue綁定到交換器上。這裏BindingBuilder的with方法,即routingKey是咱們的路由鍵,交換器將根據這裏定義的路由鍵規則進行隊列消息的分發。
@Bean
    Binding bindingPerformance(Queue performanceQueue, DirectExchange exchange) {
        return BindingBuilder.bind(performanceQueue)
                             .to(exchange)
                             .with(performanceQueueName);
    }

    @Bean
    Binding bindingPointQueue(Queue pointQueue, DirectExchange exchange) {
        return BindingBuilder.bind(pointQueue)
                             .to(exchange)
                             .with(pointQueueName);
    }
  • 定義兩個消費者
@Component
public class PointReceiver {

    public void receivePointMessage(String message) {
        System.out.println("接收到來自積分隊列的信息 <" + message + ">");
    }

}

@Component
public class PerformanceReceiver {

    public void receivePerformanceMessage(String message) {
        System.out.println("接收到來自業績隊列的信息 <" + message + ">");
    }
}
  • 爲消費者配置監聽器
@Bean
    MessageListenerAdapter pointListenerAdapter(PointReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receivePointMessage");
    }

    @Bean
    MessageListenerAdapter performanceListenerAdapter(PerformanceReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receivePerformanceMessage");
    }
  • 將監聽器配置到 Container 中
@Bean
    SimpleMessageListenerContainer pointContainer(ConnectionFactory connectionFactory,
        MessageListenerAdapter pointListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(pointQueueName);
        container.setMessageListener(pointListenerAdapter);
        return container;
    }

    @Bean
    SimpleMessageListenerContainer performanceContainer(ConnectionFactory connectionFactory,
        MessageListenerAdapter performanceListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(performanceQueueName);
        container.setMessageListener(performanceListenerAdapter);
        return container;
    }
  • 測試類以下,在這裏,咱們使用rabbitTemplate,在ExchangeName這個以前定義的交換器上,配置分發的規則,也就是第二個參數 routingKey 路由鍵。
@Component
public class Runner implements CommandLineRunner {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Sending message>>>");
        rabbitTemplate.convertAndSend(ExchangeName, pointQueueName, "增長積分~");
        rabbitTemplate.convertAndSend(ExchangeName, performanceQueueName, "增長業績~");
    }
}
  • 程序運行,控制檯輸出
Sending message>>>
接收到來自業績隊列的信息 <增長業績~>
接收到來自積分隊列的信息 <增長積分~>

這說明分發時,咱們使用的交換器已經成功地經過路由鍵進行了消息的分發。 而且監聽器也根據隊列的不一樣,將消息送達到了不一樣的消費者中。ui

這就是rabbitMQ的Direct Exchange。

Direct Exchange – 處理路由鍵。須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。若是一個隊列綁定到該交換機上要求路由鍵 「dog」,則只有被標記爲「dog」的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。this

這裏寫圖片描述

參考文章: http://blog.csdn.net/rainday0310/article/details/22082503 https://spring.io/guides/gs/messaging-rabbitmq/ 以及書籍《RabbitMQ實戰 高效部署分佈式消息隊列》.net

相關文章
相關標籤/搜索