Spring Cloud Stream消費失敗後的處理策略(四):從新入隊(RabbitMQ)

應用場景

以前咱們已經經過《Spring Cloud Stream消費失敗後的處理策略(一):自動重試》一文介紹了Spring Cloud Stream默認的消息重試功能。本文將介紹RabbitMQ的binder提供的另一種重試功能:從新入隊。java

動手試試

準備一個會消費失敗的例子,能夠直接沿用前文的工程,也能夠新建一個,而後建立以下代碼的邏輯:git

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /** * 消息生產接口 * * @param message * @return */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }

    /** * 消息消費邏輯 */
    @Slf4j
    @Component
    static class TestListener {

        private int count = 1;

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received payload : " + payload + ", " + count);
            throw new RuntimeException("Message consumer failed!");
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}
複製代碼

內容很簡單,既包含了消息的生產,也包含了消息消費。消息消費的時候主動拋出了一個異常來模擬消息的消費失敗。github

在啓動應用以前,還要記得配置一下輸入輸出通道對應的物理目標(exchange或topic名)、並設置一下分組,好比:spring

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true

spring.cloud.stream.bindings.example-topic-output.destination=test-topic
複製代碼

完成了上面配置以後,啓動應用並訪問localhost:8080/sendMessage?message=hello接口來發送一個消息到MQ中了,此時能夠看到程序不斷的拋出了消息消費異常。這是因爲這裏咱們多加了一個配置:spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true。在該配置做用之下,消息消費失敗以後,並不會將該消息拋棄,而是將消息從新放入隊列,因此消息的消費邏輯會被重複執行,直到這條消息消費成功爲止。app

深刻思考

在完成了上面的這個例子以後,可能讀者會有下面兩個常見問題:ui

問題一:以前介紹的Spring Cloud Stream默認提供的默認功能(spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts)與本文所說的重入隊列實現的重試有什麼區別?spa

Spring Cloud Stream默認提供的默認功能只是對處理邏輯的重試,它們的處理邏輯是由同一條消息觸發的。而本文所介紹的從新入隊史經過從新將消息放入隊列而觸發的,因此其實是收到了屢次消息而實現的重試。code

問題二:如上面的例子那樣,消費一直不成功,這些不成功的消息會被不斷堆積起來,如何解決這個問題?blog

對於這個問題,咱們能夠聯合前文介紹的DLQ隊列來完善消息的異常處理。教程

咱們只須要增長以下配置,自動綁定dlq隊列:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
複製代碼

而後改造一下消息處理程序,能夠根據業務狀況,爲進入dlq隊列增長一個條件,好比下面的例子:

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
    log.info("Received payload : " + payload + ", " + count);
    if (count == 3) {
        count = 1;
        throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");
    } else {
        count ++;
        throw new RuntimeException("Message consumer failed!");
    }
}
複製代碼

設定了計數器count,當count爲3的時候拋出AmqpRejectAndDontRequeueException這個特定的異常。此時,當只有當拋出這個異常的時候,纔會將消息放入DLQ隊列,從而不會形成嚴重的堆積問題。

代碼示例

本文示例讀者能夠經過查看下面倉庫的中的stream-exception-handler-4項目:

若是您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!

如下專題教程也許您會有興趣

本文首發:blog.didispace.com/spring-clou…

相關文章
相關標籤/搜索