HystrixCircuitBreaker

// 熔斷器的建立
 class Factory {

        // String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)

        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();


        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {

            // 先從緩存中獲取,key是commandkey,因此說熔斷器是否共用取決於commandkey是不是一個
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            }

            //使用concurrentHashMap 保證線程安全
            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) {
                return circuitBreakersByCommand.get(key.name());
            } else {
                return cbForCommand;
            }
        }

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
            return circuitBreakersByCommand.get(key.name());
        }

        /* package */static void reset() {

            circuitBreakersByCommand.clear();
        }
    }
 
//熔斷器主要提供的功能
public interface HystrixCircuitBreaker {
    //判斷某個請求是否放行,而且半開的邏輯也在這裏處理(當熔斷器開的時候,放行一部分請求)
    boolean allowRequest();

    boolean isOpen();


    // 熔斷器半開狀態時,請求成功會調用該方法
    void markSuccess();
    // 熔斷器半開狀態時,請求失敗會調用該方法
    void markNonSuccess();
    // 嘗試請求的時候,會調用該方法
    boolean attemptExecution();
}

熔斷器的具體實現緩存

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {

        private final HystrixCommandProperties properties;

        private final HystrixCommandMetrics metrics;

        // 熔斷器的三種狀態:關,開,半開
        enum Status {
            CLOSED, OPEN, HALF_OPEN;
        }


        // 初始是關
        private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
        private final AtomicLong circuitOpened = new AtomicLong(-1);  //熔斷器打開時的系統時間
        //  Suscription 是RxJava中用來取消訂閱關係的,該接口提供兩個方法:一個是取消訂閱,一個是查看當前是否還在訂閱中
        private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;
            //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur

            Subscription s = subscribeToStream();
            activeSubscription.set(s);
        }
        //訂閱請求統計的Observable
        // Observable.suscribe 該方法會返回一個Subscription,即傳參中的Subscriber訂閱了Observable
        private Subscription subscribeToStream() {
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {
                        }

                        @Override
                        public void onError(Throwable e) {
                        }

                        @Override
                        public void onNext(HealthCounts hc) {
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

                                // we are not past the minimum volume threshold for the stat window,

                                // so no change to circuit status.

                                // if it was CLOSED, it stays CLOSED

                                // if it was half-open, we need to wait for a successful command execution

                                // if it was open, we need to wait for sleep window to elapse

                            } else {

                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

                                    //we are not past the minimum error threshold for the stat window,

                                    // so no change to circuit status.

                                    // if it was CLOSED, it stays CLOSED

                                    // if it was half-open, we need to wait for a successful command execution

                                    // if it was open, we need to wait for sleep window to elapse

                                } else {

                                    // our failure rate is too high, we need to set the state to OPEN

                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {

                                        circuitOpened.set(System.currentTimeMillis());

                                    }

                                }

                            }

                        }

                    });

        }




        @Override
        public void markSuccess() {
            //當前若是是半開狀態,在請求成功後改爲關閉狀態,關閉熔斷器
            if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                metrics.resetStream();

                Subscription previousSubscription = activeSubscription.get();

                if (previousSubscription != null) {

                    previousSubscription.unsubscribe();

                }

                Subscription newSubscription = subscribeToStream();

                activeSubscription.set(newSubscription);

                circuitOpened.set(-1L);

            }

        }

        @Override
        public void markNonSuccess() {
            // 請求失敗,且當前狀態是半開,狀態更新爲打開
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //從新記錄打開的時間
                circuitOpened.set(System.currentTimeMillis());
            }
        }


        @Override
        public boolean isOpen() {
            //強制打開熔斷
            if (properties.circuitBreakerForceOpen().get()) {
                return true;
            }
            //強制關閉熔斷
            if (properties.circuitBreakerForceClosed().get()) {
                return false;
            }
            //circuitOpened大於等於0,熔斷器打開,小於0,熔斷器關閉
            //初始值-1,關閉熔斷器
            //打開的時機:1上面方法markNonSuccess(半開-開),2 初始化HystrixCircuirBreakerImpl中,上面的subcribeToStream()
            return circuitOpened.get() >= 0;
        }

        @Override
        public boolean allowRequest() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            //初始爲-1的時候,熔斷器關,容許請求進來
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                //半開的時候拒絕請求?
                if (status.get().equals(Status.HALF_OPEN)) {
                    return false;
                } else {
                    // 熔斷器當前是開,且若是當前時間已經到了能夠再次嘗試的時間,容許請求過來
                    return isAfterSleepWindow();
                }
            }
        }

        private boolean isAfterSleepWindow() {
            final long circuitOpenTime = circuitOpened.get();
            final long currentTime = System.currentTimeMillis();
            // 熔斷器打開後,拒絕請求多長時間後再開始嘗試執行,可配置,默認5000ms
            final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
            //判斷當前時間是否已通過了窗口睡眠時間:是否能夠開始嘗試執行請求
            return currentTime > circuitOpenTime + sleepWindowTime;

        }

        @Override
        public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
    }
相關文章
相關標籤/搜索