rabbitMQ結合springboot使用

rabbitMQ結合springboot使用

導入依賴

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/>
    </parent>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

簡單使用

消息生產

建立發送配置java

/**
 * @Author: bdsbdg
 * @Date: 2021/1/30 15:15
 */
@Configuration
public class ApplicationConfig {

    // 建立隊列
    @Bean
    public Queue confirmQueue(){
        return new Queue("confirm-queue", true);
    }

    // 建立交換機
    @Bean
    public Exchange confirmExchange(){
        return new DirectExchange("confirm-exchange", true, false);
    }

    // 隊列綁定到交換機
    @Bean
    public Binding queueBindToExchangeByConfirm(Queue confirmQueue, Exchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm-routing-key").noargs();
    }
}

發送消息spring

/**
 * @Author: bdsbdg
 * @Date: 2021/1/30 15:40
 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class demo {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirm() {
        // 將消息發送到指定交換機的指定路由
        rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key", "測試消息發送。。。");
    }
}

消息接收

建立監聽器監聽指定隊列的消息緩存

@Component
public class ConsumerListener {
    @RabbitListener(queues = "confirm-queue")
    public void myListener1(String data) throws Exception {
        System.out.println("消費者接收到的消息data爲:" + data);
    }
}

消息生產的可靠性投遞

在使用RabbitMQ的時候,做爲消息的發送方但願杜絕任何消息丟失或者投遞失敗的場景。若是消息投遞失敗,RabbitMQ爲咱們提供了兩種模式用來控制消息的可靠投遞。springboot

  • confirm模式:ide

    • 首先須要開啓confirm模式
    • 消息從producer到達exchange後,會執行一個confirmCallback回調函數
    • 該回調函數的方法中有個ack參數
      • ack = true,則發送成功
      • ack = false,則發送失敗
  • return模式:函數

    • 首先須要開啓return模式
    • 消息從exchange路由到queue後
      • 若是投遞成功,不會執行一個returnCallback回調函數
      • 若是投遞失敗,則會執行一個returnCallback回調函數

消息生產者可靠性投遞實現

修改配置文件spring-boot

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /day2
	publisher-confirms: true	# 開啓confirm模式
    publisher-returns: true		# 開啓return模式

RabbitTemplate設置回調post

/**
 * @Author: bdsbdg
 * @Date: 2021/1/30 15:40
 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class demo {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirm() {
        // confirm
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    System.out.println("發送消息成功");
                }else {
                    System.out.println("發送消息失敗!!!"+cause);
                }
            }
        });
        
        // return
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//                System.out.println("消息路由失敗會執行該方法。。。");
//                System.out.println("發送的消息體:" + new String(message.getBody()));
                System.out.println("響應碼:" + replyCode);
//                System.out.println("響應信息:" + replyText);
            }
        });
        
        
        // 將消息發送到指定交換機的指定路由	
        rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key", "測試消息發送。。。");

        rabbitTemplate.convertAndSend("confirm-exchange-1", "confirm-routing-key", "測試消息發送confirm模式失敗。。。");

        rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key-1", "測試消息發送return模式失敗。。。");
    }
}

消費端ack機制

若是在處理消息的過程當中,消費者的服務在處理消息的時候出現異常,那麼可能這條正在處理的消息就沒有完成消息消費,數據就會丟失。爲了確保數據不會丟失,RabbitMQ支持確認機制ACK (Acknowledge)。測試

消費端接收到消息後有三種ack方式:fetch

  • 不確認:ack = "none"
  • 手動確認:ack = "manual"
  • 自動確認:ack = "auto"

自動確認是指,消息一旦被consumer接收到則自動確認收到,並將相應的message從RabbitMQ的消息緩存中移除。可是在實際的業務處理中,極可能是消息被接收到了,可是業務處理出現了異常,那麼消息從緩存中移除即該消息就被丟棄了。若是設置了手動確認,則須要在業務處理成功後,調用channel.basicAck()方法手動簽收,若是出現了異常,則調用channel.basicNack()方法,讓其自動重發消息。

ack功能實現

修改消費端配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 消息確認方式
    listener:
      simple:
        acknowledge-mode: manual  # 手動確認

修改消費端消息處理代碼

@Component
public class ConsumerListener {
    @RabbitListener(queues = "confirm-queue")
    public void myListener1(String data, Message message, Channel channel) throws Exception {
//        System.out.println("消費者接收到的消息data爲:" + data);
//        Thread.sleep(5000);
        byte[] body = message.getBody();
        System.out.println("消費者接收到的消息body爲:" + new String(body));

        // 消息id
        long id = message.getMessageProperties().getDeliveryTag();
        System.out.println("id:"+id);

        try {
            if (id%2==0){
                int a = 1/0;
            }
            System.out.println("處理業務");
            // 業務處理成功:手動簽收
            channel.basicAck(id,true);
            System.out.println("處理業務成功!!!");
        }catch (Exception e){
            System.out.println("處理業務失敗!!!");
            // 業務處理失敗:拒收,而且讓消息重回隊列
            channel.basicNack(id,true,true);
        }
    }
}

消費端限流

未設置前

設置後

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 確認方式
    listener:
      simple:
        acknowledge-mode: manual
        # 每次最多處理消息的個數
        prefetch: 5

TTL與DLX

TTL

  • TTL:Time To Live(存活時間/過時時間)
  • 當消息到達存活時間後,該消息尚未被消費,會自動被清除
  • RabbitMQ能夠對消息設置過時時間也能夠對整個隊列設置過時時間
    • 若是都設置了,哪一個時間先到則生效
      在隊列上設置過時時間
// 修改隊列建立代碼便可
// 建立隊列
    @Bean
    public Queue confirmTtlQueue(){
        // 建立隊列 設置裏面內容十秒過時
        return QueueBuilder.durable("confirm-ttl-queue").withArgument("x-message-ttl",10000) .build();
    }
    // 當該隊列中的消息進入十秒後將自動銷燬

在消息上設置過時時間

@Test
public void testTTL(){
    // 能夠設置消息的屬性消息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 設置消息的過時時間 5s
            message.getMessageProperties().setExpiration("5000");
            return message;
        }
    };
    rabbitTemplate.convertAndSend("confirm-ttl-exchange","confirm-routing-key", "ttl消息", messagePostProcessor);
}

DLX

一、生成者將消息發送到交換機後,由交換機路由到指定的隊列

二、當該消息成爲了死信後而且將該消息發送給DLX。

成爲死信的三種狀況

  • 隊列消息長度達到限制
  • 消費者拒籤消息
  • 原隊列中存在消息過時設置,消息到達超時時間未被消費

三、DLX再將這個消息路由給專門處理死信的隊列,而且由對應的消費者消費

建立死信隊列

// 建立死信交換機
    @Bean
    public Exchange dlxExhange(){
        return new DirectExchange("dlx-exchange");
    }

    // 建立死信隊列
    @Bean
    public Queue dlxQueue(){
        return new Queue("dlx-queue");
    }

    // 將死信隊列綁定到死信交換機上
    @Bean
    public Binding dlxBinding(Queue dlxQueue, Exchange dlxExhange){
        return BindingBuilder.bind(dlxQueue).to(dlxExhange).with("dlx-routing-key").noargs();
    }

建立普通消息隊列時綁定死信隊列

// 建立普通隊列
    @Bean
    public Queue delayQueue(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 20000);           // 隊列過時時間
        args.put("x-max-length", 10000000);         // 隊列中消息數量
        args.put("x-dead-letter-exchange", "dlx-exchange");       // 綁定死信交換機
        args.put("x-dead-letter-routing-key", "dlx-routing-key");    // 綁定死信路由器
        return QueueBuilder.durable("delay-queue").withArguments(args).build();
    }

發送消息

消息過時

實現延時隊列

  • 建立指定過時時間且無消費端處理消息的普通隊列
  • 監聽該普通隊列過時消息存放的死信隊列
相關文章
相關標籤/搜索