摘要: 原創出處 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!html
本文主要基於 Hystrix 1.5.X 版本java
本文主要分享 斷路器 HystrixCircuitBreaker。git
HystrixCircuitBreaker 有三種狀態 :github
CLOSED
:關閉OPEN
:打開HALF_OPEN
:半開其中,斷路器處於 OPEN
狀態時,鏈路處於非健康狀態,命令執行時,直接調用回退邏輯,跳過正常邏輯。架構
HystrixCircuitBreaker 狀態變遷以下圖 :併發
紅線 :初始時,斷路器處於 CLOSED
狀態,鏈路處於健康狀態。當知足以下條件,斷路器從 CLOSED
變成 OPEN
狀態:app
HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms
)內,總請求數超過必定量( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
) 。HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%
) 。綠線 :斷路器處於 OPEN
狀態,命令執行時,若當前時間超過斷路器開啓時間必定時間( HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds = 5000 ms
),斷路器變成 HALF_OPEN
狀態,嘗試調用正常邏輯,根據執行是否成功,打開或關閉熔斷器【藍線】。ide
推薦 Spring Cloud 書籍:微服務
com.netflix.hystrix.HystrixCircuitBreaker
,Hystrix 斷路器接口。定義接口以下代碼 :ui
public interface HystrixCircuitBreaker {
/** * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does * not modify any internal state, and takes into account the half-open logic which allows some requests through * after the circuit has been opened * * @return boolean whether a request should be permitted */
boolean allowRequest();
/** * Whether the circuit is currently open (tripped). * * @return boolean state of circuit breaker */
boolean isOpen();
/** * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. */
void markSuccess();
/** * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. */
void markNonSuccess();
/** * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal * state. */
boolean attemptExecution();
}
複製代碼
#allowRequest()
和 #attemptExecution()
方法,方法目的基本相似,差異在於當斷路器知足嘗試關閉條件時,前者不會將斷路器不會修改狀態( CLOSE => HALF-OPEN
),然後者會。HystrixCircuitBreaker 有兩個子類實現 :
在 AbstractCommand 建立時,初始化 HystrixCircuitBreaker ,代碼以下 :
/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
/** * 斷路器 */
protected final HystrixCircuitBreaker circuitBreaker;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略無關代碼
// 初始化 斷路器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// ... 省略無關代碼
}
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// get the default implementation of HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
}
複製代碼
HystrixCommandProperties.circuitBreakerEnabled = true
時,即斷路器功能開啓,使用 Factory 得到 HystrixCircuitBreakerImpl 對象。在 「3. HystrixCircuitBreaker.Factory」 詳細解析。HystrixCommandProperties.circuitBreakerEnabled = false
時,即斷路器功能關閉,建立 NoOpCircuitBreaker 對象。另外,NoOpCircuitBreaker 代碼簡單到腦殘,點擊 連接 查看實現。com.netflix.hystrix.HystrixCircuitBreaker.Factory
,HystrixCircuitBreaker 工廠,主要用於:
建立 HystrixCircuitBreaker 對象,目前只建立 HystrixCircuitBreakerImpl 。
HystrixCircuitBreaker 容器,基於 HystrixCommandKey 維護了 HystrixCircuitBreaker 單例對象 的映射。代碼以下 :
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
複製代碼
總體代碼灰常清晰,點擊 連接 查看代碼。
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl
,完整的斷路器實現。
咱們來逐個方法看看 HystrixCircuitBreakerImpl 的具體實現。
構造方法,代碼以下 :
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
enum Status {
CLOSED, OPEN, HALF_OPEN
}
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
}
複製代碼
status
屬性,斷路器的狀態。circuitOpened
屬性,斷路器打開,即狀態變成 OPEN
的時間。activeSubscription
屬性,基於 Hystrix Metrics 對請求量統計 Observable 的訂閱,在 「4.2 #subscribeToStream()」 詳細解析。#subscribeToStream()
方法,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。代碼以下 :
private Subscription subscribeToStream() {
1: private Subscription subscribeToStream() {
2: /* 3: * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream 4: */
5: return metrics.getHealthCountsStream()
6: .observe()
7: .subscribe(new Subscriber<HealthCounts>() {
8: @Override
9: public void onCompleted() {
10:
11: }
12:
13: @Override
14: public void onError(Throwable e) {
15:
16: }
17:
18: @Override
19: public void onNext(HealthCounts hc) {
20: System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用於調試
21: // check if we are past the statisticalWindowVolumeThreshold
22: if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
23: // we are not past the minimum volume threshold for the stat window,
24: // so no change to circuit status.
25: // if it was CLOSED, it stays CLOSED
26: // if it was half-open, we need to wait for a successful command execution
27: // if it was open, we need to wait for sleep window to elapse
28: } else {
29: if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
30: //we are not past the minimum error threshold for the stat window,
31: // so no change to circuit status.
32: // if it was CLOSED, it stays CLOSED
33: // if it was half-open, we need to wait for a successful command execution
34: // if it was open, we need to wait for sleep window to elapse
35: } else {
36: // our failure rate is too high, we need to set the state to OPEN
37: if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
38: circuitOpened.set(System.currentTimeMillis());
39: }
40: }
41: }
42: }
43: });
44: }
複製代碼
第 5 至 7 行 :向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。這裏的 Observable 基於 RxJava Window 操做符。
FROM 《ReactiveX文檔中文翻譯》「Window」
按期未來自原始 Observable 的數據分解爲一個 Observable 窗口,發射這些窗口,而不是每次發射一項數據
#onNext()
方法將不斷被調用,每次計算斷路器的狀態。第 22 行 :判斷週期( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms
)內,總請求數超過必定量( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
) 。
00:00
內發起了 N 個請求,00:11
不計算這 N 個請求。第 29 行 :錯誤請求佔總請求數超過必定比例( 可配,HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%
) 。
第 37 至 39 行 :知足斷路器打開條件,CAS 修改狀態( CLOSED => OPEN
),並設置打開時間( circuitOpened
) 。
【補充】第 5 至 7 行 :😈 怕寫在上面,你們有壓力。Hystrix Metrics 對請求量統計 Observable 使用了兩種 RxJava Window 操做符 :
Observable#window(timespan, unit)
方法,固定週期( 可配,HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds = 500 ms
),發射 Observable 窗口。點擊 BucketedCounterStream 構造方法 查看調用處的代碼。Observable#window(count, skip)
方法,每發射一次(skip
) Observable 忽略 count
( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
) 個數據項。爲何?答案在第 22 行的代碼,週期內達到必定請求量是斷路器打開的一個條件。點擊 BucketedRollingCounterStream 構造方法 查看調用處的代碼。目前該方法有兩處調用 :
CLOSE
)。以下是 AbstractCommand#applyHystrixSemantics(_cmd)
方法,對 HystrixCircuitBreakerImpl#attemptExecution
方法的調用的代碼 :
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// ... 省略無關代碼
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
// 執行【正常邏輯】
} else {
// 執行【回退邏輯】
}
}
複製代碼
HystrixCircuitBreakerImpl#attemptExecution
方法,判斷是否能夠執行正常邏輯。#attemptExecution
方法,代碼以下 :
1: @Override
2: public boolean attemptExecution() {
3: // 強制 打開
4: if (properties.circuitBreakerForceOpen().get()) {
5: return false;
6: }
7: // 強制 關閉
8: if (properties.circuitBreakerForceClosed().get()) {
9: return true;
10: }
11: // 打開時間爲空
12: if (circuitOpened.get() == -1) {
13: return true;
14: } else {
15: // 知足間隔嘗試斷路器時間
16: if (isAfterSleepWindow()) {
17: //only the first request after sleep window should execute
18: //if the executing command succeeds, the status will transition to CLOSED
19: //if the executing command fails, the status will transition to OPEN
20: //if the executing command gets unsubscribed, the status will transition to OPEN
21: if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
22: return true;
23: } else {
24: return false;
25: }
26: } else {
27: return false;
28: }
29: }
30: }
複製代碼
HystrixCommandProperties.circuitBreakerForceOpen = true
( 默認值 :false
) 時,即斷路器強制打開,返回 false
。當該配置接入配置中心後,能夠動態實現打開熔斷。爲何會有該配置?當 HystrixCircuitBreaker 建立完成後,沒法動態切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,經過該配置以實現相似效果。HystrixCommandProperties.circuitBreakerForceClose = true
( 默認值 :false
) 時,即斷路器強制關閉,返回 true
。當該配置接入配置中心後,能夠動態實現關閉熔斷。爲何會有該配置?當 HystrixCircuitBreaker 建立完成後,沒法動態切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,經過該配置以實現相似效果。circuitOpened
) 爲"空",返回 true
。#isAfterSleepWindow()
方法,判斷是否知足嘗試調用正常邏輯的間隔時間。當知足,使用 CAS 方式修改斷路器狀態( OPEN => HALF_OPEN
),從而保證有且僅有一個線程可以嘗試調用正常邏輯。#isAfterSleepWindow()
方法,代碼以下 :
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
複製代碼
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds
( 默認值,5000 ms
),返回 true
。當嘗試調用正常邏輯成功時,調用 #markSuccess()
方法,關閉斷路器。代碼以下 :
1: @Override
2: public void markSuccess() {
3: if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
4: // 清空 Hystrix Metrics 對請求量統計 Observable 的**統計信息**
5: //This thread wins the race to close the circuit - it resets the stream to start it over from 0
6: metrics.resetStream();
7: // 取消原有訂閱
8: Subscription previousSubscription = activeSubscription.get();
9: if (previousSubscription != null) {
10: previousSubscription.unsubscribe();
11: }
12: // 發起新的訂閱
13: Subscription newSubscription = subscribeToStream();
14: activeSubscription.set(newSubscription);
15: // 設置斷路器打開時間爲空
16: circuitOpened.set(-1L);
17: }
18: }
複製代碼
HALF_OPEN => CLOSED
)。以下兩處調用了 #markNonSuccess()
方法 :
當嘗試調用正常邏輯失敗時,調用 #markNonSuccess()
方法,從新打開斷路器。代碼以下 :
1: @Override
2: public void markNonSuccess() {
3: if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
4: //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
5: circuitOpened.set(System.currentTimeMillis());
6: }
7: }
複製代碼
HALF_OPEN => OPEN
)。#attemptExecution()
過一段時間,能夠再次嘗試執行正常邏輯。以下兩處調用了 #markNonSuccess()
方法 :
#allowRequest()
和 #attemptExecution()
方法,方法目的基本相似,差異在於當斷路器知足嘗試關閉條件時,前者不會將斷路器不會修改狀態( CLOSE => HALF-OPEN
),然後者會。點擊 連接 查看代碼實現。
#isOpen()
方法,比較簡單,點擊 連接 查看代碼實現。
呼呼,相對比較乾淨的一篇文章,知足。
胖友,分享一波朋友圈可好!