java框架之SpringBoot(12)-消息及整合RabbitMQ

前言

概述

大多數應用中,可經過消息服務中間件來提高系統異步通訊、擴展解耦的能力。html

消息服務中兩個重要概念:消息代理(message broker)目的地(destination)。當消息發送者發送消息後,將由消息代理接管,消息代理保證消息傳遞到指定目的地。前端

消息隊列主要有兩種形式的目的地:java

  • 隊列(queue):點對點消息通訊(point-to-point)。
  • 主題(topic):發佈(publish)/訂閱(subscribe)消息通訊。

點對點:web

  • 消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取後被移出隊列。
  • 消息只有惟一的發送者和接收者,但並非只能有一個接收者。

發佈/訂閱:spring

  • 發送者(發佈者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那麼就會在消息到達時同時接收到消息。

兩種規範

JMS(Java Message Service):數據庫

  • Java 消息服務,基於 JVM 消息代理的規範。ActiveMQ、HornetMQ 是 JMS 的實現。

AMQP(Advanced Message Queuing Protocol):springboot

  • 高級消息隊列協議,也是一個消息代理的規範,兼容 JMS。
  • RabbitMQ 是 AMQP 的實現。
JMS 與 AMQP 對比:
  JMS AMQP
定義 Java API 網絡級協議
跨語言
跨平臺
Model 提供 2 種消息模型:
  1. Peer-2-Peer
  2. Pub/Sub
提供了 5 種消息模型:
  1. direct exchange
  2. fanout exchange
  3. topic exchange
  4. headers exchange
  5. system exchange
本質來說,後四種和 JMS 的 Pub/Sub 模型沒有太大區別,僅是在路由機制上作了更詳細的區分。
支持消息類型 多種消息類型:
  1. TextMessage
  2. MapMessage
  3. BytesMessage
  4. StreamMessage
  5. ObjectMessage
  6. Message(只有消息頭和屬性)
因其要支持跨語言跨平臺,因此僅支持 byte[],當實際應用中有複雜的消息時,能夠將消息序列化後發送。
綜合 HMS 定義了 Java API 層面的標準,在 Java 體系中,多個 client 都可經過 JMS 進行交互,不須要修改應用代碼,可是其對跨平臺支持較差。 AMQP 定義了 wire-level 層的協議標準,自然具備跨平臺、跨語言特性。
Spring 支持:
  • spring-jms 提供了對 JMS 的支持。
  • spring-rabbit 提供了對 AMQP 的支持。
  • 需使用 ConnectionFactory 的實現來鏈接消息代理。
  • 提供 JmsTemplate、RabbitTemplate 來操做消息。
  • @JmsListener(JMS)和 @RabbitListener(AMQP)註解標註在方法上可監聽消息代理髮布的消息。
  • @EnableJms、@EnableRabbit 開啓支持。
SpringBoot 自動配置類:
  • JMS 的自動配置類爲 JmsAutoConfiguration。
  • AMQP 的自動配置類爲 RabbitAutoConfiguration。

幾種場景

異步處理

場景說明:用戶註冊後,須要發註冊郵件和註冊短信。傳統的作法有兩種:串行方式、並行方式。服務器

一、串行方式:將註冊信息寫入數據庫成功後,發送註冊郵件,再發送註冊短信。以上三個任務所有完成後,返回給客戶端。網絡

二、並行方式:將註冊信息寫入數據庫成功後,發送註冊郵件的同時,發送註冊短信。以上三個任務完成後,返回給客戶端。與串行的差異是,並行的方式能夠提升處理的時間。架構

假設三個業務節點每一個使用50毫秒鐘,不考慮網絡等其餘開銷,則串行方式的時間是 150 毫秒,並行的時間多是 100 毫秒。

由於 CPU 在單位時間內處理的請求數是必定的,假設 CPU1 秒內吞吐量是 100 次。則串行方式 1 秒內 CPU 可處理的請求量是 7 次(1000/150)。並行方式處理的請求量是 10 次(1000/100)。

如以上案例描述,傳統的方式系統的性能(併發量,吞吐量,響應時間)很容易達到瓶頸。

三、引入消息隊列,將不是必須的業務邏輯,異步處理。改造後的架構以下:

按照以上約定,用戶的響應時間至關因而註冊信息寫入數據庫的時間,也就是50毫秒。註冊郵件,發送短信寫入消息隊列後,直接返回,所以寫入消息隊列的速度很快,基本能夠忽略,所以用戶的響應時間多是50毫秒。所以架構改變後,系統的吞吐量提升到每秒 20 QPS。比串行提升了 3 倍,比並行提升了 2 倍。

應用解耦

場景說明:用戶下單後,訂單系統須要通知庫存系統。傳統的作法是,訂單系統調用庫存系統的接口。

傳統模式:

傳統模式的缺點:

  • 假如庫存系統沒法訪問,則訂單減庫存將失敗,從而致使訂單失敗。
  • 訂單系統與庫存系統耦合。

引入消息隊列:

  • 訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。

  • 庫存系統:訂閱下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做。

  • 假如:在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。實現訂單系統與庫存系統的應用解耦。

流量削鋒

場景說明:秒殺活動,通常會由於流量過大,致使流量暴增,應用掛掉。爲解決這個問題,通常須要在應用前端加入消息隊列。

  • 能夠控制活動的人數。

  • 能夠緩解短期內高流量壓垮應用。

  • 用戶的請求,服務器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面。

  • 秒殺業務根據消息隊列中的請求信息,再作後續處理。

該部份內容摘自 https://blog.csdn.net/cws1214/article/details/52922267

RabbitMQ介紹

簡介

RabbitMQ 採用 Erlang 語言開發,是 AMQP 的開源實現。Erlang 語言由 Ericson 設計,專門爲開發 concurrent 和 distribution 系統的一種語言,在電信領域使用普遍。OTP(Open Telecom Platform)做爲 Erlang 語言的一部分,包含了不少基於 Erlang 開發的中間件/庫/工具,如 mnesia/SASL,極大方便了 Erlang 應用的開發。OTP 就相似於 Python 語言中衆多的 module,用戶藉助這些 module 能夠很方便的開發應用。

核心概念

Message

消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(相對於其餘消息的優先級)、delivery-mode(標識指定消息是否須要持久性存儲)等。

Publisher

消息的生產者,也是一個向交換機發布消息的客戶端應用程序。

Exchange

交換器,用來接收生產者發送的消息並將這些消息路由到服務器中的隊列,也是消息到達 Broker 的第一站,根據分發規則,匹配查詢表中的路由鍵,分發消息到隊列中去。經常使用的類型有:direct (point-to-point), topic (publish-subscribe) 和 fanout (multicast)。

Queue

消息隊列,用來保存信息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息能夠投入到一個或多個隊列。消息一致在隊列中,等待消費者鏈接到這個隊列將其取走。

Binding

綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。路由器和隊列的綁定能夠是多對多的關係。

Connection

鏈接,Publisher/Consumer 和 Broker 之間的 TCP 鏈接。斷開鏈接的操做只會在 client  端進行,Broker 不會斷開鏈接,除非出現網絡故障或 Broker 服務出現問題。

Channel

信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的 TCP 鏈接內的虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接受消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念以複用一條 TCP 鏈接。

Consumer

消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

Virtual Host

虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 '/'。

Broker

表示消息隊列服務器實體,即接收和分發消息的應用,RabbitMQ Server 就是 Message Broker。

運行機制

AMQP中的消息路由

AMQP 中的消息路由與 JMS 存在一些差異,AMQP 中增長了 Exchange 和 Binding 的角色。生產者須要把消息發佈到 Exchange,最終由 Exchange 轉發到隊列並被消費者接收,而 Binding 就決定了交換器會將消息轉發到哪一個隊列。

Exchange類型

Exchange 分發消息時根據類型的不一樣分發策略有區別,目前共有四種類型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由鍵,headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三中類型:

  • Direct Exchange

    消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致,交換器就將消息發送到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲"dog",則只轉發 routing key 標記爲 "dog" 的消息,不會轉發 "dog.puppy",也不會轉發"dog.guard"等。它是徹底匹配、單播的模式。

  • Fanout Exchange:每一個發到 fanout 類型交換器的消息都會分發到全部綁定的隊列中。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息時最快的。

  • Topic Exchange:topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將由路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:"#"和"*"。"#" 匹配 0 個或多個單詞,"*" 匹配一個單詞。簡單說就是根據 routing key 及通配規則將消息分發到目標隊列中。

安裝

參考【Docker 安裝RabbitMQ】。

使用

準備

一、進入 RabbitMQ 的 web 可視化頁,用 guest 用戶登陸,密碼也爲 guest。

二、新建以下測試隊列:

三、新建以下測試交換器:

四、給新建的 direct 和 fanout 交換器新建以下綁定:

五、給新建的 topic 交換器新建以下綁定:

direct交換機測試

一、給「張三.msg」這個隊列發送消息:

二、「張三.msg」接收消息:

fanout交換機測試

一、給全部綁定的隊列發送消息:

二、全部隊列都接收到消息:

topic交換器測試

一、給全部「姓張」的隊列發送消息:

 

二、全部「姓張」的隊列都接收到消息:

整合RabbitMQ

準備

一、使用 maven 新建 SpringBoot 項目,引入 Rabbit 、Web 場景啓動器。

二、配置 RabbitMQ 鏈接信息:

spring.rabbitmq.host=192.168.202.136
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
application.properties

三、註解配置啓用 RabbitMQ:

package com.springboot.config;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit // 啓用 Rabbit
public class MyAmqpConfig {

}
com.springboot.config.MyAmqpConfig

四、新建測試 JavaBean:

package com.springboot.bean;

import java.io.Serializable;
import java.util.Date;

public class User implements Serializable {

    private Integer id;
    private String name;
    private Date birthday;
    private String city;

    public User() {
    }

    public User(Integer id, String name, Date birthday, String city) {
        this.id = id;
        this.name = name;
        this.birthday = birthday;
        this.city = city;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getBirthday() {
        return birthday;
    }

    public void setBirthday(Date birthday) {
        this.birthday = birthday;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", birthday=" + birthday +
                ", city='" + city + '\'' +
                '}';
    }
}
com.springboot.bean.User

RabbitTemplate使用

下面經過 RabbitTemplate 來完成上述 RabbitMQ 在可視化界面中的幾個測試操做:

package com.springboot;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTests {

    // org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration 自動配置類中註冊了 RabbitTemplate 的 bean
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
        // 經過 direct 交換器給 「張三.msg」 隊列發送消息

        // send 方法的 message 參數中須要本身定義消息頭和消息體
        // rabbitTemplate.send(exchange,routingkey,message);

        rabbitTemplate.convertAndSend("my.direct","zhangsan.msg","你好 張三");
    }

    @Test
    public void test2(){
        // 接收 「張三.msg」 隊列的消息
        Object o = rabbitTemplate.receiveAndConvert("張三.msg");
        System.out.println(o.toString());

        /*
        你好 張三
         */
    }

    @Test
    public void test3(){
        // 經過 fanout 交換器給全部隊列發送消息

        rabbitTemplate.convertAndSend("my.fanout", "zhangsan.msg", "你們好");
    }

    @Test
    public void test4(){
        // 全部隊列接收消息
        Object msg1 = rabbitTemplate.receiveAndConvert("張三.msg");
        System.out.println(msg1.toString());
        Object msg2 = rabbitTemplate.receiveAndConvert("張四.msg");
        System.out.println(msg2.toString());
        Object msg3 = rabbitTemplate.receiveAndConvert("李三.msg");
        System.out.println(msg3.toString());
        Object msg4 = rabbitTemplate.receiveAndConvert("李四.msg");
        System.out.println(msg4.toString());

        /*
        你們好
        你們好
        你們好
        你們好
         */
    }

    @Test
    public void test5(){
        // 經過 topic 交換器給全部「姓張」的隊列發送消息
        rabbitTemplate.convertAndSend("my.topic", "zhang.hello", "張先生 你好");
    }

    @Test
    public void test6(){
        // 全部「姓張」的隊列接收消息
        Object msg1 = rabbitTemplate.receiveAndConvert("張三.msg");
        Object msg2 = rabbitTemplate.receiveAndConvert("張四.msg");

        System.out.println(msg1);
        System.out.println(msg2);

        /*
        張先生 你好
        張先生 你好
         */
    }
}
test

在上述的操做中操做的都是字符串,而經過 RabbitTemplate 是能夠直接操做對象的,RabbitTemplate 內部的 Converter 會自動幫咱們完成對象的序列化與反序列化:

package com.springboot;

import com.springboot.bean.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.ParseException;
import java.text.SimpleDateFormat;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1() throws ParseException {
        // 直接發送一個對象
        User user = new User(1, "張三", new SimpleDateFormat("yyyy-MM-dd").parse("1998-6-5"), "深圳");
        rabbitTemplate.convertAndSend("my.direct","zhangsan.msg",user);
     
    }

    @Test
    public void test2(){
        Object o = rabbitTemplate.receiveAndConvert("張三.msg");
        System.out.println(o.getClass());
        System.out.println(o);
        
        /*
        class com.springboot.bean.User
        User{id=1, name='張三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'}
         */
        
        // 根據輸出結果能夠看到,獲取的消息自動完成了反序列化轉換爲 java 對象
    }
}
test

查看 RabbitMQ 服務器中存儲的對象,會發現存儲的值爲 RabbitMQ 以默認消息轉換器 org.springframework.amqp.support.converter.SimpleMessageConverter 序列化後的值,若是咱們須要存儲的消息爲 Json 格式,只須要本身註冊一個 Json 格式消息轉換器到容器便可,而 Spring 已經給咱們提供了這個轉換器:

package com.springboot.config;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class MyAmqpConfig {
    @Bean
    public MessageConverter messageConverter(){
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        return jackson2JsonMessageConverter;
    }
}
com.springboot.config.MyAmqpConfig

此時再次執行上述操做,查看服務器中存儲消息:

消息以轉換爲 Json 格式。

監聽隊列-@RabbitListener

Spring 也爲咱們提供了監聽隊列支持的註解 @RabbitListener,它可以幫咱們很簡便的建立一個監聽服務,只須要標註在一個存放在 IoC 容器中實例的方法上。看以下示例:

一、建立一個服務類,註冊到 IoC 容器,使用 @RabbitListener 註解標註在方法上:

package com.springboot.service;

import com.springboot.bean.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class UserService {

    @RabbitListener(queues = {"張三.msg"})  // 監聽指定隊列消息
    public void receiveUserMsg(User user) {
        // 接收自動反序列化後的對象
        System.out.println(user);
    }

    @RabbitListener(queues = {"李四.msg"})
    public void receiveMessage(Message message){
        // 接收源消息信息
        
        // 得到消息體
        System.out.println(message.getBody());
        // 得到消息屬性信息
        System.out.println(message.getMessageProperties());
    }
}
com.springboot.service.UserService

二、啓動程序,運行單元測試中發送 User 對象方法,監聽程序輸出以下:

User{id=1, name='張三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'}

AmqpAdmin組件

Spring 自動註冊了一個 AmqpAdmin 組件,它的做用相似於數據庫中的 DDL 語句,能夠用來幫咱們定義(建立)交換器、隊列。以下:

package com.springboot;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpAdminTests {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void testDeclareExchange(){
        // 建立一個交換器
        Exchange exchange = new DirectExchange("my.directNew");
        amqpAdmin.declareExchange(exchange);

                  


    }

    @Test
    public void testDeclareQueue(){
        // 建立 Queue
        Queue queue = new Queue("myQueue");
        amqpAdmin.declareQueue(queue);

                  


    }

    @Test
    public void testBinding(){
        // 建立一個 binding ,綁定交換器與隊列
        amqpAdmin.declareBinding(new Binding("myQueue", Binding.DestinationType.QUEUE,"my.directNew","myQueue",null));

                  


    }
}
test
相關文章
相關標籤/搜索