spring-boot-route(十三)整合RabbitMQ

這篇是SpringBoot整合消息隊列的第一篇文章,咱們詳細介紹下消息隊列的相關內容。java

消息隊列簡介

1. 什麼是消息隊列

MQ(Message Quene):經過典型的生產者和消費者模型,生產者不斷向消息隊列中產生消息,消費者不斷的從隊列中獲取消息。由於生產者和消費者都是異步的,並且生產者只關心消息的發送,消費者只關心消息的接收,沒有業務邏輯的侵入,輕鬆實現業務解耦。python

2. 消息隊列有什麼用

  • 異步處理

場景描述:某商場具備註冊功能,註冊的時候須要發送短信驗證碼。git

傳統的作法是用戶提交信息到用戶服務,用戶服務調用短信服務發送短信,而後給用戶返回響應,這種是同步的處理方式,耗時較長。加入消息隊列後,用戶直接提交信息到用戶服務,將信息寫入消息隊列,直接給用戶返回響應,短信服務從消息隊列中讀取消息進行發送短信。github

  • 應用解耦

場景描述:某商場下單流程。redis

傳統作法是用戶下單,訂單系統去查詢庫存系統,若是庫存系統宕機了,則下單失敗,損失訂單量。加入消息隊列後,用戶下單,訂單系統記錄訂單,將訂單信息寫入消息隊列,下單成功,而後庫存系統恢復正常後去操做數據庫庫存(不考慮庫存爲0的狀況)。這樣訂單系統和庫存系統就達到鬆耦合的目的了spring

  • 流量削峯

場景描述:秒殺活動。數據庫

流量過大確定會致使響應超時或系統宕機,加入消息隊列,用戶秒殺請求寫入消息隊列,設置消息隊列的長度等屬性,達到消息隊列最大長度後,直接返回秒殺失敗,而後再去消費消息隊列的數據,完成秒殺。緩存

RabbitMQ簡介

RabbitMQ是用Erlang語言編寫的,實現了高級消息隊列協議(AMQP)的消息中間件。服務器

1. AMQP協議概念

AMQPAMQP是一種連接協議,直接定義網絡交換的數據格式,這使得實現了AMQPprovider自己就是跨平臺的。如下是AMQP協議模型:微信

  • server - 又稱broker,接收客戶端的連接,實現amqp實體服務。
  • Connection - 連接,應用程序跟broker的網絡連接。
  • channel - 網絡信道,幾乎全部的操做都是在channel中進行,數據的流轉都要在channel上進行。channel是進行消息讀寫的通道。客戶端能夠創建多個channel,每一個channel表明一個會話任務。
  • message - 消息,服務器與應用程序之間傳送的數據。由properties和body組成。properties能夠對消息進行修飾,好比消息的升級,延遲等高級特性。body就是消息體的內容。
  • virtual host - 虛擬主機,用於進行邏輯隔離,最上層的消息路由,一個虛擬地址裏面能夠有多個交換機。exchange和消息隊列message quene。
  • exchange - 交換機,接收消息,根據路由器轉發消息到綁定的隊列。
  • binding - 綁定,交換機和隊列之間的虛擬連接,綁定中能夠包含routing key。
  • routing key - 一個路由規則,虛擬機能夠用它來肯定jiekyi如何路由一個特定消息。
  • quene - 消息隊列,保存消息並將它們轉發給消費者。

2. RabbitMQ的消息模型

1. 簡單模型

img

在上圖中:

  • p:生成者
  • C:消費者
  • 紅色部分:quene,消息隊列

2. 工做模型

img

在上圖中:

  • p:生成者
  • C一、C2:消費者
  • 紅色部分:quene,消息隊列

當消息處理比較耗時時,就會出現生產消息的速度遠遠大於消費消息的速度,這樣就會出現消息堆積,沒法及時處理。這時就可讓多個消費者綁定一個隊列,去消費消息,隊列中的消息一旦消費就會丟失,所以任務不會重複執行。

3. 廣播模型(fanout)

img

這種模型中生產者發送的消息全部消費者均可以消費。

在上圖中:

  • p:生成者
  • X:交換機
  • C一、C2:消費者
  • 紅色部分:quene,消息隊列

4. 路由模型(routing)

python-four.png (423×171)

這種模型消費者發送的消息,不一樣類型的消息能夠由不一樣的消費者去消費。

在上圖中:

  • p:生成者
  • X:交換機,接收到生產者的消息後將消息投遞給與routing key徹底匹配的隊列
  • C一、C2:消費者
  • 紅色部分:quene,消息隊列

5. 訂閱模型(topic)

img

這種模型和direct模型同樣,都是能夠根據routing key將消息路由到不一樣的隊列,只不過這種模型可讓隊列綁定routing key 的時候使用通配符。這種類型的routing key都是由一個或多個單詞組成,多個單詞之間用.分割。

通配符介紹:

*:只匹配一個單詞

#:匹配一個或多個單詞

6. RPC模型

img

這種模式須要通知遠程計算機運行功能並等待返回運行結果。這個過程是阻塞的。

當客戶端啓動時,它建立一個匿名獨佔回調隊列。並提供名字爲call的函數,這個call會發送RPC請求而且阻塞直到收到RPC運算的結果。

Spring Boot整合RabbitMQ

第一步:引入pom依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:增長RabbitMQ服務配置信息

spring:
  rabbitmq:
    virtual-host: javatrip
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest

這裏咱們用廣播模型來舉例使用,廣播模型(fanout)比較好理解,就像公衆號同樣,我天天推文章後,會推送給每一個關注用戶,他們均可以看到這條消息。

廣播模型注意點:

  1. 能夠有多個隊列
  2. 每一個隊列都須要綁定交換機
  3. 每一個消費者有本身的隊列
  4. 交換機把消息發送給綁定過的全部隊列

1. 定義兩個隊列

@Configuration
public class RabbitConfig {

    final static String queueNameA = "first-queue";
    final static String queueNameB = "second-queue";

    /***
     * 定義一個隊列,設置隊列屬性
     * @return
     */
    @Bean("queueA")
    public Queue queueA(){

        Map<String,Object> map = new HashMap<>();
        // 消息過時時長,10秒過時
        map.put("x-message-ttl",10000);
        // 隊列中最大消息條數,10條
        map.put("x-max-length",10);
        // 第一個參數,隊列名稱
        // 第二個參數,durable:持久化
        // 第三個參數,exclusive:排外的,
        // 第四個參數,autoDelete:自動刪除
        Queue queue = new Queue(queueNameA,true,false,false,map);
        return queue;
    }
    
    @Bean("queueB")
    public Queue queueB(){

        Map<String,Object> map = new HashMap<>();
        // 消息過時時長,10秒過時
        map.put("x-message-ttl",10000);
        // 隊列中最大消息條數,10條
        map.put("x-max-length",10);
        // 第一個參數,隊列名稱
        // 第二個參數,durable:持久化
        // 第三個參數,exclusive:排外的,
        // 第四個參數,autoDelete:自動刪除
        Queue queue = new Queue(queueNameB,true,false,false,map);
        return queue;
    }
}

2. 定義扇形交換機

@Bean
public FanoutExchange fanoutExchange(){

    // 第一個參數,交換機名稱
    // 第二個參數,durable,是否持久化
    // 第三個參數,autoDelete,是否自動刪除
    FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false);
    return fanoutExchange;
}

3. 交換機和隊列綁定

@Bean
public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){
    Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
    return binding;
}

@Bean
public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){
    Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
    return binding;
}

4. 建立兩個消費者分別監聽兩個隊列

@RabbitListener(queues = RabbitConfig.queueNameA)
@Component
@Slf4j
public class ConsumerA {

    @RabbitHandler
    public void receive(String message){
        log.info("消費者A接收到的消息:"+message);
    }
}
@RabbitListener(queues = RabbitConfig.queueNameB)
@Component
@Slf4j
public class ConsumerB {

    @RabbitHandler
    public void receive(String message){
        log.info("消費者B接收到的消息:"+message);
    }
}

5. 建立生產者生產消息

@RestController
public class provider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("send")
    public void sendMessage(){

        String message = "你好,我是Java旅途";
        rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message);
    }
}

這樣生產者發送一條消息後,兩個消費者就能同時消費到消息了。


本文示例代碼已上傳至github,點個star支持一下!

Spring Boot系列教程目錄

spring-boot-route(一)Controller接收參數的幾種方式

spring-boot-route(二)讀取配置文件的幾種方式

spring-boot-route(三)實現多文件上傳

spring-boot-route(四)全局異常處理

spring-boot-route(五)整合Swagger生成接口文檔

spring-boot-route(六)整合JApiDocs生成接口文檔

spring-boot-route(七)整合jdbcTemplate操做數據庫

spring-boot-route(八)整合mybatis操做數據庫

spring-boot-route(九)整合JPA操做數據庫

spring-boot-route(十)多數據源切換

spring-boot-route(十一)數據庫配置信息加密

spring-boot-route(十二)整合redis作爲緩存

spring-boot-route(十三)整合RabbitMQ

spring-boot-route(十四)整合Kafka

spring-boot-route(十五)整合RocketMQ

spring-boot-route(十六)使用logback生產日誌文件

spring-boot-route(十七)使用aop記錄操做日誌

spring-boot-route(十八)spring-boot-adtuator監控應用

spring-boot-route(十九)spring-boot-admin監控服務

spring-boot-route(二十)Spring Task實現簡單定時任務

spring-boot-route(二十一)quartz實現動態定時任務

spring-boot-route(二十二)實現郵件發送功能

spring-boot-route(二十三)開發微信公衆號

這個系列的文章都是工做中頻繁用到的知識,學完這個系列,應付平常開發綽綽有餘。若是還想了解其餘內容,掃面下方二維碼告訴我,我會進一步完善這個系列的文章!

相關文章
相關標籤/搜索