斷路器在HystrixCommand和HystrixObservableCommand執行過程當中起到相當重要的做用。查看一下核心組件HystrixCircuitBreakerjava
package com.netflix.hystrix; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts; import rx.Subscriber; import rx.Subscription; public interface HystrixCircuitBreaker { boolean allowRequest(); boolean isOpen(); void markSuccess(); void markNonSuccess(); boolean attemptExecution(); class Factory { // String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly) private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>(); } class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { } static class NoOpCircuitBreaker implements HystrixCircuitBreaker { } }
下面先看一下該接口的抽象方法:git
下面看一下該接口中的類:github
在該類中定義了斷路器的五個核心對象:spring
@Override public boolean isOpen() { if (properties.circuitBreakerForceOpen().get()) { return true; } if (properties.circuitBreakerForceClosed().get()) { return false; } return circuitOpened.get() >= 0; }
用來判斷斷路器是否打開或關閉。主要步驟有:ide
private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); return currentTime > circuitOpenTime + sleepWindowTime; } @Override public boolean attemptExecution() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { if (isAfterSleepWindow()) { if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { //only the first request after sleep window should execute return true; } else { return false; } } else { return false; } } }
該方法的主要邏輯有如下幾步:ui
@Override public void markSuccess() { if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { //This thread wins the race to close the circuit - it resets the stream to start it over from 0 metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } }
該方法主要用來關閉斷路器,主要邏輯有如下幾步:atom