Spring Boot (十三): Spring Boot 整合 RabbitMQ

1. 前言

RabbitMQ 是一個消息隊列,說到消息隊列,你們可能多多少少有聽過,它主要的功能是用來實現應用服務的異步與解耦,同時也能起到削峯填谷、消息分發的做用。html

消息隊列在比較主要的一個做用是用來作應用服務的解耦,消息從消息的生產者傳遞到消息隊列,消費者從消息隊列中獲取消息並進行消費,生產者不須要管是誰在消費消息,消費者也無需關注消息是由誰來生產的。在分佈式的系統中,消息隊列也會被用在其餘地方,好比分佈式事務的支持,表明如阿里開源的 RocketMQ 。java

固然,咱們本篇文章的主角仍是 RabbitMQ 。git

2. RabbitMQ 介紹

RabbitMQ 是實現 AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 RabbitMQ 主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。github

AMQP,即 Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP 的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。spring

RabbitMQ 是一個開源的 AMQP 實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。安全

3. 概念介紹

在普通的消息隊列的設計中,通常會有這麼幾個概念:生產者、消費者和咱們的隊列。可是在 RabbitMQ 中,中間增長了一層,叫交換機(Exchange),這樣,消息的投遞就不禁生產者來決定投遞至哪一個隊列,而消息是直接投遞至交換機的,由交換機根據調度策略來決定這個消息須要投遞到哪一個隊列。如圖:springboot

  • 左側的 P 表明消息的生產者
  • 紫色的 X 表明交換機
  • 右側紅色的表明隊列

4. 交換機(Exchange)

那麼爲何咱們須要 Exchange 而不是直接將消息發送至隊列呢?服務器

AMQP 協議中的核心思想就是生產者和消費者的解耦,生產者從不直接將消息發送給隊列。生產者一般不知道是否一個消息會被髮送到隊列中,只是將消息發送到一個交換機。先由 Exchange 來接收,而後 Exchange 按照特定的策略轉發到 Queue 進行存儲。app

Exchange 收到消息時,他是如何知道須要發送至哪些 Queue 呢?這裏就須要瞭解 Binding 和 RoutingKey 的概念:異步

Binding 表示 Exchange 與 Queue 之間的關係,咱們也能夠簡單的認爲隊列對該交換機上的消息感興趣,綁定能夠附帶一個額外的參數 RoutingKey。Exchange 就是根據這個 RoutingKey 和當前 Exchange 全部綁定的 Binding 作匹配,若是知足匹配,就往 Exchange 所綁定的 Queue 發送消息,這樣就解決了咱們向 RabbitMQ 發送一次消息,能夠分發到不一樣的 Queue。RoutingKey 的意義依賴於交換機的類型。

下面就來了解一下 Exchange 的三種主要類型:Fanout、Direct 和 Topic。

4.1 Direct Exchange

Direct Exchange 是 RabbitMQ 默認的 Exchange,徹底根據 RoutingKey 來路由消息。設置 Exchange 和 Queue 的 Binding 時需指定 RoutingKey(通常爲 Queue Name),發消息時也指定同樣的 RoutingKey,消息就會被路由到對應的Queue。

4.2 Topic Exchange

Topic Exchange 和 Direct Exchange 相似,也須要經過 RoutingKey 來路由消息,區別在於Direct Exchange 對 RoutingKey 是精確匹配,而 Topic Exchange 支持模糊匹配。分別支持 *# 通配符,* 表示匹配一個單詞, # 則表示匹配沒有或者多個單詞。

4.3 Headers Exchange

Headers Exchange 會忽略 RoutingKey 而根據消息中的 Headers 和建立綁定關係時指定的 Arguments 來匹配決定路由到哪些 Queue。

Headers Exchange 的性能比較差,並且 Direct Exchange 徹底能夠代替它,因此不建議使用。

4.4 Default Exchange

Default Exchange 是一種特殊的 Direct Exchange。當你手動建立一個隊列時,後臺會自動將這個隊列綁定到一個名稱爲空的 Direct Exchange 上,綁定 RoutingKey 與隊列名稱相同。有了這個默認的交換機和綁定,使咱們只關心隊列這一層便可,這個比較適合作一些簡單的應用。

5. Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ 很是簡單,若是隻是簡單的使用配置很是少,Spring Boot 提供了 spring-boot-starter-amqp 項目對消息各類支持。

5.1 簡單使用

引入依賴

代碼清單:spring-boot-rabbitmq/pom.xml***

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>複製代碼

配置文件 application.yml 以下:

代碼清單:spring-boot-rabbitmq/src/main/resources/application.yml***

server:
  port: 8080
spring:
  application:
    name: spring-boot-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin複製代碼

隊列配置

代碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/QueueConfig.java***

@Configuration
public class QueueConfig {
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple");
    }

    @Bean
    public Queue simpleOneToMany() {
        return new Queue("simpleOneToMany");
    }
}複製代碼

消息提供者

代碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleSend.java***

@Component
public class SimpleSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
  
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String message = "Hello Spring Boot " + simpleDateFormat.format(new Date());
        amqpTemplate.convertAndSend("simple", message);
        logger.info("消息推送成功!");
    }
}複製代碼

消息消費者

代碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleReceive.java***

@Component
@RabbitListener(queues = "simple")
public class SimpleReceive {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(String message) {
        logger.info("Receive :{}", message);
    }

}複製代碼

測試

代碼清單:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java***

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

    @Autowired
    SimpleSend simpleSend;

    @Test
    public void simpleSend() {
        simpleSend.send();
    }

}複製代碼

5.2 一對多使用

若是有一個消息的生產者,有 N 個消息的消費者,會發生什麼呢?

對上面的代碼稍做改動,增長一個消息的消費者。

測試代碼以下:

@Test
public void simpleOneSend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
    }
}複製代碼

測試能夠看到結果是兩個消費者平均的消費了生產者生產的消息。

5.3 多對多使用

咱們再增長一個消息的生產者,測試代碼以下:

@Test
public void simpleManySend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
        simpleManySend1.send(i);
    }
}複製代碼

測試能夠看到結果是兩個消費者平均的消費了兩個生產者生產的消息。

5.4 Topic Exchange

首先仍是先配置 Topic ,配置代碼以下:

代碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/TopicConfig.java***

@Configuration
public class TopicConfig {

    private final String message = "topic.message";
    private final String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(this.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(this.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}複製代碼

這裏隊列 queueMessages 能夠同時匹配兩個 route_key ,而隊列 queueMessage 只能匹配 topic.message 。

消息的生產者代碼以下:

代碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/topic/TopicSend.java***

@Component
public class TopicSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String message = "message 1";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.message", message);
    }

    public void send2() {
        String message = "message 2";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.messages", message);
    }
}複製代碼

調用 send1() 消息會由 Exchange 同時轉發到兩個隊列, 而調用 send2() 則只會轉發至 receive2 。

5.5 Fanout Exchange

Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給 Fanout 交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。

Fanout 配置以下:

代碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/FanoutConfig.java***

@Configuration
public class FanoutConfig {
    @Bean
    public Queue MessageA() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue MessageB() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue MessageC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageB).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue MessageC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageC).to(fanoutExchange);
    }
}複製代碼

消息生產者代碼以下:

代碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/fanout/FanoutSend.java***

@Component
public class FanoutSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String message = "Hello FanoutSend.";
        logger.info("send:{}", message);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", message);
    }
}複製代碼

測試代碼以下:

代碼清單:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java***

@Test
public void fanoutSend() {
    fanoutSend.send();
}複製代碼

測試結果爲綁定到 fanout 交換機上面的隊列都收到了消息。

6. 示例代碼

示例代碼-Github

示例代碼-Gitee

7. 參考

http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html

https://blog.csdn.net/y4x5M0nivSrJaY3X92c/article/details/80416996

若是個人文章對您有幫助,請掃碼關注下做者的公衆號:獲取最新干貨推送:)
相關文章
相關標籤/搜索