Spring Boot:使用Rabbit MQ消息隊列

綜合概述

消息隊列

消息隊列就是一個消息的鏈表,能夠把消息看做一個記錄,具備特定的格式以及特定的優先級。對消息隊列有寫權限的進程能夠向消息隊列中按照必定的規則添加新消息,對消息隊列有讀權限的進程則能夠從消息隊列中讀走消息,而消息隊列就是在消息的傳輸過程當中保存消息的容器,你能夠簡單的把消息隊列理解爲相似快遞櫃,快遞員(消息發佈者)往快遞櫃(消息隊列)投遞物件(消息),接受者(消息訂閱者)從快遞櫃(消息隊列)接收物件(消息),固然消息隊列每每還包含一些特定的消息傳遞和接收機制。

消息隊列做爲分佈式系統中重要的組件,能夠有效解決應用耦合,異步消息,流量削鋒等系列問題,有利於實現高性能,高可用,可伸縮和最終一致性架構。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,各類消息隊列也都各有特色,好比Kafka提供高性能、高吞吐量,但可靠性有所欠缺,因此比較適合像日誌處理這類對性能要求高但對可靠性要求沒那麼嚴格的業務,再好比RabbitMQ支持了各類協議,實現較爲臃腫,性能和吞吐量都通常,但卻提供了很好的可靠性,比較適合像銀行金融一類對可靠性要求較高的業務。html

應用場景

如下簡單介紹幾個消息隊列在實際應用中的使用場景(如下場景資料引用自網絡)。前端

1 異步處理

場景說明:用戶註冊後,須要發註冊郵件和註冊短信。傳統的作法有兩種 1.串行的方式;2.並行方式java

(1)串行方式:將註冊信息寫入數據庫成功後,發送註冊郵件,再發送註冊短信。以上三個任務所有完成後,返回給客戶端git

 

(2)並行方式:將註冊信息寫入數據庫成功後,發送註冊郵件的同時,發送註冊短信。以上三個任務完成後,返回給客戶端。與串行的差異是,並行的方式能夠提升處理的時間web

 

假設三個業務節點每一個使用50毫秒鐘,不考慮網絡等其餘開銷,則串行方式的時間是150毫秒,並行的時間多是100毫秒。ajax

由於CPU在單位時間內處理的請求數是必定的,假設CPU1秒內吞吐量是100次。則串行方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)spring

小結:如以上案例描述,傳統的方式系統的性能(併發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?數據庫

引入消息隊列,將不是必須的業務邏輯,異步處理。改造後的架構以下:apache

 

按照以上約定,用戶的響應時間至關因而註冊信息寫入數據庫的時間,也就是50毫秒。註冊郵件,發送短信寫入消息隊列後,直接返回,所以寫入消息隊列的速度很快,基本能夠忽略,所以用戶的響應時間多是50毫秒。所以架構改變後,系統的吞吐量提升到每秒20 QPS。比串行提升了3倍,比並行提升了兩倍api

2 應用解耦

場景說明:用戶下單後,訂單系統須要通知庫存系統。傳統的作法是,訂單系統調用庫存系統的接口。以下圖

 

傳統模式的缺點:

  • 假如庫存系統沒法訪問,則訂單減庫存將失敗,從而致使訂單失敗

  • 訂單系統與庫存系統耦合

如何解決以上問題呢?引入應用消息隊列後的方案,以下圖:

 

  • 訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功

  • 庫存系統:訂閱下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做

  • 假如:在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。實現訂單系統與庫存系統的應用解耦

3 流量削鋒

流量削鋒也是消息隊列中的經常使用場景,通常在秒殺或團搶活動中使用普遍

應用場景:秒殺活動,通常會由於流量過大,致使流量暴增,應用掛掉。爲解決這個問題,通常須要在應用前端加入消息隊列。

  • 能夠控制活動的人數

  • 能夠緩解短期內高流量壓垮應用

 

  • 用戶的請求,服務器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面

  • 秒殺業務根據消息隊列中的請求信息,再作後續處理

4 日誌處理

日誌處理是指將消息隊列用在日誌處理中,好比Kafka的應用,解決大量日誌傳輸的問題。架構簡化以下

 

  • 日誌採集客戶端,負責日誌數據採集,定時寫受寫入Kafka隊列

  • Kafka消息隊列,負責日誌數據的接收,存儲和轉發

  • 日誌處理應用:訂閱並消費kafka隊列中的日誌數據

如下是新浪kafka日誌處理應用案例:

 

(1)Kafka:接收用戶日誌的消息隊列

(2)Logstash:作日誌解析,統一成JSON輸出給Elasticsearch

(3)Elasticsearch:實時日誌分析服務的核心技術,一個schemaless,實時的數據存儲服務,經過index組織數據,兼具強大的搜索和統計功能

(4)Kibana:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是衆多公司選擇ELK stack的重要緣由

5 消息通信

消息通信是指,消息隊列通常都內置了高效的通訊機制,所以也能夠用在純的消息通信。好比實現點對點消息隊列,或者聊天室等

點對點通信:

 

客戶端A和客戶端B使用同一隊列,進行消息通信。

聊天室通信:

 

客戶端A,客戶端B,客戶端N訂閱同一主題,進行消息發佈和接收。實現相似聊天室效果。

以上實際是消息隊列的兩種消息模式,點對點或發佈訂閱模式。模型爲示意圖,供參考。

Rabbit MQ

AMQP,即 Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦和通信。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性和安全。
RabbitMQ是一個開源的AMQP實現,服務器端用 Erlang 語言編寫,支持多種客戶端,如:Java、Python、Ruby、.NET、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,具備很高的易用性和可用性。
 
接下來,咱們先來了解幾個相關概念(如下相關介紹資料引用自網絡)。

ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket連接,它封裝了socket協議相關部分邏輯。ConnectionFactory爲Connection的製造工廠。 Channel是咱們與RabbitMQ打交道的最重要的一個接口,咱們大部分的業務操做是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發佈消息等。

Queue

Queue(隊列)是RabbitMQ的內部對象,用於存儲消息。

RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)能夠從Queue中獲取消息並消費。

生產者Send Message 「A」被傳送到Queue中,消費者發現消息隊列Queue中有訂閱的消息,就會將這條消息A讀取出來進行一些列的業務操做。這裏只是一個消費正對應一個隊列Queue,也能夠多個消費者訂閱同一個隊列Queue,固然這裏就會將Queue裏面的消息平分給其餘的消費者,可是會存在一個一個問題就是若是每一個消息的處理時間不一樣,就會致使某些消費者一直在忙碌中,而有的消費者處理完了消息後一直處於空閒狀態,由於前面已經說起到了Queue會平分這些消息給相應的消費者。這裏咱們就可使用prefetchCount來限制每次發送給消費者消息的個數。詳情見下圖所示:

這裏的prefetchCount=1是指每次從Queue中發送一條消息來。等消費者處理完這條消息後Queue會再發送一條消息給消費者。

Message acknowledgment

在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其餘意外)的狀況,這種狀況下就可能會致使消息丟失。爲了不這種狀況發生,咱們能夠要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後纔將該消息從Queue中移除;若是RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ鏈接斷開,則RabbitMQ會將該消息發送給其餘消費者(若是存在多個消費者)進行處理。這裏不存在timeout概念,一個消費者處理消息時間再長也不會致使該消息被髮送給其餘消費者,除非它的RabbitMQ鏈接斷開。 這裏會產生另一個問題,若是咱們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會致使嚴重的bug——Queue中堆積的消息會愈來愈多;消費者重啓後會重複消費這些消息並重復執行業務邏輯…

另外pub message是沒有ack的。

Message durability

若是咱們但願即便在RabbitMQ服務重啓的狀況下,也不會丟失消息,咱們能夠將Queue與Message都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是咱們須要對這種小几率事件也要管理起來,那麼咱們要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此這裏將不講解RabbitMQ相關的事務。

Exchange

首先明確一點就是生產者產生的消息並非直接發送給消息隊列Queue的,而是要通過Exchange(交換器),由Exchange再將消息路由到一個或多個Queue,固然這裏還會對不符合路由規則的消息進行丟棄掉,這裏指的是後續要談到的Exchange Type。那麼Exchange是怎樣將消息準確的推送到對應的Queue的呢?那麼這裏的功勞最大的當屬Binding,RabbitMQ是經過Binding將Exchange和Queue連接在一塊兒,這樣Exchange就知道如何將消息準確的推送到Queue中去。簡單示意圖以下所示:

        

在綁定(Binding)Exchange和Queue的同時,通常會指定一個Binding Key,生產者將消息發送給Exchange的時候,通常會產生一個Routing Key,當Routing Key和Binding Key對應上的時候,消息就會發送到對應的Queue中去。那麼Exchange有四種類型,不一樣的類型有着不一樣的策略。也就是代表不一樣的類型將決定綁定的Queue不一樣,換言之就是說生產者發送了一個消息,Routing Key的規則是A,那麼生產者會將Routing Key=A的消息推送到Exchange中,這時候Exchange中會有本身的規則,對應的規則去篩選生產者發來的消息,若是可以對應上Exchange的內部規則就將消息推送到對應的Queue中去。那麼接下來就來詳細講解下Exchange裏面類型。

Exchange Types

  • fanout

        fanout類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中。

       

    上圖所示,生產者(P)生產消息1將消息1推送到Exchange,因爲Exchange Type=fanout這時候會遵循fanout的規則將消息推送到全部與它綁定Queue,也就是圖上的兩個Queue最後兩個消費者消費。

  • direct

        direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中

         

     當生產者(P)發送消息時Rotuing key=booking時,這時候將消息傳送給Exchange,Exchange獲取到生產者發送過來消息後,會根據自身的規則進行與匹配相應的Queue,這時發現Queue1和Queue2都符合,就會將消息傳送給這兩個隊列,若是咱們以Rotuing key=create和Rotuing key=confirm發送消息時,這時消息只會被推送到Queue2隊列中,其餘Routing Key的消息將會被丟棄。

  • topic

      前面提到的direct規則是嚴格意義上的匹配,換言之Routing Key必須與Binding Key相匹配的時候纔將消息傳送給Queue,那麼topic這個規則就是模糊匹配,能夠經過通配符知足一部分規則就能夠傳送。它的約定是:

  1. routing key爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」
  2. binding key與routing key同樣也是句點號「. 」分隔的字符串
  3. binding key中能夠存在兩種特殊字符「*」與「#」,用於作模糊匹配,其中「*」用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)

      

  當生產者發送消息Routing Key=F.C.E的時候,這時候只知足Queue1,因此會被路由到Queue中,若是Routing Key=A.C.E這時候會被同是路由到Queue1和Queue2中,若是Routing Key=A.F.B時,這裏只會發送一條消息到Queue2中。

  • headers

        headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對;若是徹底匹配則消息會路由到該Queue,不然不會路由到該Queue。

實現案例

首先,須要安裝Rabbit MQ,能夠直接安裝,也能夠用Docker安裝,這個網上教程不少,這裏就再也不贅述了。

生成項目模板

爲方便咱們初始化項目,Spring Boot給咱們提供一個項目模板生成網站。

1.  打開瀏覽器,訪問:https://start.spring.io/

2.  根據頁面提示,選擇構建工具,開發語言,項目信息等。

3.  點擊 Generate the project,生成項目模板,生成以後會將壓縮包下載到本地。

4.  使用IDE導入項目,我這裏使用Eclipse,經過導入Maven項目的方式導入。

添加相關依賴

清理掉不須要的測試類及測試依賴,添加 rabbitmq相關依賴。

<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

下面給出完整的POM文件。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.louis.springboot</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- web -->
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
        <!-- swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

添加相關配置

添加一個swagger 配置類,在工程下新建 config 包並添加一個 SwaggerConfig 配置類。

SwaggerConfig.java

package com.louis.springboot.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket createRestApi(){
        return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.any())
                .paths(PathSelectors.any()).build();
    }

    private ApiInfo apiInfo(){
        return new ApiInfoBuilder()
                .title("Swagger API Doc")
                .description("This is a restful api document of Swagger.")
                .version("1.0")
                .build();
    }

}

修改application.properties文件名爲application.yml,在其中添加RabbitMQ配置信息,根據本身安裝的RabbitMQ配置。

application.yml

# rabbitmq配置
spring:
    rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: guest

普通隊列模式

新建一個RabbitMQ配置類,並添加一個demoQueue隊列。

RabbitConfig.java

package com.louis.springboot.demo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    /**
     * 定義demoQueue隊列
     * @return
     */
    @Bean
    public Queue demoString() {
        return new Queue("demoQueue");
    }
    
}

編寫一個消息發佈者,並編寫一個發送方法,經過AmqpTemplate往"demoQueue"發送消息。

RabbitProducer.java

package com.louis.springboot.demo.mq;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendDemoQueue() {
        Date date = new Date();
        String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
        System.out.println("[demoQueue] send msg: " + dateString);  
        // 第一個參數爲剛剛定義的隊列名稱
        this.rabbitTemplate.convertAndSend("demoQueue", dateString);
    }
}

編寫一個消息消費者,經過@RabbitListener(queues = "demoQueue")註解監聽"demoQueue"隊列,並用@RabbitHandler註解相關方法,這樣在在隊列收到消息以後,交友@RabbitHandler註解的方法進行處理。

DemoQueueConsumer.java

package com.louis.springboot.demo.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "demoQueue")
public class DemoQueueConsumer {

    /**
     * 消息消費
     * @RabbitHandler 表明此方法爲接受到消息後的處理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("[demoQueue] recieved message: " + msg);
    }

}

編寫一個控制器,注入RabbitProducer調用相關消息發送方法,方便經過接口觸發消息發送。

RabbitMqController.java

package com.louis.springboot.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.louis.springboot.demo.mq.RabbitProducer;

@RestController
public class RabbitMqController {

    @Autowired
    private RabbitProducer rabbitProducer;

    @GetMapping("/sendDemoQueue")
    public Object sendDemoQueue() {
        rabbitProducer.sendDemoQueue();
        return "success";
    }
}

編譯並啓動應用,打開瀏覽器,訪問:http://localhost:8080/swagger-ui.html,進入swagger接口文檔界面。

調用兩次sendDemoQueue接口,在控制檯能夠看到咱們輸出的信息,說明消息已經成功發送並被消費。

[demoQueue] send msg: 2019-58-183 04:07:38
[demoQueue] recieved message: 2019-58-183 04:07:38
[demoQueue] send msg: 2019-01-183 05:07:05
[demoQueue] recieved message: 2019-01-183 05:07:05

Fanout廣播模式

Fanout其實就是廣播模式,只要跟它綁定的隊列都會通知而且接受到消息。修改配置類,在RabbitConfig中添加以下fanout模式的隊列跟交換機信息。在代碼中咱們配置了三個隊列名、一個fanout交換機,而且將這三個隊列綁定到了fanout交換器上。只要咱們往這個交換機生產新的消息,那麼這三個隊列都會收到。

RabbitConfig.java

    //=================== fanout廣播模式  ====================

    @Bean
    public Queue fanoutA() {
        return new Queue("fanout.a");
    }

    @Bean
    public Queue fanoutB() {
        return new Queue("fanout.b");
    }

    @Bean
    public Queue fanoutC() {
        return new Queue("fanout.c");
    }

    /**
     * 定義個fanout交換器
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange() {
        // 定義一個名爲fanoutExchange的fanout交換器
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 將定義的fanoutA隊列與fanoutExchange交換機綁定
     * @return
     */
    @Bean
    public Binding bindingExchangeWithA() {
        return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
    }

    /**
     * 將定義的fanoutB隊列與fanoutExchange交換機綁定
     * @return
     */
    @Bean
    public Binding bindingExchangeWithB() {
        return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
    }

    /**
     * 將定義的fanoutC隊列與fanoutExchange交換機綁定
     * @return
     */
    @Bean
    public Binding bindingExchangeWithC() {
        return BindingBuilder.bind(fanoutC()).to(fanoutExchange());
    }

而後咱們在RabbitProducer中添加一個sendFanout方法,用來向fanout隊列發送消息。

RabbitProducer.java

public void sendFanout() {
    Date date = new Date();
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
    System.out.println("[fanout] send msg:" + dateString);
    // 注意 第一個參數是咱們交換機的名稱 ,第二個參數是routerKey 咱們不用管空着就能夠,第三個是你要發送的消息
    this.rabbitTemplate.convertAndSend("fanoutExchange", "", dateString);
}

一樣的,在控制器裏添加一個訪問接口。

RabbitMqController.java

@GetMapping("/sendFanout")
public Object sendFanout() {
    rabbitProducer.sendFanout();
    return "success";
}

接着針對三個廣播隊列分別編寫一個消息消費者,指定隊列和處理函數。

FanoutAConsumer.java

package com.louis.springboot.demo.mq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.a")
public class FanoutAConsumer {

    /**
     * 消息消費
     * @RabbitHandler 表明此方法爲接受到消息後的處理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("[fanout.a] recieved message: " + msg);
    }
}

FanoutBConsumer.java

package com.louis.springboot.demo.mq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.b")
public class FanoutBConsumer {

    /**
     * 消息消費
     * @RabbitHandler 表明此方法爲接受到消息後的處理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("[fanout.b] recieved message: " + msg);
    }
}

FanoutCConsumer.java

package com.louis.springboot.demo.mq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.c")
public class FanoutCConsumer {

    /**
     * 消息消費
     * @RabbitHandler 表明此方法爲接受到消息後的處理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("[fanout.c] recieved message: " + msg);
    }
}

從新啓動應用,調用sendFanout接口,經過控制檯能夠看到消息發送以後,a, b, c三個隊列都收到了消息。

[fanout] send msg:2019-47-183 05:07:12
[fanout.c] recieved message: 2019-47-183 05:07:12
[fanout.b] recieved message: 2019-47-183 05:07:12
[fanout.a] recieved message: 2019-47-183 05:07:12

Topic主題模式

利用topic模式能夠實現模糊匹配,一樣的,在RabbitConfig中配置topic隊列跟交換器,注意的是這裏須要多配置一個bindingKey。

RabbitConfig.java

    //=================== topic主題模式  ====================

    @Bean
    public Queue topiocA() {
        return new Queue("topic.a");
    }

    @Bean
    public Queue topicB() {
        return new Queue("topic.b");
    }

    @Bean
    public Queue topicC() {
        return new Queue("topic.c");
    }

    /**
     * 定義個topic交換器
     * @return
     */
    @Bean
    TopicExchange topicExchange() {
        // 定義一個名爲fanoutExchange的fanout交換器
        return new TopicExchange("topicExchange");
    }

    /**
     * 將定義的topicA隊列與topicExchange交換機綁定
     * @return
     */
    @Bean
    public Binding bindingTopicExchangeWithA() {
        return BindingBuilder.bind(topiocA()).to(topicExchange()).with("topic.msg");
    }

    /**
     * 將定義的topicB隊列與topicExchange交換機綁定
     * @return
     */
    @Bean
    public Binding bindingTopicExchangeWithB() {
        return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.#");
    }

    /**
     * 將定義的topicC隊列與topicExchange交換機綁定
     * @return
     */
    @Bean
    public Binding bindingTopicExchangeWithC() {
        return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic.*.z");
    }

上述配置中:

topicA的key爲topic.msg 那麼他只會接收包含topic.msg的消息

topicB的key爲topic.#那麼他只會接收topic開頭的消息

topicC的key爲topic.*.z那麼他只會接收topic.x.z這樣格式的消息

而後修改RabbitProducer,在其中添加以下三個方法,如方法名所示,分別根據匹配規則發送到A\B,B,B\C隊列。

RabbitProducer.java

public void sendTopicTopicAB() {
    Date date = new Date();
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
    dateString = "[topic.msg] send msg:" + dateString;
    System.out.println(dateString);
    // 注意 第一個參數是咱們交換機的名稱 ,第二個參數是routerKey topic.msg,第三個是你要發送的消息
    // 這條信息將會被 topic.a  topic.b接收
    this.rabbitTemplate.convertAndSend("topicExchange", "topic.msg", dateString);
}

public void sendTopicTopicB() {
    Date date = new Date();
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
    dateString = "[topic.good.msg] send msg:" + dateString;
    System.out.println(dateString);
    // 注意 第一個參數是咱們交換機的名稱 ,第二個參數是routerKey ,第三個是你要發送的消息
    // 這條信息將會被topic.b接收
    this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString);
}

public void sendTopicTopicBC() {
    Date date = new Date();
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
    dateString = "[topic.m.z] send msg:" + dateString;
    System.out.println(dateString);
    // 注意 第一個參數是咱們交換機的名稱 ,第二個參數是routerKey ,第三個是你要發送的消息
    // 這條信息將會被topic.b、topic.c接收
    this.rabbitTemplate.convertAndSend("topicExchange", "topic.m.z", dateString);
}

一樣的,在控制器裏面添加發送服務對應的接口。

RabbitMqController.java

@GetMapping("/sendTopicTopicAB")
public Object sendTopicTopicAB() {
    rabbitProducer.sendTopicTopicAB();
    return "success";
}

@GetMapping("/sendTopicTopicB")
public Object sendTopicTopicB() {
    rabbitProducer.sendTopicTopicB();
    return "success";
}

@GetMapping("/sendTopicTopicBC")
public Object sendTopicTopicBC() {
    rabbitProducer.sendTopicTopicBC();
    return "success";
}

接着針對三個主題隊列編寫對應的消息消費者。

TopicAConsumer.java

package com.louis.springboot.demo.mq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.a")
public class TopicAConsumer {

    /**
     * 消息消費
     * @RabbitHandler 表明此方法爲接受到消息後的處理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("[topic.a] recieved message:" + msg);
    }
}

TopicBConsumer.java

package com.louis.springboot.demo.mq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.b")
public class TopicBConsumer {

    /**
     * 消息消費
     * @RabbitHandler 表明此方法爲接受到消息後的處理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("[topic.b] recieved message:" + msg);
    }
}

TopicCConsumer.java

package com.louis.springboot.demo.mq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.c")
public class TopicCConsumer {

    /**
     * 消息消費
     * @RabbitHandler 表明此方法爲接受到消息後的處理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("[topic.c] recieved message:" + msg);
    }
}

重啓應用,調用sendTopicTopicAB接口,通過匹配,route key爲「topic.msg」的消息被髮送到了topic.a和topic.b。

[topic.msg] send msg:2019-12-183 06:07:22
[topic.b] recieved message:[topic.msg] send msg:2019-12-183 06:07:22
[topic.a] recieved message:[topic.msg] send msg:2019-12-183 06:07:22

調用sendTopicTopicB接口,通過匹配,route key爲「topic.good.msg」的消息被髮送到了topic.b。

[topic.good.msg] send msg:2019-15-183 06:07:23
[topic.b] recieved message:[topic.good.msg] send msg:2019-15-183 06:07:23

調用sendTopicTopicBC接口,通過匹配,route key爲「topic.m.z」的消息被髮送到了topic.b和topic.c。

[topic.m.z] send msg:2019-16-183 06:07:09
[topic.b] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09
[topic.c] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09

 

 

參考資料

官方網站:https://www.rabbitmq.com/

百度百科:https://baike.baidu.com/item/rabbitmq/9372144?fr=aladdin

中文教程:http://rabbitmq.mr-ping.com/description.html

相關導航

Spring Boot 系列教程目錄導航

Spring Boot:快速入門教程

Spring Boot:整合Swagger文檔

Spring Boot:整合MyBatis框架

Spring Boot:實現MyBatis分頁

源碼下載

碼雲:https://gitee.com/liuge1988/spring-boot-demo.git


做者:朝雨憶輕塵
出處:https://www.cnblogs.com/xifengxiaoma/ 
版權全部,歡迎轉載,轉載請註明原文做者及出處。

相關文章
相關標籤/搜索