專欄系列文章:SpringCloud系列專欄java
系列文章:web
SpringCloud 源碼系列(1)— 註冊中心Eureka 之 啓動初始化編程
SpringCloud 源碼系列(2)— 註冊中心Eureka 之 服務註冊、續約緩存
SpringCloud 源碼系列(3)— 註冊中心Eureka 之 抓取註冊表markdown
SpringCloud 源碼系列(4)— 註冊中心Eureka 之 服務下線、故障、自我保護機制併發
SpringCloud 源碼系列(5)— 註冊中心Eureka 之 EurekaServer集羣app
SpringCloud 源碼系列(6)— 註冊中心Eureka 之 總結篇負載均衡
SpringCloud 源碼系列(7)— 負載均衡Ribbon 之 RestTemplate異步
SpringCloud 源碼系列(8)— 負載均衡Ribbon 之 核心原理ide
SpringCloud 源碼系列(9)— 負載均衡Ribbon 之 核心組件與配置
SpringCloud 源碼系列(10)— 負載均衡Ribbon 之 HTTP客戶端組件
SpringCloud 源碼系列(11)— 負載均衡Ribbon 之 重試與總結篇
SpringCloud 源碼系列(12)— 服務調用Feign 之 基礎使用篇
SpringCloud 源碼系列(13)— 服務調用Feign 之 掃描@FeignClient註解接口
SpringCloud 源碼系列(14)— 服務調用Feign 之 構建@FeignClient接口動態代理
SpringCloud 源碼系列(15)— 服務調用Feign 之 結合Ribbon進行負載均衡請求
SpringCloud 源碼系列(16)— 熔斷器Hystrix 之 基礎入門篇
這一章咱們從 HystrixCommand 的構造以及 execute() 執行爲入口,一步步分析下 Hystrix 如何封裝業務邏輯、線程池隔離模式、熔斷降級的核心原理。
須要注意的是,Hystrix 源碼大量使用了 rxjava 響應式編程,源碼中充斥着大量的回調,以及 Observable 層層嵌套,源碼運行流程並非線性的,所以在分析源碼的過程當中,我會只展現核心的一些源碼,便於咱們梳理出 Hystrix 的設計便可。
HystrixCommand 是一個抽象類,又繼承自抽象類 AbstractCommand,核心的邏輯都在 AbstractCommand
中,HystrixCommand 相對來講比較簡單,主要就是重載了幾個方法,因此咱們先看下 HystrixCommand 組件的結構。
看 HystrixCommand 的類結構圖,注意看左邊的標識,能夠得出以下信息:
run()
方法是一個抽象方法,須要子類實現,也就是咱們的業務邏輯都封裝到 run() 方法中execute()
、queue()
是 public 的方法,用於執行命令getExecutionObservable()
、getFallbackObservable()
是 final 修飾的,不可被重載,見名知意,getExecutionObservable() 是父類用於獲取執行命令 Observable 的,getFallbackObservable() 是父類用於獲取回調方法 Observable 的。getFallback()
、getFallbackMethodName()
都是 protected 的,能夠被子類重載。getExecutionObservable()
是實現的父類 AbstractCommand 的抽象方法,經過名稱或源碼能夠知道,這個方法就是獲取 run() 方法訂閱對象 Observable 的,調用命令時最終是必定要走到這個方法的。
@Override
final protected Observable<R> getExecutionObservable() {
// defer:對象被訂閱時才觸發 call() 方法的調用
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 訂閱此 Observable 對象將觸發 run() 方法的執行
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// 訂閱時將當前線程保存起來
executionThread.set(Thread.currentThread());
}
});
}
複製代碼
一樣的,getFallbackObservable()
則是用於獲取回調方法的訂閱對象 Observable。
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 返回一個 執行回調方法 的訂閱對象
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
複製代碼
HystrixCommand 一共有四個方法能夠調用來執行命令,分別是:execute()、queue()、observe()、toObservable()
,看源碼能夠了解到,前面三個方法最終都是調用 toObservable() 方法來實現的。observe()、toObservable() 都是返回一個 Observable 對象,調用 .toBlocking()
方法就會觸發訂閱對象的執行,而 toFuture()
返回的 Future 就能夠異步執行,再調用 Future 的 get()
方法就能夠同步阻塞等待執行結果,所以最終就實現了四個方法的不一樣特性。
public R execute() {
return queue().get();
}
public Future<R> queue() {
// 全部的調用最終都到了 toObservable()
final Future<R> delegate = toObservable().toBlocking().toFuture();
// 代理,在 delegate 執行異常後作一些處理
final Future<R> f = new Future<R>() {
//...
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
//...
};
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
//....
}
}
return f;
}
複製代碼
HystrixCommand 的構造初始化最終都是在父類 AbstractCommand 中參數最齊全的這個構造方法,能夠看到這個構造方法有不少參數,但只有第一個參數 HystrixCommandGroupKey 是必輸的,即分組名稱,其他的參數爲空時將使用默認值。
HystrixCommand 的初始化過程比較簡單,主要就是初始化了 command 一些配置以及組件。
HystrixCommandProperties
這個類。hystrix-{groupKey}-{number}
。// 除了 group 參數必須外,其它都是非必須參數
protected AbstractCommand(HystrixCommandGroupKey group, // 組名 HystrixCommandKey key, // 命令名 HystrixThreadPoolKey threadPoolKey, // 線程池名 HystrixCircuitBreaker circuitBreaker, // 斷路器 HystrixThreadPool threadPool, // Hystrix 線程池 HystrixCommandProperties.Setter commandPropertiesDefaults, // 配置 HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, // 線程池配置 HystrixCommandMetrics metrics, // 度量器 TryableSemaphore fallbackSemaphore, // 信號量回調 TryableSemaphore executionSemaphore, // 信號量限流器 HystrixPropertiesStrategy propertiesStrategy, // Hystrix配置策略組件 HystrixCommandExecutionHook executionHook) { // Hook 跟蹤 Hystrix 命令執行
// 命令分組
this.commandGroup = initGroupKey(group);
// key 爲 null 時取類名:getClass().getSimpleName()
this.commandKey = initCommandKey(key, getClass());
// Command 配置,默認爲 commandPropertiesDefaults
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
// 先使用配置的 poolKeyOverride,不然 threadPoolKey 爲空 則使用 groupKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
// 度量統計組件:HystrixCommandMetrics
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
// 斷路器:HystrixCircuitBreaker
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// Hystrix線程池:HystrixThreadPool
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
// 時間通知器:HystrixEventNotifier
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
// 併發策略:HystrixConcurrencyStrategy
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
// Hook:HystrixCommandExecutionHook
this.executionHook = initExecutionHook(executionHook);
// 請求緩存:HystrixRequestCache
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
// 請求日誌記錄:HystrixRequestLog,默認爲 null
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
// 信號量回調覆蓋
this.fallbackSemaphoreOverride = fallbackSemaphore;
// 信號量覆蓋
this.executionSemaphoreOverride = executionSemaphore;
}
複製代碼
以斷路器 HystrixCircuitBreaker 的初始化爲例,看看這些組件是如何初始化的。
初始化步驟基本都是相似的,若是 AbstractCommand 構造參數傳入的組件爲 null,就會初始化默認組件。每一個組件會有一個內部類 Factory
,Factory 提供了一個 getInstance
方法來獲取組件。Factory 會用一個 ConcurrentHashMap 來緩存不一樣的 command 對應的組件,避免重複建立,getInstance() 在獲取組件時,先從本地緩存中獲取,不存在則建立默認的組件,並放入本地緩存中。
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// 從 Factory 中獲取默認的 斷路器
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
// 禁用斷路器就返回什麼都不操做的實現類
return new NoOpCircuitBreaker();
}
}
複製代碼
HystrixCircuitBreaker.Factory:
public static class Factory {
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// 先從本地緩存中找是否已經建立了同名的斷路器組件
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
// 尚未建立過就建立一個默認的
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
return circuitBreakersByCommand.get(key.name());
} else {
return cbForCommand;
}
}
}
複製代碼
AbstractCommand 的 toObservable()
方法返回命令執行的最終訂閱對象,但它內部對 run()
方法的執行封裝可謂是層層嵌套,很是複雜,所以咱們抓大放小,下面先從總體上來看看 toObservable 返回訂閱對象 Observable 的流程,一直到最後找到是在哪裏執行 run()
方法的。
大體流程以下:
定義命令執行結束後的回調動做 Action0 => terminateCommandCleanup。
定義命令取消執行後的回調動做 Action0 => unsubscribeCommandCleanup。
定義應用 hystrix 的核心語意 Func0 => applyHystrixSemantics,之因此展現這個回調方法的實現,是由於這個回調會去封裝 run() 方法。
定義一個轉換Hook Func1 => wrapWithAllOnNextHooks。
定義Hook完成後的回調 => fireOnCompletedHook。
最後一步纔是在建立 Observable 訂閱對象,看下這個訂閱對象主要作什麼:
applyHystrixSemantics
返回的訂閱對象toObservable() 總結一下,其實最核心的就兩個地方:
applyHystrixSemantics
纔是封裝核心業務邏輯的地方;public Observable<R> toObservable() {
// _cmd => 當前命令對象
final AbstractCommand<R> _cmd = this;
// 命令執行結束後的一些動做
final Action0 terminateCommandCleanup = new Action0() {...};
// 命令取消執行的一些動做
final Action0 unsubscribeCommandCleanup = new Action0() {...};
// 應用 Hystrix 的核心語意部分,hystrix 執行入口
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
// 執行 hystrix 請求
return applyHystrixSemantics(_cmd);
}
};
// 對原始命令作一些轉換
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {...};
// 執行成功後的動做
final Action0 fireOnCompletedHook = new Action0() {...};
// defer: 不會當即執行,調用 toBlocking() 後才執行 call() 方法
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 設置狀態爲 OBSERVABLE_CHAIN_CREATED,若是初始狀態不是 NOT_STARTED,將拋出異常
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
throw new HystrixRuntimeException(...);
}
// 設置命令開始時間(注意是在真正訂閱時纔會設置)
commandStartTimestamp = System.currentTimeMillis();
// 是否開啓請求日誌,默認爲 currentRequestLog 爲 null
if (properties.requestLogEnabled().get() && currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
// 是否開啓請求緩存,且 cacheKey 不爲空
final boolean requestCacheEnabled = isRequestCachingEnabled();
// 要開啓請求緩存須要重載 getCacheKey() 方法
final String cacheKey = getCacheKey();
// 首先從緩存中取
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
// 直接返回緩存中的數據
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 訂閱 applyHystrixSemantics 返回的訂閱對象
Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 開啓請求緩存的狀況下,再次訂閱 hystrixObservable,在執行結束後將請求結果緩存起來
if (requestCacheEnabled && cacheKey != null) {
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
// 訂閱對象放入緩存中
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
// 緩存已存在則取消訂閱,返回緩存中的內容
if (fromCache != null) {
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
afterCache = toCache.toObservable();
}
}
// 未開啓緩存就是本來的 hystrixObservable
else {
afterCache = hystrixObservable;
}
// 返回訂閱對象
return afterCache
.doOnTerminate(terminateCommandCleanup) // 命令結束後執行
.doOnUnsubscribe(unsubscribeCommandCleanup) // 命令取消後執行
.doOnCompleted(fireOnCompletedHook); // 命令完成後執行
}
});
}
複製代碼
前面看到 applyHystrixSemantics(_cmd)
是封裝 Hystrix 命令的語意,但看下來發現,其實還沒看到 run() 的封裝,下面看下它主要作了哪些事情。
首先發出一個Hook,告知命令開始執行了。
而後用斷路器判斷是否容許請求,若是斷路器拒絕了,好比斷路器狀態爲打開狀態,就直接走降級。
若是斷路器容許請求,獲取一個信號量 TryableSemaphore,若是是信號量模式返回的是 TryableSemaphoreActual;線程池模式返回的是 TryableSemaphoreNoOp,什麼都不作,直接放行。
定義了在請求結束後是否信號量許可證的動做 Action0 => singleSemaphoreRelease。
定義了拋出異常後發出通知的動做 Action1 => markExceptionThrown。
以後獲取信號量許可證,獲取失敗就會進入信號量拒絕降級
獲取到信號量許可證後,就設置開始執行的時間
最後,經過 executeCommandAndObserve(_cmd)
方法再次訂閱,並設置了錯誤回調、結束回調、取消執行的回調
總結一下,applyHystrixSemantics(_cmd)
最核心的應該就是應用斷路器
或信號量
來對請求進行限流。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 命令開始執行
executionHook.onStart(_cmd);
// 斷路器是否容許請求
if (circuitBreaker.allowRequest()) {
// 獲取 Semaphore,信號量模式返回 TryableSemaphoreActual,線程池模式返回 TryableSemaphoreNoOp
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
// 請求結束後釋放 信號量許可證
final Action0 singleSemaphoreRelease = new Action0() {...};
// 拋出異常後發出通知
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {...};
// 獲取 信號量許可證,線程池模式始終返回 true
if (executionSemaphore.tryAcquire()) {
try {
// 設置執行開始時間,這應該纔是命令真正開始執行的時間
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 執行命令並訂閱
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown) // 錯誤後執行
.doOnTerminate(singleSemaphoreRelease) // 結束後執行
.doOnUnsubscribe(singleSemaphoreRelease); // 取消執行後執行
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 信號量拒絕 => 降級
return handleSemaphoreRejectionViaFallback();
}
} else {
// 短路 => 降級
return handleShortCircuitViaFallback();
}
}
複製代碼
applyHystrixSemantics(_cmd)
中又調用了 executeCommandAndObserve(_cmd)
獲取訂閱對象,這個方法總體看下來也比較簡單,下面來看下。
首先也是建立了幾個回調對象
核心的是在最後幾步,調用 executeCommandWithSpecifiedIsolation(_cmd)
獲取一個訂閱對象,看方法名稱應該就是應用Hystrix的隔離策略。
若是啓用了執行超時,訂閱對象還會增長一個超時處理器 HystrixObservableTimeoutOperator
,進去能夠發現,這個處理器建立了一個 TimerListener
去更改 isCommandTimedOut
的狀態爲超時,這塊就和前面對應上了。超時相關的咱們後面再分析。
仍是總結下,executeCommandAndObserve(_cmd)
最核心的應該就是,若是啓用了超時,就給訂閱對象增長一個超時處理器來響應超時。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 標記命令已經執行
final Action1<R> markEmits = new Action1<R>() {...};
// 標記命令執行結束
final Action0 markOnCompleted = new Action0() {...};
// 處理回調
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {...};
// 設置當前線程
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {...};
Observable<R> execution;
// 建立 run() 方法的 Observable,根據配置的隔離策略繼續封裝,線程池隔離模式會放到線程池中去調度,信號量模式則直接返回
if (properties.executionTimeoutEnabled().get()) {
// 超時會由 HystrixObservableTimeoutOperator 處理,拋出 HystrixTimeoutException 超時異常
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 設置訂閱回調
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
複製代碼
接着來看 executeCommandWithSpecifiedIsolation(_cmd)
方法,看方法名稱就知道是將 command 放到配置的隔離策略下去執行。能夠看到方法內首先一個 if...else...分紅線程池隔離和信號量隔離。
線程池隔離模式:
首先將命令的狀態由 OBSERVABLE_CHAIN_CREATED 更改成 USER_CODE_EXECUTED,表示用戶代碼開始執行。
經過 isCommandTimedOut
判斷命令是否超時,超時就拋出異常,還未執行就超時。這種狀況是有可能發生的,好比線程池已經滿了,command 在隊列中等待,等待的過程當中超時了。先記住 isCommandTimedOut
這個東西,它是在其它地方進行設置的。
Hystrix.startCurrentThreadExecutingCommand(getCommandKey())
看起來是開始執行命令,但它內部其實只是將 HystrixCommandKey 放到了一個棧的棧頂,返回的 endCurrentThreadExecutingCommand 則是在命令執行結束後 將 HystrixCommandKey 從棧頂彈出。目前還不清楚有什麼做用。
經過 getUserExecutionObservable(_cmd)
獲取用戶執行的訂閱對象,這個方法纔是最終封裝 run() 方法返回的訂閱對象。
線程池隔離和信號量隔離最大的區別在最後一步,線程池隔離會有個訂閱 subscribeOn(Scheduler scheduler)
,這個 scheduler 是調用 threadPool.getScheduler(Func0 func)
獲取的一個 rx.Scheduler
對象,實際類型是 HystrixContextScheduler
。能夠猜想一下就是將 call() 返回的 Observable 對象扔到 Scheduler 裏進行異步調度,因此這裏將是線程池隔離
的入口。
信號量隔離相對來講就簡單不少,最後一步一樣經過 getUserExecutionObservable(_cmd)
獲取 run() 方法的訂閱對象。
總結一下,這個方法最核心的就是返回 run()
的訂閱對象,並根據命令的隔離策略進行資源隔離。線程池隔離時會訂閱到一個 Scheculer 中進行調度執行,能夠猜測到內部最終會扔到一個線程池中去執行。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 線程池隔離
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 狀態由 OBSERVABLE_CHAIN_CREATED 更改成 USER_CODE_EXECUTED,表示 run() 方法的代碼開始執行
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException(...));
}
// 統計,命令開始執行
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// 判斷命令是否執行超時,isCommandTimedOut 在 TimerListener 中更新
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// 增長執行線程數
HystrixCounters.incrementGlobalConcurrentThreads();
// 統計線程開始執行
threadPool.markThreadExecution();
// 將命令Key push 到一個棧的棧頂
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
// 發出一些Hook通知...
// 獲取 run() 的 Observable
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnTerminate(new Action0() {
// 命令執行結束後更新線程狀態...
}).doOnUnsubscribe(new Action0() {
// 命令取消執行後更新線程狀態...
})
// 將 defer 返回的 Observable 放到一個調度器中異步執行,調度器 ==> HystrixContextScheduler
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
// 判斷是否中斷線程執行,超時後中斷執行
return properties.executionIsolationThreadInterruptOnTimeout().get() &&
_cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 信號量隔離
else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 狀態由 OBSERVABLE_CHAIN_CREATED 更改成 USER_CODE_EXECUTED,表示 run() 方法的代碼開始執行
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException(...));
}
// 統計:信號量默認命令開始執行
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// 將命令Key push 到一個棧的棧頂
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
// 發出一些Hook通知...
// 獲取 run() 的 Observable
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
}
複製代碼
最終,終於找到封裝 run() 方法的地方了,getUserExecutionObservable(_cmd)
方法比較簡單,就是調用子類 HystrixCommand 實現的 getExecutionObservable()
來獲得執行 run() 的訂閱對象,也就是咱們自定義的業務代碼。
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
// 封裝了重載的 run() 方法
userObservable = getExecutionObservable();
} catch (Throwable ex) {
userObservable = Observable.error(ex);
}
return userObservable
// Hystrix 執行回調處理
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
複製代碼
HystrixCommand
中的 getExecutionObservable() 方法:
final protected Observable<R> getExecutionObservable() {
// defer:對象被訂閱時才觸發 call() 方法的調用
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 訂閱此 Observable 對象將觸發 run() 方法的執行
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
})
}
複製代碼
至此,獲取 run()
業務邏輯訂閱對象的整個流程基本就分析清楚了,toObservable()
是獲取訂閱對象的入口,爲了加上 hystrix 的各類特性,又嵌了多層建立了最終的 Observable 對象,基本上是每一個子方法就爲訂閱對象增長一個特性。
toObservable()
:獲取訂閱對象的入口。applyHystrixSemantics(_cmd)
:應用 Hystrix 斷路器或信號量,斷路器打開或沒法獲取信號量就直接拒絕走降級。executeCommandAndObserve(_cmd)
:若是啓用了超時,爲訂閱對象增長超時處理器、executeCommandWithSpecifiedIsolation
:根據配置的隔離策略,返回不一樣的訂閱對象;線程池隔離就會將訂閱對象扔到一個 HystrixContextScheduler
中去調度執行。getUserExecutionObservable(_cmd)
:返回真正封裝了 run() 業務邏輯的訂閱對象。