Hystrix核心熔斷器

在深刻研究熔斷器以前,咱們須要先看一下Hystrix的幾個重要的默認配置,這幾個配置在HystrixCommandPropertiesapp

//時間窗(ms)
static final Integer default_metricsRollingStatisticalWindow = 10000;
//最少請求次數
private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;
//熔斷器打開後開始嘗試半開的時間間隔
private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000;
//錯誤比例
private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;

這幾個屬性共同組成了熔斷器的核心邏輯,即:ide

  1. 每10秒的窗口期內,當請求次數超過20次,且出錯比例超過50%,則觸發熔斷器打開
  2. 當熔斷器5秒後,會嘗試放過去一部分流量進行試探
熔斷器初始化

熔斷器的初始化是在HystrixCircuitBreaker.FactorygetInstance方法函數

private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            // this should find it for all but the first time
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize

            // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
            // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
            // If 2 threads hit here only one will get added and the other will get a non-null response instead.
            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) {
                // this means the putIfAbsent step just created a new one so let's retrieve and return it
                return circuitBreakersByCommand.get(key.name());
            } else {
                // this means a race occurred and while attempting to 'put' another one got there before
                // and we instead retrieved it and will now return it
                return cbForCommand;
            }
        }

由上方代碼可知,每個熔斷器都是由HystrixCircuitBreakerImpl實現的,而全部的熔斷器都維護在circuitBreakersByCommand這個ConcurrentHashMapui

熔斷器實現
構造方法
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    enum Status {
        CLOSED, OPEN, HALF_OPEN
    }

    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
    private final AtomicLong circuitOpened = new AtomicLong(-1);
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }
}

先介紹一下幾個比較基礎的屬性:this

  1. HystrixCommandProperties:當前熔斷器的配置
  2. HystrixCommandMetrics: 請求統計組件
  3. Status:熔斷器狀態枚舉,一共包含三種,關閉、打開和半開
  4. status:當前熔斷器的狀態
  5. circuitOpened:當前熔斷器的打開時間
  6. activeSubscription:訂閱請求統計的處理函數
請求統計處理
private Subscription subscribeToStream() {
            /*
             * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
             */
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(HealthCounts hc) {
                            // check if we are past the statisticalWindowVolumeThreshold
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                // we are not past the minimum volume threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    //we are not past the minimum error threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    // our failure rate is too high, we need to set the state to OPEN
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }

直接看onNext方法裏的處理方式:atom

  1. 時間窗內的請求數量是否達標,按默認配置就是10秒鐘的請求數是否超過20次,若是不達標不能開啓熔斷器
  2. else中首先判斷錯誤比例是否達到比例,按默認就是50%
  3. 知足打開條件,使用CAS修改狀態爲打開,並記錄打開時間circuitOpened爲當前時間

當記錄了當前應用的統計數據以後,在每次請求的時候就能夠根據這些數據來判斷是否應該打開熔斷器了code

請求過濾

不知你是否還記得在系列文章第一篇中曾經提到了一個方法applyHystrixSemantics,在這個方法中就包含了判斷是否應該熔斷的邏輯,若是熔斷器打開的狀況下會直接進入降級邏輯。這個判斷的方法以下:ip

public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
  1. 第一個if,若是配置強制熔斷則返回false表示開啓熔斷器進入降級邏輯
  2. 第二個,若是配置強制關閉則返回正常不進行後續的判斷
  3. 第三個,打開時間爲空則確定沒打開過
  4. 第四個,判斷是否知足嘗試時間,默認是5秒鐘。時間計算方式以下:
private boolean isAfterSleepWindow() {
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;
}
  1. 當知足嘗試時則使用CAS方式修改熔斷器爲半開狀態

而當請求成功的時候則會調用以下方法清除統計數據,更改熔斷器狀態爲關閉ci

public void markSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                //This thread wins the race to close the circuit - it resets the stream to start it over from 0
                metrics.resetStream();
                Subscription previousSubscription = activeSubscription.get();
                if (previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }
                Subscription newSubscription = subscribeToStream();
                activeSubscription.set(newSubscription);
                circuitOpened.set(-1L);
            }
        }

請求失敗則再次打開熔斷器,並更新打開時間get

public void markNonSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
                circuitOpened.set(System.currentTimeMillis());
            }
        }

1

相關文章
相關標籤/搜索