SpringBoot RabbitMQ 七種工做模式入門

RabbitMQ 工做模式

簡單模式
  • 簡單:一個生產者、一個隊列和一個消費者,生產者發送消息至隊列,消費者監聽隊列並消費消息

Work 模式
  • Work:一個生產者、一個隊列和多個消費者,生產者發送消息至隊列,多個消費者監聽同一隊列消費消息

發佈/訂閱模式
  • 發佈/訂閱:publish/subscribe 模式包含一個生產者、一個交換機、多個隊列及多個消費者,交換機(Exchange)和隊列直接綁定,生產者經過交換機(Exchange)將消息存儲在與交換機綁定的隊列中,消費者監聽隊列並進行消費

路由模式
  • 路由:routing 模式能夠根據 routing key 將消息發送給指定隊列,交換機(Exchange)和隊列經過routing key 進行綁定,生產者經過交換機(Exchange)和 routing key 將消息精準發送至隊列,消費者監聽隊列並消費消息

主題模式
  • 主題:Topics 模式在路由模式的基礎上支持通配符操做,交換機會根據通配符將消息存儲在匹配成功的隊列中,消費者監聽隊列並進行消費

Header 模式
  • Header:header 模式取消了 routing key,而是使用 header 中的 key/value 鍵值對來進行匹配,匹配成功後消息會經過交換機發送給隊列,消息者才能獲取到消息並消費

RPC 模式
  • RPC:RPC 模式主要針對須要獲取消費者處理結果的狀況,一般是生產者將消息發送給了消費者,消費者接收到消息並進行消費後返回給生產者處理結果

SpringBoot 集成 RabbitMQ

  • 首先建立一個SpringBoot 項目,pom.xml 文件加入如下依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
  • 配置文件修改,加入如下 RabbitMQ 配置
server:
  port: 8888  # 設置端口號

spring:
  rabbitmq:
    host: 127.0.0.1  # 設置RabbitMQ所在主機
    port: 5672       # 設置RabbitMQ服務端口
    username: guest  # 設置RabbitMQ用戶名
    password: guest  # 設置RabbitMQ密碼
  • 新增公共常量類
public interface RabbitConstant {

    /**
     * 簡單模式
     */
    String SIMPLE_QUEUE_NAME = "simple_queue";

    /**
     * work 模式
     */
    String WORK_QUEUE_NAME = "work_queue";

    /**
     * 發佈/訂閱(publish/subscribe)模式
     */
    String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange";
    String PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME = "publish_subscribe_first_queue";
    String PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME = "publish_subscribe_second_queue";

    /**
     * 路由(routing)模式
     */
    String ROUTING_EXCHANGE_NAME = "routing_exchange";
    String ROUTING_FIRST_QUEUE_NAME = "routing_first_queue";
    String ROUTING_SECOND_QUEUE_NAME = "routing_second_queue";
    String ROUTING_THIRD_QUEUE_NAME = "routing_third_queue";
    String ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME = "routing_first_queue_routing_key";
    String ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME = "routing_second_queue_routing_key";
    String ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME = "routing_third_queue_routing_key";

    /**
     * 主題(topics)模式
     */
    String TOPICS_EXCHANGE_NAME = "topics_exchange";
    String TOPICS_FIRST_QUEUE_NAME = "topics_first_queue";
    String TOPICS_SECOND_QUEUE_NAME = "topics_second_queue";
    String TOPICS_THIRD_QUEUE_NAME = "topics_third_queue";
    String TOPICS_FIRST_QUEUE_ROUTING_KEY = "topics.first.routing.key";
    String TOPICS_SECOND_QUEUE_ROUTING_KEY = "topics.second.routing.key";
    String TOPICS_THIRD_QUEUE_ROUTING_KEY = "topics.third.routing.key";
    String TOPICS_ROUTING_KEY_FIRST_WILDCARD = "#.first.#";
    String TOPICS_ROUTING_KEY_SECOND_WILDCARD = "*.second.#";
    String TOPICS_ROUTING_KEY_THRID_WILDCARD = "*.third.*";

    /**
     * header 模式
     */
    String HEADER_EXCHANGE_NAME = "header_exchange";
    String HEADER_FIRST_QUEUE_NAME = "header_first_queue";
    String HEADER_SECOND_QUEUE_NAME = "header_second_queue";

    /**
     * rpc 模式
     */
    String RPC_QUEUE_NAME = "rpc_queue";

}
  • 新增 Controller 請求類(用於驗證結果,可最後新增)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

@RestController
public class RabbitController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/simple")
    public void simple() {
        rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!");
    }

    @GetMapping(value = "/work")
    public void work() {
        rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!");
    }

    @GetMapping(value = "/pubsub")
    public void pubsub() {
        rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello");
    }

    @GetMapping(value = "/routing")
    public void routing() {
        // 給第一個隊列發送消息
        rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello");
    }

    @GetMapping(value = "/topics")
    public void topics() {
        // 給第一個隊列發送消息,此時隊列能接受到消息,由於隊列通配符爲 #.first.#,而 routing_key 爲 topics.first.routing.key,匹配成功
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
        // 給第二個隊列發送消息,此時隊列也能接受到消息,由於隊列通配符爲 *.second.#,而 routing_key 爲 topics.second.routing.key,匹配成功
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
        // 給第三個隊列發送消息,此時隊列沒法接受到消息,由於隊列通配符爲 *.third.*,而 routing_key 爲 topics.third.routing.key,匹配失敗
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
    }

    @GetMapping(value = "/header")
    public void header() {
        // 這條消息應該能被兩個隊列都接收到,第一個隊列 all 匹配成功,第二個隊列 hello-value any匹配成功
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("matchAll", "YES");
        messageProperties.setHeader("hello", "world");
        Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message);

        // 這條消息應該只被第二個隊列接受,第一個隊列 all 匹配失敗,第二個隊列 matchAll-NO any匹配成功
        MessageProperties messagePropertiesSecond = new MessageProperties();
        messagePropertiesSecond.setHeader("matchAll", "NO");
        Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond);
        rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond);
    }

    @GetMapping(value = "/rpc")
    public void rpc() {
        Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!");
        System.out.println("rabbit rpc response message: " + responseMsg);
    }

}
SpringBoot RabbitMQ 簡單模式
  • 生產者聲明隊列,並向隊列發送消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitSimpleProvider {

    @Bean
    public Queue simpleQueue() {
        return new Queue(RabbitConstant.SIMPLE_QUEUE_NAME);
    }

}
  • 消費者監聽隊列,並消費消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitSimpleConsumer {

    @RabbitHandler
    @RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_NAME)
    public void simpleListener(String context) {
        System.out.println("rabbit receiver: " + context);
    }

}
  • 單元測試
@Test
public void simple() {
    rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!");
}
  • 響應結果

SpringBoot RabbitMQ Work 模式
  • 生產者聲明隊列,並向隊列生產消息
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitWorkProvider {

    @Bean
    public Queue workQueue() {
        return new Queue(RabbitConstant.WORK_QUEUE_NAME);
    }

}
  • 消費者監聽隊列,並消費消息(這裏有兩個消費者監聽同一隊列)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitWorkConsumer {

    @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
    @RabbitHandler
    public void workQueueListenerFirst(String context) {
        System.out.println("rabbit workQueue listener first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
    @RabbitHandler
    public void workQueueListenerSecond(String context) {
        System.out.println("rabbit workQueue listener second receiver: " + context);
    }

}
  • 單元測試
@Test
public void work() {
    rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!");
}
  • 響應結果(因爲有兩個消費者監聽同一隊列,消息只能被其中一者進行消費,默認是負載均衡的將消息發送給全部消費者)

SpringBoot RabbitMQ 發佈/訂閱模式
  • 生產者聲明兩個隊列和一個 fanout 交換機,並將這兩個隊列和交換機進行綁定
  • 交換機種類一共有四種 fanout、direct、topic、header(文末介紹)
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitPublishSubscribeProvider {

    @Bean
    public Queue pubsubQueueFirst() {
        return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue pubsubQueueSecond() {
        return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 建立fanout類型交換機,表示與此交換機會將消息發送給全部綁定的隊列
        return new FanoutExchange(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME);
    }

    @Bean
    public Binding pubsubQueueFirstBindFanoutExchange() {
        // 隊列一綁定交換機
        return BindingBuilder.bind(pubsubQueueFirst()).to(fanoutExchange());
    }

    @Bean
    public Binding pubsubQueueSecondBindFanoutExchange() {
        // 隊列二綁定交換機
        return BindingBuilder.bind(pubsubQueueSecond()).to(fanoutExchange());
    }

}
  • 消費者監聽隊列,並進行消費
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitPublishSubscribeConsumer {

    @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void pubsubQueueFirst(String context) {
        System.out.println("rabbit pubsub queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void pubsubQueueSecond(String context) {
        System.out.println("rabbit pubsub queue second receiver: " + context);
    }

}
  • 單元測試
@Test
public void pubsub() {
    rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "publish/subscribe hello");
}
  • 響應結果

SpringBoot RabbitMQ 路由模式
  • 生產者聲明三個隊列和一個 direct 交換機,將這三個隊列和交換機進行綁定並設定交換機與隊列之間的路由
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRoutingProvider {

    @Bean
    public Queue rabbitRoutingFirstQueue() {
        return new Queue(RabbitConstant.ROUTING_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue rabbitRoutingSecondQueue() {
        return new Queue(RabbitConstant.ROUTING_SECOND_QUEUE_NAME);
    }

    @Bean
    public Queue rabbitRoutingThirdQueue() {
        return new Queue(RabbitConstant.ROUTING_THIRD_QUEUE_NAME);
    }

    @Bean
    public DirectExchange directExchange() {
        // 建立direct類型交換機,表示與此交換機會將消息發送給 routing_key 徹底相同的隊列
        return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE_NAME);
    }

    @Bean
    public Binding routingFirstQueueBindDirectExchange() {
        // 隊列一綁定direct交換機,並設置 routing_key 爲 routing_first_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingFirstQueue()).to(directExchange()).with(RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME);
    }

    @Bean
    public Binding routingSecondQueueBindDirectExchange() {
        // 隊列二綁定direct交換機,並設置 routing_key 爲 routing_second_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingSecondQueue()).to(directExchange()).with(RabbitConstant.ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME);
    }

    @Bean
    public Binding routingThirdQueueBindDirectExchange() {
        // 隊列三綁定direct交換機,並設置 routing_key 爲 routing_third_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingThirdQueue()).to(directExchange()).with(RabbitConstant.ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME);
    }

}
  • 消費者監聽隊列,並進行消費
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitRoutingConsumer {

    @RabbitListener(queues = RabbitConstant.ROUTING_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void routingFirstQueueListener(String context) {
        System.out.println("rabbit routing queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.ROUTING_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void routingSecondQueueListener(String context) {
        System.out.println("rabbit pubsub queue second receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.ROUTING_THIRD_QUEUE_NAME)
    @RabbitHandler
    public void routingThirdQueueListener(String context) {
        System.out.println("rabbit pubsub queue third receiver: " + context);
    }

}
  • 單元測試
@Test
public void routing() {
    // 給第一個隊列發送消息
    rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello");
}
  • 響應結果

SpringBoot RabbitMQ 主題模式
  • 生產者聲明三個隊列和一個 topic 交換機,隊列分別與 topic 交換機綁定並設置 routing key 統配符,若 routing key 知足交換機與隊列間通配符要求則將消息存儲至隊列
  • \# 通配符能夠匹配一個或多個單詞,* 通配符能夠匹配一個單詞;假如交換機(Exchange)與隊列之間的 routing key 通配符爲 #.hello.#,則表明 routing key 中間帶有 hello 單詞的都知足條件,消息將存儲至隊列
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTopicProvider {

    @Bean
    public Queue topicFirstQueue() {
        return new Queue(RabbitConstant.TOPICS_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue topicSecondQueue() {
        return new Queue(RabbitConstant.TOPICS_SECOND_QUEUE_NAME);
    }

    @Bean
    public Queue topicThirdQueue() {
        return new Queue(RabbitConstant.TOPICS_THIRD_QUEUE_NAME);
    }

    @Bean
    public TopicExchange topicExchange() {
        // 建立topic類型交換機,表示與此交換機會將消息發送給 routing_key 通配符匹配成功的隊列
        return new TopicExchange(RabbitConstant.TOPICS_EXCHANGE_NAME);
    }

    @Bean
    public Binding topicFirstQueueBindExchange() {
        // 隊列一綁定topic類型交換機,並設置 routing_key 通配符爲 #.first.#
        return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_FIRST_WILDCARD);
    }

    @Bean
    public Binding topicSecondQueueBindExchange() {
        // 隊列二綁定topic類型交換機,並設置 routing_key 通配符爲 *.second.#
        return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_SECOND_WILDCARD);
    }

    @Bean
    public Binding topicThirdQueueBindExchange() {
        // 隊列三綁定topic類型交換機,並設置 routing_key 通配符爲 *.third.*
        return BindingBuilder.bind(topicThirdQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_THRID_WILDCARD);
    }

}
  • 消費者監聽隊列,並進行消費
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitTopicsConsumer {

    @RabbitListener(queues = RabbitConstant.TOPICS_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void topicFirstQueue(String context) {
        System.out.println("rabbit topics queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.TOPICS_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void topicSecondQueue(String context) {
        System.out.println("rabbit topics queue second receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.TOPICS_THIRD_QUEUE_NAME)
    @RabbitHandler
    public void topicThirdQueue(String context) {
        System.out.println("rabbit topics queue third receiver: " + context);
    }

}
  • 單元測試
@Test
public void topics() {
    // 給第一個隊列發送消息,此時隊列能接受到消息,由於隊列通配符爲 #.first.#,而 routing_key 爲 topics.first.routing.key,匹配成功
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
    // 給第二個隊列發送消息,此時隊列也能接受到消息,由於隊列通配符爲 *.second.#,而 routing_key 爲 topics.second.routing.key,匹配成功
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
    // 給第三個隊列發送消息,此時隊列沒法接受到消息,由於隊列通配符爲 *.third.*,而 routing_key 爲 topics.third.routing.key,匹配失敗
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
    }
  • 響應結果

SpringBoot RabbitMQ header模式
  • 生產者聲明隊列並建立 HeaderExchange 交換機,將隊列分別與交換機經過 header 進行綁定
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitHeaderProvider {

    @Bean
    public Queue headerFirstQueue() {
        return new Queue(RabbitConstant.HEADER_FIRST_QUEUE_NAME);
    }

    @Bean
    public Queue headerSecondQueue() {
        return new Queue(RabbitConstant.HEADER_SECOND_QUEUE_NAME);
    }

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(RabbitConstant.HEADER_EXCHANGE_NAME);
    }

    @Bean
    public Binding headerFirstQueueBindExchange() {
        Map<String, Object> headersMap = new HashMap<>(8);
        headersMap.put("matchAll", "YES");
        headersMap.put("hello", "world");

        return BindingBuilder.bind(headerFirstQueue()).to(headersExchange()).whereAll(headersMap).match();
    }

    @Bean
    public Binding headerSecondQueueBindExchange() {
        Map<String, Object> headersMap = new HashMap<>(8);
        headersMap.put("matchAll", "NO");
        headersMap.put("hello", "world");

        return BindingBuilder.bind(headerSecondQueue()).to(headersExchange()).whereAny(headersMap).match();
    }

}
  • 交換機與隊列綁定詳情

  • 消費者監聽隊列,並進行消費
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitHeaderConsumer {

    @RabbitListener(queues = RabbitConstant.HEADER_FIRST_QUEUE_NAME)
    @RabbitHandler
    public void headerFirstQueue(String context) {
        System.out.println("rabbit header queue first receiver: " + context);
    }

    @RabbitListener(queues = RabbitConstant.HEADER_SECOND_QUEUE_NAME)
    @RabbitHandler
    public void headerSecondQueue(String context) {
        System.out.println("rabbit header queue second receiver: " + context);
    }

}
  • 單元測試
@Test
public void header() {
    // 這條消息應該能被兩個隊列都接收到,第一個隊列 all 匹配成功,第二個隊列 hello-value any匹配成功
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setHeader("matchAll", "YES");
    messageProperties.setHeader("hello", "world");
    Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties);
    rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message);

    // 這條消息應該只被第二個隊列接受,第一個隊列 all 匹配失敗,第二個隊列 matchAll-NO any匹配成功
    MessageProperties messagePropertiesSecond = new MessageProperties();
    messagePropertiesSecond.setHeader("matchAll", "NO");
    Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond);
    rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond);
}
  • 響應結果

SpringBoot RabbitMQ RPC模式
  • 生產者聲明隊列
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRpcProvider {

    @Bean
    public Queue rpcQueue() {
        return new Queue(RabbitConstant.RPC_QUEUE_NAME);
    }

}
  • 消費者監聽隊列,並進行消費
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitRpcConsumer {

    @RabbitListener(queues = RabbitConstant.RPC_QUEUE_NAME)
    @RabbitHandler
    public String rpcQueue(String context) {
        System.out.println("rabbit rpc queue receiver: " + context);
        return "copy that!";
    }

}
  • 單元測試
@Test
public void rpc() {
    Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!");
    System.out.println("rabbit rpc response message: " + responseMsg);
}
  • 響應結果

Exchange 交換機

  • fanout 交換機:經過fanout交換機發送消息,則與fanout交換機綁定的全部隊列都能接收到該消息

  • direct 交換機:經過direct交換機發送消息,則與direct交換機綁定隊列中routing key徹底一致的隊列能接收到消息

  • topic 交換機:經過topic交換機發送消息,則與topic交換機綁定隊列中routing key通過通配符匹配成功的隊列能接收到消息

文章還有不少不足之處,歡迎各位兄弟姐妹批評指正,代碼倉庫已存放至gitee [SpringBoot RabbitMQ 工做模式倉庫連接](https://gitee.com/BarryMan/spring-boot-rabbitmq.git)java

相關文章
相關標籤/搜索