死信隊列實際上就是,當咱們的業務隊列處理失敗(好比拋異常而且達到了retry的上限),就會將消息從新投遞到另外一個Exchange(Dead Letter Exchanges),該Exchange再根據routingKey重定向到另外一個隊列,在這個隊列從新處理該消息。html
設置重試次數、間隔和投遞到死信隊列java
spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=nut spring.rabbitmq.password=nut # 容許消息消費失敗的重試 spring.rabbitmq.listener.simple.retry.enabled=true # 消息最多消費次數3次 spring.rabbitmq.listener.simple.retry.max-attempts=3 # 消息屢次消費的間隔1秒 spring.rabbitmq.listener.simple.retry.initial-interval=1000 # 設置爲false,會丟棄消息或者從新發布到死信隊列 spring.rabbitmq.listener.simple.default-requeue-rejected=false server.port=5678
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 死信隊列的配置 */ @Configuration public class RabbitDeadLetterConfig { public static final String DEAD_LETTER_EXCHANGE = "TDL_EXCHANGE"; public static final String DEAD_LETTER_TEST_ROUTING_KEY = "TDL_KEY"; public static final String DEAD_LETTER_REDIRECT_ROUTING_KEY = "TKEY_R"; public static final String DEAD_LETTER_QUEUE = "TDL_QUEUE"; public static final String REDIRECT_QUEUE = "TREDIRECT_QUEUE"; /** * 死信隊列跟交換機類型沒有關係 不必定爲directExchange 不影響該類型交換機的特性. */ @Bean("deadLetterExchange") public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build(); } @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 聲明 死信隊列Exchange args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 聲明 死信隊列拋出異常重定向隊列的routingKey(TKEY_R) args.put("x-dead-letter-routing-key", DEAD_LETTER_REDIRECT_ROUTING_KEY); return QueueBuilder.durable(DEAD_LETTER_QUEUE).withArguments(args).build(); } @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable(REDIRECT_QUEUE).build(); } /** * 死信隊列綁定到死信交換器上. * * @return the binding */ @Bean public Binding deadLetterBinding() { return new Binding(DEAD_LETTER_QUEUE, Binding.DestinationType.QUEUE, DEAD_LETTER_EXCHANGE, DEAD_LETTER_TEST_ROUTING_KEY, null); } /** * 將重定向隊列經過routingKey(TKEY_R)綁定到死信隊列的Exchange上 * * @return the binding */ @Bean public Binding redirectBinding() { return new Binding(REDIRECT_QUEUE, Binding.DestinationType.QUEUE, DEAD_LETTER_EXCHANGE, DEAD_LETTER_REDIRECT_ROUTING_KEY, null); } }
這裏爲了方便測試沒有往業務隊列發送消息,直接往死信Exchange裏投遞消息。spring
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.RabbitDeadLetterConfig; @Slf4j @Component public class DeadLetterSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(int number) { log.warn("DeadLetterSender : {}", number); // 這裏的Exchange能夠是業務的Exchange,爲了方便測試這裏直接往死信Exchange裏投遞消息 rabbitTemplate.convertAndSend( RabbitDeadLetterConfig.DEAD_LETTER_EXCHANGE, RabbitDeadLetterConfig.DEAD_LETTER_TEST_ROUTING_KEY, number); } }
這裏會拋異常app
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.RabbitDeadLetterConfig; @Slf4j @Component @RabbitListener(queues = RabbitDeadLetterConfig.DEAD_LETTER_QUEUE) public class DeadLetterConsumer { /*@RabbitListener(bindings = @QueueBinding( value = @Queue(value = RabbitDeadLetterConfig.DEAD_LETTER_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitDeadLetterConfig.DEAD_LETTER_EXCHANGE, type = ExchangeTypes.DIRECT), key = RabbitDeadLetterConfig.DEAD_LETTER_TEST_ROUTING_KEY) )*/ @RabbitHandler public void testDeadLetterQueueAndThrowsException(@Payload Integer number){ log.warn("DeadLetterConsumer :{}/0 ", number); int i = number / 0; } }
隊列"死信"後,會將消息投遞到Dead Letter Exchanges,而後該Exchange會將消息投遞到重定向隊列。spring-boot
此時,在重定向隊列中,作對應的業務操做。測試
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.RabbitDeadLetterConfig; @RabbitListener(queues = RabbitDeadLetterConfig.REDIRECT_QUEUE) @Component @Slf4j public class RedirectQueueConsumer { /** * 重定向隊列和死信隊列形參一致Integer number * @param number */ @RabbitHandler public void fromDeadLetter(Integer number){ log.warn("RedirectQueueConsumer : {}", number); // 對應的操做 int i = number / 1; } }
先啓動項目ui
而後利用測試類發送一條信息spa
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import xxx.DeadLetterSender; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private DeadLetterSender deadLetterSender; @Test public void testSendDeadLetterQueue(){ deadLetterSender.send(15); } }
再看RabbitmqApplication控制檯日誌.net
重試3次後,消息再也不入隊,投遞到DL Exchange,路由到重定向隊列。3d
參考: