Spring Cloud Circuit Breaker

支持實現
<!--SpringCloud依賴-->
org.springframework.cloud:spring-cloud-starter-netflix-hystrix
<!-- SpringCloud依賴(Resilience4j的方式) -->
 org.springframework.cloud:spring-cloud-starter-circuitbreaker-resilience4j

<!-- SpringCloud依賴(Reactive Resilience4J的方式) --->
org.springframework.cloud:spring-cloud-starter-circuitbreaker-reator-resilience4j

官方介紹:java

Resilience4j是一款輕量級,易於使用的容錯庫,其靈感來自於Netflix Hystrix,專爲Java8和函數式編程而設計。輕量級,由於庫只使用了Vavr,它沒有任何其餘外部依賴下。相比之下,Netflix HystrixArchaius具備編譯依賴性,Archaius具備更多的外部庫依賴性,例如GuavaApache Commons Configurationreact

多種支持:git

提供對Spring Boot2.XSpring Cloud以及 RXJAVA2的支持等github

環境要求:spring

JDK1.8+編程

  • 提供的組件(版本均爲1.1.0服務器

    • Circuit Breaker多線程

      <!-- 須要添加的依賴 -->
      <dependency>
      	<groupId>io.github.resilience4j</groupId>
      	<artifactId>resilience4j-circuitbreaker</artifactId>
      	<version>${resilience4j.version}</version>
      </dependency>

      斷路器三種狀態併發

      • CLOSED - 一切正常,不涉及短路
      • OPEN - 遠程服務器已關閉,全部請求都被短路
      • HALF_OPEN - 從進入開放狀態到如今已經通過了一段時間,斷路器容許請求檢查遠程服務是否從新上線

      配置項:app

配置屬性 默認值 描述
failureRateThreshold 50(%) 當故障率等於或大於閾值時,CircuitBreaker轉換爲打開狀態並開始短路呼叫。
slowCallRateThreshold 100 配置百分比閾值。當通話時長大於10分鐘時,CircuitBreaker會認爲通話緩慢slowCallDurationThreshold當慢速呼叫的百分比等於或大於閾值時,CircuitBreaker轉換爲打開並開始短路呼叫。
slowCallDurationThreshold 60(s) 配置持續時間閾值,在該閾值以上,呼叫將被視爲慢速並增長慢速呼叫的速率。
permittedNumberOfCallsInHalfOpenState 10 配置當CircuitBreaker半開時容許的呼叫數。
slideWindowType COUNT_BASED 配置滑動窗口的類型,該窗口用於在CircuitBreaker關閉時記錄呼叫結果。 滑動窗口能夠基於計數或基於時間。若是滑動窗口爲COUNT_BASEDslidingWindowSize則記錄並彙總最近的調用。 若是滑動窗口是TIME_BASED,則slidingWindowSize記錄並彙總最近幾秒的調用。
slideWindowSize 100 配置滑動窗口的大小,該窗口用於記錄CircuitBreaker關閉時的呼叫結果。
minimumNumberOfCalls 10 配置CircuitBreaker能夠計算錯誤率以前所需的最小呼叫數(每一個滑動窗口時段)。例如,若是minimumNumberOfCalls10,則在計算失敗率以前,必須至少記錄10個呼叫。若是僅記錄了9個呼叫,則即便全部9個呼叫均失敗,CircuitBreaker也不會轉換爲打開。
waitDurationInOpenState 60(s) 從打開狀態轉爲半開狀態等待的時間
recordExceptions empty 須要記錄的異常列表
ignoreExceptions empty 須要忽略的異常列表
recordException throwable-> true默認狀況下,全部異常都記錄爲失敗。 用於評估是否應將異常記錄爲失敗。若是異常應計爲失敗,則必須返回true。若是異常應被視爲成功,則必須返回false,除非該異常被顯式忽略ignoreExceptions
ignoreException throwable->false默認狀況下,不會忽略任何異常。 用於評估是否應忽略異常,而且該異常既不算做失敗也不算成功。若是應忽略異常,則必須返回true。不然必須返回false
automaticTransitionFromOpenToHalfOpenEnabled false 若是置爲true,當等待時間結束會自動由打開變爲半開,若置爲false,則須要一個請求進入來觸發熔斷器狀態轉換
CircuitBreakerConfig cfg = CircuitBreakerConfig
                .custom()
              .failureRateThreshold(50) // 錯誤率,這個是根據滑動窗口大小決定的,e.g. windowSize = 2,failureRate=50% 那麼,當出現一個錯誤的時候即爲失敗
                .waitDurationInOpenState(Duration.ofMillis(1000))
              .permittedNumberOfCallsInHalfOpenState(2)
                .slidingWindowSize(2) // 滑動窗口大小
              .recordExceptions(RuntimeException.class) // 當出現列表中的異常類型時記錄
                .build();
  • RateLimiter

    • 默認實現 - io.github.resilience4j.ratelimiter.internal.AtomicRateLimiter
      • State -io.github.resilience4j.ratelimiter.internal.AtomicRateLimiter.State
        • activeCycle - 上一次調用使用的週期號
        • activePermissions- 上次調用後的可用權限計數。 若是保留某些權限,則能夠爲負
        • nanosToWait- 等待上一次呼叫的等待許可的納秒數
<!-- 須要添加的依賴 -->
  <dependency>
      <groupId>io.github.resilience4j</groupId>
      <artifactId>resilience4j-ratelimiter</artifactId>
      <version>${resilience4j.version}</version>
  </dependency>
配置項
配置屬性 默認值 描述
timeoutDuration 5 [s] 默認等待權限持續時間
limitRefreshPeriod 500 [ns] 限制刷新的時間段。在每一個時間段以後,速率限制器將其權限計數從新設置爲limitForPeriod
limitForPeriod 50 限制刷新期間段可用的權限數
  • Bulkhead
<!-- 須要添加的依賴 -->
<dependency>
     <groupId>io.github.resilience4j</groupId>
     <artifactId>resilience4j-bulkhead</artifactId>
     <version>${resilience4j.version}</version>
</dependency>

實現方式

  • SemaphoreBulkhead - 使用信號量

應該應用於多線程環境或者I/O密集型的場景下,基於semaphore,但與histrix不一樣,他不提供'shadow'線程池選項, 客戶端應確保正確的線程池大小與bulkhead 配置保持一致。

配置項:
配置屬性 默認值 描述
maxConcurrentCalls 25 容許的最大並行執行量
maxWaitDuration 0 嘗試進入飽和的bulkhead時,應阻塞線程的最長時間。
BulkheadConfig config = BulkheadConfig.custom()
                .maxConcurrentCalls(5)
                .maxWaitDuration(Duration.ofMillis(1))
                .build();
        BulkheadRegistry registry = BulkheadRegistry.of(config);
        Bulkhead bulkhead = registry.bulkhead("Bulkhead");
        for (int i = 0; i<10;i++){
            Supplier<String> decoratedSupplier = Bulkhead
                    .decorateSupplier(bulkhead, CircuitBreakerService::say);
            String result = Try.ofSupplier(decoratedSupplier)
                    .recover(throwable -> "bulkhead").get();
            System.out.println(result);
        }
  • FixedThreadPoolBulkhead - 使用有界隊列和固定大小的線程池

配置項

配置屬性 默認值 描述
maxThreadPoolSize Runtime.getRuntime() .availableProcessors() 最大線程池大小
coreThreadPoolSize Runtime.getRuntime() .availableProcessors() - 1 核心線程池大小
queueCapacity 100 隊列的容量
keepAliveDuration 20(ms) 當線程總數大於核心線程數時,空閒線程存活的最大時間
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
                .maxThreadPoolSize(2)
                .coreThreadPoolSize(1)
                .queueCapacity(1)
                .build();
        ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
        ThreadPoolBulkhead bulkhead = registry.bulkhead("ThreadPoolBulkhead");
        ThreadPoolBulkhead bulkhead2 = registry.bulkhead("ThreadPoolBulkhead2");
        ThreadPoolBulkhead bulkhead3 = registry.bulkhead("ThreadPoolBulkhead3");
        Callable<CompletionStage<String>> call =
                ThreadPoolBulkhead.decorateCallable(bulkhead, () -> CircuitBreakerService.say());

        Callable<CompletionStage<String>> call2 =
                ThreadPoolBulkhead.decorateCallable(bulkhead2, () -> CircuitBreakerService.say());

        Callable<CompletionStage<String>> call3 =
                ThreadPoolBulkhead.decorateCallable(bulkhead3, () -> CircuitBreakerService.say());


        ExecutorService es = Executors.newFixedThreadPool(2);
        es.submit(call);
        es.submit(call2);
        es.submit(call3);
  • Retry
<!-- 須要添加的依賴 -->
<dependency>
     <groupId>io.github.resilience4j</groupId>
     <artifactId>resilience4j-retry</artifactId>
     <version>${resilience4j.version}</version>
</dependency>

配置項 | 配置屬性 | 默認值 | 描述 | | --------------------------- | ------------------------------- | ------------------------------------------------------------ | | maxAttempts | 3 | 最大重試次數 | | waitDuration | 500 [ms] | 重試嘗試過程等待時間 | | intervalFunction | numOfAttempts -> waitDuration | 發生故障後修改等待間隔的功能。默認狀況下,等待時間保持不變。 | | retryOnResultPredicate | result -> false | 是否須要重試,若是須要則必須置位true, 不然應置位爲false | | retryOnExceptionPredicate | throwable -> true | 發生故障後是否須要重試,若是須要則必須置位true, 不然應置位爲false | | retryExceptions | empty | 須要重試的異常類型列表 | | ignoreExceptions | empty | 忽略重試的異常類型列表 |

RetryConfig config = RetryConfig.custom()
                .maxAttempts(2) // 最大重試次數
                .waitDuration(Duration.ofMillis(100)) // 等待時間
                .retryOnException(e -> e instanceof WebServiceException)
                .retryExceptions(IOException.class, TimeoutException.class,RuntimeException.class)
//                .ignoreExceptions(TimeoutException.class)
                .build();
        RetryRegistry registry = RetryRegistry.of(config);
        Retry retry = registry.retry("Retry");
 Function<Void , String> decorated
                = Retry.decorateFunction(retry, (s) -> CircuitBreakerService.say());
        decorated.apply(null);
  • Cache
<!-- 須要添加的依賴 -->
<dependency>
     <groupId>io.github.resilience4j</groupId>
     <artifactId>resilience4j-cache</artifactId>
     <version>${resilience4j.version}</version>
</dependency>

注意:

不建議在生產中使用參考JCache 的實現,由於它會致使一些併發問題。使用EhcacheCaffeineRedissonHazelcastIgnite或其餘實現。

Spring Cloud 配置 Resilience4J

1.1. 啓動器

  • org.springframework.cloud:spring-cloud-starter-circuitbreaker-resilience4j -非反應式應用
  • org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j -反應式應用

1.2. 禁用自動裝配

spring.cloud.circuitbreaker.resilience4j.enabled=false

1.3. 默認配置

  • Resilience4JCircuitBreakerFactory
@Bean
public Customizer<Resilience4JCircuitBreakerFactory> defaultCustomizer() {
    return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
            .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(4)).build())
            .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
            .build());
}
  • ReactiveResilience4JCircuitBreakerFactory
@Bean
public Customizer<Resilience4JCircuitBreakerFactory> defaultCustomizer() {
    return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
            .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(4)).build())
            .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
            .build());
}

1.4. 特殊熔斷配置

@Bean
public Customizer<Resilience4JCircuitBreakerFactory> slowCustomizer() {
    return factory -> factory.configure(builder -> builder.circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
            .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(2)).build()), "slow");
}

@Bean
public Customizer<Resilience4JCircuitBreakerFactory> slowCustomizer() {
    return factory -> factory.addCircuitBreakerCustomizer(circuitBreaker -> circuitBreaker.getEventPublisher()
    .onError(normalFluxErrorConsumer).onSuccess(normalFluxSuccessConsumer), "normalflux");
}

// Reactive方式
@Bean
public Customizer<ReactiveResilience4JCircuitBreakerFactory> slowCusomtizer() {
    return factory -> {
        factory.configure(builder -> builder
        .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(2)).build())
        .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()), "slow", "slowflux");
        factory.addCircuitBreakerCustomizer(circuitBreaker -> circuitBreaker.getEventPublisher()
            .onError(normalFluxErrorConsumer).onSuccess(normalFluxSuccessConsumer), "normalflux");
     };
}

1.5. 收集指標

加入依賴: org.springframework.boot:spring-boot-starter-actuator io.github.resilience4j:resilience4j-micrometer.

注意: 不須要額外的加入 micrometer-core ,由於已經存在於 spring-boot-starter-actuator

<!-- SpringCloud依賴 -->
org.springframework.cloud:spring-cloud-starter-circuitbreaker-sentinal

該項目爲Spring應用程序提供了聲明式重試支持。

環境:

JDK1.7+ , MAVEN 3.0.5 +

<!-- SpringCloud依賴 -->
org.springframework.cloud:spring-cloud-starter-circuitbreaker-spring-retry

Spring Retry經過使用CircuitBreakerRetryPolicy有狀態重試. 來提供斷路器實現 。

1.1. 默認配置

@Bean
public Customizer<SpringRetryCircuitBreakerFactory> defaultCustomizer() {
    return factory -> factory.configureDefault(id -> new SpringRetryConfigBuilder(id)
        .retryPolicy(new TimeoutRetryPolicy()).build());
}

1.2. 特殊配置

@Bean
public Customizer<SpringRetryCircuitBreakerFactory> slowCustomizer() {
    return factory -> factory.configure(builder -> builder.retryPolicy(new SimpleRetryPolicy(1)).build(), "slow");
}
@Bean
public Customizer<SpringRetryCircuitBreakerFactory> slowCustomizer() {
    return factory -> factory.addRetryTemplateCustomizers(retryTemplate -> retryTemplate.registerListener(new RetryListener() {

        @Override
        public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
            return false;
        }
        @Override
        public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {

        }
        @Override
        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {

        }
    }));
}
相關文章
相關標籤/搜索