第四十三章: 基於SpringBoot & RabbitMQ完成TopicExchange分佈式消息消費

咱們在以前的兩個章節第四十一章: 基於SpringBoot & RabbitMQ完成DirectExchange分佈式消息消費第四十二章: 基於SpringBoot & RabbitMQ完成DirectExchange分佈式消息多消費者消費提升了RabbitMQ消息隊列的DirectExchange交換類型的消息消費,咱們以前的章節提到了RabbitMQ比較經常使用的交換類型有三種,咱們今天來看看TopicExchange主題交換類型。java

本章目標

基於SpringBoot平臺完成RabbitMQTopicExchange消息類型交換。git

SpringBoot 企業級核心技術學習專題

專題 專題名稱 專題描述
001 Spring Boot 核心技術 講解SpringBoot一些企業級層面的核心組件
002 Spring Boot 核心技術章節源碼 Spring Boot 核心技術簡書每一篇文章碼雲對應源碼
003 Spring Cloud 核心技術 對Spring Cloud核心技術全面講解
004 Spring Cloud 核心技術章節源碼 Spring Cloud 核心技術簡書每一篇文章對應源碼
005 QueryDSL 核心技術 全面講解QueryDSL核心技術以及基於SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技術 全面講解SpringDataJPA核心技術

解決問題

以前少年也遇到了一個問題,分類了多模塊後消息隊列沒法自動建立,說來也可笑,以前沒有時間去看這個問題,今天在編寫本章文章時發現緣由居然是SpringBoot沒有掃描到common模塊內的配置類。讓我一陣的頭大~~~,咱們在XxxApplication啓動類上添加@ComponentScan(value = "com.hengyu.rabbitmq")就能夠自動建立隊列了!!!web

構建項目

本章構建項目時一樣採用多模塊的方式進行設計,能夠很好的看到消息處理的效果,由於是多模塊項目,咱們先來建立一個SpringBoot項目,pom.xml配置文件依賴配置以下所示:spring

<dependencies>
		<!--rabbbitMQ相關依賴-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<!--web相關依賴-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!--lombok依賴-->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<!--spring boot tester-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!--fast json依賴-->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.40</version>
		</dependency>
	</dependencies>
複製代碼

下面咱們先來構建公共RabbitMQ模塊,由於咱們的消費者以及生產者都是須要RabbitMQ相關的配置信息,這裏咱們能夠提取出來,使用時進行模塊之間的引用。數據庫

rabbitmq-topic-common

建立子模塊rabbitmq-topic-common,在resources下添加application.yml配置文件並配置RabbitMQ相關的依賴配置,以下所示:json

spring:
  #rabbitmq消息隊列配置信息
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirms: true
複製代碼
定義交換配置信息

咱們跟以前的章節一張,獨立編寫一個枚舉類型來配置消息隊列的交換信息,以下所示:bash

/**
 * rabbitmq交換配置枚舉
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:13:56
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum ExchangeEnum
{
    /**
     * 用戶註冊交換配置枚舉
     */
    USER_REGISTER_TOPIC_EXCHANGE("register.topic.exchange")
    ;
    private String name;

    ExchangeEnum(String name) {
        this.name = name;
    }
}
複製代碼
定義隊列配置信息

一樣消息隊列的基本信息配置也一樣採用枚舉的形式配置,以下所示:app

/**
 * 隊列配置枚舉
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:05
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum QueueEnum
{
    /**
     * 用戶註冊
     * 建立帳戶消息隊列
     */
    USER_REGISTER_CREATE_ACCOUNT("register.account","register.#"),
    /**
     * 用戶註冊
     * 發送註冊成功郵件消息隊列
     */
    USER_REGISTER_SEND_MAIL("register.mail","register.#")
    ;
    /**
     * 隊列名稱
     */
    private String name;
    /**
     * 隊列路由鍵
     */
    private String routingKey;

    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}
複製代碼

消息隊列枚舉內添加了兩個屬性,分別對應了隊列名稱隊列路由,咱們本章所講解的TopicExchange類型消息隊列能夠根據路徑信息配置多個消息消費者,而轉發的匹配規則信息則是咱們定義的隊列的路由信息。框架

定義發送消息路由信息

咱們在發送消息到隊列時,須要咱們傳遞一個路由相關的配置信息,RabbitMQ會根據發送時的消息路由規則信息與定義消息隊列時的路由信息進行匹配,若是能夠匹配則調用該隊列的消費者完成消息的消費,發送消息路由信息配置以下所示:dom

/**
 * 消息隊列topic交換路由key配置枚舉
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:58
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum TopicEnum {
    /**
     * 用戶註冊topic路由key配置
     */
    USER_REGISTER("register.user")
    ;

    private String topicRouteKey;

    TopicEnum(String topicRouteKey) {
        this.topicRouteKey = topicRouteKey;
    }
}
複製代碼
路由特殊字符 #

咱們在QueueEnum內配置的路由鍵時有個特殊的符號:#,在RabbitMQ消息隊列內路由配置#時表示能夠匹配零個或多個字符,咱們TopicEnum枚舉內定義的register.user,則是能夠匹配QueueEnum枚舉定義register.#隊列的路由規則。 固然發送消息時若是路由傳遞:register.user.account也是能夠一樣匹配register.#的路由規則。

路由特殊字符 *

除此以外比較經常使用到的特殊字符還有一個*,在RabbitMQ消息隊列內路由配置*時表示能夠匹配一個字符,咱們QueueEnum定義路由鍵若是修改爲register.*時,發送消息時路由爲register.user則是能夠接受到消息的。但若是發送時的路由爲register.user.account時,則是沒法匹配該消息。

消息隊列配置

配置準備工做已經作好,接下來咱們開始配置隊列相關的內容,跟以前同樣咱們須要配置QueueExchangeBinding將消息隊列與交換綁定。下面咱們來看看配置跟以前的章節有什麼差別的地方,代碼以下所示:

/**
 * 用戶註冊消息隊列配置
 * ========================
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:16:58
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Configuration
public class UserRegisterQueueConfiguration {

    private Logger logger = LoggerFactory.getLogger(UserRegisterQueueConfiguration.class);
    /**
     * 配置用戶註冊主題交換
     * @return
     */
    @Bean
    public TopicExchange userTopicExchange()
    {
        TopicExchange topicExchange = new TopicExchange(ExchangeEnum.USER_REGISTER_TOPIC_EXCHANGE.getName());
        logger.info("用戶註冊交換實例化成功。");
        return topicExchange;
    }

    /**
     * 配置用戶註冊
     * 發送激活郵件消息隊列
     * 並設置持久化隊列
     * @return
     */
    @Bean
    public Queue sendRegisterMailQueue()
    {
        Queue queue = new Queue(QueueEnum.USER_REGISTER_SEND_MAIL.getName());
        logger.info("建立用戶註冊消息隊列成功");
        return queue;
    }

    /**
     * 配置用戶註冊
     * 建立帳戶消息隊列
     * 並設置持久化隊列
     * @return
     */
    @Bean
    public Queue createAccountQueue()
    {
        Queue queue = new Queue(QueueEnum.USER_REGISTER_CREATE_ACCOUNT.getName());
        logger.info("建立用戶註冊帳號隊列成功.");
        return queue;
    }

    /**
     * 綁定用戶發送註冊激活郵件隊列到用戶註冊主題交換配置
     * @return
     */
    @Bean
    public Binding sendMailBinding(TopicExchange userTopicExchange,Queue sendRegisterMailQueue)
    {
        Binding binding = BindingBuilder.bind(sendRegisterMailQueue).to(userTopicExchange).with(QueueEnum.USER_REGISTER_SEND_MAIL.getRoutingKey());
        logger.info("綁定發送郵件到註冊交換成功");
        return binding;
    }

    /**
     * 綁定用戶建立帳戶到用戶註冊主題交換配置
     * @return
     */
    @Bean
    public Binding createAccountBinding(TopicExchange userTopicExchange,Queue createAccountQueue)
    {
        Binding binding = BindingBuilder.bind(createAccountQueue).to(userTopicExchange).with(QueueEnum.USER_REGISTER_CREATE_ACCOUNT.getRoutingKey());
        logger.info("綁定建立帳號到註冊交換成功。");
        return binding;
    }
}
複製代碼

咱們從上面開始分析。 第一步: 首先咱們建立了TopicExchange消息隊列對象,使用ExchangeEnum枚舉內的USER_REGISTER_TOPIC_EXCHANGE類型做爲交換名稱。

第二步:咱們建立了發送註冊郵件的隊列sendRegisterMailQueue,使用QueueEnum枚舉內的類型USER_REGISTER_SEND_MAIL做爲隊列名稱。

第三步:與發送郵件隊列一致,用戶建立完成後須要初始化帳戶信息,而createAccountQueue消息隊列後續邏輯就是來完成該工做,使用QueueEnum枚舉內的USER_REGISTER_CREATE_ACCOUNT枚舉做爲建立帳戶隊列名稱。

第四步:在上面步驟中已經將交換、隊列建立完成,下面就開始將隊列綁定到用戶註冊交換,從而實現註冊用戶消息隊列消息消費,sendMailBinding綁定了QueueEnum.USER_REGISTER_SEND_MAILRoutingKey配置信息。

createAccountBinding綁定了QueueEnum.USER_REGISTER_CREATE_ACCOUNTRoutingKey配置信息。

到目前爲止咱們完成了rabbitmq-topic-common模塊的全部配置信息,下面咱們開始編寫用戶註冊消息消費者模塊。

rabbitmq-topic-consumer

咱們首先來建立一個子模塊命名爲rabbitmq-topic-consumer,在pom.xml配置文件內添加rabbitmq-topic-common模塊的引用,以下所示:

....//
<dependencies>
        <!--公共模塊依賴-->
        <dependency>
            <groupId>com.hengyu</groupId>
            <artifactId>rabbitmq-topic-common</artifactId>
            <version>${parent.version}</version>
        </dependency>
    </dependencies>
....//
複製代碼
消費者程序入口

下面咱們來建立程序啓動類RabbitMqTopicConsumerApplication,在這裏須要注意,手動配置下掃描路徑@ComponentScan,啓動類代碼以下所示:

/**
 * 消息消費者程序啓動入口
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:48
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
@ComponentScan(value = "com.hengyu.rabbitmq")
public class RabbitMqTopicConsumerApplication {

    /**
     * logback
     */
    private static Logger logger = LoggerFactory.getLogger(RabbitMqTopicConsumerApplication.class);

    /**
     * 程序入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitMqTopicConsumerApplication.class,args);

        logger.info("【【【【【Topic隊列消息Consumer啓動成功】】】】】");
    }
}
複製代碼

手動配置掃描路徑在文章的開始解釋過了,主要目的是爲了掃描到RabbitMQConfiguration配置類內的信息,讓RabbitAdmin自動建立配置信息到server端。

發送郵件消費者

發送郵件消息費監聽register.mail消息隊列信息,以下所示:

/**
 * 發送用戶註冊成功郵件消費者
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:07
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "register.mail")
public class SendMailConsumer
{

    /**
     * logback
     */
    Logger logger = LoggerFactory.getLogger(SendMailConsumer.class);

    /**
     * 處理消息
     * 發送用戶註冊成功郵件
     * @param userId 用戶編號
     */
    @RabbitHandler
    public void handler(String userId)
    {

        logger.info("用戶:{},註冊成功,自動發送註冊成功郵件.",userId);

        //... 發送註冊成功郵件邏輯
    }
}
複製代碼

在這裏我只是完成了消息的監聽,具體的業務邏輯能夠根據需求進行處理。

建立帳戶消費者

建立用戶帳戶信息消費者監聽隊列register.account,代碼以下所示:

/**
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:04
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "register.account")
public class CreateAccountConsumer {

    /**
     * logback
     */
    Logger logger = LoggerFactory.getLogger(CreateAccountConsumer.class);

    /**
     * 處理消息
     * 建立用戶帳戶
     * @param userId 用戶編號
     */
    @RabbitHandler
    public void handler(String userId)
    {
        logger.info("用戶:{},註冊成功,自動建立帳戶信息.",userId);

        //... 建立帳戶邏輯
    }
}
複製代碼

建立帳戶,帳戶初始化邏輯均可以在handler方法進行處理,本章沒有作數據庫複雜的處理,因此沒有過多的邏輯處理在消費者業務內。

rabbitmq-topic-provider

接下來是咱們的消息提供者的模塊編寫,咱們依然先來建立程序入口類,並添加掃描配置@ComponentScan路徑,代碼以下所示:

/**
 * 消息生產者程序啓動入口
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:48
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
@ComponentScan(value = "com.hengyu.rabbitmq")
public class RabbitMqTopicProviderApplication {

    /**
     * logback
     */
    private static Logger logger = LoggerFactory.getLogger(RabbitMqTopicProviderApplication.class);

    /**
     * 程序入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitMqTopicProviderApplication.class,args);

        logger.info("【【【【【Topic隊列消息Provider啓動成功】】】】】");
    }
}
複製代碼
定義消息發送接口

建立QueueMessageService隊列消息發送接口並添加send方法,以下所示:

/**
 * 消息隊列業務
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:50
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
public interface QueueMessageService
{
    /**
     * 發送消息到rabbitmq消息隊列
     * @param message 消息內容
     * @param exchangeEnum 交換配置枚舉
     * @param routingKey 路由key
     * @throws Exception
     */
    public void send(Object message, ExchangeEnum exchangeEnum, String routingKey) throws Exception;
}
複製代碼

send方法內有三個參數,解析以下:

  • message:發送消息內容,能夠爲任意類型,固然本章內僅僅是java.lang.String。
  • exchangeEnum:咱們自定義的交換枚舉類型,方便發送消息到指定交換。
  • routingKey:發送消息時的路由鍵內容,該值採用TopicEnum枚舉內的topicRouteKey做爲參數值。

下面咱們來看看該接口的實現類QueueMessageServiceSupportsend方法實現,以下所示:

/**
 * 消息隊列業務邏輯實現
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:52
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
public class QueueMessageServiceSupport
    implements QueueMessageService
{
    /**
     * 消息隊列模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(Object message, ExchangeEnum exchangeEnum, String routingKey) throws Exception {
        //發送消息到消息隊列
        rabbitTemplate.convertAndSend(exchangeEnum.getName(),routingKey,message);
    }
}
複製代碼

咱們經過RabbitTemplate實例的convertAndSend方法將對象類型轉換成JSON字符串後發送到消息隊列服務端,RabbitMQ接受到消息後根據註冊的消費者而且路由規則篩選後進行消息轉發,並實現消息的消費。

運行測試

爲了方便測試咱們建立一個名爲UserService的實現類,以下所示:

/**
 * 用戶業務邏輯
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:10
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Service
public class UserService
{
    /**
     * 消息隊列發送業務邏輯
     */
    @Autowired
    private QueueMessageService queueMessageService;

    /**
     * 隨機建立用戶
     * 隨機生成用戶uuid編號,發送到消息隊列服務端
     * @return
     * @throws Exception
     */
    public String randomCreateUser() throws Exception
    {
        //用戶編號
        String userId = UUID.randomUUID().toString();
        //發送消息到rabbitmq服務端
        queueMessageService.send(userId, ExchangeEnum.USER_REGISTER_TOPIC_EXCHANGE, TopicEnum.USER_REGISTER.getTopicRouteKey());
        return userId;
    }
}
複製代碼

該類內添加了一個名爲randomCreateUser隨機建立用戶的方法,經過UUID隨機生成字符串做爲用戶的編號進行傳遞給用戶註冊消息隊列,完成用戶的模擬建立。

編寫測試用例

接下來咱們建立RabbitMqTester測試類來完成隨機用戶建立消息發送,測試用例完成簡單的UserService注入,並調用randomCreateUser方法,以下所示:

/**
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:10
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqTopicProviderApplication.class)
public class RabbitMqTester
{
    /**
     * 用戶業務邏輯
     */
    @Autowired
   private UserService userService;

    /**
     * 模擬隨機建立用戶 & 發送消息到註冊用戶消息隊列
     * @throws Exception
     */
    @Test
    public void testTopicMessage() throws Exception
    {
        userService.randomCreateUser();
    }
}
複製代碼

到目前爲止,咱們的編碼已經完成,下面咱們按照下面的步驟啓動測試:

  1. 啓動rabbitmq-topic-consumer消息消費者模塊,並查看控制檯輸出內容是否正常
  2. 運行rabbitmq-topic-provider模塊測試用例方法testTopicMessage
  3. 查看rabbitmq-topic-consumer控制檯輸出內容

最終效果:

2017-12-30 18:39:16.819  INFO 2781 --- [           main] c.h.r.c.RabbitMqTopicConsumerApplication : 【【【【【Topic隊列消息Consumer啓動成功】】】】】
2017-12-30 18:39:29.376  INFO 2781 --- [cTaskExecutor-1] c.h.r.consumer.CreateAccountConsumer     : 用戶:c6ef682d-da2e-4cac-a004-c244ff4c4503,註冊成功,自動建立帳戶信息.
2017-12-30 18:39:29.376  INFO 2781 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.SendMailConsumer   : 用戶:c6ef682d-da2e-4cac-a004-c244ff4c4503,註冊成功,自動發送註冊成功郵件.
複製代碼

總結

本章主要講解了TopicExchange交換類型如何消費隊列消息,講解了經常使用到了的特殊字符#*如何匹配,解決了多模塊下的隊列配置信息沒法自動建立問題。還有一點須要注意TopicExchange交換類型在消息消費時不存在固定的前後順序!!!

本章源碼已經上傳到碼雲: SpringBoot配套源碼地址:gitee.com/hengboy/spr… SpringCloud配套源碼地址:gitee.com/hengboy/spr… SpringBoot相關係列文章請訪問:目錄:SpringBoot學習目錄 QueryDSL相關係列文章請訪問:QueryDSL通用查詢框架學習目錄 SpringDataJPA相關係列文章請訪問:目錄:SpringDataJPA學習目錄,感謝閱讀! 歡迎加入QQ技術交流羣,共同進步。

QQ技術交流羣
相關文章
相關標籤/搜索