【RabbitMQ】一文帶你搞定RabbitMQ死信隊列

本文口味:爆炒魷魚   預計閱讀:15分鐘java

1、說明

RabbitMQ是流行的開源消息隊列系統,使用erlang語言開發,因爲其社區活躍度高,維護更新較快,性能穩定,深得不少企業的歡心(固然,也包括我如今所在公司【手動滑稽】)。git

爲了保證訂單業務的消息數據不丟失,須要使用到RabbitMQ的死信隊列機制,當消息消費發生異常時,將消息投入死信隊列中。但因爲對死信隊列的概念及配置不熟悉,致使曾一度陷入百度的汪洋大海,沒法自拔,不少文章都看起來可行,可是實際上卻並不能幫我解決實際問題。最終,在官網文檔中找到了我想要的答案,經過官網文檔的學習,才發現對於死信隊列存在一些誤解,致使配置死信隊列之路困難重重。github

因而本着記錄和分享的精神,將死信隊列的概念和配置完整的寫下來,以便幫助遇到一樣問題的朋友。web

2、本文大綱

如下是本文大綱:spring

AG4T332}NEUNPUAU(A)U6.png

本文閱讀前,須要對RabbitMQ有一個簡單的瞭解,偏向實戰配置講解。數組

3、死信隊列是什麼

死信,在官網中對應的單詞爲「Dead Letter」,能夠看出翻譯確實很是的簡單粗暴。那麼死信是個什麼東西呢?網絡

「死信」是RabbitMQ中的一種消息機制,當你在消費消息時,若是隊列裏的消息出現如下狀況:app

  1. 消息被否認確認,使用 channel.basicNackchannel.basicReject ,而且此時requeue 屬性被設置爲false
  2. 消息在隊列的存活時間超過設置的TTL時間。
  3. 消息隊列的消息數量已經超過最大隊列長度。

那麼該消息將成爲「死信」。運維

「死信」消息會被RabbitMQ進行特殊處理,若是配置了死信隊列信息,那麼該消息將會被丟進死信隊列中,若是沒有配置,則該消息將會被丟棄。spring-boot

4、如何配置死信隊列

這一部分將是本文的關鍵,如何配置死信隊列呢?其實很簡單,大概能夠分爲如下步驟:

  1. 配置業務隊列,綁定到業務交換機上
  2. 爲業務隊列配置死信交換機和路由key
  3. 爲死信交換機配置死信隊列

注意,並非直接聲明一個公共的死信隊列,而後因此死信消息就本身跑到死信隊列裏去了。而是爲每一個須要使用死信的業務隊列配置一個死信交換機,這裏同一個項目的死信交換機能夠共用一個,而後爲每一個業務隊列分配一個單獨的路由key。

有了死信交換機和路由key後,接下來,就像配置業務隊列同樣,配置死信隊列,而後綁定在死信交換機上。也就是說,死信隊列並非什麼特殊的隊列,只不過是綁定在死信交換機上的隊列。死信交換機也不是什麼特殊的交換機,只不過是用來接受死信的交換機,因此能夠爲任何類型【Direct、Fanout、Topic】。通常來講,會爲每一個業務隊列分配一個獨有的路由key,並對應的配置一個死信隊列進行監聽,也就是說,通常會爲每一個重要的業務隊列配置一個死信隊列。

有了前文這些陳述後,接下來就是驚險刺激的實戰環節,這裏省略了RabbitMQ環境的部署和搭建環節。

先建立一個Springboot項目。而後在pom文件中添加 spring-boot-starter-amqpspring-boot-starter-web 的依賴,接下來建立一個Config類,這裏是關鍵:

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";

    // 聲明業務Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 聲明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 聲明業務隊列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    這裏聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  這裏聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }

    // 聲明業務隊列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    這裏聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  這裏聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
    }

    // 聲明死信隊列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    // 聲明死信隊列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    // 聲明業務隊列A綁定關係
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 聲明業務隊列B綁定關係
    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 聲明死信隊列A綁定關係
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    // 聲明死信隊列B綁定關係
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

這裏聲明瞭兩個Exchange,一個是業務Exchange,另外一個是死信Exchange,業務Exchange下綁定了兩個業務隊列,業務隊列都配置了同一個死信Exchange,並分別配置了路由key,在死信Exchange下綁定了兩個死信隊列,設置的路由key分別爲業務隊列裏配置的路由key。

下面是配置文件application.yml:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
          default-requeue-rejected: false
          acknowledge-mode: manual

這裏記得將default-requeue-rejected屬性設置爲false。

接下來,是業務隊列的消費代碼:

@Slf4j
@Component
public class BusinessMessageReceiver {

    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到業務消息A:{}", msg);
        boolean ack = true;
        Exception exception = null;
        try {
            if (msg.contains("deadletter")){
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }
        if (!ack){
            log.error("消息消費發生異常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到業務消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

而後配置死信隊列的消費者:

@Component
public class DeadLetterMessageReceiver {


    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

最後,爲了方便測試,寫一個簡單的消息生產者,並經過controller層來生產消息。

@Component
public class BusinessMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private BusinessMessageSender sender;

    @RequestMapping("sendmsg")
    public void sendMsg(String msg){
        sender.sendMsg(msg);
    }
}

一切準備就緒,啓動!

能夠從RabbitMQ的管理後臺中看到一共有四個隊列,除默認的Exchange外還有聲明的兩個Exchange。

8(%(3A_Y`_N8XX8W5XHZMWY.png

123.png

接下來,訪問一下url,來測試一下:

http://localhost:8080/rabbitmq/sendmsg?msg=msg

日誌:

收到業務消息A:msg
收到業務消息B:msg

表示兩個Consumer都正常收到了消息。這表明正常消費的消息,ack後正常返回。而後咱們再來測試nck的消息。

http://localhost:8080/rabbitmq/sendmsg?msg=deadletter

這將會觸發業務隊列A的NCK,按照預期,消息被NCK後,會拋到死信隊列中,所以死信隊列將會出現這個消息,日誌以下:

收到業務消息A:deadletter
消息消費發生異常,error msg:dead letter exception
java.lang.RuntimeException: dead letter exception
...

收到死信消息A:deadletter

能夠看到,死信隊列的Consumer接受到了這個消息,因此流程到此爲止就打通了。

5、死信消息的變化

那麼「死信」被丟到死信隊列中後,會發生什麼變化呢?

若是隊列配置了參數 x-dead-letter-routing-key 的話,「死信」的路由key將會被替換成該參數對應的值。若是沒有設置,則保留該消息原有的路由key。

舉個栗子:

若是原有消息的路由key是testA,被髮送到業務Exchage中,而後被投遞到業務隊列QueueA中,若是該隊列沒有配置參數x-dead-letter-routing-key,則該消息成爲死信後,將保留原有的路由keytestA,若是配置了該參數,而且值設置爲testB,那麼該消息成爲死信後,路由key將會被替換爲testB,而後被拋到死信交換機中。

另外,因爲被拋到了死信交換機,因此消息的Exchange Name也會被替換爲死信交換機的名稱。

消息的Header中,也會添加不少奇奇怪怪的字段,修改一下上面的代碼,在死信隊列的消費者中添加一行日誌輸出:

log.info("死信消息properties:{}", message.getMessageProperties());

而後從新運行一次,便可獲得死信消息Header中被添加的信息:

死信消息properties:MessageProperties [headers={x-first-death-exchange=dead.letter.demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=dead.letter.demo.simple.business.exchange, time=Sun Jul 14 16:48:16 CST 2019, routing-keys=[], queue=dead.letter.demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=dead.letter.demo.simple.business.queuea}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAREVTS1RPUC1DUlZGUzBOAAAPQAAAAAAB.bLbsdR1DnuRSwiKKmtdOGw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.deadletter.exchange, receivedRoutingKey=dead.letter.demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-NSp18SUPoCNvQcoYoS2lPg, consumerQueue=dead.letter.demo.simple.deadletter.queuea]

Header中看起來有不少信息,實際上並很少,只是值比較長而已。下面就簡單說明一下Header中的值:

字段名 含義
x-first-death-exchange 第一次被拋入的死信交換機的名稱
x-first-death-reason 第一次成爲死信的緣由,rejected:消息在從新進入隊列時被隊列拒絕,因爲default-requeue-rejected 參數被設置爲falseexpired :消息過時。maxlen : 隊列內消息數量超過隊列最大容量
x-first-death-queue 第一次成爲死信前所在隊列名稱
x-death 歷次被投入死信交換機的信息列表,同一個消息每次進入一個死信交換機,這個數組的信息就會被更新

6、死信隊列應用場景

經過上面的信息,咱們已經知道如何使用死信隊列了,那麼死信隊列通常在什麼場景下使用呢?

通常用在較爲重要的業務隊列中,確保未被正確消費的消息不被丟棄,通常發生消費異常可能緣由主要有因爲消息信息自己存在錯誤致使處理異常,處理過程當中參數校驗異常,或者因網絡波動致使的查詢異常等等,當發生異常時,固然不能每次經過日誌來獲取原消息,而後讓運維幫忙從新投遞消息(沒錯,之前就是這麼幹的= =)。經過配置死信隊列,可讓未正確處理的消息暫存到另外一個隊列中,待後續排查清楚問題後,編寫相應的處理代碼來處理死信消息,這樣比手工恢復數據要好太多了。

7、總結

死信隊列其實並無什麼神祕的地方,不過是綁定在死信交換機上的普通隊列,而死信交換機也只是一個普通的交換機,不過是用來專門處理死信的交換機。

總結一下死信消息的生命週期:

  1. 業務消息被投入業務隊列
  2. 消費者消費業務隊列的消息,因爲處理過程當中發生異常,因而進行了nck或者reject操做
  3. 被nck或reject的消息由RabbitMQ投遞到死信交換機中
  4. 死信交換機將消息投入相應的死信隊列
  5. 死信隊列的消費者消費死信消息

死信消息是RabbitMQ爲咱們作的一層保證,其實咱們也能夠不使用死信隊列,而是在消息消費異常時,將消息主動投遞到另外一個交換機中,當你明白了這些以後,這些Exchange和Queue想怎樣配合就能怎麼配合。好比從死信隊列拉取消息,而後發送郵件、短信、釘釘通知來通知開發人員關注。或者將消息從新投遞到一個隊列而後設置過時時間,來進行延時消費。

本篇文章中的demo項目已上傳至github,有須要的朋友能夠自行下載查閱​。https://github.com/MFrank2016/dead-letter-demo​

若是本文對你有幫助,記得點個贊,也但願能分享給更多的朋友。也歡迎關注個人公衆號進行留言交流。

TIM圖片20190714173105.png

相關文章
相關標籤/搜索