springboot rabbitmq 死信隊列應用場景和完整demo

何爲死信隊列?

死信隊列實際上就是,當咱們的業務隊列處理失敗(好比拋異常而且達到了retry的上限),就會將消息從新投遞到另外一個Exchange(Dead Letter Exchanges),該Exchange再根據routingKey重定向另外一個隊列,在這個隊列從新處理該消息。html

 

來自一個隊列的消息能夠被當作‘死信’,即被從新發布到另一個「exchange」去,這樣的狀況有:
  • 消息被拒絕 (basic.reject or basic.nack) 且帶 requeue=false不從新入隊參數或達到的retry從新入隊的上限次數
  • 消息的TTL(Time To Live)-存活時間已通過期
  • 隊列長度限制被超越(隊列滿,queue的"x-max-length"參數
 
Dead letter exchanges (DLXs) are normal exchanges.
 
For any given queue, a DLX can be defined by clients using the queue's arguments, or in the server using policies.
 
通過上面的認知,可知應用場景:重要的業務隊列若是失敗,就須要從新將消息用另外一種業務邏輯處理;若是是正常的業務邏輯故意讓消息中不合法的值失敗,就不須要死信;具體場景具體分析

配置文件

設置重試次數、間隔和投遞到死信隊列java

spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=nut
spring.rabbitmq.password=nut

# 容許消息消費失敗的重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 消息最多消費次數3次
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 消息屢次消費的間隔1秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
#  設置爲false,會丟棄消息或者從新發布到死信隊列
spring.rabbitmq.listener.simple.default-requeue-rejected=false


server.port=5678

 

初始化和綁定重定向隊列配置類

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信隊列的配置
 */
@Configuration
public class RabbitDeadLetterConfig {

    public static final String DEAD_LETTER_EXCHANGE = "TDL_EXCHANGE";
    public static final String DEAD_LETTER_TEST_ROUTING_KEY = "TDL_KEY";
    public static final String DEAD_LETTER_REDIRECT_ROUTING_KEY = "TKEY_R";
    public static final String DEAD_LETTER_QUEUE = "TDL_QUEUE";
    public static final String REDIRECT_QUEUE = "TREDIRECT_QUEUE";

    /**
     * 死信隊列跟交換機類型沒有關係 不必定爲directExchange  不影響該類型交換機的特性.
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();
    }

    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    聲明  死信隊列Exchange
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key    聲明 死信隊列拋出異常重定向隊列的routingKey(TKEY_R)
        args.put("x-dead-letter-routing-key", DEAD_LETTER_REDIRECT_ROUTING_KEY);
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).withArguments(args).build();
    }

    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable(REDIRECT_QUEUE).build();
    }

    /**
     * 死信隊列綁定到死信交換器上.
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding(DEAD_LETTER_QUEUE, Binding.DestinationType.QUEUE, DEAD_LETTER_EXCHANGE, DEAD_LETTER_TEST_ROUTING_KEY, null);

    }

    /**
     * 將重定向隊列經過routingKey(TKEY_R)綁定到死信隊列的Exchange上
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding(REDIRECT_QUEUE, Binding.DestinationType.QUEUE, DEAD_LETTER_EXCHANGE, DEAD_LETTER_REDIRECT_ROUTING_KEY, null);
    }
}

 

生產者向業務隊列發送消息

這裏爲了方便測試沒有往業務隊列發送消息,直接往死信Exchange裏投遞消息。spring

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.RabbitDeadLetterConfig;

@Slf4j
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(int number) {
        log.warn("DeadLetterSender : {}", number);
        // 這裏的Exchange能夠是業務的Exchange,爲了方便測試這裏直接往死信Exchange裏投遞消息
        rabbitTemplate.convertAndSend(
                RabbitDeadLetterConfig.DEAD_LETTER_EXCHANGE,
                RabbitDeadLetterConfig.DEAD_LETTER_TEST_ROUTING_KEY,
                number);
    }
}

 

死信隊列消費者

這裏會拋異常app

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.RabbitDeadLetterConfig;

@Slf4j
@Component
@RabbitListener(queues = RabbitDeadLetterConfig.DEAD_LETTER_QUEUE)
public class DeadLetterConsumer {

    /*@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitDeadLetterConfig.DEAD_LETTER_QUEUE, durable = "true"),
            exchange = @Exchange(value = RabbitDeadLetterConfig.DEAD_LETTER_EXCHANGE, type = ExchangeTypes.DIRECT),
            key = RabbitDeadLetterConfig.DEAD_LETTER_TEST_ROUTING_KEY)
    )*/
    @RabbitHandler
    public void testDeadLetterQueueAndThrowsException(@Payload Integer number){
        log.warn("DeadLetterConsumer :{}/0 ", number);
        int i = number / 0;
    }
}

 

重定向隊列

隊列"死信"後,會將消息投遞到Dead Letter Exchanges,而後該Exchange會將消息投遞到重定向隊列spring-boot

此時,在重定向隊列中,作對應的業務操做。測試

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.RabbitDeadLetterConfig;

@RabbitListener(queues = RabbitDeadLetterConfig.REDIRECT_QUEUE)
@Component
@Slf4j
public class RedirectQueueConsumer {

    /**
     * 重定向隊列和死信隊列形參一致Integer number
     * @param number
     */
    @RabbitHandler
    public void fromDeadLetter(Integer number){
        log.warn("RedirectQueueConsumer : {}", number);
        // 對應的操做
        int i = number / 1;
    }
}

 

測試

先啓動項目ui

 

而後利用測試類發送一條信息spa

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import xxx.DeadLetterSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {


    @Autowired
    private DeadLetterSender deadLetterSender;


    @Test
    public void testSendDeadLetterQueue(){
        deadLetterSender.send(15);
    }

}

 

再看RabbitmqApplication控制檯日誌.net

 

重試3次後,消息再也不入隊,投遞到DL Exchange,路由到重定向隊列。3d

 

SpringBoot RabbitMQ 延遲隊列代碼實現 

 

參考:

http://www.cnblogs.com/wei-feng/p/6599419.html

https://my.oschina.net/10000000000/blog/1626278

相關文章
相關標籤/搜索