本篇假設你們對Hystrix的執行過程及源碼有必定的瞭解,這裏介紹Hystrix的熔斷器執行機制。app
HystrixCircuitBreaker
做爲接口定義,具體的實現有NoOpCircuitBreaker
和HystrixCircuitBreakerImpl
,其中NoOpCircuitBreaker
只是個空殼沒有具體的實現,至關於不熔斷。HystrixCircuitBreakerImpl
是主要的熔斷邏輯實現。ide
熔斷器有三個狀態 CLOSED
、 OPEN
、HALF_OPEN
熔斷器默認關閉狀態,當觸發熔斷後狀態變動爲 OPEN
,在等待到指定的時間,Hystrix會放請求檢測服務是否開啓,這期間熔斷器會變爲HALF_OPEN
半開啓狀態,熔斷探測服務可用則繼續變動爲 CLOSED
關閉熔斷器。
ui
ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
Hystrix爲每一個commandKey都維護了一個熔斷器,保持着對應的熔斷器,因此當new XXXHystrixCommand()的時候依然可以保持着原來熔斷器的狀態。this
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur Subscription s = subscribeToStream(); activeSubscription.set(s); } private Subscription subscribeToStream() { /* * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream */ return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { //.....................省略干擾代碼...................... @Override public void onNext(HealthCounts hc) { // check if we are past the statisticalWindowVolumeThreshold if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { } else { if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); }
這裏面HystrixBreaker啓動的時候會訂閱HystrixCommandMetrics
的 HealthCountsStream
,每當HealthCountsStream
蒐集到數據,都會觸發上面的 onNext
方法,而後該方法作下面幾個判斷
1.當前請求量是否達到設定水位(請求量過小不作閥值控制)
2.當前的請求錯誤量是否達到閥值,達到後會將熔斷器狀態置爲 OPEN
, circuitOpened設置爲當前時間戳表示開啓的時間。spa
先看下HystrixCommand 的執行Observable
com.netflix.hystrix.AbstractCommand#applyHystrixSemantics.net
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.attemptExecution()) { ··········省略代碼··········
這裏,每次HystrixCommand執行都會調用 circuitBreaker.attemptExecution()
code
public boolean attemptExecution() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { if (isAfterSleepWindow()) { if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { //only the first request after sleep window should execute return true; } else { return false; } } else { return false; } } }
這裏代碼判斷邏輯
1.判斷是否強制開啓熔斷器,是則return false,command不能執行
2.判斷是否強制關閉熔斷器,是則return true, command可執行
3.判斷熔斷器是否開啓 circuitOpened.get() == -1
表示沒有開啓,則return true,command可執行。
4.到這步證實已經開啓了熔斷器,那麼判斷是否可嘗試請求,若是能夠同時會把熔斷器的狀態改成HALF_OPEN
blog
com.netflix.hystrix.AbstractCommand#executeCommandAndObserve接口
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { ......省略干擾代碼....... final Action1<R> markEmits = new Action1<R>() { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } if (commandIsScalar()) { ......省略干擾代碼....... circuitBreaker.markSuccess(); } } }; final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { ......省略干擾代碼....... circuitBreaker.markSuccess(); } } }; final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); ......省略干擾代碼....... } }; ......省略干擾代碼....... return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
此處表示HystrixCommand執行的過程當中對應的熔斷器狀態變動,上面代碼不難看出,當error的時候會觸發circuitBreaker.markNonSuccess();
,執行成功或者執行完成觸發 circuitBreaker.markSuccess();
ip
markNonSuccess
@Override public void markNonSuccess() { if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { //This thread wins the race to re-open the circuit - it resets the start time for the sleep window circuitOpened.set(System.currentTimeMillis()); } }
若是能執行到markNonSuccess,說明此時熔斷器是關閉狀態,或者嘗試放流階段。關閉狀態的話不作處理(未觸發熔斷),嘗試放流時,發現依然執行失敗,這裏講熔斷器狀態從新置爲開啓狀態,並把circuitOpened設置爲當前的時間戳。
markSuccess
@Override public void markSuccess() { if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { //This thread wins the race to close the circuit - it resets the stream to start it over from 0 metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } }
能走到markSuccess說明熔斷器此時關閉或者放流階段,嘗試放流階段則講熔斷器關閉,設置circuitOpened=-1,並重置指標統計。
到這裏熔斷器的介紹就結束了,回顧下主要有熔斷器如何開啓、如何關閉、幾個狀態的變動。一個完整的熔斷器就此呈如今你們的面前。