Spring Cloud Stream消費失敗後的處理策略(三):使用DLQ隊列(RabbitMQ)

應用場景

前兩天咱們已經介紹了兩種Spring Cloud Stream對消息失敗的處理策略:java

  • 自動重試:對於一些因環境緣由(如:網絡抖動等不穩定因素)引起的問題能夠起到比較好的做用,提升消息處理的成功率。
  • 自定義錯誤處理邏輯:若是業務上,消息處理失敗以後有明確的降級邏輯能夠彌補的,能夠採用這種方式,可是2.0.x版本有Bug,2.1.x版本修復。

那麼若是代碼自己存在邏輯錯誤,不管重試多少次都不可能成功,也沒有具體的降級業務邏輯,以前在深刻思考中討論過,能夠經過日誌,或者降級邏輯記錄的方式把錯誤消息保存下來,而後過後分析、修復Bug再從新處理。可是很顯然,這樣作很是原始,而且太過笨拙,處理複雜度太高。因此,本文將介紹利用中間件特性來便捷地處理該問題的方案:使用RabbitMQ的DLQ隊列。git

動手試試

準備一個會消費失敗的例子,能夠直接沿用前文的工程。也能夠新建一個,而後建立以下代碼的邏輯:github

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生產接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }

    /**
     * 消息消費邏輯
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received payload : " + payload);
            throw new RuntimeException("Message consumer failed!");
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

內容很簡單,既包含了消息的生產,也包含了消息消費。消息消費的時候主動拋出了一個異常來模擬消息的消費失敗。spring

在啓動應用以前,還要記得配置一下輸入輸出通道對應的物理目標(exchange或topic名)、並設置一下分組,好比:網絡

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

這裏加入了一個重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true,用來開啓DLQ(死信隊列)。完成了上面配置以後,啓動應用並訪問localhost:8080/sendMessage?message=hello接口來發送一個消息到MQ中了,此時能夠看到消費失敗後拋出了異常,消息消費失敗,記錄了日誌。此時,能夠查看RabbitMQ的控制檯以下:
pasted-129.pngapp

其中,test-topic.stream-exception-handler.dlq隊列就是test-topic.stream-exception-handler的dlq(死信)隊列,當test-topic.stream-exception-handler隊列中的消息消費時候以後,就會將這條消息原封不動的轉存到dlq隊列中。這樣這些沒有獲得妥善處理的消息就經過簡單的配置實現了存儲,以後,咱們還能夠經過簡單的操做對這些消息進行從新消費。咱們只須要在控制檯中點擊test-topic.stream-exception-handler.dlq隊列的名字進入到詳情頁面以後,使用Move messages功能,直接將這些消息移動回test-topic.stream-exception-handler隊列,這樣這些消息就能從新被消費一次。ui

pasted-130.png

若是Move messages功能中是以下內容:spa

To move messages, the shovel plugin must be enabled, try:

$ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

那是因爲沒有安裝對應的插件,只須要根據提示的命令安裝就能使用該命令了。插件

深刻思考

先來總結一下在引入了RabbitMQ的DLQ以後,對於消息異常處理更爲完整一些的基本思路:日誌

  1. 瞬時的環境抖動引發的異常,利用重試功能提升處理成功率
  2. 若是重試依然失敗的,日誌報錯,並進入DLQ隊列
  3. 日誌告警通知相關開發人員,分析問題緣由
  4. 解決問題(修復程序Bug、擴容等措施)以後,DLQ隊列中的消息移回從新處理

在這樣的總體思路中,可能還涉及一些微調,這裏舉幾個常見例子,幫助讀者進一步瞭解一些特殊的場景和配置使用!

場景一:有些消息在業務上存在時效性,進入死信隊列以後,過一段時間再處理已經沒有意義,這個時候如何過濾這些消息呢?

只須要配置一個參數便可:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000

該參數能夠控制DLQ隊列中消息的存活時間,當超過配置時間以後,該消息會自動的從DLQ隊列中移除。

場景二:可能進入DLQ隊列的消息存在各類不一樣的緣由(不一樣異常形成的),此時若是在作補救措施的時候,還但願根據這些異常作不一樣的處理時候,咱們如何區分這些消息進入DLQ的緣由呢?

再來看看這個參數:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true

該參數默認是false,若是設置了死信隊列的時候,會將消息原封不動的發送到死信隊列(也就是上面例子中的實現),此時你們能夠在RabbitMQ控制檯中經過Get message(s)功能來看看隊列中的消息,應該以下圖所示:

pasted-131.png

這是一條原始消息。

若是咱們該配置設置爲true的時候,那麼該消息在進入到死信隊列的時候,會在headers中加入錯誤信息,以下圖所示:
pasted-132.png

這樣,不論咱們是經過移回原通道處理仍是新增訂閱處理這些消息的時候就能夠以此做爲依據進行分類型處理了。

關於RabbitMQ的binder中還有不少關於DLQ的配置,這裏不一一介紹了,上面幾個是目前筆者使用過的幾個,其餘一些暫時認爲採用默認配置已經夠用,除非還有其餘定製要求,或者是存量內容,須要去適配纔會去配置。讀者能夠查看官方文檔瞭解更多詳情!

代碼示例

本文示例讀者能夠經過查看下面倉庫的中的stream-exception-handler-3項目:

若是您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!

如下專題教程也許您會有興趣

本文首發:http://blog.didispace.com/spr...

相關文章
相關標籤/搜索