第四十一章: 基於SpringBoot & RabbitMQ完成DirectExchange分佈式消息消費

消息隊列目前流行的有KafKa、RabbitMQ、ActiveMQ等,它們的誕生無非不是爲了解決消息的分佈式消費,完成項目、服務之間的解耦動做。消息隊列提供者與消費者之間徹底採用異步通訊方式,極力的提升了系統的響應能力,從而提升系統的網絡請求吞吐量。 每一種的消息隊列都有它在設計上的獨一無二的優點,在實際的項目技術選型時根據項目的需求來肯定。html

本章目標

基於SpringBoot項目整合RabbitMQ消息隊列,完成DirectExchange(路由鍵)分佈式消息消費。java

Exchange

RabbitMQ中有三種轉發方式,分別是:mysql

DirectExchange:路由鍵方式轉發消息。 FanoutExchange:廣播方式轉發消息。 TopicExchange:主題匹配方式轉發消息。git

咱們本章先來說解DirectExchange路由鍵方式,根據設置的路由鍵的值進行徹底匹配時轉發,下面咱們來看一張圖,形象的介紹了轉發消息匹配流程,以下圖所示: web

DirectExchange

咱們能夠看到上圖,當消息被提供者發送到RabbitMQ後,會根據配置隊列的交換以及綁定實例進行轉發消息,上圖只會將消息轉發路由鍵爲KEY的隊列消費者對應的實現方法邏輯中,從而完成消息的消費過程。spring

安裝RabbitMQ

由於RabbitMQ是跨平臺的分佈式消息隊列服務,能夠部署在任意的操做系統上,下面咱們分別介紹在不一樣的系統下該怎麼去安裝RabbitMQ服務。sql

咱們本章採用的環境版本以下:數據庫

  • RabbitMQ Server 3.6.14
  • Erlang/OTP_X64 20.1

Windows下安裝

咱們先去RabbitMQ官方網站下載最新版的安裝包,下載地址:https://www.rabbitmq.com/download.html,能夠根據不一樣的操做系統選擇下載。 咱們在安裝RabbitMQ服務端時須要Erlang環境的支持,因此咱們須要先安裝Erlangjson

  1. 咱們經過Erlang官方網站http://www.erlang.org/downloads下載最新的安裝包,由於是國外的網站因此下載比較慢,不過沒有關係,我再本章源碼的resource目錄下存放了安裝包,本章源碼在文章底部。bash

  2. 咱們訪問RabiitmQ官方下載地址https://www.rabbitmq.com/download.html下載最新安裝包,該安裝包一樣存放在resource目錄下。

  3. 運行安裝Erlang

  4. 運行安裝RabbitMQ

5.檢查服務是否安裝完成,RabbitMQ安裝完成後會以服務的形式建立,而且隨着開機啓動,以下所示:

Rabbit服務

Mac OS X 安裝

在Mac OS X中咱們使用brew工具能夠很簡單的安裝RabbitMQ服務端,步驟以下:

  1. brew更新到最新版本,執行:brew update
  2. 接下來咱們安裝Erlang,執行:brew install erlang
  3. 最後安裝RabbitMQ,執行:brew install rabbitmq

咱們經過上面的步驟安裝後,RabbitMQ會被自動安裝到/usr/local/sbin目錄下,下面咱們須要手動設置環境變量,來支持服務運行,修改.profile配置文件並添加以下配置:

PATH=$PATH:/usr/local/sbin
複製代碼

配置完成後,能夠直接經過rabbitmq-server命令來操做RabbitMQ服務。

Ubuntu 安裝

Ubuntu操做系統中,咱們能夠直接使用APT倉庫進行安裝,我使用的系統版本是16.04,系統版本並不影響安裝。

  1. 安裝Erlang,執行命令:sudo apt-get install erlang
  2. 下面咱們須要將RabbitMQ的安裝源配置信息寫入到系統的/etc/apt/sources.list.d配置文件內,執行以下命令:
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
複製代碼
  1. 下面咱們更新APT本地倉庫的安裝包列表,執行命令:sudo apt-get update
  2. 最後安裝RabbitMQ服務,執行命令:sudo apt-get install rabbitmq-server

啓用界面管理插件

RabbitMQ提供了界面管理的web插件,咱們只須要啓用指定的插件就能夠了,下面咱們來看看Windows操做系統下該怎麼啓動界面管理插件。 咱們使用CMD進入RabbitMQ安裝目錄C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.14,而後咱們進入sbin目錄,能夠看到目錄內存在不少個bat腳本程序,咱們找到rabbitmq-plugins.bat,這個腳本程序能夠控制RabbitMQ插件啓用禁用,咱們執行以下腳本命令來啓用界面管理插件:

rabbitmq-plugins.bat enable rabbitmq_management
複製代碼

命令行輸出內容以下所示:

The following plugins have been enabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@yuqiyu... started 6 plugins.
複製代碼

能夠看到輸出的內容RabbitMQ自動啓動了6個插件,咱們如今訪問http://127.0.0.1:15672地址能夠直接打開RabbitMQ的界面管理平臺,而默認的用戶名/密碼分別爲:guest/guest,經過該用戶能夠直接登陸管理平臺。

禁用界面管理插件

咱們一樣能夠禁用RabbitMQ指定插件,執行以下命令:

rabbitmq-plugins.bat disable rabbitmq_management
複製代碼

命令建立輸出內容則是相關中止插件的日誌,以下:

The following plugins have been disabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@yuqiyu... stopped 6 plugins.
複製代碼

這樣咱們再訪問http://127.0.0.1:15672就會發現咱們沒法訪問到界面。

構建項目

咱們使用idea開發工具建立一個SpringBoot項目,添加依賴,pom.xml配置文件以下所示:

<dependencies>
		<!--rabbitmq依賴-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<!--web依賴-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!--lombok依賴-->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<!--fastjson依賴-->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.40</version>
		</dependency>
		<!--測試依賴-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
複製代碼

咱們本章來模擬用戶註冊完成後,將註冊用戶的編號經過Provider模塊發送到RabbitMQ,而後RabbitMQ根據配置的DirectExchange的路由鍵進行異步轉發。

初始化用戶表

下面咱們先來建立所須要的用戶基本信息表,建表SQL以下所示:

CREATE TABLE `user_info` (
  `UI_ID` int(11) DEFAULT NULL COMMENT '用戶編號',
  `UI_USER_NAME` varchar(20) DEFAULT NULL COMMENT '用戶名稱',
  `UI_NAME` varchar(20) DEFAULT NULL COMMENT '真實姓名',
  `UI_AGE` int(11) DEFAULT NULL COMMENT '用戶年齡',
  `UI_BALANCE` decimal(10,0) DEFAULT NULL COMMENT '用戶餘額'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用戶基本信息表';
複製代碼

構建 rabbitmq-provider 項目

基於咱們上述的項目建立一個Maven子模塊,命名爲:rabbitmq-provider,由於是直接建立的Module項目,IDEA並無給我建立SpringApplication啓用類。

建立入口類

下面咱們自行建立一個Provider項目啓動入口程序,以下所示:

/**
 * 消息隊列消息提供者啓動入口
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:14
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqProviderApplication
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqProviderApplication.class);

    /**
     * 消息隊列提供者啓動入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqProviderApplication.class,args);

        logger.info("【【【【【消息隊列-消息提供者啓動成功.】】】】】");
    }
}
複製代碼
application.properties配置文件

下面咱們在src/main/resource目錄下建立application.properties並將對應RabbitMQ以及Druid的配置加入,以下所示:

#用戶名
spring.rabbitmq.username=guest
#密碼
spring.rabbitmq.password=guest
#服務器ip
spring.rabbitmq.host=localhost
#虛擬空間地址
spring.rabbitmq.virtual-host=/
#端口號
spring.rabbitmq.port=5672
#配置發佈消息確認回調
spring.rabbitmq.publisher-confirms=true

#數據源配置
spring.datasource.druid.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.druid.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true
spring.datasource.druid.username=root
spring.datasource.druid.password=123456
複製代碼

RabbitMQ內有個virtual-host即虛擬主機的概念,一個RabbitMQ服務能夠配置多個虛擬主機,每個虛擬機主機之間是相互隔離,相互獨立的,受權用戶到指定的virtual-host就能夠發送消息到指定隊列。

用戶實體

本章數據庫操做採用spring-data-jpa,相關文章請訪問:第十三章:SpringBoot實戰SpringDataJPA,咱們基於user_info數據表對應建立實體,以下所示:

@Data
@Table(name = "user_info")
@Entity
public class UserEntity
    implements Serializable
{
    /**
     * 用戶編號
     */
    @Id
    @GeneratedValue
    @Column(name = "UI_ID")
    private Long id;
    /**
     * 用戶名稱
     */
    @Column(name = "UI_USER_NAME")
    private String userName;
    /**
     * 姓名
     */
    @Column(name = "UI_NAME")
    private String name;
    /**
     * 年齡
     */
    @Column(name = "UI_AGE")
    private int age;
    /**
     * 餘額
     */
    @Column(name = "UI_BALANCE")
    private BigDecimal balance;
}
複製代碼
用戶數據接口

建立UserRepository用戶數據操做接口,並繼承JpaRepository得到spring-data-jpa相關的接口定義方法。以下所示:

/**
 * 用戶數據接口定義
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:35
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
public interface UserRepository
    extends JpaRepository<UserEntity,Long>
{
}
複製代碼
用戶業務邏輯實現

本章只是簡單完成了數據的添加,代碼以下所示:

/**
 * 用戶業務邏輯實現類
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:37
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Service
@Transactional(rollbackFor = Exception.class)
public class UserService
{
    @Autowired
    private UserRepository userRepository;
    /**
     * 消息隊列業務邏輯實現
     */
    @Autowired
    private QueueMessageService queueMessageService;

    /**
     * 保存用戶
     * 並寫入消息隊列
     * @param userEntity
     * @return
     */
    public Long save(UserEntity userEntity) throws Exception
    {
        /**
         * 保存用戶
         */
        userRepository.save(userEntity);
        /**
         * 將消息寫入消息隊列
         */
        queueMessageService.send(userEntity.getId(), ExchangeEnum.USER_REGISTER, QueueEnum.USER_REGISTER);

        return userEntity.getId();
    }
複製代碼

在上面業務邏輯實現類內出現了一個名爲QueueMessageService消息隊列實現類,該類是咱們定義的用於發送消息到消息隊列的統一入口,在下面咱們會詳細講解。

用戶控制器

建立一個名爲UserController的控制器類,對應編寫一個添加用戶的請求方法,以下所示:

/**
 * 用戶控制器
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:41
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@RestController
@RequestMapping(value = "/user")
public class UserController
{
    /**
     * 用戶業務邏輯
     */
    @Autowired
    private UserService userService;

    /**
     * 保存用戶基本信息
     * @param userEntity
     * @return
     */
    @RequestMapping(value = "/save")
    public UserEntity save(UserEntity userEntity) throws Exception
    {
        userService.save(userEntity);
        return userEntity;
    }
}
複製代碼

到這咱們添加用戶的流程已經編寫完成了,那麼咱們就來看下消息隊列QueueMessageService接口的定義以及實現類的定義。

消息隊列方法定義接口

建立一個名爲QueueMessageService的接口而且繼承了RabbitTemplate.ConfirmCallback接口,而RabbitTemplate.ConfirmCallback接口是用來回調消息發送成功後的方法,當一個消息被成功寫入到RabbitMQ服務端時,就會自動的回調RabbitTemplate.ConfirmCallback接口內的confirm方法完成通知,QueueMessageService接口以下所示:

/**
 * 消息隊列業務
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:50
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
public interface QueueMessageService
    extends RabbitTemplate.ConfirmCallback
{
    /**
     * 發送消息到rabbitmq消息隊列
     * @param message 消息內容
     * @param exchangeEnum 交換配置枚舉
     * @param queueEnum 隊列配置枚舉
     * @throws Exception
     */
    public void send(Object message, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception;
}
複製代碼

接下來咱們須要實現該接口內的全部方法,並作出一些業務邏輯的處理。

消息隊列業務實現

建立名爲QueueMessageServiceSupport實體類實現QueueMessageService接口,並實現接口內的全部方法,以下所示:

/**
 * 消息隊列業務邏輯實現
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:52
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
public class QueueMessageServiceSupport
    implements QueueMessageService
{
    /**
     * 消息隊列模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(Object message, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception {
        //設置回調爲當前類對象
        rabbitTemplate.setConfirmCallback(this);
        //構建回調id爲uuid
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //發送消息到消息隊列
        rabbitTemplate.convertAndSend(exchangeEnum.getValue(),queueEnum.getRoutingKey(),message,correlationId);
    }

    /**
     * 消息回調確認方法
     * @param correlationData 請求數據對象
     * @param ack 是否發送成功
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回調id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息發送成功");
        } else {
            System.out.println("消息發送失敗:" + cause);
        }
    }
}
複製代碼

convertAndSend方法用於將Object類型的消息轉換後發送到RabbitMQ服務端,發送是的消息類型要與消息消費者方法參數保持一致。

confirm方法內,咱們僅僅打印了消息發送時的id,根據ack參數輸出消息發送狀態。

在上面代碼中咱們注入了RabbitTemplate消息隊列模板實例,而經過該實例咱們能夠將消息發送到RabbitMQ服務端。那麼這個實例具體在什麼地方定義的呢?咱們帶着這個疑問來建立下面的模塊,咱們須要將RabbitMQ相關的配置抽取出來做爲一個單獨的Module存在。

構建 rabbitmq-common 項目

該模塊項目很簡單,只是添加RabbitMQ相關的配置信息,因爲Module是一個子模塊因此繼承了parent全部的依賴,固然咱們用到的RabbitMQ相關依賴也不例外。

配置rabbitmq

在建立配置類以前,咱們先來定義兩個枚舉,分別存放了隊列的交換信息、隊列路由信息,

  • ExchangeEnum (存放了隊列交換配置信息)
/**
 * rabbitmq交換配置枚舉
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:13:56
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum ExchangeEnum
{
    /**
     * 用戶註冊交換配置枚舉
     */
    USER_REGISTER("user.register.topic.exchange")
    ;
    private String value;

    ExchangeEnum(String value) {
        this.value = value;
    }
}
複製代碼
  • QueueEnum (存放了隊列信息以及隊列的路由配置信息)
/**
 * 隊列配置枚舉
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:05
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum QueueEnum
{
    /**
     * 用戶註冊枚舉
     */
    USER_REGISTER("user.register.queue","user.register")
    ;
    /**
     * 隊列名稱
     */
    private String name;
    /**
     * 隊列路由鍵
     */
    private String routingKey;

    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}
複製代碼

建立名爲UserRegisterQueueConfiguration的實體類用於配置本章用到的用戶註冊隊列信息,若是你得項目中使用多個隊列,建議每個業務邏輯建立一個配置類,分開維護,這樣不容易出錯。配置信息以下:

/**
 * 用戶註冊消息隊列配置
 * ========================
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:16:58
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Configuration
public class UserRegisterQueueConfiguration {
    /**
     * 配置路由交換對象實例
     * @return
     */
    @Bean
    public DirectExchange userRegisterDirectExchange()
    {
        return new DirectExchange(ExchangeEnum.USER_REGISTER.getValue());
    }

    /**
     * 配置用戶註冊隊列對象實例
     * 並設置持久化隊列
     * @return
     */
    @Bean
    public Queue userRegisterQueue()
    {
        return new Queue(QueueEnum.USER_REGISTER.getName(),true);
    }

    /**
     * 將用戶註冊隊列綁定到路由交換配置上並設置指定路由鍵進行轉發
     * @return
     */
    @Bean
    public Binding userRegisterBinding()
    {
        return BindingBuilder.bind(userRegisterQueue()).to(userRegisterDirectExchange()).with(QueueEnum.USER_REGISTER.getRoutingKey());
    }
}
複製代碼

該配置類大體分爲以下三部分:

  • 配置交換實例 配置DirectExchange實例對象,爲交換設置一個名稱,引用ExchangeEnum枚舉配置的交換名稱,消息提供者與消息消費者的交換名稱必須一致才具有的第一步的通信基礎。

  • 配置隊列實例 配置Queue實例對象,爲消息隊列設置一個名稱,引用QueueEnum枚舉配置的隊列名稱,固然隊列的名稱一樣也是提供者與消費者之間的通信基礎。

  • 綁定隊列實例到交換實例 配置Binding實例對象,消息綁定的目的就是將Queue實例綁定到Exchange上,而且經過設置的路由Key進行消息轉發,配置了路由Key後,只有符合該路由配置的消息纔會被轉發到綁定交換上的消息隊列。

咱們的rabbitmq-common模塊已經編寫完成。

添加 rabbitmq-provider 依賴 rabbitmq-common

下面咱們回到rabbitmq-provider模塊,修改pom.xml配置文件,以下所示:

<dependencies>
        <!--添加common模塊依賴-->
        <dependency>
            <groupId>com.hengyu</groupId>
            <artifactId>rabbitmq-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <!--mysql依賴-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--druid數據源依賴-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.5</version>
        </dependency>
        <!--data jpa依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
    </dependencies>
複製代碼

能夠看到咱們將rabbitmq-common模塊添加到了rabbitmq-provider模塊的pom配置文件內,完成了模塊之間的相互依賴,這樣咱們rabbitmq-provider就自動添加了對應的消息隊列配置。

構建rabbitmq-consumer

咱們再來建立一個rabbitmq-consumer隊列消息消費者模塊,用於接受消費用戶註冊消息。

建立入口類

一樣咱們先來建立一個SpringApplication入口啓動類,以下所示:

/**
 * 消息隊列消息消費者入口
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:15
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqConsumerApplication
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerApplication.class);

    /**
     * rabbitmq消費者啓動入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqConsumerApplication.class,args);

        logger.info("【【【【【消息隊列-消息消費者啓動成功.】】】】】");
    }
}
複製代碼
application.properties配置文件

配置文件的消息隊列配置信息要與rabbitmq-provider配置文件一致,以下所示:

spring.application.name=rabbitmq-consumer
#啓動端口
server.port=1111
#用戶名
spring.rabbitmq.username=guest
#密碼
spring.rabbitmq.password=guest
#服務器ip
spring.rabbitmq.host=localhost
#虛擬空間地址
spring.rabbitmq.virtual-host=/
#端口號
spring.rabbitmq.port=5672
#配置發佈消息確認回調
spring.rabbitmq.publisher-confirms=true
複製代碼

咱們修改了程序啓動的端口號,爲了咱們下面進行測試的時候不出現端口占用的狀況。

若是RabbitMQ配置信息與rabbitmq-provider不一致,就不會收到消費消息。

用戶註冊消息消費者

建立名爲UserConsumer類,用於完成消息監聽,而且實現消息消費,以下所示:

/**
 * 用戶註冊消息消費者
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:20
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {

    @RabbitHandler
    public void execute(Long userId)
    {
        System.out.println("用戶:" + userId+",完成了註冊");

        //...//自行業務邏輯處理
    }
}
複製代碼

在消息消費者類內,有兩個陌生的註解:

  • @RabbitListener RabbitMQ隊列消息監聽註解,該註解配置監聽queues內的隊列名稱列表,能夠配置多個。隊列名稱對應本章rabbitmq-common模塊內QueueEnum枚舉name屬性。
  • @RabbitHandler RabbitMQ消息處理方法,該方法的參數要與rabbitmq-provider發送消息時的類型保持一致,不然沒法自動調用消費方法,也就沒法完成消息的消費。

#運行測試 咱們接下來在rabbitmq-provider模塊src/test/java下建立一個測試用例,訪問用戶註冊控制器請求路徑,以下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqProviderApplication.class)
public class UserTester
{
    /**
     * 模擬mvc測試對象
     */
    private MockMvc mockMvc;

    /**
     * web項目上下文
     */
    @Autowired
    private WebApplicationContext webApplicationContext;

    /**
     * 全部測試方法執行以前執行該方法
     */
    @Before
    public void before() {
        //獲取mockmvc對象實例
        mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
    }

    /**
     * 測試添加用戶
     * @throws Exception
     */
    @Test
    public void testUserAdd() throws Exception
    {
        mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
                .param("userName","yuqiyu")
                .param("name","恆宇少年")
                .param("age","23")
        )
                .andDo(MockMvcResultHandlers.log())
                .andReturn();
    }
}
複製代碼

調用測試用例時會自動將參數保存到數據庫,而且將用戶編號發送到RabbitMQ服務端,而RabbitMQ根據交換配置以及隊列配置轉發消息到消費者實例。

啓動 rabbitmq-consumer

咱們先來把rabbitmq-consumer項目啓動,控制檯輸出啓動日誌以下所示:

.....
51.194  INFO 2340 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'rabbitConnectionFactory' has been autodetected for JMX exposure
2017-12-03 16:58:51.196  INFO 2340 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'rabbitConnectionFactory': registering with JMX server as MBean [org.springframework.amqp.rabbit.connection:name=rabbitConnectionFactory,type=CachingConnectionFactory]
2017-12-03 16:58:51.216  INFO 2340 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2017-12-03 16:58:51.237  INFO 2340 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#443ff8ef:0/SimpleConnection@4369ac5c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62107]
2017-12-03 16:58:51.287  INFO 2340 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-03 16:58:51.290  INFO 2340 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : Started RabbitmqConsumerApplication in 2.354 seconds (JVM running for 3.026)
2017-12-03 16:58:51.290  INFO 2340 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息隊列-消息消費者啓動成功.】】】】】
複製代碼

該部分啓動日誌就是咱們配置的RabbitMQ初始化信息,咱們能夠看到項目啓動時會自動與配置的RabbitMQ進行關聯:

[delegate=amqp://guest@127.0.0.1:5672/, localPort= 62107]
複製代碼
運行測試用例

接下來咱們執行rabbitmq-provider項目的測試用例,來查看控制檯的輸出內容以下所示:

......
 回調id:e08f6d82-57bc-4c3f-9899-31c4b990c5be
消息發送成功
......
複製代碼

已經能夠正常的將消息發送到RabbitMQ服務端,而且接收到了回調通知,那麼咱們的rabbitmq-consumer項目是否是已經執行了消息的消費呢?咱們打開rabbitmq-consumer控制檯查看輸出內容以下所示:

用戶:2,完成了註冊
複製代碼

看以看到已經能夠成功的執行UserConsumer消息監聽類內的監聽方法邏輯,到這裏消息隊列路由一對一的方式已經講解完了。

總結

本章主要講解了RabbitMQ在不一樣操做系統下的安裝方式,以及經過三個子模塊形象的展現了消息的分佈式處理,總體流程:rabbitmq-provider -> RabbitMQ服務端 -> rabbitmq-consumer,消息的轉發是很是快的,RabbitMQ在收到消息後就會檢索當前服務端是否存在該消息的消費者,若是存在將會立刻將消息轉發。

本章源碼已經上傳到碼雲: SpringBoot配套源碼地址:gitee.com/hengboy/spr… SpringCloud配套源碼地址:gitee.com/hengboy/spr… SpringBoot相關係列文章請訪問:目錄:SpringBoot學習目錄 QueryDSL相關係列文章請訪問:QueryDSL通用查詢框架學習目錄 SpringDataJPA相關係列文章請訪問:目錄:SpringDataJPA學習目錄 SpringBoot相關文章請訪問:目錄:SpringBoot學習目錄,感謝閱讀! 歡迎加入QQ技術交流羣,共同進步。

QQ技術交流羣
相關文章
相關標籤/搜索