Hystrix熔斷器執行機制

本篇假設你們對Hystrix的執行過程及源碼有必定的瞭解,這裏介紹Hystrix的熔斷器執行機制。app

1.Hystrix 熔斷器類結構


HystrixCircuitBreaker做爲接口定義,具體的實現有NoOpCircuitBreakerHystrixCircuitBreakerImpl,其中NoOpCircuitBreaker只是個空殼沒有具體的實現,至關於不熔斷。HystrixCircuitBreakerImpl是主要的熔斷邏輯實現。ide

2.Hystrix 熔斷器狀態

熔斷器有三個狀態 CLOSED OPENHALF_OPEN 熔斷器默認關閉狀態,當觸發熔斷後狀態變動爲 OPEN,在等待到指定的時間,Hystrix會放請求檢測服務是否開啓,這期間熔斷器會變爲HALF_OPEN 半開啓狀態,熔斷探測服務可用則繼續變動爲 CLOSED關閉熔斷器。
狀態變動示意圖ui

3.代碼視角

ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

Hystrix爲每一個commandKey都維護了一個熔斷器,保持着對應的熔斷器,因此當new XXXHystrixCommand()的時候依然可以保持着原來熔斷器的狀態。this

3.1 如何斷定開啓熔斷
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    this.properties = properties;
    this.metrics = metrics;

    //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
    Subscription s = subscribeToStream();
    activeSubscription.set(s);
}

private Subscription subscribeToStream() {
    /*
     * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
     */
    return metrics.getHealthCountsStream()
            .observe()
            .subscribe(new Subscriber<HealthCounts>() {
                
                 //.....................省略干擾代碼......................
                @Override
                public void onNext(HealthCounts hc) {
                    // check if we are past the statisticalWindowVolumeThreshold
                    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                        
                    } else {
                        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                            
                        } else {
                            
                            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                circuitOpened.set(System.currentTimeMillis());
                            }
                        }
                    }
                }
            });
}

這裏面HystrixBreaker啓動的時候會訂閱HystrixCommandMetricsHealthCountsStream,每當HealthCountsStream蒐集到數據,都會觸發上面的 onNext方法,而後該方法作下面幾個判斷
1.當前請求量是否達到設定水位(請求量過小不作閥值控制)
2.當前的請求錯誤量是否達到閥值,達到後會將熔斷器狀態置爲 OPEN, circuitOpened設置爲當前時間戳表示開啓的時間。spa

3.2 attemptExecution

先看下HystrixCommand 的執行Observable
com.netflix.hystrix.AbstractCommand#applyHystrixSemantics.net

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {
··········省略代碼··········

這裏,每次HystrixCommand執行都會調用 circuitBreaker.attemptExecution()code

public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }

這裏代碼判斷邏輯
1.判斷是否強制開啓熔斷器,是則return false,command不能執行
2.判斷是否強制關閉熔斷器,是則return true, command可執行
3.判斷熔斷器是否開啓 circuitOpened.get() == -1表示沒有開啓,則return true,command可執行。
4.到這步證實已經開啓了熔斷器,那麼判斷是否可嘗試請求,若是能夠同時會把熔斷器的狀態改成HALF_OPENblog

3.3 markSuccess&markNonSuccess

com.netflix.hystrix.AbstractCommand#executeCommandAndObserve接口

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    ......省略干擾代碼.......

    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (shouldOutputOnNextEvents()) {
                executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
            }
            if (commandIsScalar()) {
                ......省略干擾代碼.......
                circuitBreaker.markSuccess();
            }
        }
    };

    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
                ......省略干擾代碼.......
                circuitBreaker.markSuccess();
            }
        }
    };

    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            circuitBreaker.markNonSuccess();
            ......省略干擾代碼.......
        }
    };

    ......省略干擾代碼.......

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

此處表示HystrixCommand執行的過程當中對應的熔斷器狀態變動,上面代碼不難看出,當error的時候會觸發circuitBreaker.markNonSuccess();,執行成功或者執行完成觸發 circuitBreaker.markSuccess();ip

markNonSuccess

@Override
public void markNonSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
        //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
        circuitOpened.set(System.currentTimeMillis());
    }
}

若是能執行到markNonSuccess,說明此時熔斷器是關閉狀態,或者嘗試放流階段。關閉狀態的話不作處理(未觸發熔斷),嘗試放流時,發現依然執行失敗,這裏講熔斷器狀態從新置爲開啓狀態,並把circuitOpened設置爲當前的時間戳。

markSuccess

@Override
public void markSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        //This thread wins the race to close the circuit - it resets the stream to start it over from 0
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {
            previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        circuitOpened.set(-1L);
    }
}

能走到markSuccess說明熔斷器此時關閉或者放流階段,嘗試放流階段則講熔斷器關閉,設置circuitOpened=-1,並重置指標統計。

4.結束了

到這裏熔斷器的介紹就結束了,回顧下主要有熔斷器如何開啓、如何關閉、幾個狀態的變動。一個完整的熔斷器就此呈如今你們的面前。

相關文章
相關標籤/搜索