在2018-3-1
日SpringBoot
官方發版了2.0.0.RELEASE
最新版本,新版本徹底基於Spring5.0
來構建,JDK
最低支持也從原來的1.6
也改爲了1.8
,再也不兼容1.8
如下的版本,更多新特性請查看官方文檔。html
基於SpringBoot
整合RabbitMQ
完成消息延遲消費。git
因爲
SpringBoot
的內置掃描機制,咱們若是不自動配置掃描路徑,請保持下面rabbitmq-common
模塊內的配置能夠被SpringBoot
掃描到,不然不會自動建立隊列,控制檯會輸出404的錯誤信息。web
專題 | 專題名稱 | 專題描述 |
---|---|---|
001 | Spring Boot 核心技術 | 講解SpringBoot一些企業級層面的核心組件 |
002 | Spring Boot 核心技術章節源碼 | Spring Boot 核心技術簡書每一篇文章碼雲對應源碼 |
003 | Spring Cloud 核心技術 | 對Spring Cloud核心技術全面講解 |
004 | Spring Cloud 核心技術章節源碼 | Spring Cloud 核心技術簡書每一篇文章對應源碼 |
005 | QueryDSL 核心技術 | 全面講解QueryDSL核心技術以及基於SpringBoot整合SpringDataJPA |
006 | SpringDataJPA 核心技術 | 全面講解SpringDataJPA核心技術 |
007 | SpringBoot核心技術學習目錄 | SpringBoot系統的學習目錄,敬請關注點贊!!! |
咱們本章採用2.0.0.RELEASE
版本的SpringBoot
,添加相關的依賴以下所示:spring
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies>
<!--rabbbitMQ相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok依賴-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--spring boot tester-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--fast json依賴-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>
</dependencies>
......
複製代碼
咱們仍然採用多模塊的方式來測試隊列的Provider
以及Consumer
。json
咱們先來建立一個名爲rabbitmq-common
公共依賴模塊(Create New Maven Module) 在公共模塊內添加一個QueueEnum
隊列枚舉配置,該枚舉內配置隊列的Exchange
、QueueName
、RouteKey
等相關內容,以下所示:bash
package com.hengyu.rabbitmq.lazy.enums;
import lombok.Getter;
/**
* 消息隊列枚舉配置
*
* @author:於起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:33
* 簡書:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Getter
public enum QueueEnum {
/**
* 消息通知隊列
*/
MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
/**
* 消息通知ttl隊列
*/
MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
/**
* 交換名稱
*/
private String exchange;
/**
* 隊列名稱
*/
private String name;
/**
* 路由鍵
*/
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}
複製代碼
能夠看到MESSAGE_QUEUE
隊列配置跟咱們以前章節的配置同樣,而咱們另外新建立了一個後綴爲ttl
的消息隊列配置。咱們採用的這種方式是RabbitMQ
消息隊列其中一種的延遲消費模塊,經過配置隊列消息過時後轉發的形式。微信
這種模式比較簡單,咱們須要將消息先發送到
ttl
延遲隊列內,當消息到達過時時間後會自動轉發到ttl
隊列內配置的轉發Exchange
以及RouteKey
綁定的隊列內完成消息消費。app
下面咱們來模擬消息通知
的延遲消費場景,先來建立一個名爲MessageRabbitMqConfiguration
的隊列配置類,該配置類內添加消息通知隊列
配置以及消息經過延遲隊列
配置,以下所示:框架
/**
* 消息通知 - 消息隊列配置信息
*
* @author:恆宇少年 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:32
* 簡書:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Configuration
public class MessageRabbitMqConfiguration {
/**
* 消息中心實際消費隊列交換配置
*
* @return
*/
@Bean
DirectExchange messageDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
.durable(true)
.build();
}
/**
* 消息中心延遲消費交換配置
*
* @return
*/
@Bean
DirectExchange messageTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
.durable(true)
.build();
}
/**
* 消息中心實際消費隊列配置
*
* @return
*/
@Bean
public Queue messageQueue() {
return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
}
/**
* 消息中心TTL隊列
*
* @return
*/
@Bean
Queue messageTtlQueue() {
return QueueBuilder
.durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
// 配置到期後轉發的交換
.withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
// 配置到期後轉發的路由鍵
.withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
.build();
}
/**
* 消息中心實際消息交換與隊列綁定
*
* @param messageDirect 消息中心交換配置
* @param messageQueue 消息中心隊列
* @return
*/
@Bean
Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
return BindingBuilder
.bind(messageQueue)
.to(messageDirect)
.with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
}
/**
* 消息中心TTL綁定實際消息中心實際消費交換機
*
* @param messageTtlQueue
* @param messageTtlDirect
* @return
*/
@Bean
public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
return BindingBuilder
.bind(messageTtlQueue)
.to(messageTtlDirect)
.with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
}
}
複製代碼
咱們聲明瞭消息通知隊列
的相關Exchange
、Queue
、Binding
等配置,將message.center.create
隊列經過路由鍵message.center.create
綁定到了message.center.direct
交換上。ide
除此以外,咱們還添加了消息通知延遲隊列
的Exchange
、Queue
、Binding
等配置,將message.center.create.ttl
隊列經過message.center.create.ttl
路由鍵綁定到了message.center.topic.ttl
交換上。
咱們仔細來看看messageTtlQueue
延遲隊列的配置,跟messageQueue
隊列配置不一樣的地方這裏多出了x-dead-letter-exchange
、x-dead-letter-routing-key
兩個參數,而這兩個參數就是配置延遲隊列過時後轉發的Exchange
、RouteKey
,只要在建立隊列時對應添加了這兩個參數,在RabbitMQ
管理平臺看到的隊列配置就不只是單純的Direct
類型的隊列類型,以下圖所示:
在上圖內咱們能夠看到message.center.create.ttl
隊列多出了DLX
、DLK
的配置,這就是RabbitMQ
內死信交換
的標誌。 知足死信交換
的條件,在官方文檔中表示:
Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:
The message is rejected (basic.reject or basic.nack) with requeue=false, The TTL for the message expires; or The queue length limit is exceeded.
咱們須要知足上面的其中一種方式就能夠了,咱們採用知足第二個條件,採用過時的方式。
咱們再來建立一個名爲rabbitmq-lazy-provider
的模塊(Create New Maven Module),而且在pom.xml
配置文件內添加rabbitmq-common
模塊的依賴,以下所示:
<!--添加公共模塊依賴-->
<dependency>
<groupId>com.hengyu</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
複製代碼
在resource
下建立一個名爲application.yml
的配置文件,在該配置文件內添加以下配置信息:
spring:
#rabbitmq消息隊列配置信息
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /hengboy
publisher-confirms: true
複製代碼
接下來咱們來建立名爲MessageProvider
消息提供者類,用來發送消息內容到消息通知延遲隊列,代碼以下所示:
/**
* 消息通知 - 提供者
*
* @author:於起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:40
* 簡書:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Component
public class MessageProvider {
/**
* logger instance
*/
static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
/**
* RabbitMQ 模版消息實現類
*/
@Autowired
private AmqpTemplate rabbitMqTemplate;
/**
* 發送延遲消息
*
* @param messageContent 消息內容
* @param exchange 隊列交換
* @param routerKey 隊列交換綁定的路由鍵
* @param delayTimes 延遲時長,單位:毫秒
*/
public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
if (!StringUtils.isEmpty(exchange)) {
logger.info("延遲:{}毫秒寫入消息隊列:{},消息內容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
// 執行發送消息到指定隊列
rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
// 設置延遲毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
} else {
logger.error("未找到隊列消息:{},所屬的交換機", exchange);
}
}
}
複製代碼
因爲咱們在 pom.xml
配置文件內添加了RabbitMQ
相關的依賴而且在上面application.yml
文件內添加了對應的配置,SpringBoot
爲咱們自動實例化了AmqpTemplate
,該實例能夠發送任何類型的消息到指定隊列。 咱們採用convertAndSend
方法,將消息內容發送到指定Exchange
、RouterKey
隊列,而且經過setExpiration
方法設置過時時間,單位:毫秒。
咱們在test
目錄下建立一個測試類,以下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {
/**
* 消息隊列提供者
*/
@Autowired
private MessageProvider messageProvider;
/**
* 測試延遲消息消費
*/
@Test
public void testLazy() {
// 測試延遲10秒
messageProvider.sendMessage("測試延遲消費,寫入時間:" + new Date(),
QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),
QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),
10000);
}
}
複製代碼
注意:
@SpringBootTest
註解內添加了classes
入口類的配置,由於咱們是模塊建立的項目並非默認建立的SpringBoot
項目,這裏須要配置入口程序類才能夠運行測試。
在測試類咱們注入了MessageProvider
消息提供者,調用sendMessage
方法發送消息到消息通知延遲隊列
,而且設置延遲的時間爲10秒
,這裏衡量發送到指定隊列的標準是要看MessageRabbitMqConfiguration
配置類內的相關Binding
配置,經過Exchange
、RouterKey
值進行發送到指定的隊列。
到目前爲止咱們的rabbitmq-lazy-provider
消息提供模塊已經編寫完成了,下面咱們來看看消息消費者模塊。
咱們再來建立一個名爲rabbitmq-lazy-consumer
的模塊(Create New Maven Module),一樣須要在pom.xml
配置文件內添加rabbitmq-common
模塊的依賴,以下所示:
<!--添加公共模塊依賴-->
<dependency>
<groupId>com.hengyu</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
複製代碼
固然一樣須要在resource
下建立application.yml
並添加消息隊列的相關配置,代碼就不貼出來了,能夠直接從rabbitmq-lazy-provider
模塊中複製application.yml
文件到當前模塊內。
接下來建立一個名爲MessageConsumer
的消費者類,該類須要監聽消息通知隊列
,代碼以下所示:
/**
* 消息通知 - 消費者
*
* @author:於起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午5:00
* 簡書:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {
/**
* logger instance
*/
static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@RabbitHandler
public void handler(String content) {
logger.info("消費內容:{}", content);
}
}
複製代碼
在@RabbitListener
註解內配置了監聽的隊列,這裏配置內容是QueueEnum
枚舉內的queueName
屬性值,固然若是你採用常量的方式在註解屬性上是直接可使用的,枚舉不支持這種配置,這裏只能把QueueName
字符串配置到queues
屬性上了。 因爲咱們在消息發送時採用字符串的形式發送消息內容,這裏在@RabbitHandler
處理方法的參數內要保持數據類型一致!
咱們爲消費者模塊添加一個入口程序類,用於啓動消費者,代碼以下所示:
/**
* 【第四十六章:SpringBoot & RabbitMQ完成消息延遲消費】
* 隊列消費者模塊 - 入口程序類
*
* @author:於起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:55
* 簡書:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);
}
}
複製代碼
咱們的代碼已經編寫完畢,下面來測試下是否完成了咱們預想的效果,步驟以下所示:
1. 啓動消費者模塊
2. 執行RabbitMqLazyProviderApplicationTests.testLazy()方法進行發送消息到通知延遲隊列
3. 查看消費者模塊控制檯輸出內容
複製代碼
咱們能夠在消費者模塊控制檯看到輸出內容:
2018-03-04 10:10:34.765 INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer : 消費內容:測試延遲消費,寫入時間:Sun Mar 04 10:10:24 CST 2018
複製代碼
咱們在提供者測試方法發送消息的時間爲10:10:24
,而真正消費的時間則爲10:10:34
,與咱們預計的同樣,消息延遲了10秒
後去執行消費。
終上所述咱們完成了消息隊列的延遲消費
,採用死信
方式,經過消息過時方式觸發,在實際項目研發過程當中,延遲消費仍是頗有必要的,能夠省去一些定時任務的配置。
本章源碼已經上傳到碼雲: SpringBoot配套源碼地址:gitee.com/hengboy/spr… SpringCloud配套源碼地址:gitee.com/hengboy/spr… SpringBoot相關係列文章請訪問:目錄:SpringBoot學習目錄 QueryDSL相關係列文章請訪問:QueryDSL通用查詢框架學習目錄 SpringDataJPA相關係列文章請訪問:目錄:SpringDataJPA學習目錄,感謝閱讀!