Spring Cloud Stream異常處理

應用處理

當消費者在處理接收到的消息時,有可能會因爲某些緣由而拋出異常。若但願對拋出來的異常進行處理的話,就須要採起一些異常處理手段,異常處理的方式可分爲三種:應用層面的處理、系統層面的處理以及經過RetryTemplate進行處理。java

本小節先來介紹較爲經常使用的應用層面的異常處理方式,該方式又細分爲局部處理和全局處理。node

局部處理spring

Stream相關的配置內容以下:ide

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        input:
          destination: stream-test-topic
          group: binder-group

所謂局部處理就是針對指定的channel進行處理,須要定義一個處理異常的方法,並在該方法上添加@ServiceActivator註解,該註解有一個inputChannel屬性,用於指定對哪一個channel進行處理,格式爲{destination}.{group}.errors。具體代碼以下:code

package com.zj.node.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;

/**
 * 消費者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receive1(String messageBody) {
        log.info("消費消息,messageBody = {}", messageBody);
        throw new IllegalArgumentException("參數錯誤");
    }

    /**
     * 處理局部異常的方法
     *
     * @param errorMessage 異常消息對象
     */
    @ServiceActivator(
        // 經過特定的格式指定處理哪一個channel的異常
        inputChannel = "stream-test-topic.binder-group.errors"
    )
    public void handleError(ErrorMessage errorMessage) {
        // 獲取異常對象
        Throwable errorMessagePayload = errorMessage.getPayload();
        log.error("發生異常", errorMessagePayload);

        // 獲取消息體
        Message<?> originalMessage = errorMessage.getOriginalMessage();
        if (originalMessage != null) {
            log.error("消息體: {}", originalMessage.getPayload());
        } else {
            log.error("消息體爲空");
        }
    }
}

全局處理server

全局處理則是能夠處理全部channel拋出來的異常,全部的channel拋出異常後會生成一個ErrorMessage對象,即錯誤消息。錯誤消息會被放到一個專門的channel裏,這個channel就是errorChannel。因此經過監聽errorChannel就能夠實現全局異常的處理。具體代碼以下:中間件

@StreamListener(Sink.INPUT)
public void receive1(String messageBody) {
    log.info("消費消息,messageBody = {}", messageBody);
    throw new IllegalArgumentException("參數錯誤");
}

/**
 * 處理全局異常的方法
 *
 * @param errorMessage 異常消息對象
 */
@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
    log.error("發生異常. errorMessage = {}", errorMessage);
}

系統處理

系統處理方式,因消息中間件的不一樣而異。若是應用層面沒有配置錯誤處理,那麼error將會被傳播給binder,而binder則會將error回傳給消息中間件。消息中間件能夠選擇:對象

  • 丟棄消息:錯誤消息將被丟棄。雖然在某些狀況下能夠接受,但這種方式通常不適用於生產
  • requeue(從新排隊,從而從新處理)
  • 將失敗的消息發送給DLQ(死信隊列)

DLQ隊列

目前RabbitMQ對DLQ的支持比較好,這裏以RabbitMQ爲例,只須要添加DLQ相關的配置:ip

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: stream-test-topic
          group: binder-group
      rabbit:
        bindings:
          input:
            consumer:
              # 自動將失敗的消息發送給DLQ
              auto-bind-dlq: true

消息消費失敗後,就會放入死信隊列。在控制檯操做一下,便可將死信放回消息隊列,這樣,客戶端就能夠從新處理。

若是想獲取原始錯誤的異常堆棧,可添加以下配置:

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          input:
            consumer:
              republish-to-dlq: true

requeue

Rabbit及Kafka的binder依賴RetryTemplate實現消息重試,從而提高消息處理的成功率。然而,若是設置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那麼RetryTemplate則不會再重試。此時能夠經過requeue方式來處理異常。

須要添加以下配置:

# 默認是3,設爲1則禁用重試
spring.cloud.stream.bindings.<input channel名稱>.consumer.max-attempts=1
# 表示是否要requeue被拒絕的消息(即:requeue處理失敗的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

這樣,失敗的消息將會被從新提交到同一個handler進行處理,直到handler拋出 AmqpRejectAndDontRequeueException 異常爲止。


RetryTemplate

RetryTemplate主要用於實現消息重試,也是錯誤處理的一種手段。有兩種配置方式,一種是經過配置文件進行配置,以下示例:

spring:
  cloud:
    stream:
      bindings:
        <input channel名稱>:
          consumer:
            # 最多嘗試處理幾回,默認3
            maxAttempts: 3
            # 重試時初始避退間隔,單位毫秒,默認1000
            backOffInitialInterval: 1000
            # 重試時最大避退間隔,單位毫秒,默認10000
            backOffMaxInterval: 10000
            # 避退乘數,默認2.0
            backOffMultiplier: 2.0
            # 當listen拋出retryableExceptions未列出的異常時,是否要重試
            defaultRetryable: true
            # 異常是否容許重試的map映射
            retryableExceptions:
              java.lang.RuntimeException: true
              java.lang.IllegalStateException: false

另外一種則是經過代碼配置,在多數場景下,使用配置文件定製重試行爲都是能夠知足需求的,但配置文件裏支持的配置項可能沒法知足一些複雜需求。此時可以使用代碼方式配置RetryTemplate,以下示例:

@Configuration
class RetryConfiguration {
    @StreamRetryTemplate
    public RetryTemplate sinkConsumerRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());

        return retryTemplate;
    }

    private ExceptionClassifierRetryPolicy retryPolicy() {
        BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
                Collections.singletonList(IllegalAccessException.class
                ));
        keepRetryingClassifier.setTraverseCauses(true);

        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
        AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();

        ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
        retryPolicy.setExceptionClassifier(
                classifiable -> keepRetryingClassifier.classify(classifiable) ?
                        alwaysRetryPolicy : simpleRetryPolicy);

        return retryPolicy;
    }

    private FixedBackOffPolicy backOffPolicy() {
        final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(2);

        return backOffPolicy;
    }
}

最後還須要添加一段配置:

spring.cloud.stream.bindings.<input channel名稱>.consumer.retry-template-name=myRetryTemplate

注:Spring Cloud Stream 2.2才支持設置retry-template-name

相關文章
相關標籤/搜索