第四十六章:SpringBoot & RabbitMQ完成消息延遲消費

2018-3-1SpringBoot官方發版了2.0.0.RELEASE最新版本,新版本徹底基於Spring5.0來構建,JDK最低支持也從原來的1.6也改爲了1.8,再也不兼容1.8如下的版本,更多新特性請查看官方文檔html

本章目標

基於SpringBoot整合RabbitMQ完成消息延遲消費。git

構建項目

注意前言

因爲SpringBoot的內置掃描機制,咱們若是不自動配置掃描路徑,請保持下面rabbitmq-common模塊內的配置能夠被SpringBoot掃描到,不然不會自動建立隊列,控制檯會輸出404的錯誤信息。web

SpringBoot 企業級核心技術學習專題


專題 專題名稱 專題描述
001 Spring Boot 核心技術 講解SpringBoot一些企業級層面的核心組件
002 Spring Boot 核心技術章節源碼 Spring Boot 核心技術簡書每一篇文章碼雲對應源碼
003 Spring Cloud 核心技術 對Spring Cloud核心技術全面講解
004 Spring Cloud 核心技術章節源碼 Spring Cloud 核心技術簡書每一篇文章對應源碼
005 QueryDSL 核心技術 全面講解QueryDSL核心技術以及基於SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技術 全面講解SpringDataJPA核心技術
007 SpringBoot核心技術學習目錄 SpringBoot系統的學習目錄,敬請關注點贊!!!

咱們本章採用2.0.0.RELEASE版本的SpringBoot,添加相關的依賴以下所示:spring

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies>
        <!--rabbbitMQ相關依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web相關依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依賴-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--spring boot tester-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--fast json依賴-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
    </dependencies>
......
複製代碼

咱們仍然採用多模塊的方式來測試隊列的Provider以及Consumerjson

隊列公共模塊

咱們先來建立一個名爲rabbitmq-common公共依賴模塊(Create New Maven Module) 在公共模塊內添加一個QueueEnum隊列枚舉配置,該枚舉內配置隊列的ExchangeQueueNameRouteKey等相關內容,以下所示:bash

package com.hengyu.rabbitmq.lazy.enums;

import lombok.Getter;

/**
 * 消息隊列枚舉配置
 *
 * @author:於起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:33
 * 簡書:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Getter
public enum QueueEnum {
    /**
     * 消息通知隊列
     */
    MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
    /**
     * 消息通知ttl隊列
     */
    MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
    /**
     * 交換名稱
     */
    private String exchange;
    /**
     * 隊列名稱
     */
    private String name;
    /**
     * 路由鍵
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}
複製代碼

能夠看到MESSAGE_QUEUE隊列配置跟咱們以前章節的配置同樣,而咱們另外新建立了一個後綴爲ttl的消息隊列配置。咱們採用的這種方式是RabbitMQ消息隊列其中一種的延遲消費模塊,經過配置隊列消息過時後轉發的形式。微信

這種模式比較簡單,咱們須要將消息先發送到ttl延遲隊列內,當消息到達過時時間後會自動轉發到ttl隊列內配置的轉發Exchange以及RouteKey綁定的隊列內完成消息消費。app

下面咱們來模擬消息通知的延遲消費場景,先來建立一個名爲MessageRabbitMqConfiguration的隊列配置類,該配置類內添加消息通知隊列配置以及消息經過延遲隊列配置,以下所示:框架

/**
 * 消息通知 - 消息隊列配置信息
 *
 * @author:恆宇少年 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:32
 * 簡書:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Configuration
public class MessageRabbitMqConfiguration {
    /**
     * 消息中心實際消費隊列交換配置
     *
     * @return
     */
    @Bean
    DirectExchange messageDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心延遲消費交換配置
     *
     * @return
     */
    @Bean
    DirectExchange messageTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心實際消費隊列配置
     *
     * @return
     */
    @Bean
    public Queue messageQueue() {
        return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
    }


    /**
     * 消息中心TTL隊列
     *
     * @return
     */
    @Bean
    Queue messageTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
                // 配置到期後轉發的交換
                .withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
                // 配置到期後轉發的路由鍵
                .withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
                .build();
    }

    /**
     * 消息中心實際消息交換與隊列綁定
     *
     * @param messageDirect 消息中心交換配置
     * @param messageQueue  消息中心隊列
     * @return
     */
    @Bean
    Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
        return BindingBuilder
                .bind(messageQueue)
                .to(messageDirect)
                .with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
    }

    /**
     * 消息中心TTL綁定實際消息中心實際消費交換機
     *
     * @param messageTtlQueue
     * @param messageTtlDirect
     * @return
     */
    @Bean
    public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
        return BindingBuilder
                .bind(messageTtlQueue)
                .to(messageTtlDirect)
                .with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
    }
}
複製代碼

咱們聲明瞭消息通知隊列的相關ExchangeQueueBinding等配置,將message.center.create隊列經過路由鍵message.center.create綁定到了message.center.direct交換上。ide

除此以外,咱們還添加了消息通知延遲隊列ExchangeQueueBinding等配置,將message.center.create.ttl隊列經過message.center.create.ttl路由鍵綁定到了message.center.topic.ttl交換上。

咱們仔細來看看messageTtlQueue延遲隊列的配置,跟messageQueue隊列配置不一樣的地方這裏多出了x-dead-letter-exchangex-dead-letter-routing-key兩個參數,而這兩個參數就是配置延遲隊列過時後轉發的ExchangeRouteKey,只要在建立隊列時對應添加了這兩個參數,在RabbitMQ管理平臺看到的隊列配置就不只是單純的Direct類型的隊列類型,以下圖所示:

隊列類型差別

在上圖內咱們能夠看到message.center.create.ttl隊列多出了DLXDLK的配置,這就是RabbitMQ死信交換的標誌。 知足死信交換的條件,在官方文檔中表示:

Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with requeue=false, The TTL for the message expires; or The queue length limit is exceeded.

  • 該消息被拒絕(basic.reject或 basic.nack),requeue = false
  • 消息的TTL過時
  • 隊列長度限制已超出 官方文檔地址

咱們須要知足上面的其中一種方式就能夠了,咱們採用知足第二個條件,採用過時的方式。

隊列消息提供者

咱們再來建立一個名爲rabbitmq-lazy-provider的模塊(Create New Maven Module),而且在pom.xml配置文件內添加rabbitmq-common模塊的依賴,以下所示:

<!--添加公共模塊依賴-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>
複製代碼

配置隊列

resource下建立一個名爲application.yml的配置文件,在該配置文件內添加以下配置信息:

spring:
  #rabbitmq消息隊列配置信息
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /hengboy
    publisher-confirms: true
複製代碼

消息提供者類

接下來咱們來建立名爲MessageProvider消息提供者類,用來發送消息內容到消息通知延遲隊列,代碼以下所示:

/**
 * 消息通知 - 提供者
 *
 * @author:於起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:40
 * 簡書:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Component
public class MessageProvider {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
    /**
     * RabbitMQ 模版消息實現類
     */
    @Autowired
    private AmqpTemplate rabbitMqTemplate;

    /**
     * 發送延遲消息
     *
     * @param messageContent 消息內容
     * @param exchange       隊列交換
     * @param routerKey      隊列交換綁定的路由鍵
     * @param delayTimes     延遲時長,單位:毫秒
     */
    public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
        if (!StringUtils.isEmpty(exchange)) {
            logger.info("延遲:{}毫秒寫入消息隊列:{},消息內容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
            // 執行發送消息到指定隊列
            rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
                // 設置延遲毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            });
        } else {
            logger.error("未找到隊列消息:{},所屬的交換機", exchange);
        }
    }
}
複製代碼

因爲咱們在 pom.xml配置文件內添加了RabbitMQ相關的依賴而且在上面application.yml文件內添加了對應的配置,SpringBoot爲咱們自動實例化了AmqpTemplate,該實例能夠發送任何類型的消息到指定隊列。 咱們採用convertAndSend方法,將消息內容發送到指定ExchangeRouterKey隊列,而且經過setExpiration方法設置過時時間,單位:毫秒。

編寫發送測試

咱們在test目錄下建立一個測試類,以下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {
    /**
     * 消息隊列提供者
     */
    @Autowired
    private MessageProvider messageProvider;

    /**
     * 測試延遲消息消費
     */
    @Test
    public void testLazy() {
        // 測試延遲10秒
        messageProvider.sendMessage("測試延遲消費,寫入時間:" + new Date(),
                QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),
                QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),
                10000);
    }
}
複製代碼

注意:@SpringBootTest註解內添加了classes入口類的配置,由於咱們是模塊建立的項目並非默認建立的SpringBoot項目,這裏須要配置入口程序類才能夠運行測試。

在測試類咱們注入了MessageProvider消息提供者,調用sendMessage方法發送消息到消息通知延遲隊列,而且設置延遲的時間爲10秒,這裏衡量發送到指定隊列的標準是要看MessageRabbitMqConfiguration配置類內的相關Binding配置,經過ExchangeRouterKey值進行發送到指定的隊列。

到目前爲止咱們的rabbitmq-lazy-provider消息提供模塊已經編寫完成了,下面咱們來看看消息消費者模塊。

隊列消息消費者

咱們再來建立一個名爲rabbitmq-lazy-consumer的模塊(Create New Maven Module),一樣須要在pom.xml配置文件內添加rabbitmq-common模塊的依賴,以下所示:

<!--添加公共模塊依賴-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>
複製代碼

固然一樣須要在resource下建立application.yml並添加消息隊列的相關配置,代碼就不貼出來了,能夠直接從rabbitmq-lazy-provider模塊中複製application.yml文件到當前模塊內。

消息消費者類

接下來建立一個名爲MessageConsumer的消費者類,該類須要監聽消息通知隊列,代碼以下所示:

/**
 * 消息通知 - 消費者
 *
 * @author:於起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午5:00
 * 簡書:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @RabbitHandler
    public void handler(String content) {
        logger.info("消費內容:{}", content);
    }
}
複製代碼

@RabbitListener註解內配置了監聽的隊列,這裏配置內容是QueueEnum枚舉內的queueName屬性值,固然若是你採用常量的方式在註解屬性上是直接可使用的,枚舉不支持這種配置,這裏只能把QueueName字符串配置到queues屬性上了。 因爲咱們在消息發送時採用字符串的形式發送消息內容,這裏在@RabbitHandler處理方法的參數內要保持數據類型一致!

消費者入口類

咱們爲消費者模塊添加一個入口程序類,用於啓動消費者,代碼以下所示:

/**
 * 【第四十六章:SpringBoot & RabbitMQ完成消息延遲消費】
 * 隊列消費者模塊 - 入口程序類
 *
 * @author:於起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:55
 * 簡書:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);
    }
}
複製代碼

測試

咱們的代碼已經編寫完畢,下面來測試下是否完成了咱們預想的效果,步驟以下所示:

1. 啓動消費者模塊
2. 執行RabbitMqLazyProviderApplicationTests.testLazy()方法進行發送消息到通知延遲隊列
3. 查看消費者模塊控制檯輸出內容
複製代碼

咱們能夠在消費者模塊控制檯看到輸出內容:

2018-03-04 10:10:34.765  INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer      : 消費內容:測試延遲消費,寫入時間:Sun Mar 04 10:10:24 CST 2018
複製代碼

咱們在提供者測試方法發送消息的時間爲10:10:24,而真正消費的時間則爲10:10:34,與咱們預計的同樣,消息延遲了10秒後去執行消費。

總結

終上所述咱們完成了消息隊列的延遲消費,採用死信方式,經過消息過時方式觸發,在實際項目研發過程當中,延遲消費仍是頗有必要的,能夠省去一些定時任務的配置。

本章源碼已經上傳到碼雲: SpringBoot配套源碼地址:gitee.com/hengboy/spr… SpringCloud配套源碼地址:gitee.com/hengboy/spr… SpringBoot相關係列文章請訪問:目錄:SpringBoot學習目錄 QueryDSL相關係列文章請訪問:QueryDSL通用查詢框架學習目錄 SpringDataJPA相關係列文章請訪問:目錄:SpringDataJPA學習目錄,感謝閱讀!

微信掃碼關注 - 專一分享
相關文章
相關標籤/搜索