熔斷器 Hystrix 源碼解析 —— 斷路器 HystrixCircuitBreaker

摘要: 原創出處 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!html

本文主要基於 Hystrix 1.5.X 版本java


1. 概述

本文主要分享 斷路器 HystrixCircuitBreakergit

HystrixCircuitBreaker 有三種狀態 :github

  • CLOSED :關閉
  • OPEN :打開
  • HALF_OPEN :半開

其中,斷路器處於 OPEN 狀態時,鏈路處於非健康狀態,命令執行時,直接調用回退邏輯,跳過正常邏輯。架構

HystrixCircuitBreaker 狀態變遷以下圖 :併發


  • 紅線 :初始時,斷路器處於 CLOSED 狀態,鏈路處於健康狀態。當知足以下條件,斷路器從 CLOSED 變成 OPEN 狀態:app

    • 週期( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms )內,總請求數超過必定( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20 ) 。
    • 錯誤請求佔總請求數超過必定比例( 可配,HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50% ) 。
  • 綠線 :斷路器處於 OPEN 狀態,命令執行時,若當前時間超過斷路器開啓時間必定時間( HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds = 5000 ms ),斷路器變成 HALF_OPEN 狀態,嘗試調用正常邏輯,根據執行是否成功,打開或關閉熔斷器【藍線】。ide


推薦 Spring Cloud 書籍微服務

2. HystrixCircuitBreaker

com.netflix.hystrix.HystrixCircuitBreaker ,Hystrix 斷路器接口。定義接口以下代碼 :ui

public interface HystrixCircuitBreaker {

    /** * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does * not modify any internal state, and takes into account the half-open logic which allows some requests through * after the circuit has been opened * * @return boolean whether a request should be permitted */
    boolean allowRequest();

    /** * Whether the circuit is currently open (tripped). * * @return boolean state of circuit breaker */
    boolean isOpen();

    /** * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. */
    void markSuccess();

    /** * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. */
    void markNonSuccess();

    /** * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal * state. */
    boolean attemptExecution();
}
複製代碼
  • #allowRequest()#attemptExecution() 方法,方法目的基本相似,差異在於當斷路器知足嘗試關閉條件時,前者不會將斷路器不會修改狀態( CLOSE => HALF-OPEN ),然後者會。

HystrixCircuitBreaker 有兩個子類實現 :

  • NoOpCircuitBreaker :的斷路器實現,用於不開啓斷路器功能的狀況。
  • HystrixCircuitBreakerImpl :完整的斷路器實現。

在 AbstractCommand 建立時,初始化 HystrixCircuitBreaker ,代碼以下 :

/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    
    /** * 斷路器 */
    protected final HystrixCircuitBreaker circuitBreaker;

    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

        // ... 省略無關代碼
        
        // 初始化 斷路器
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        // ... 省略無關代碼
    }

    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        if (enabled) {
            if (fromConstructor == null) {
                // get the default implementation of HystrixCircuitBreaker
                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
            } else {
                return fromConstructor;
            }
        } else {
            return new NoOpCircuitBreaker();
        }
    }

}
複製代碼
  • HystrixCommandProperties.circuitBreakerEnabled = true 時,即斷路器功能開啓,使用 Factory 得到 HystrixCircuitBreakerImpl 對象。在 「3. HystrixCircuitBreaker.Factory」 詳細解析。
  • HystrixCommandProperties.circuitBreakerEnabled = false 時,即斷路器功能關閉,建立 NoOpCircuitBreaker 對象。另外,NoOpCircuitBreaker 代碼簡單到腦殘,點擊 連接 查看實現。

3. HystrixCircuitBreaker.Factory

com.netflix.hystrix.HystrixCircuitBreaker.Factory ,HystrixCircuitBreaker 工廠,主要用於:

  • 建立 HystrixCircuitBreaker 對象,目前只建立 HystrixCircuitBreakerImpl 。

  • HystrixCircuitBreaker 容器,基於 HystrixCommandKey 維護了 HystrixCircuitBreaker 單例對象 的映射。代碼以下 :

    private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
    複製代碼

總體代碼灰常清晰,點擊 連接 查看代碼。

4. HystrixCircuitBreakerImpl

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl完整的斷路器實現。

咱們來逐個方法看看 HystrixCircuitBreakerImpl 的具體實現。

4.1 構造方法

構造方法,代碼以下 :

/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    enum Status {
        CLOSED, OPEN, HALF_OPEN
    }

    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
    private final AtomicLong circuitOpened = new AtomicLong(-1);
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }
}    
複製代碼
  • Status 枚舉類,斷路器的三種狀態。
  • status 屬性,斷路器的狀態。
  • circuitOpened 屬性,斷路器打開,即狀態變成 OPEN 的時間。
  • activeSubscription 屬性,基於 Hystrix Metrics 對請求量統計 Observable 的訂閱,在 「4.2 #subscribeToStream()」 詳細解析。

4.2 #subscribeToStream()

#subscribeToStream() 方法,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。代碼以下 :

private Subscription subscribeToStream() {
  1: private Subscription subscribeToStream() {
  2:     /* 3: * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream 4: */
  5:     return metrics.getHealthCountsStream()
  6:             .observe()
  7:             .subscribe(new Subscriber<HealthCounts>() {
  8:                 @Override
  9:                 public void onCompleted() {
 10: 
 11:                 }
 12: 
 13:                 @Override
 14:                 public void onError(Throwable e) {
 15: 
 16:                 }
 17: 
 18:                 @Override
 19:                 public void onNext(HealthCounts hc) {
 20:                     System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用於調試
 21:                     // check if we are past the statisticalWindowVolumeThreshold
 22:                     if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
 23:                         // we are not past the minimum volume threshold for the stat window,
 24:                         // so no change to circuit status.
 25:                         // if it was CLOSED, it stays CLOSED
 26:                         // if it was half-open, we need to wait for a successful command execution
 27:                         // if it was open, we need to wait for sleep window to elapse
 28:                     } else {
 29:                         if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
 30:                             //we are not past the minimum error threshold for the stat window,
 31:                             // so no change to circuit status.
 32:                             // if it was CLOSED, it stays CLOSED
 33:                             // if it was half-open, we need to wait for a successful command execution
 34:                             // if it was open, we need to wait for sleep window to elapse
 35:                         } else {
 36:                             // our failure rate is too high, we need to set the state to OPEN
 37:                             if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
 38:                                 circuitOpened.set(System.currentTimeMillis());
 39:                             }
 40:                         }
 41:                     }
 42:                 }
 43:             });
 44: }
複製代碼
  • 第 5 至 7 行 :向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。這裏的 Observable 基於 RxJava Window 操做符。

    FROM 《ReactiveX文檔中文翻譯》「Window」
    按期未來自原始 Observable 的數據分解爲一個 Observable 窗口,發射這些窗口,而不是每次發射一項數據

    • 簡單來講,固定間隔,#onNext() 方法將不斷被調用,每次計算斷路器的狀態。
  • 第 22 行 :判斷週期( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms )內,總請求數超過必定( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20 ) 。

    • 這裏要注意下,請求次數統計的是週期內,超過週期的不計算在內。例如說,00:00 內發起了 N 個請求,00:11 不計算這 N 個請求。
  • 第 29 行 :錯誤請求佔總請求數超過必定比例( 可配,HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50% ) 。

  • 第 37 至 39 行 :知足斷路器打開條件,CAS 修改狀態( CLOSED => OPEN ),並設置打開時間( circuitOpened ) 。

  • 補充】第 5 至 7 行 :😈 怕寫在上面,你們有壓力。Hystrix Metrics 對請求量統計 Observable 使用了兩種 RxJava Window 操做符 :

    • Observable#window(timespan, unit) 方法,固定週期( 可配,HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds = 500 ms ),發射 Observable 窗口。點擊 BucketedCounterStream 構造方法 查看調用處的代碼。
    • Observable#window(count, skip) 方法,每發射一次(skip) Observable 忽略 count ( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20 ) 個數據項。爲何?答案在第 22 行的代碼,週期內達到必定請求量是斷路器打開的一個條件。點擊 BucketedRollingCounterStream 構造方法 查看調用處的代碼。

目前該方法有兩處調用 :

  • 「4.1 構造方法」,在建立 HystrixCircuitBreakerImpl 時,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。固定間隔,計算斷路器是否要關閉( CLOSE )。
  • 「4.4 #markSuccess()」,清空 Hystrix Metrics 對請求量統計 Observable 的統計信息,取消原有訂閱,併發起新的訂閱。

4.3 #attemptExecution()

以下是 AbstractCommand#applyHystrixSemantics(_cmd) 方法,對 HystrixCircuitBreakerImpl#attemptExecution 方法的調用的代碼 :

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

    // ... 省略無關代碼

   /* determine if we're allowed to execute */
   if (circuitBreaker.attemptExecution()) {
        // 執行【正常邏輯】
   } else {
        // 執行【回退邏輯】
   }
}
複製代碼
  • 使用 HystrixCircuitBreakerImpl#attemptExecution 方法,判斷是否能夠執行正常邏輯

#attemptExecution 方法,代碼以下 :

1: @Override
  2: public boolean attemptExecution() {
  3:     // 強制 打開
  4:     if (properties.circuitBreakerForceOpen().get()) {
  5:         return false;
  6:     }
  7:     // 強制 關閉
  8:     if (properties.circuitBreakerForceClosed().get()) {
  9:         return true;
 10:     }
 11:     // 打開時間爲空
 12:     if (circuitOpened.get() == -1) {
 13:         return true;
 14:     } else {
 15:         // 知足間隔嘗試斷路器時間
 16:         if (isAfterSleepWindow()) {
 17:             //only the first request after sleep window should execute
 18:             //if the executing command succeeds, the status will transition to CLOSED
 19:             //if the executing command fails, the status will transition to OPEN
 20:             //if the executing command gets unsubscribed, the status will transition to OPEN
 21:             if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
 22:                 return true;
 23:             } else {
 24:                 return false;
 25:             }
 26:         } else {
 27:             return false;
 28:         }
 29:     }
 30: }
複製代碼
  • 第 4 至 6 行 :當 HystrixCommandProperties.circuitBreakerForceOpen = true ( 默認值 :false) 時,即斷路器強制打開,返回 false 。當該配置接入配置中心後,能夠動態實現打開熔斷。爲何會有該配置?當 HystrixCircuitBreaker 建立完成後,沒法動態切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,經過該配置以實現相似效果。
  • 第 8 至 10 行 :當 HystrixCommandProperties.circuitBreakerForceClose = true ( 默認值 :false) 時,即斷路器強制關閉,返回 true 。當該配置接入配置中心後,能夠動態實現關閉熔斷。爲何會有該配置?當 HystrixCircuitBreaker 建立完成後,沒法動態切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,經過該配置以實現相似效果。
  • 第 12 至 13 行 :斷路器打開時間( circuitOpened ) 爲"空",返回 true
  • 第 16 至 28 行 :調用 #isAfterSleepWindow() 方法,判斷是否知足嘗試調用正常邏輯的間隔時間。當知足,使用 CAS 方式修改斷路器狀態( OPEN => HALF_OPEN ),從而保證有且僅有一個線程可以嘗試調用正常邏輯。

#isAfterSleepWindow() 方法,代碼以下 :

private boolean isAfterSleepWindow() {
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;
}
複製代碼
  • 當前時間超過斷路器打開時間 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds ( 默認值,5000 ms ),返回 true

4.4 #markSuccess()

當嘗試調用正常邏輯成功時,調用 #markSuccess() 方法,關閉斷路器。代碼以下 :

1: @Override
  2: public void markSuccess() {
  3:     if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
  4:         // 清空 Hystrix Metrics 對請求量統計 Observable 的**統計信息**
  5:         //This thread wins the race to close the circuit - it resets the stream to start it over from 0
  6:         metrics.resetStream();
  7:         // 取消原有訂閱
  8:         Subscription previousSubscription = activeSubscription.get();
  9:         if (previousSubscription != null) {
 10:             previousSubscription.unsubscribe();
 11:         }
 12:         // 發起新的訂閱
 13:         Subscription newSubscription = subscribeToStream();
 14:         activeSubscription.set(newSubscription);
 15:         // 設置斷路器打開時間爲空
 16:         circuitOpened.set(-1L);
 17:     }
 18: }
複製代碼
  • 第 3 行 :使用 CAS 方式,修改斷路器狀態( HALF_OPEN => CLOSED )。
  • 第 6 行 :清空 Hystrix Metrics 對請求量統計 Observable 的統計信息
  • 第 8 至 14 行 :取消原有訂閱,發起新的訂閱。
  • 第 16 行 :設置斷路器打開時間爲"空" 。

以下兩處調用了 #markNonSuccess() 方法 :

4.5 #markNonSuccess()

當嘗試調用正常邏輯失敗時,調用 #markNonSuccess() 方法,從新打開斷路器。代碼以下 :

1: @Override
  2: public void markNonSuccess() {
  3:     if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
  4:         //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
  5:         circuitOpened.set(System.currentTimeMillis());
  6:     }
  7: }
複製代碼
  • 第 3 行 :使用 CAS 方式,修改斷路器狀態( HALF_OPEN => OPEN )。
  • 第 5 行 :設置設置斷路器打開時間爲當前時間。這樣,#attemptExecution() 過一段時間,能夠再次嘗試執行正常邏輯。

以下兩處調用了 #markNonSuccess() 方法 :

4.6 #allowRequest()

#allowRequest()#attemptExecution() 方法,方法目的基本相似,差異在於當斷路器知足嘗試關閉條件時,前者不會將斷路器不會修改狀態( CLOSE => HALF-OPEN ),然後者會。點擊 連接 查看代碼實現。

4.7 #isOpen()

#isOpen() 方法,比較簡單,點擊 連接 查看代碼實現。

666. 彩蛋

呼呼,相對比較乾淨的一篇文章,知足。

胖友,分享一波朋友圈可好!

相關文章
相關標籤/搜索