前兩天咱們已經介紹了兩種Spring Cloud Stream對消息失敗的處理策略:java
那麼若是代碼自己存在邏輯錯誤,不管重試多少次都不可能成功,也沒有具體的降級業務邏輯,以前在深刻思考中討論過,能夠經過日誌,或者降級邏輯記錄的方式把錯誤消息保存下來,而後過後分析、修復Bug再從新處理。可是很顯然,這樣作很是原始,而且太過笨拙,處理複雜度太高。因此,本文將介紹利用中間件特性來便捷地處理該問題的方案:使用RabbitMQ的DLQ隊列。git
準備一個會消費失敗的例子,能夠直接沿用前文的工程。也能夠新建一個,而後建立以下代碼的邏輯:github
@EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生產接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).build()); return "ok"; } } /** * 消息消費邏輯 */ @Slf4j @Component static class TestListener { @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info("Received payload : " + payload); throw new RuntimeException("Message consumer failed!"); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); } }
內容很簡單,既包含了消息的生產,也包含了消息消費。消息消費的時候主動拋出了一個異常來模擬消息的消費失敗。spring
在啓動應用以前,還要記得配置一下輸入輸出通道對應的物理目標(exchange或topic名)、並設置一下分組,好比:網絡
spring.cloud.stream.bindings.example-topic-input.destination=test-topic spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1 spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true spring.cloud.stream.bindings.example-topic-output.destination=test-topic
這裏加入了一個重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
,用來開啓DLQ(死信隊列)。完成了上面配置以後,啓動應用並訪問localhost:8080/sendMessage?message=hello
接口來發送一個消息到MQ中了,此時能夠看到消費失敗後拋出了異常,消息消費失敗,記錄了日誌。此時,能夠查看RabbitMQ的控制檯以下:
app
其中,test-topic.stream-exception-handler.dlq
隊列就是test-topic.stream-exception-handler
的dlq(死信)隊列,當test-topic.stream-exception-handler
隊列中的消息消費時候以後,就會將這條消息原封不動的轉存到dlq隊列中。這樣這些沒有獲得妥善處理的消息就經過簡單的配置實現了存儲,以後,咱們還能夠經過簡單的操做對這些消息進行從新消費。咱們只須要在控制檯中點擊test-topic.stream-exception-handler.dlq
隊列的名字進入到詳情頁面以後,使用Move messages
功能,直接將這些消息移動回test-topic.stream-exception-handler
隊列,這樣這些消息就能從新被消費一次。ui
若是Move messages功能中是以下內容:spa
To move messages, the shovel plugin must be enabled, try: $ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
那是因爲沒有安裝對應的插件,只須要根據提示的命令安裝就能使用該命令了。插件
先來總結一下在引入了RabbitMQ的DLQ以後,對於消息異常處理更爲完整一些的基本思路:日誌
在這樣的總體思路中,可能還涉及一些微調,這裏舉幾個常見例子,幫助讀者進一步瞭解一些特殊的場景和配置使用!
場景一:有些消息在業務上存在時效性,進入死信隊列以後,過一段時間再處理已經沒有意義,這個時候如何過濾這些消息呢?
只須要配置一個參數便可:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000
該參數能夠控制DLQ隊列中消息的存活時間,當超過配置時間以後,該消息會自動的從DLQ隊列中移除。
場景二:可能進入DLQ隊列的消息存在各類不一樣的緣由(不一樣異常形成的),此時若是在作補救措施的時候,還但願根據這些異常作不一樣的處理時候,咱們如何區分這些消息進入DLQ的緣由呢?
再來看看這個參數:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true
該參數默認是false,若是設置了死信隊列的時候,會將消息原封不動的發送到死信隊列(也就是上面例子中的實現),此時你們能夠在RabbitMQ控制檯中經過Get message(s)
功能來看看隊列中的消息,應該以下圖所示:
這是一條原始消息。
若是咱們該配置設置爲true的時候,那麼該消息在進入到死信隊列的時候,會在headers中加入錯誤信息,以下圖所示:
這樣,不論咱們是經過移回原通道處理仍是新增訂閱處理這些消息的時候就能夠以此做爲依據進行分類型處理了。
關於RabbitMQ的binder中還有不少關於DLQ的配置,這裏不一一介紹了,上面幾個是目前筆者使用過的幾個,其餘一些暫時認爲採用默認配置已經夠用,除非還有其餘定製要求,或者是存量內容,須要去適配纔會去配置。讀者能夠查看官方文檔瞭解更多詳情!
本文示例讀者能夠經過查看下面倉庫的中的stream-exception-handler-3
項目:
若是您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!