RabbitMQ(二)--RabbitMQ整合SpringBoot

新建項目

新建項目
選擇依賴

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.tedu</groupId>
    <artifactId>rabbitmq-springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-springboot</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml

spring:
  rabbitmq:
    host: 192.168.64.140
    username: admin
    password: admin

主程序

刪除自動建立的主程序java

咱們爲每種模式建立一個包,在每一個包中建立各自的主程序,單獨測試.spring

簡單模式

主程序

Spring提供的Queue類,是隊列的封裝對象,它封裝了隊列的參數信息.apache

RabbitMQ的自動配置類,會發現這些Queue實例,並在RabbitMQ服務器中定義這些隊列.api

package cn.tedu.rabbitmqspringboot.m1;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    @Autowired
    private Producer producer;

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    @Bean
    public Queue helloworldQueue(){
        /**
         * 可用如下形式:
         * new Queue("helloworld") - 默認屬性:持久(true),非排他(false),非自動刪除(false)
         * new Queue("helloworld",false,false,false,null)
         */
        return new Queue("helloworld",false);//返回一個非持久隊列
    }

    /**
     *@PostConstruct 方法會被自動執行,spring掃描建立了全部對象,並完成全部注入操做後會執行
     */
    @PostConstruct
    public void test(){
        producer.send();
        System.out.println("消息已經發送");
    }
}

生產者

AmqpTemplate是rabbitmq客戶端API的一個封裝工具,提供了簡便的方法來執行消息操做.springboot

AmqpTemplate由自動配置類自動建立服務器

package cn.tedu.rabbitmqspringboot.m1;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    // 在RabbtiAutoConfiguration 自動配置類中建立的工具對象
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){
        amqpTemplate.convertAndSend("helloworld", "Hello world!");
    }
}

消費者

經過@RabbitListener從指定的隊列接收消息
使用@RebbitHandler註解的方法來處理消息app

package cn.tedu.rabbitmqspringboot.m1;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
//@RabbitListener(queues = "helloworld")//這樣使用須要用@RabbitHandler配合使用
public class Consumer {
    //@RabbitHandler//這樣只能接收一個隊列的消息,簡單模式
 @RabbitListener(queues = "helloworld")//這種用法能夠寫多個方法接收多個隊列,工廠模式
 public void receive(String msg){
        System.out.println("收到: " + msg);
 }
}

工做模式

主程序

在主程序中建立名爲task_queue持久隊列dom

package cn.tedu.rabbitmqspringboot.m2;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

/**
 * 合理分發
 *      1.手動ack - springboot整合後默認就是手動ack模式
 *                  消費者方法執行成功後,springboot會幫助發送回執
 *      2.qos=1 - yml中配置prefetch
 * 持久化
 *      1.隊列持久化
 *      2.消息持久化 - 默認是持久消息
 */
@SpringBootApplication
public class Main {
    @Autowired
    private Producer producer;

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    @Bean
    public Queue taskQueue(){
        /**
         * 可用如下形式:
         * new Queue("helloworld") - 默認屬性:持久(true),非排他(false),非自動刪除(false)
         * new Queue("helloworld",false,false,false,null)
         */
        return new Queue("task_queue",true);//返回一個持久隊列
    }

    /**
     *@PostConstruct 方法會被自動執行,spring掃描建立了全部對象,並完成全部注入操做後會執行
     */
    @PostConstruct
    public void test(){
        producer.send();
    }
}

生產者

package cn.tedu.rabbitmqspringboot.m2;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component
public class Producer {
    // 在RabbtiAutoConfiguration 自動配置類中建立的工具對象
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){
//        new Thread(new Runnable() {
//            @Override
//            public void run() {
//
//            }
//        }).start();

        // lambda.匿名內部類的簡寫
        new Thread(() ->{
            while (true){//用單獨的線程執行,不要影響主線程
                System.out.println("輸入消息: ");
                String msg = new Scanner(System.in).nextLine();
                amqpTemplate.convertAndSend("task_queue",msg);
            }
        }).start();


    }
}

spring boot封裝的 rabbitmq api 中, 發送的消息默認是持久化消息.
若是但願發送非持久化消息, 須要在發送消息時作如下設置:異步

  • 使用 MessagePostProcessor 前置處理器參數
  • 從消息中獲取消息的屬性對象
  • 在屬性中把 DeliveryMode 設置爲非持久化
//若是須要設置消息爲非持久化,能夠取得消息的屬性對象,修改它的deliveryMode屬性
    t.convertAndSend("task_queue", (Object) s, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            MessageProperties props = message.getMessageProperties();
            props.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        }
    });

消費者

package cn.tedu.rabbitmqspringboot.m2;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @RabbitListener(queues = "task_queue")
    public void receive1(String msg){
        System.out.println("消費者1-收到: " + msg);
    }

    @RabbitListener(queues = "task_queue")
    public void receive2(String msg){
        System.out.println("消費者2-收到: " + msg);
    }
}

ack模式

在 spring boot 中提供了三種確認模式:maven

  • NONE - 使用rabbitmq的自動確認
  • AUTO - 使用rabbitmq的手動確認, springboot會自動發送確認回執 (默認)
  • MANUAL - 使用rabbitmq的手動確認, 且必須手動執行確認操做

默認的 AUTO 模式中, 處理消息的方法拋出異常, 則表示消息沒有被正確處理, 該消息會被從新發送.

設置 ack 模式

spring:
  rabbitmq:
    listener:
      simple:
        # acknowledgeMode: NONE # rabbitmq的自動確認
        acknowledgeMode: AUTO # rabbitmq的手動確認, springboot會自動發送確認回執 (默認)
        # acknowledgeMode: MANUAL # rabbitmq的手動確認, springboot不發送回執, 必須本身編碼

手動執行確認操做

若是設置爲 MANUAL 模式,必須手動執行確認操做

@RabbitListener(queues="task_queue")
    public void receive1(String s, Channel c, @Header(name=AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("receiver1 - 收到: "+s);
        // 手動發送確認回執
        c.basicAck(tag, false);
    }

抓取數量

工做模式中, 爲了合理地分發數據, 須要將 qos 設置成 1, 每次只接收一條消息, 處理完成後才接收下一條消息.

spring boot 中是經過 prefetch 屬性進行設置, 改屬性的默認值是 250.

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # qos=1, 默認250

發佈和訂閱模式

主程序

建立 FanoutExcnahge 實例, 封裝 fanout 類型交換機定義信息.

spring boot 的自動配置類會自動發現交換機實例, 並在 RabbitMQ 服務器中定義該交換機.

package cn.tedu.rabbitmqspringboot.m3;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;


@SpringBootApplication
public class Main {
    @Autowired
    private Producer producer;

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    @Bean
    public FanoutExchange logsExchange(){
        return new FanoutExchange("logs",false,false);//非持久,不自動刪除
    }

    @PostConstruct
    public void test(){
        producer.send();
    }

}

生產者

生產者向指定的交換機 logs 發送數據.

不須要指定隊列名或路由鍵, 即便指定也無效, 由於 fanout 交換機會向全部綁定的隊列發送數據, 而不是有選擇的發送.

package cn.tedu.rabbitmqspringboot.m3;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){

        new Thread(() ->{
            while (true){
                System.out.println("輸入消息: ");
                String msg = new Scanner(System.in).nextLine();
                amqpTemplate.convertAndSend("logs","",msg);//向交換機發送消息
            }
        }).start();
    }
}

消費者

消費者須要執行如下操做:

  1. 定義隨機隊列(隨機命名,非持久,排他,自動刪除)
  2. 定義交換機(能夠省略, 已在主程序中定義)
  3. 將隊列綁定到交換機

spring boot 經過註解完成以上操做:

@RabbitListener(bindings = @QueueBinding( //這裏進行綁定設置
    value = @Queue, //這裏定義隨機隊列,默認屬性: 隨機命名,非持久,排他,自動刪除
    exchange = @Exchange(name = "logs", declare = "false") //指定 logs 交換機,由於主程序中已經定義,這裏不進行定義
))
package cn.tedu.rabbitmqspringboot.m3;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    //1.建立隨機隊列 2,指定交換機logs 3.綁定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列
            exchange = @Exchange(name = "logs",declare = "false")//交換機,declare表示不定義交換機,只是使用
    ))
    public void receive1(String msg){
        System.out.println("消費者1-收到: " + msg);
    }

    //1.建立隨機隊列 2,指定交換機logs 3.綁定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列
            exchange = @Exchange(name = "logs",declare = "false")//交換機,declare(false)表示不定義交換機,只是使用
    ))
    public void receive2(String msg){
        System.out.println("消費者2-收到: " + msg);
    }
}

路由模式

與發佈和訂閱模式代碼相似, 只是作如下三點調整:

  1. 使用 direct 交換機
  2. 隊列和交換機綁定時, 設置綁定鍵
  3. 發送消息時, 指定路由鍵

主程序

主程序中使用 DirectExcnahge 對象封裝交換機信息, spring boot 自動配置類會自動發現這個對象, 並在 RabbitMQ 服務器上定義這個交換機.

package cn.tedu.rabbitmqspringboot.m4;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;


@SpringBootApplication
public class Main {
    @Autowired
    private Producer producer;

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    @Bean
    public DirectExchange directExchange(){

        return new DirectExchange("direct_logs",false,false);//非持久,不自動刪除
    }

    @PostConstruct
    public void test(){
        producer.send();
    }
}

生產者

生產者向指定的交換機發送消息, 並指定路由鍵.

package cn.tedu.rabbitmqspringboot.m4;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){

        new Thread(() ->{
            while (true){
                System.out.println("輸入消息: ");
                String msg = new Scanner(System.in).nextLine();
                System.out.println("輸入路由鍵: ");
                String key = new Scanner(System.in).nextLine();
                amqpTemplate.convertAndSend("direct_logs",key,msg);
            }
        }).start();
    }
}

消費者

消費者經過註解來定義隨機隊列, 綁定到交換機, 並指定綁定鍵:

@RabbitListener(bindings = @QueueBinding( // 這裏作綁定設置
    value = @Queue, // 定義隊列, 隨機命名,非持久,排他,自動刪除
    exchange = @Exchange(name = "direct_logs", declare = "false"), // 指定綁定的交換機,主程序中已經定義過隊列,這裏不進行定義
    key = {"error","info","warning"} // 設置綁定鍵
))
package cn.tedu.rabbitmqspringboot.m4;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    //1.建立隨機隊列 2,指定交換機logs 3.綁定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列
            exchange = @Exchange(name = "direct_logs",declare = "false"),//交換機,declare表示不定義交換機,只是使用
            key = {"error"} //設置綁定鍵
    ))
    public void receive1(String msg){
        System.out.println("消費者1-收到: " + msg);
    }

    //1.建立隨機隊列 2,指定交換機logs 3.綁定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列
            exchange = @Exchange(name = "direct_logs",declare = "false"),//交換機,declare(false)表示不定義交換機,只是使用
            key = {"error","info","warning"} //設置綁定鍵
    ))
    public void receive2(String msg){
        System.out.println("消費者2-收到: " + msg);
    }
}

主題模式

主題模式不過是具備特殊規則的路由模式, 代碼與路由模式基本相同, 只作以下調整:

  1. 使用 topic 交換機
  2. 使用特殊的綁定鍵和路由鍵規則

主程序

package cn.tedu.rabbitmqspringboot.m5;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;


@SpringBootApplication
public class Main {
    @Autowired
    private Producer producer;

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    @Bean
    public TopicExchange directExchange(){

        return new TopicExchange("topic_logs",false,false);//非持久,不自動刪除
    }

    @PostConstruct
    public void test(){
        producer.send();
    }
}

生產者

package cn.tedu.rabbitmqspringboot.m5;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){

        new Thread(() ->{
            while (true){
                System.out.println("輸入消息: ");
                String msg = new Scanner(System.in).nextLine();
                System.out.println("輸入路由鍵: ");
                String key = new Scanner(System.in).nextLine();
                amqpTemplate.convertAndSend("topic_logs",key,msg);
            }
        }).start();
    }
}

消費者

package cn.tedu.rabbitmqspringboot.m5;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    //1.建立隨機隊列 2,指定交換機logs 3.綁定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列
            exchange = @Exchange(name = "topic_logs",declare = "false"),//交換機,declare表示不定義交換機,只是使用
            key = {"*.orange.*"} //設置綁定鍵
    ))
    public void receive1(String msg){
        System.out.println("消費者1-收到: " + msg);
    }

    //1.建立隨機隊列 2,指定交換機logs 3.綁定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(),//隊列,隨即名,非持久,獨佔,自動刪除隊列
            exchange = @Exchange(name = "topic_logs",declare = "false"),//交換機,declare(false)表示不定義交換機,只是使用
            key = {"*.*.rabbit","lazy.#"} //設置綁定鍵
    ))
    public void receive2(String msg){
        System.out.println("消費者2-收到: " + msg);
    }
}

RPC異步調用

主程序

主程序中定義兩個隊列

  • 發送調用信息的隊列: rpc_queue
  • 返回結果的隊列: 隨機命名
package cn.tedu.m6;

import java.util.UUID;

import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class Main {

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
    @Bean
    public Queue sendQueue() {
        return new Queue("rpc_queue",false);
    }
    @Bean
    public Queue rndQueue() {
        return new Queue(UUID.randomUUID().toString(), false);
    }
}

服務端

rpc_queue接收調用數據, 執行運算求斐波那契數,並返回計算結果.
@Rabbitlistener註解對於具備返回值的方法:

  • 會自動獲取 replyTo 屬性
  • 自動獲取 correlationId 屬性
  • replyTo 屬性指定的隊列發送計算結果, 並攜帶 correlationId 屬性
package cn.tedu.m6;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RpcServer {
    @RabbitListener(queues = "rpc_queue")
    public long getFbnq(int n) {
        return f(n);
    }

    private long f(int n) {
        if (n==1 || n==2) {
            return 1;
        }
        return f(n-1) + f(n-2);
    }
}

客戶端

使用 SPEL 表達式獲取隨機隊列名: "#{rndQueue.name}"

發送調用數據時, 攜帶隨機隊列名和correlationId

從隨機隊列接收調用結果, 並獲取correlationId

package cn.tedu.m6;

import java.util.UUID;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class RpcClient {
    @Autowired
    AmqpTemplate t;
    
    @Value("#{rndQueue.name}")
    String rndQueue;
    
    public void send(int n) {
        // 發送調用信息時, 經過前置消息處理器, 對消息屬性進行設置, 添加返回隊列名和關聯id
        t.convertAndSend("rpc_queue", (Object)n, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties p = message.getMessageProperties();
                p.setReplyTo(rndQueue);
                p.setCorrelationId(UUID.randomUUID().toString());
                return message;
            }
        });
    }
    
    //從隨機隊列接收計算結果
    @RabbitListener(queues = "#{rndQueue.name}")
    public void receive(long r, @Header(name=AmqpHeaders.CORRELATION_ID) String correlationId) {
        System.out.println("nn"+correlationId+" - 收到: "+r);
    }
    
}
相關文章
相關標籤/搜索