上一篇《Spring Cloud Stream消費失敗後的處理策略(一):自動重試》介紹了默認就會生效的消息重試功能。對於一些因環境緣由、網絡抖動等不穩定因素引起的問題能夠起到比較好的做用。可是對於諸如代碼自己存在的邏輯錯誤等,不管重試多少次都不可能成功的問題,是沒法修復的。對於這樣的狀況,前文中說了能夠利用日誌記錄消息內容,配合告警來作補救,可是很顯然,這樣作很是原始,而且太過笨拙,處理複雜度太高。因此,咱們須要需求更好的辦法,本文將介紹針對該類問題的一種處理方法:自定義錯誤處理邏輯。java
準備一個會消費失敗的例子,能夠直接沿用前文的工程,也能夠新建一個,而後建立以下代碼的邏輯:git
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
/** * 消息生產接口 * * @param message * @return */
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
/** * 消息消費邏輯 */
@Slf4j
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
}
複製代碼
內容很簡單,既包含了消息的生產,也包含了消息消費。消息消費的時候主動拋出了一個異常來模擬消息的消費失敗。github
在啓動應用以前,還要記得配置一下輸入輸出通道對應的物理目標(exchange或topic名)、並設置一下分組,好比:spring
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
複製代碼
完成了上面配置以後,啓動應用並訪問localhost:8080/sendMessage?message=hello
接口來發送一個消息到MQ中了,此時能夠看到消費失敗後拋出了異常,跟上一篇文章的結果同樣,消息消費失敗,記錄了日誌,消息信息丟棄。bash
下面,針對消息消費失敗,在TestListener
中針對消息消費邏輯建立一段錯誤處理邏輯,好比:網絡
@Slf4j
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}
/** * 消息消費失敗的降級處理邏輯 * * @param message */
@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
public void error(Message<?> message) {
log.info("Message consumer failed, call fallback!");
}
}
複製代碼
經過使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
指定了某個通道的錯誤處理映射。其中,inputChannel的配置中對應關係以下:app
test-topic
:消息通道對應的目標(destination,即:spring.cloud.stream.bindings.example-topic-input.destination
的配置)stream-exception-handler
:消息通道對應的消費組(group,即:spring.cloud.stream.bindings.example-topic-input.group
的配置)再啓動應用並訪問localhost:8080/sendMessage?message=hello
接口來發送一個消息到MQ中,此時能夠看到日誌以下:函數
2018-12-11 12:00:35.500 INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512 INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54391]
2018-12-11 12:00:35.527 INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener : Received: hello,
2018-12-11 12:00:38.541 INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener : Message consumer failed, call fallback!
複製代碼
雖然消費邏輯中輸出了消息內容以後拋出了異常,可是會進入到error函數中,執行錯誤處理邏輯(這裏只是答應了一句話),用戶能夠根據須要讀取消息內容以及異常詳情作更進一步的細化處理。ui
因爲error邏輯是經過編碼方式來實現的,因此這段邏輯相對來講比較死。一般,只有業務上有明確的錯誤處理邏輯的時候,這種方法才能夠比較好的被應用到。否則能作的可能也只是將消息記錄下來,而後具體的分析緣由後再去作補救措施。因此這種方法也不是萬能的,主要適用於有明確錯誤處理方案的方式來使用(這種場景並很少),另外。。。編碼
注意:有坑! 這個方案在目前版本(2.0.x)其實還有一個坑,這種方式並不能很好的處理異常消息,會有部分消息得不到正確的處理,因爲應用場景也很少,因此目前不推薦使用這種方法來作(徹底能夠用原始的異常捕獲機制來處理,只是沒有這種方式那麼優雅)。目前看官方issue是在Spring Cloud Stream的2.1.0版本中會修復,後續發佈以後可使用該功能,具體點擊查看:Issue #1357。
而對於沒有特定的錯誤處理方案的,也只能經過記錄和後續處理來解決,可能這樣的方式也只是比從日誌中抓去簡單那麼一些,並無獲得很大的提高。可是,沒關係,由於下一篇咱們將繼續介紹其餘更好的處理方案。
本文示例讀者能夠經過查看下面倉庫的中的stream-exception-handler-2
項目:
若是您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!