咱們在以前的兩個章節第四十一章: 基於SpringBoot & RabbitMQ完成DirectExchange分佈式消息消費、第四十二章: 基於SpringBoot & RabbitMQ完成DirectExchange分佈式消息多消費者消費提升了RabbitMQ
消息隊列的DirectExchange
交換類型的消息消費,咱們以前的章節提到了RabbitMQ
比較經常使用的交換類型有三種,咱們今天來看看TopicExchange
主題交換類型。java
基於SpringBoot
平臺完成RabbitMQ
的TopicExchange
消息類型交換。git
專題 | 專題名稱 | 專題描述 |
---|---|---|
001 | Spring Boot 核心技術 | 講解SpringBoot一些企業級層面的核心組件 |
002 | Spring Boot 核心技術章節源碼 | Spring Boot 核心技術簡書每一篇文章碼雲對應源碼 |
003 | Spring Cloud 核心技術 | 對Spring Cloud核心技術全面講解 |
004 | Spring Cloud 核心技術章節源碼 | Spring Cloud 核心技術簡書每一篇文章對應源碼 |
005 | QueryDSL 核心技術 | 全面講解QueryDSL核心技術以及基於SpringBoot整合SpringDataJPA |
006 | SpringDataJPA 核心技術 | 全面講解SpringDataJPA核心技術 |
以前少年也遇到了一個問題,分類了多模塊後消息隊列沒法自動建立,說來也可笑,以前沒有時間去看這個問題,今天在編寫本章文章時發現緣由居然是SpringBoot
沒有掃描到common
模塊內的配置類。讓我一陣的頭大~~~,咱們在XxxApplication
啓動類上添加@ComponentScan(value = "com.hengyu.rabbitmq")
就能夠自動建立隊列了!!!web
本章構建項目時一樣採用多模塊的方式進行設計,能夠很好的看到消息處理的效果,由於是多模塊項目,咱們先來建立一個SpringBoot
項目,pom.xml
配置文件依賴配置以下所示:spring
<dependencies>
<!--rabbbitMQ相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok依賴-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--spring boot tester-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--fast json依賴-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>
</dependencies>
複製代碼
下面咱們先來構建公共RabbitMQ
模塊,由於咱們的消費者以及生產者都是須要RabbitMQ
相關的配置信息,這裏咱們能夠提取出來,使用時進行模塊之間的引用。數據庫
建立子模塊rabbitmq-topic-common
,在resources
下添加application.yml
配置文件並配置RabbitMQ
相關的依賴配置,以下所示:json
spring:
#rabbitmq消息隊列配置信息
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirms: true
複製代碼
咱們跟以前的章節一張,獨立編寫一個枚舉類型來配置消息隊列的交換信息,以下所示:bash
/**
* rabbitmq交換配置枚舉
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:13:56
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Getter
public enum ExchangeEnum
{
/**
* 用戶註冊交換配置枚舉
*/
USER_REGISTER_TOPIC_EXCHANGE("register.topic.exchange")
;
private String name;
ExchangeEnum(String name) {
this.name = name;
}
}
複製代碼
一樣消息隊列的基本信息配置也一樣採用枚舉的形式配置,以下所示:app
/**
* 隊列配置枚舉
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:14:05
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Getter
public enum QueueEnum
{
/**
* 用戶註冊
* 建立帳戶消息隊列
*/
USER_REGISTER_CREATE_ACCOUNT("register.account","register.#"),
/**
* 用戶註冊
* 發送註冊成功郵件消息隊列
*/
USER_REGISTER_SEND_MAIL("register.mail","register.#")
;
/**
* 隊列名稱
*/
private String name;
/**
* 隊列路由鍵
*/
private String routingKey;
QueueEnum(String name, String routingKey) {
this.name = name;
this.routingKey = routingKey;
}
}
複製代碼
消息隊列枚舉內添加了兩個屬性,分別對應了隊列名稱
、隊列路由
,咱們本章所講解的TopicExchange
類型消息隊列能夠根據路徑信息配置多個消息消費者,而轉發的匹配規則信息則是咱們定義的隊列的路由信息。框架
咱們在發送消息到隊列時,須要咱們傳遞一個路由相關的配置信息,RabbitMQ
會根據發送時的消息路由規則信息與定義消息隊列時的路由信息進行匹配,若是能夠匹配則調用該隊列的消費者完成消息的消費,發送消息路由信息配置以下所示:dom
/**
* 消息隊列topic交換路由key配置枚舉
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/12/11
* Time:21:58
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Getter
public enum TopicEnum {
/**
* 用戶註冊topic路由key配置
*/
USER_REGISTER("register.user")
;
private String topicRouteKey;
TopicEnum(String topicRouteKey) {
this.topicRouteKey = topicRouteKey;
}
}
複製代碼
#
咱們在QueueEnum
內配置的路由鍵
時有個特殊的符號:#
,在RabbitMQ
消息隊列內路由配置#
時表示能夠匹配零個或多個字符,咱們TopicEnum
枚舉內定義的register.user
,則是能夠匹配QueueEnum
枚舉定義register.#
隊列的路由規則。 固然發送消息時若是路由傳遞:register.user.account
也是能夠一樣匹配register.#
的路由規則。
*
除此以外比較經常使用到的特殊字符還有一個*
,在RabbitMQ
消息隊列內路由配置*
時表示能夠匹配一個字符,咱們QueueEnum
定義路由鍵若是修改爲register.*
時,發送消息時路由爲register.user
則是能夠接受到消息的。但若是發送時的路由爲register.user.account
時,則是沒法匹配該消息。
配置準備工做已經作好,接下來咱們開始配置隊列相關的內容,跟以前同樣咱們須要配置Queue
、Exchange
、Binding
將消息隊列與交換綁定。下面咱們來看看配置跟以前的章節有什麼差別的地方,代碼以下所示:
/**
* 用戶註冊消息隊列配置
* ========================
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:16:58
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Configuration
public class UserRegisterQueueConfiguration {
private Logger logger = LoggerFactory.getLogger(UserRegisterQueueConfiguration.class);
/**
* 配置用戶註冊主題交換
* @return
*/
@Bean
public TopicExchange userTopicExchange()
{
TopicExchange topicExchange = new TopicExchange(ExchangeEnum.USER_REGISTER_TOPIC_EXCHANGE.getName());
logger.info("用戶註冊交換實例化成功。");
return topicExchange;
}
/**
* 配置用戶註冊
* 發送激活郵件消息隊列
* 並設置持久化隊列
* @return
*/
@Bean
public Queue sendRegisterMailQueue()
{
Queue queue = new Queue(QueueEnum.USER_REGISTER_SEND_MAIL.getName());
logger.info("建立用戶註冊消息隊列成功");
return queue;
}
/**
* 配置用戶註冊
* 建立帳戶消息隊列
* 並設置持久化隊列
* @return
*/
@Bean
public Queue createAccountQueue()
{
Queue queue = new Queue(QueueEnum.USER_REGISTER_CREATE_ACCOUNT.getName());
logger.info("建立用戶註冊帳號隊列成功.");
return queue;
}
/**
* 綁定用戶發送註冊激活郵件隊列到用戶註冊主題交換配置
* @return
*/
@Bean
public Binding sendMailBinding(TopicExchange userTopicExchange,Queue sendRegisterMailQueue)
{
Binding binding = BindingBuilder.bind(sendRegisterMailQueue).to(userTopicExchange).with(QueueEnum.USER_REGISTER_SEND_MAIL.getRoutingKey());
logger.info("綁定發送郵件到註冊交換成功");
return binding;
}
/**
* 綁定用戶建立帳戶到用戶註冊主題交換配置
* @return
*/
@Bean
public Binding createAccountBinding(TopicExchange userTopicExchange,Queue createAccountQueue)
{
Binding binding = BindingBuilder.bind(createAccountQueue).to(userTopicExchange).with(QueueEnum.USER_REGISTER_CREATE_ACCOUNT.getRoutingKey());
logger.info("綁定建立帳號到註冊交換成功。");
return binding;
}
}
複製代碼
咱們從上面開始分析。 第一步: 首先咱們建立了TopicExchange
消息隊列對象,使用ExchangeEnum
枚舉內的USER_REGISTER_TOPIC_EXCHANGE
類型做爲交換名稱。
第二步:咱們建立了發送註冊郵件的隊列sendRegisterMailQueue
,使用QueueEnum
枚舉內的類型USER_REGISTER_SEND_MAIL
做爲隊列名稱。
第三步:與發送郵件隊列一致,用戶建立完成後須要初始化帳戶信息,而createAccountQueue
消息隊列後續邏輯就是來完成該工做,使用QueueEnum
枚舉內的USER_REGISTER_CREATE_ACCOUNT
枚舉做爲建立帳戶隊列名稱。
第四步:在上面步驟中已經將交換、隊列建立完成,下面就開始將隊列綁定到用戶註冊交換,從而實現註冊用戶消息隊列消息消費,sendMailBinding
綁定了QueueEnum.USER_REGISTER_SEND_MAIL
的RoutingKey
配置信息。
createAccountBinding
綁定了QueueEnum.USER_REGISTER_CREATE_ACCOUNT
的RoutingKey
配置信息。
到目前爲止咱們完成了rabbitmq-topic-common
模塊的全部配置信息,下面咱們開始編寫用戶註冊消息消費者模塊。
咱們首先來建立一個子模塊命名爲rabbitmq-topic-consumer
,在pom.xml
配置文件內添加rabbitmq-topic-common
模塊的引用,以下所示:
....//
<dependencies>
<!--公共模塊依賴-->
<dependency>
<groupId>com.hengyu</groupId>
<artifactId>rabbitmq-topic-common</artifactId>
<version>${parent.version}</version>
</dependency>
</dependencies>
....//
複製代碼
下面咱們來建立程序啓動類RabbitMqTopicConsumerApplication
,在這裏須要注意,手動配置下掃描路徑@ComponentScan
,啓動類代碼以下所示:
/**
* 消息消費者程序啓動入口
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/12/11
* Time:21:48
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@SpringBootApplication
@ComponentScan(value = "com.hengyu.rabbitmq")
public class RabbitMqTopicConsumerApplication {
/**
* logback
*/
private static Logger logger = LoggerFactory.getLogger(RabbitMqTopicConsumerApplication.class);
/**
* 程序入口
* @param args
*/
public static void main(String[] args)
{
SpringApplication.run(RabbitMqTopicConsumerApplication.class,args);
logger.info("【【【【【Topic隊列消息Consumer啓動成功】】】】】");
}
}
複製代碼
手動配置掃描路徑在文章的開始解釋過了,主要目的是爲了掃描到RabbitMQConfiguration
配置類內的信息,讓RabbitAdmin
自動建立配置信息到server
端。
發送郵件消息費監聽register.mail
消息隊列信息,以下所示:
/**
* 發送用戶註冊成功郵件消費者
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/12/11
* Time:22:07
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Component
@RabbitListener(queues = "register.mail")
public class SendMailConsumer
{
/**
* logback
*/
Logger logger = LoggerFactory.getLogger(SendMailConsumer.class);
/**
* 處理消息
* 發送用戶註冊成功郵件
* @param userId 用戶編號
*/
@RabbitHandler
public void handler(String userId)
{
logger.info("用戶:{},註冊成功,自動發送註冊成功郵件.",userId);
//... 發送註冊成功郵件邏輯
}
}
複製代碼
在這裏我只是完成了消息的監聽,具體的業務邏輯能夠根據需求進行處理。
建立用戶帳戶信息消費者監聽隊列register.account
,代碼以下所示:
/**
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/12/11
* Time:22:04
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Component
@RabbitListener(queues = "register.account")
public class CreateAccountConsumer {
/**
* logback
*/
Logger logger = LoggerFactory.getLogger(CreateAccountConsumer.class);
/**
* 處理消息
* 建立用戶帳戶
* @param userId 用戶編號
*/
@RabbitHandler
public void handler(String userId)
{
logger.info("用戶:{},註冊成功,自動建立帳戶信息.",userId);
//... 建立帳戶邏輯
}
}
複製代碼
建立帳戶,帳戶初始化邏輯均可以在handler
方法進行處理,本章沒有作數據庫複雜的處理,因此沒有過多的邏輯處理在消費者業務內。
接下來是咱們的消息提供者的模塊編寫,咱們依然先來建立程序入口類,並添加掃描配置@ComponentScan
路徑,代碼以下所示:
/**
* 消息生產者程序啓動入口
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/12/11
* Time:21:48
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@SpringBootApplication
@ComponentScan(value = "com.hengyu.rabbitmq")
public class RabbitMqTopicProviderApplication {
/**
* logback
*/
private static Logger logger = LoggerFactory.getLogger(RabbitMqTopicProviderApplication.class);
/**
* 程序入口
* @param args
*/
public static void main(String[] args)
{
SpringApplication.run(RabbitMqTopicProviderApplication.class,args);
logger.info("【【【【【Topic隊列消息Provider啓動成功】】】】】");
}
}
複製代碼
建立QueueMessageService
隊列消息發送接口並添加send
方法,以下所示:
/**
* 消息隊列業務
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:14:50
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
public interface QueueMessageService
{
/**
* 發送消息到rabbitmq消息隊列
* @param message 消息內容
* @param exchangeEnum 交換配置枚舉
* @param routingKey 路由key
* @throws Exception
*/
public void send(Object message, ExchangeEnum exchangeEnum, String routingKey) throws Exception;
}
複製代碼
send
方法內有三個參數,解析以下:
TopicEnum
枚舉內的topicRouteKey
做爲參數值。下面咱們來看看該接口的實現類QueueMessageServiceSupport
內send
方法實現,以下所示:
/**
* 消息隊列業務邏輯實現
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:14:52
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Component
public class QueueMessageServiceSupport
implements QueueMessageService
{
/**
* 消息隊列模板
*/
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(Object message, ExchangeEnum exchangeEnum, String routingKey) throws Exception {
//發送消息到消息隊列
rabbitTemplate.convertAndSend(exchangeEnum.getName(),routingKey,message);
}
}
複製代碼
咱們經過RabbitTemplate
實例的convertAndSend
方法將對象類型轉換成JSON
字符串後發送到消息隊列服務端,RabbitMQ
接受到消息後根據註冊的消費者而且路由規則篩選後進行消息轉發,並實現消息的消費。
爲了方便測試咱們建立一個名爲UserService
的實現類,以下所示:
/**
* 用戶業務邏輯
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/12/11
* Time:22:10
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Service
public class UserService
{
/**
* 消息隊列發送業務邏輯
*/
@Autowired
private QueueMessageService queueMessageService;
/**
* 隨機建立用戶
* 隨機生成用戶uuid編號,發送到消息隊列服務端
* @return
* @throws Exception
*/
public String randomCreateUser() throws Exception
{
//用戶編號
String userId = UUID.randomUUID().toString();
//發送消息到rabbitmq服務端
queueMessageService.send(userId, ExchangeEnum.USER_REGISTER_TOPIC_EXCHANGE, TopicEnum.USER_REGISTER.getTopicRouteKey());
return userId;
}
}
複製代碼
該類內添加了一個名爲randomCreateUser
隨機建立用戶的方法,經過UUID
隨機生成字符串做爲用戶的編號進行傳遞給用戶註冊消息隊列,完成用戶的模擬建立。
接下來咱們建立RabbitMqTester
測試類來完成隨機用戶建立消息發送,測試用例完成簡單的UserService
注入,並調用randomCreateUser
方法,以下所示:
/**
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/12/11
* Time:22:10
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqTopicProviderApplication.class)
public class RabbitMqTester
{
/**
* 用戶業務邏輯
*/
@Autowired
private UserService userService;
/**
* 模擬隨機建立用戶 & 發送消息到註冊用戶消息隊列
* @throws Exception
*/
@Test
public void testTopicMessage() throws Exception
{
userService.randomCreateUser();
}
}
複製代碼
到目前爲止,咱們的編碼已經完成,下面咱們按照下面的步驟啓動測試:
- 啓動
rabbitmq-topic-consumer
消息消費者模塊,並查看控制檯輸出內容是否正常- 運行
rabbitmq-topic-provider
模塊測試用例方法testTopicMessage
- 查看
rabbitmq-topic-consumer
控制檯輸出內容
最終效果:
2017-12-30 18:39:16.819 INFO 2781 --- [ main] c.h.r.c.RabbitMqTopicConsumerApplication : 【【【【【Topic隊列消息Consumer啓動成功】】】】】
2017-12-30 18:39:29.376 INFO 2781 --- [cTaskExecutor-1] c.h.r.consumer.CreateAccountConsumer : 用戶:c6ef682d-da2e-4cac-a004-c244ff4c4503,註冊成功,自動建立帳戶信息.
2017-12-30 18:39:29.376 INFO 2781 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.SendMailConsumer : 用戶:c6ef682d-da2e-4cac-a004-c244ff4c4503,註冊成功,自動發送註冊成功郵件.
複製代碼
本章主要講解了TopicExchange
交換類型如何消費隊列消息,講解了經常使用到了的特殊字符#
、*
如何匹配,解決了多模塊下的隊列配置信息沒法自動建立問題。還有一點須要注意TopicExchange
交換類型在消息消費時不存在固定的前後順序!!!
本章源碼已經上傳到碼雲: SpringBoot配套源碼地址:gitee.com/hengboy/spr… SpringCloud配套源碼地址:gitee.com/hengboy/spr… SpringBoot相關係列文章請訪問:目錄:SpringBoot學習目錄 QueryDSL相關係列文章請訪問:QueryDSL通用查詢框架學習目錄 SpringDataJPA相關係列文章請訪問:目錄:SpringDataJPA學習目錄,感謝閱讀! 歡迎加入QQ技術交流羣,共同進步。