專欄系列文章:SpringCloud系列專欄java
系列文章:web
SpringCloud 源碼系列(1)— 註冊中心Eureka 之 啓動初始化安全
SpringCloud 源碼系列(2)— 註冊中心Eureka 之 服務註冊、續約markdown
SpringCloud 源碼系列(3)— 註冊中心Eureka 之 抓取註冊表併發
SpringCloud 源碼系列(4)— 註冊中心Eureka 之 服務下線、故障、自我保護機制app
SpringCloud 源碼系列(5)— 註冊中心Eureka 之 EurekaServer集羣負載均衡
SpringCloud 源碼系列(6)— 註冊中心Eureka 之 總結篇異步
SpringCloud 源碼系列(7)— 負載均衡Ribbon 之 RestTemplateide
SpringCloud 源碼系列(8)— 負載均衡Ribbon 之 核心原理函數
SpringCloud 源碼系列(9)— 負載均衡Ribbon 之 核心組件與配置
SpringCloud 源碼系列(10)— 負載均衡Ribbon 之 HTTP客戶端組件
SpringCloud 源碼系列(11)— 負載均衡Ribbon 之 重試與總結篇
SpringCloud 源碼系列(12)— 服務調用Feign 之 基礎使用篇
SpringCloud 源碼系列(13)— 服務調用Feign 之 掃描@FeignClient註解接口
SpringCloud 源碼系列(14)— 服務調用Feign 之 構建@FeignClient接口動態代理
SpringCloud 源碼系列(15)— 服務調用Feign 之 結合Ribbon進行負載均衡請求
SpringCloud 源碼系列(16)— 熔斷器Hystrix 之 基礎入門篇
SpringCloud 源碼系列(17)— 熔斷器Hystrix 之 獲取執行訂閱對象Observable
在構造 HystrixCommand 時,會去初始化 Hystrix 線程池 HystrixThreadPool
,跟進去能夠發現,初始化的邏輯就在默認實現類 HystrixThreadPoolDefault
的構造方法中,HystrixThreadPoolDefault 也是線程池調度的核心組件。
其中,ThreadPoolExecutor 線程池是由 concurrencyStrategy.getThreadPool(threadPoolKey, properties)
這段代碼建立的,queueSize
隊列大小默認是 -1,隊列 queue
是從 ThreadPoolExecutor 中獲取的,因此線程池的構造還得繼續看 concurrencyStrategy.getThreadPool。
class HystrixThreadPoolDefault implements HystrixThreadPool {
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final int queueSize;
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
// 線程池配置
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
// 併發策略
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
// 隊列大小,默認 -1
this.queueSize = properties.maxQueueSize().get();
// hystrix 線程池度量器
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 建立線程池
properties);
// 默認返回的是一個使用 SynchronousQueue 隊列的線程池,核心線程數和最大線程數都是默認值 10
this.threadPool = this.metrics.getThreadPool();
// SynchronousQueue
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
}
複製代碼
接着看 getThreadPool
方法,這個方法就是根據配置構建線程池 ThreadPoolExecutor 的,從這個方法咱們能夠獲得以下信息。
getThreadFactory
返回的 ThreadFactory 能夠看出,線程名稱的格式是:"hystrix-{threadPoolKey}-{number}"
,這跟咱們在日誌中看到的 hystrix 線程名稱是一致的。getBlockingQueue
方法獲取一個隊列,可是默認狀況下 maxQueueSize
爲 -1,那麼返回的隊列即是 SynchronousQueue
,這是一個無容量的隊列,就是說默認狀況下任務不會進入隊列,若是線程池線程滿了將直接拒絕任務。總結一下,默認狀況下,dynamicCoreSize
、dynamicMaximumSize
都是 10,maxQueueSize
等於 -1,allowMaximumSizeToDivergeFromCoreSize
默認爲 false。
那麼默認狀況下建立的 ThreadPoolExecutor 就是備以下特性:
SynchronousQueue
隊列。"hystrix-{threadPoolKey}-{number}"
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
// 名稱格式:"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
// 是否容許核心線程數擴大到最大線程數,默認false
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
// 核心線程數
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
// 線程存活時間
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
// 最大隊列數 maxQueueSize 默認爲 -1
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
// maxQueueSize <= 0 ==> SynchronousQueue
// maxQueueSize > 0 ==> LinkedBlockingQueue(maxQueueSize)
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
// 容許擴展到最大線程數
if (allowMaximumSizeToDivergeFromCoreSize) {
// 最大線程數
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
// 不容許擴展到最大線程數,核心線程數 == 最大線程數。默認會進入這裏
else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
}
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
// maxQueueSize 默認爲 -1
if (maxQueueSize <= 0) {
return new SynchronousQueue<Runnable>();
} else {
return new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
}
複製代碼
上一篇文章中咱們已經分析出 Observable 訂閱對象被調度應該是在 executeCommandWithSpecifiedIsolation(_cmd)
方法中的,入口就是 subscribeOn(Scheculer scheduler)
這個訂閱。這個 scheduler 是經過上一節中初始化的 Hystrix線程池 HystrixThreadPool
來獲取的,threadPool.getScheduler(Func0 func)
返回的是一個 HystrixContextScheduler
調度器。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 線程池隔離
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//...
// 獲取 run() 的 Observable
return getUserExecutionObservable(_cmd);
}
})
.doOnTerminate(new Action0() {...})
.doOnUnsubscribe(new Action0() {...})
// 將 defer 返回的 Observable 放到一個調度器中異步執行,調度器 ==> HystrixContextScheduler
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
// 判斷是否中斷線程執行,超時後中斷執行
return properties.executionIsolationThreadInterruptOnTimeout().get() &&
_cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 信號量隔離
else {//...}
}
複製代碼
接着看如何建立調度器的。
經過 getScheduler
方法能夠了解到,首先會經過 touchConfig()
更新線程池 ThreadPoolExecutor
的配置,這說明咱們是能夠在運行時動態修改線程池的參數的。可是會發現,只能修改線程池的 corePoolSize、maximumPoolSize、keepAliveTime
三個參數,因此是沒法動態擴容線程池隊列的。
以後建立了 HystrixContextScheduler
,進入構造方法能夠看到,內部又建立了一個調度器 actualScheduler
,實際的類型是 ThreadPoolScheduler
,能夠猜測最終的調度任務應該是在 ThreadPoolScheduler 中的。
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 動態更改線程池配置
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
private void touchConfig() {
final int dynamicCoreSize = properties.coreSize().get();
final int configuredMaximumSize = properties.maximumSize().get();
int dynamicMaximumSize = properties.actualMaximumSize();
final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
boolean maxTooLow = false;
if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
dynamicMaximumSize = dynamicCoreSize;
maxTooLow = true;
}
if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
threadPool.setCorePoolSize(dynamicCoreSize);
threadPool.setMaximumPoolSize(dynamicMaximumSize);
}
threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
// actualScheduler => rx.Scheduler => ThreadPoolScheduler
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
複製代碼
下面是線程池調度的流程源碼,建立調度器和調度執行的過程看起來就像是層層嵌套代理。
HystrixContextScheduler
,但實際的調度器是 ThreadPoolScheduler
。createWorker()
建立 Hystrix上下文調度工做者 HystrixContextSchedulerWorker
,但又由 ThreadPoolScheduler 建立了一個代理工做者 ThreadPoolWorker
。schedule
調度方法,這個調度方法最後又用 ThreadPoolWorker 來執行調度。綜上來看,真正的調度邏輯,其實就是在 HystrixContextSchedulerWorker 和 ThreadPoolWorker 的調度方法 schedule
。
HystrixContextScheduler ::
public Worker createWorker() {
// actualScheduler.createWorker() ==> ThreadPoolWorker
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
ThreadPoolScheduler ::
public Worker createWorker() {
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}
HystrixContextSchedulerWorker ::
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// 隊列滿了就直接拋出異常
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
// 調度 worker ==> ThreadPoolWorker
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
ThreadPoolWorker ::
public Subscription schedule(final Action0 action) {
// 封裝調度Action
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// 提交任務到線程池
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
// 中斷任務的訂閱 ==> shouldInterruptThread.call() 判斷是否超時中斷任務
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
複製代碼
爲了便於理解,將源碼抽象成以下的調度流程圖:
在 HystrixContextSchedulerWorker
的調度方法中,先調用 threadPool.isQueueSpaceAvailable()
判斷線程池隊列是否還有可用空間,若是沒有就會拋出拒絕異常,若是有就會用 ThreadPoolWorker
進行調度。
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// 隊列滿了就直接拋出異常
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
// 調度 worker ==> ThreadPoolWorker
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
複製代碼
再來看看 isQueueSpaceAvailable
的邏輯。
默認狀況下 queueSize 爲 -1,若是 queueSize <= 0,就表示有可用空間,但其實是一個 SynchronousQueue 無容量隊列。之因此返回 true,任務就會直接丟到線程池去執行,若是線程池工做線程滿了,就由線程池自己來拒絕任務,這一點從它的註釋也能夠了解到。
若是設置了隊列大小(配置 maxQueueSize),且隊列已使用的容量小於 queueSizeRejectionThreshold(默認爲5),才表示隊列有可用空間。這裏可能會感受有點疑惑,爲什麼不是判斷隊列是否還有剩餘容量?好比判斷 threadPool.getQueue().remainingCapacity() > 0
來肯定是否還有空間?
經過它的註釋能夠了解到,實際上是爲了實現動態擴容的目的,由於隊列的大小是不能動態修改的,但爲了能在運行時達到隊列動態擴容的目的,它用了另外一個配置 queueSizeRejectionThreshold
來控制進入隊列的數量。好比 maxQueueSize 配置 100,但 queueSizeRejectionThreshold 默認爲 5,因此此時隊列實際上最多隻會進入5個任務;運行時動態修改 queueSizeRejectionThreshold 爲 20,這個時候隊列最多就會進入20個任務了;所以在配置時 maxQueueSize 要大於 queueSizeRejectionThreshold
纔有意義。這種設計仍是值得借鑑的。
/** * Whether the threadpool queue has space available according to the <code>queueSizeRejectionThreshold</code> settings. * * Note that the <code>queueSize</code> is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. * The data structure is static, so this does not make sense as a dynamic lookup. * The <code>queueSizeRejectionThreshold</code> can be dynamic (up to <code>queueSize</code>), so that should * still get checked on each invocation. * <p> * If a SynchronousQueue implementation is used (<code>maxQueueSize</code> <= 0), it always returns 0 as the size so this would always return true. */
@Override
public boolean isQueueSpaceAvailable() {
// 隊列大小默認爲 -1
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead let the thread-pool reject or not
return true;
} else {
// 隊列已經使用的容量 超過了 隊列容量拒絕閾值,queueSizeRejectionThreshold 默認爲 5
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
複製代碼
最後,再來看 ThreadPoolWorker 的調度方法,在這個方法裏面,終於看到提交到線程池的代碼了,這一步就真正實現了基於線程池的隔離了。
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
// 線程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// 提交任務到線程池
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
// 中斷任務的訂閱 ==> shouldInterruptThread.call() 判斷是否超時中斷任務
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
複製代碼
schedule 方法最後,向提交到線程池的 ScheduledAction 添加了一個 FutureCompleterWithConfigurableInterrupt
訂閱對象,它會在取消訂閱的時候取消任務的執行,好比任務超時了,這個時候就會取消任務的執行。
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
@Override
public void unsubscribe() {
executor.remove(f);
// 判斷是否取消任務
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
}
複製代碼
前面分析到 executeCommandAndObserve(_cmd)
這個方法有以下這段代碼,executeCommandWithSpecifiedIsolation(_cmd)
返回的 Observable 主要是採起不一樣的隔離策略,而後,若是啓用了超時(默認啓用),會增長一個 HystrixObservableTimeoutOperator
操做器,看起來就是在控制超時相關的。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//...
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
// HystrixObservableTimeoutOperator 控制超時
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 設置訂閱回調
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
複製代碼
再繼續看 HystrixObservableTimeoutOperator
,它會返回一個 Subscriber
對原 Observable 進行一個處理。
HystrixContextRunnable
回調,就是拋出 HystrixTimeoutException 這個異常。TimerListener
時間監聽器,這個監聽器的間隔時間是 getIntervalTimeInMilliseconds()
返回的時間,就是命令執行超時時間。isCommandTimedOut
的狀態,若是超時後 isCommandTimedOut 仍是 NOT_EXECUTED
,就會更新到 TIMED_OUT
,並取消任務的執行,而後發出超時的異常。TimerListener
添加到了 HystrixTimer
中,因此時間控制的核心邏輯應該是在 HystrixTimer 中。isCommandTimedOut
對應的 TimedOutStatus 有三種狀態:NOT_EXECUTED, COMPLETED, TIMED_OUT
。
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
final AbstractCommand<R> originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
this.originalCommand = originalCommand;
}
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
child.add(s);
// 超時回調,拋出 HystrixTimeoutException 異常
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
// 超時監聽器
TimerListener listener = new TimerListener() {
// tick 在每次間隔時間會調用一次
@Override
public void tick() {
// 超時後更新狀態
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// 通知超時失敗
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// 中止本來的請求
s.unsubscribe();
// 拋出超時異常
timeoutRunnable.run();
}
}
// 返回間隔時間,默認就是命令執行超時時間
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
// 添加監聽器,開始計算超時
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
// 設置到原命令中
originalCommand.timeoutTimer.set(tl);
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
tl.clear(); // 清除 TimerListener
child.onCompleted();
}
}
//...
private boolean isNotTimedOut() {
// 任務執行完成,更新 isCommandTimedOut 狀態
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
s.add(parent);
return parent;
}
}
複製代碼
接着看 HystrixTimer 的 addTimerListener
方法,這個方法就比較好理解了,就是初始化任務調度器,而後調度 TimerListener 的執行,調度器的延遲執行時間和間隔週期都是 TimerListener 返回的命令超時時間。
總結一下 TimerListener 和 HystrixTimer 整合起來,其實就是檢測任務是否超時的,任務執行超時後就會拋出超時異常,而後取消任務的執行,後面應該就會進入降級的邏輯了。
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
// 初始化 executor 調度器:ScheduledThreadPoolExecutor
startThreadIfNeeded();
Runnable r = new Runnable() {
@Override
public void run() {
try {
// 觸發監聽器
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
// 超時時間默認1000毫秒,每隔1000毫秒調度一次
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r,
listener.getIntervalTimeInMilliseconds(), // 延遲多久執行
listener.getIntervalTimeInMilliseconds(), // 執行週期
TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
複製代碼
在 Hystrix 基礎篇中,咱們提到了 Hystrix 執行錯誤的六種類型,以下圖所示,除了 BAD_REQUEST,都會降級進入回調方法中。
在 AbstractCommand
中,能夠看到有以下6個處理錯誤的方法,分別針對 Hystrix 的6中錯誤類型。在分析 HystrixCommand 的執行流程中,咱們知道有不少的回調方法,其中錯誤回調確定就會調用下面其中的一個方法來處理錯誤,這個就不在看了。
須要注意的是,除了 handleBadRequestByEmittingError()
這個處理 BAD_REQUEST 錯誤的方法,其他5個方法最後都會調用 getFallbackOrThrowException
方法來獲取回調方法的Observable,這樣就合上圖中對應起來了。說明錯誤回調的封裝就是 getFallbackOrThrowException
。
// 信號量拒絕回調
private Observable<R> handleSemaphoreRejectionViaFallback() {
Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
executionResult = executionResult.setExecutionException(semaphoreRejectionException);
eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException);
}
// 短路回調
private Observable<R> handleShortCircuitViaFallback() {
eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
executionResult = executionResult.setExecutionException(shortCircuitException);
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
}
// 線程池拒絕回調
private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
threadPool.markThreadRejection();
return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION,
"could not be queued for execution", underlying);
}
// 超時回調
private Observable<R> handleTimeoutViaFallback() {
return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,
"timed-out", new TimeoutException());
}
// 失敗回調
private Observable<R> handleFailureViaFallback(Exception underlying) {
//...
eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
}
// BAD_REQUEST 處理
private Observable<R> handleBadRequestByEmittingError(Exception underlying) {
Exception toEmit = underlying;
//...
return Observable.error(toEmit);
}
複製代碼
接着來看回調的處理方法 getFallbackOrThrowException
,能夠獲得以下信息:
getFallbackSemaphore()
獲取的信號量,默認返回的實際類型是 TryableSemaphore
,限流數默認是 10。而後會經過這個信號量獲取一個許可證後纔去調用回調方法。getFallbackObservable()
獲取回調 Observable,若是用戶重載了 getFallback()
方法,就返回 getFallback() 的 Observable;若是沒有重載,將拋出 "No fallback available."
的異常。private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// 啓用回調
if (properties.fallbackEnabled().get()) {
// 設置當前線程
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {...};
// 回調通知
final Action1<R> markFallbackEmit = new Action1<R>() {...};
// 回調完成通知
final Action0 markFallbackCompleted = new Action0() {...};
// 回調錯誤後處理
final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {...};
// 回調信號量 => TryableSemaphore
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// 釋放信號量許可證
final Action0 singleSemaphoreRelease = new Action0() {...};
// 獲取回調 Observable
Observable<R> fallbackExecutionChain;
// 獲取信號量許可證
if (fallbackSemaphore.tryAcquire()) {
try {
// 用戶是否認義了回調方法,即重寫 getFallback()
if (isFallbackUserDefined()) {
executionHook.onFallbackStart(this);
fallbackExecutionChain = getFallbackObservable();
} else {
// 拋出異常:"No fallback available."
fallbackExecutionChain = getFallbackObservable();
}
} catch (Throwable ex) {
fallbackExecutionChain = Observable.error(ex);
}
// 執行回調
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} else {
return handleFallbackRejectionByEmittingError();
}
} else {
return handleFallbackDisabledByEmittingError(originalException, failureType, message);
}
}
複製代碼
下面用一張圖總結下Hystrix任務提交到線程池執行的流程。
在 AbstractCommand 構造方法中,初始化了斷路器 HystrixCircuitBreaker
, 若是啓用了斷路器(circuitBreakerEnabled
),就從 HystrixCircuitBreaker.Factory 中獲取一個 斷路器,默認實現類是 HystrixCircuitBreakerImpl
。若是未啓用斷路器,默認實現類則是 NoOpCircuitBreaker
,就是什麼都不控制的斷路器。
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker,
this.commandGroup, this.commandKey, this.properties, this.metrics);
複製代碼
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// 從 Factory 中獲取默認的 斷路器
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
// 禁用斷路器就返回什麼都不操做的實現類
return new NoOpCircuitBreaker();
}
}
複製代碼
HystrixCircuitBreakerImpl
的構造方法主要須要 HystrixCommandProperties
和 HystrixCommandMetrics
,可想而知,斷路器須要統計 hystrix 請求的錯誤、超時、拒絕、成功等次數,而後決定是否打開斷路器,那麼這些統計的數據來源就是 HystrixCommandMetrics
.
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
}
複製代碼
以前分析 applyHystrixSemantics(_cmd)
時,咱們看到了斷路器的使用,applyHystrixSemantics 是應用Hystrix的核心入口,會先經過斷路器判斷是否容許放行請求,斷路器不容許則會走斷路器拒絕降級的方法。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 斷路器是否容許請求
if (circuitBreaker.allowRequest()) {
//...
} else {
// 短路 => 降級
return handleShortCircuitViaFallback();
}
}
複製代碼
接着看 allowRequest()
方法:
public boolean allowRequest() {
// 斷路器手動強制打開
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 斷路器手動強制關閉
if (properties.circuitBreakerForceClosed().get()) {
// 調度一次,進行統計,決定是否打開斷路器
isOpen();
return true;
}
// 斷路器未打開 或者 運行打開
return !isOpen() || allowSingleTest();
}
複製代碼
isOpen()
方法判斷斷路器是否打開:
circuitOpen
是否爲 true,是則表示斷路器打開了。也就是說斷路器的狀態是由 circuitOpen
這個 AtomicBoolean 來控制的。HystrixCommandMetrics
中獲取統計的計數 HealthCounts
,後面的邏輯就是根據 HealthCounts 計算是否要打開斷路器。總的請求數 < 斷路器請求閾值(默認20)
就不會打開斷路器。錯誤百分比率 < 斷路器錯誤比率閾值(默認50%)
就不會打開斷路器。Hystrix 斷路器的設計很好的利用了原子類的特性,circuitOpen 的更新採用基於 CAS 指令的 compareAndSet
,高併發狀況下也能安全更新,更新成功再設置斷路器打開時間,更新失敗則不更新。
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
public boolean isOpen() {
// circuitOpen 是否打開
if (circuitOpen.get()) {
return true;
}
// 獲取請求計數
HealthCounts health = metrics.getHealthCounts();
// 小於斷路器請求的閾值,默認爲20,至少超過20個請求後纔會開啓斷路器
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
// 錯誤百分比
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 打開斷路器
if (circuitOpen.compareAndSet(false, true)) {
// 設置斷路器打開的時間
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
複製代碼
若是斷路器已經打開了,這時斷路器會休眠一段時間後,放一個請求去測試被調用方是否已經恢復,這就是半開狀態。
看 allowSingleTest()
這個方法,是否進入半開狀態的依據是:
OPEN -> HALF-OPEN
。public boolean allowSingleTest() {
// 斷路器打開時間
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 斷路器打開 且 當前時間 > (斷路器打開時間 + 斷路器休眠窗口時間(默認5000毫秒))
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// 斷路器打開時間設置爲當前時間
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
複製代碼
那斷路器打開了什麼時候恢復呢?跟蹤一下 circuitOpen
就知道了,HystrixCircuitBreaker
有個方法 markSuccess()
會去在斷路器打開時關閉斷路器。
public void markSuccess() {
if (circuitOpen.get()) {
// 關閉斷路器
if (circuitOpen.compareAndSet(true, false)) {
metrics.resetStream();
}
}
}
複製代碼
在 executeCommandAndObserve(_cmd)
這個方法中,能夠看到有兩個回調函數 markEmits
、markOnCompleted
,他們會在請求執行成功和完成後被回調執行,回調函數裏調用了 circuitBreaker.markSuccess();
來告訴斷路器請求成功了,而後斷路器就會進入關閉狀態,HALF -> CLOSE
。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 標記命令已經執行
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (commandIsScalar()) {
//...
// 告訴斷路器請求成功
circuitBreaker.markSuccess();
}
}
};
// 標記命令執行結束
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
//...
// 告訴斷路器請求成功
circuitBreaker.markSuccess();
}
}
};
// 處理回調
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {...};
// 設置當前線程
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {...};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
// 超時會由 HystrixObservableTimeoutOperator 處理,拋出 HystrixTimeoutException 超時異常
execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 設置訂閱回調
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
複製代碼
最後,仍是用一張圖總結下斷路器的工做機制。