// 熔斷器的建立 class Factory { // String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly) private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>(); public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // 先從緩存中獲取,key是commandkey,因此說熔斷器是否共用取決於commandkey是不是一個 HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name()); if (previouslyCached != null) { return previouslyCached; } //使用concurrentHashMap 保證線程安全 HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics)); if (cbForCommand == null) { return circuitBreakersByCommand.get(key.name()); } else { return cbForCommand; } } public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) { return circuitBreakersByCommand.get(key.name()); } /* package */static void reset() { circuitBreakersByCommand.clear(); } } //熔斷器主要提供的功能 public interface HystrixCircuitBreaker { //判斷某個請求是否放行,而且半開的邏輯也在這裏處理(當熔斷器開的時候,放行一部分請求) boolean allowRequest(); boolean isOpen(); // 熔斷器半開狀態時,請求成功會調用該方法 void markSuccess(); // 熔斷器半開狀態時,請求失敗會調用該方法 void markNonSuccess(); // 嘗試請求的時候,會調用該方法 boolean attemptExecution(); }
熔斷器的具體實現緩存
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; // 熔斷器的三種狀態:關,開,半開 enum Status { CLOSED, OPEN, HALF_OPEN; } // 初始是關 private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED); private final AtomicLong circuitOpened = new AtomicLong(-1); //熔斷器打開時的系統時間 // Suscription 是RxJava中用來取消訂閱關係的,該接口提供兩個方法:一個是取消訂閱,一個是查看當前是否還在訂閱中 private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur Subscription s = subscribeToStream(); activeSubscription.set(s); } //訂閱請求統計的Observable // Observable.suscribe 該方法會返回一個Subscription,即傳參中的Subscriber訂閱了Observable private Subscription subscribeToStream() { return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(HealthCounts hc) { if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { //we are not past the minimum error threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { // our failure rate is too high, we need to set the state to OPEN if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); } @Override public void markSuccess() { //當前若是是半開狀態,在請求成功後改爲關閉狀態,關閉熔斷器 if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } } @Override public void markNonSuccess() { // 請求失敗,且當前狀態是半開,狀態更新爲打開 if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { //從新記錄打開的時間 circuitOpened.set(System.currentTimeMillis()); } } @Override public boolean isOpen() { //強制打開熔斷 if (properties.circuitBreakerForceOpen().get()) { return true; } //強制關閉熔斷 if (properties.circuitBreakerForceClosed().get()) { return false; } //circuitOpened大於等於0,熔斷器打開,小於0,熔斷器關閉 //初始值-1,關閉熔斷器 //打開的時機:1上面方法markNonSuccess(半開-開),2 初始化HystrixCircuirBreakerImpl中,上面的subcribeToStream() return circuitOpened.get() >= 0; } @Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } //初始爲-1的時候,熔斷器關,容許請求進來 if (circuitOpened.get() == -1) { return true; } else { //半開的時候拒絕請求? if (status.get().equals(Status.HALF_OPEN)) { return false; } else { // 熔斷器當前是開,且若是當前時間已經到了能夠再次嘗試的時間,容許請求過來 return isAfterSleepWindow(); } } } private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); // 熔斷器打開後,拒絕請求多長時間後再開始嘗試執行,可配置,默認5000ms final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); //判斷當前時間是否已通過了窗口睡眠時間:是否能夠開始嘗試執行請求 return currentTime > circuitOpenTime + sleepWindowTime; } @Override public boolean attemptExecution() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { if (isAfterSleepWindow()) { if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { return true; } else { return false; } } else { return false; } } } }