在深刻研究熔斷器以前,咱們須要先看一下Hystrix的幾個重要的默認配置,這幾個配置在HystrixCommandProperties
中app
//時間窗(ms) static final Integer default_metricsRollingStatisticalWindow = 10000; //最少請求次數 private static final Integer default_circuitBreakerRequestVolumeThreshold = 20; //熔斷器打開後開始嘗試半開的時間間隔 private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000; //錯誤比例 private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;
這幾個屬性共同組成了熔斷器的核心邏輯,即:ide
熔斷器的初始化是在HystrixCircuitBreaker.Factory
的getInstance
方法函數
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>(); public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // this should find it for all but the first time HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name()); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety // If 2 threads hit here only one will get added and the other will get a non-null response instead. HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics)); if (cbForCommand == null) { // this means the putIfAbsent step just created a new one so let's retrieve and return it return circuitBreakersByCommand.get(key.name()); } else { // this means a race occurred and while attempting to 'put' another one got there before // and we instead retrieved it and will now return it return cbForCommand; } }
由上方代碼可知,每個熔斷器都是由HystrixCircuitBreakerImpl
實現的,而全部的熔斷器都維護在circuitBreakersByCommand
這個ConcurrentHashMap
中ui
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; enum Status { CLOSED, OPEN, HALF_OPEN } private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED); private final AtomicLong circuitOpened = new AtomicLong(-1); private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur Subscription s = subscribeToStream(); activeSubscription.set(s); } }
先介紹一下幾個比較基礎的屬性:this
HystrixCommandProperties
:當前熔斷器的配置HystrixCommandMetrics
: 請求統計組件Status
:熔斷器狀態枚舉,一共包含三種,關閉、打開和半開status
:當前熔斷器的狀態circuitOpened
:當前熔斷器的打開時間activeSubscription
:訂閱請求統計的處理函數private Subscription subscribeToStream() { /* * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream */ return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(HealthCounts hc) { // check if we are past the statisticalWindowVolumeThreshold if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { //we are not past the minimum error threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { // our failure rate is too high, we need to set the state to OPEN if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); }
直接看onNext
方法裏的處理方式:atom
circuitOpened
爲當前時間當記錄了當前應用的統計數據以後,在每次請求的時候就能夠根據這些數據來判斷是否應該打開熔斷器了code
不知你是否還記得在系列文章第一篇中曾經提到了一個方法applyHystrixSemantics
,在這個方法中就包含了判斷是否應該熔斷的邏輯,若是熔斷器打開的狀況下會直接進入降級邏輯。這個判斷的方法以下:ip
public boolean attemptExecution() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { if (isAfterSleepWindow()) { if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { //only the first request after sleep window should execute return true; } else { return false; } } else { return false; } } }
private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); return currentTime > circuitOpenTime + sleepWindowTime; }
而當請求成功的時候則會調用以下方法清除統計數據,更改熔斷器狀態爲關閉ci
public void markSuccess() { if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { //This thread wins the race to close the circuit - it resets the stream to start it over from 0 metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } }
請求失敗則再次打開熔斷器,並更新打開時間get
public void markNonSuccess() { if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { //This thread wins the race to re-open the circuit - it resets the start time for the sleep window circuitOpened.set(System.currentTimeMillis()); } }