SpringCloud 源碼系列(18)— 熔斷器Hystrix 之 執行核心原理

專欄系列文章:SpringCloud系列專欄java

系列文章:web

SpringCloud 源碼系列(1)— 註冊中心Eureka 之 啓動初始化安全

SpringCloud 源碼系列(2)— 註冊中心Eureka 之 服務註冊、續約markdown

SpringCloud 源碼系列(3)— 註冊中心Eureka 之 抓取註冊表併發

SpringCloud 源碼系列(4)— 註冊中心Eureka 之 服務下線、故障、自我保護機制app

SpringCloud 源碼系列(5)— 註冊中心Eureka 之 EurekaServer集羣負載均衡

SpringCloud 源碼系列(6)— 註冊中心Eureka 之 總結篇異步

SpringCloud 源碼系列(7)— 負載均衡Ribbon 之 RestTemplateide

SpringCloud 源碼系列(8)— 負載均衡Ribbon 之 核心原理函數

SpringCloud 源碼系列(9)— 負載均衡Ribbon 之 核心組件與配置

SpringCloud 源碼系列(10)— 負載均衡Ribbon 之 HTTP客戶端組件

SpringCloud 源碼系列(11)— 負載均衡Ribbon 之 重試與總結篇

SpringCloud 源碼系列(12)— 服務調用Feign 之 基礎使用篇

SpringCloud 源碼系列(13)— 服務調用Feign 之 掃描@FeignClient註解接口

SpringCloud 源碼系列(14)— 服務調用Feign 之 構建@FeignClient接口動態代理

SpringCloud 源碼系列(15)— 服務調用Feign 之 結合Ribbon進行負載均衡請求

SpringCloud 源碼系列(16)— 熔斷器Hystrix 之 基礎入門篇

SpringCloud 源碼系列(17)— 熔斷器Hystrix 之 獲取執行訂閱對象Observable

線程池隔離執行原理

Hystrix線程池初始化

在構造 HystrixCommand 時,會去初始化 Hystrix 線程池 HystrixThreadPool,跟進去能夠發現,初始化的邏輯就在默認實現類 HystrixThreadPoolDefault 的構造方法中,HystrixThreadPoolDefault 也是線程池調度的核心組件。

其中,ThreadPoolExecutor 線程池是由 concurrencyStrategy.getThreadPool(threadPoolKey, properties) 這段代碼建立的,queueSize 隊列大小默認是 -1,隊列 queue 是從 ThreadPoolExecutor 中獲取的,因此線程池的構造還得繼續看 concurrencyStrategy.getThreadPool。

class HystrixThreadPoolDefault implements HystrixThreadPool {
  private final BlockingQueue<Runnable> queue;
  private final ThreadPoolExecutor threadPool;
  private final HystrixThreadPoolMetrics metrics;
  private final int queueSize;

  public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
      // 線程池配置
      this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
      // 併發策略
      HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
      // 隊列大小,默認 -1
      this.queueSize = properties.maxQueueSize().get();

      // hystrix 線程池度量器
      this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
              concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 建立線程池
              properties);
      // 默認返回的是一個使用 SynchronousQueue 隊列的線程池,核心線程數和最大線程數都是默認值 10
      this.threadPool = this.metrics.getThreadPool();
      // SynchronousQueue
      this.queue = this.threadPool.getQueue();

      /* strategy: HystrixMetricsPublisherThreadPool */
      HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
  }
}  
複製代碼

接着看 getThreadPool 方法,這個方法就是根據配置構建線程池 ThreadPoolExecutor 的,從這個方法咱們能夠獲得以下信息。

  • 首先,從 getThreadFactory 返回的 ThreadFactory 能夠看出,線程名稱的格式是:"hystrix-{threadPoolKey}-{number}",這跟咱們在日誌中看到的 hystrix 線程名稱是一致的。
  • 調用 getBlockingQueue 方法獲取一個隊列,可是默認狀況下 maxQueueSize 爲 -1,那麼返回的隊列即是 SynchronousQueue,這是一個無容量的隊列,就是說默認狀況下任務不會進入隊列,若是線程池線程滿了將直接拒絕任務。
  • 若是配置了容許擴展到最大線程數 且 最大線程數大於核心線程數,那麼建立的線程池在覈心線程數滿了後,就會繼續建立線程直到達到最大線程數。不然,建立的線程池核心線程數和最大線程數就是同樣的。

總結一下,默認狀況下,dynamicCoreSizedynamicMaximumSize 都是 10,maxQueueSize 等於 -1,allowMaximumSizeToDivergeFromCoreSize 默認爲 false。

那麼默認狀況下建立的 ThreadPoolExecutor 就是備以下特性:

  • 核心線程數 等於 最大線程數。
  • 工做隊列是一個無容量的 SynchronousQueue 隊列。
  • 基於上面兩個特性,線程池工做的最大負載就是10個線程同時工做,超出後將直接拒絕任務。
  • 線程名稱的格式是:"hystrix-{threadPoolKey}-{number}"
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
    // 名稱格式:"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()
    final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
    // 是否容許核心線程數擴大到最大線程數,默認false
    final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    // 核心線程數
    final int dynamicCoreSize = threadPoolProperties.coreSize().get();
    // 線程存活時間
    final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
    // 最大隊列數 maxQueueSize 默認爲 -1
    final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
    // maxQueueSize <= 0 ==> SynchronousQueue
    // maxQueueSize > 0 ==> LinkedBlockingQueue(maxQueueSize)
    final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

    // 容許擴展到最大線程數
    if (allowMaximumSizeToDivergeFromCoreSize) {
        // 最大線程數
        final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
        if (dynamicCoreSize > dynamicMaximumSize) {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }
    // 不容許擴展到最大線程數,核心線程數 == 最大線程數。默認會進入這裏
    else {
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
    }
}

private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
    return new ThreadFactory() {
        private final AtomicInteger threadNumber = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
            thread.setDaemon(true);
            return thread;
        }
    };
}

public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
    // maxQueueSize 默認爲 -1
    if (maxQueueSize <= 0) {
        return new SynchronousQueue<Runnable>();
    } else {
        return new LinkedBlockingQueue<Runnable>(maxQueueSize);
    }
}
複製代碼

線程池調度任務

建立調度器

上一篇文章中咱們已經分析出 Observable 訂閱對象被調度應該是在 executeCommandWithSpecifiedIsolation(_cmd) 方法中的,入口就是 subscribeOn(Scheculer scheduler) 這個訂閱。這個 scheduler 是經過上一節中初始化的 Hystrix線程池 HystrixThreadPool 來獲取的,threadPool.getScheduler(Func0 func) 返回的是一個 HystrixContextScheduler 調度器。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    // 線程池隔離
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                //...
                // 獲取 run() 的 Observable
                return getUserExecutionObservable(_cmd);
            }
        })
        .doOnTerminate(new Action0() {...})
        .doOnUnsubscribe(new Action0() {...})
        // 將 defer 返回的 Observable 放到一個調度器中異步執行,調度器 ==> HystrixContextScheduler
        .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                // 判斷是否中斷線程執行,超時後中斷執行
                return properties.executionIsolationThreadInterruptOnTimeout().get() && 
                		_cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    }
    // 信號量隔離
    else {//...}
}
複製代碼

接着看如何建立調度器的。

經過 getScheduler 方法能夠了解到,首先會經過 touchConfig() 更新線程池 ThreadPoolExecutor 的配置,這說明咱們是能夠在運行時動態修改線程池的參數的。可是會發現,只能修改線程池的 corePoolSize、maximumPoolSize、keepAliveTime 三個參數,因此是沒法動態擴容線程池隊列的。

以後建立了 HystrixContextScheduler,進入構造方法能夠看到,內部又建立了一個調度器 actualScheduler,實際的類型是 ThreadPoolScheduler,能夠猜測最終的調度任務應該是在 ThreadPoolScheduler 中的。

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    // 動態更改線程池配置
    touchConfig();
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

private void touchConfig() {
    final int dynamicCoreSize = properties.coreSize().get();
    final int configuredMaximumSize = properties.maximumSize().get();
    int dynamicMaximumSize = properties.actualMaximumSize();
    final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    boolean maxTooLow = false;

    if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
        dynamicMaximumSize = dynamicCoreSize;
        maxTooLow = true;
    }

    if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
        threadPool.setCorePoolSize(dynamicCoreSize);
        threadPool.setMaximumPoolSize(dynamicMaximumSize);
    }

    threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}

public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
    this.concurrencyStrategy = concurrencyStrategy;
    this.threadPool = threadPool;
    // actualScheduler => rx.Scheduler => ThreadPoolScheduler
    this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
複製代碼

任務調度執行

下面是線程池調度的流程源碼,建立調度器和調度執行的過程看起來就像是層層嵌套代理。

  • 整個流程首先建立Hystrix上下文調度器HystrixContextScheduler,但實際的調度器是 ThreadPoolScheduler
  • 調度器建立好了以後就是任務調度了,首先會建立調度工做者,先是 HystrixContextScheduler 調用 createWorker() 建立 Hystrix上下文調度工做者 HystrixContextSchedulerWorker,但又由 ThreadPoolScheduler 建立了一個代理工做者 ThreadPoolWorker
  • 最後就是調度任務了,首先執行 HystrixContextSchedulerWorker 的 schedule 調度方法,這個調度方法最後又用 ThreadPoolWorker 來執行調度。

綜上來看,真正的調度邏輯,其實就是在 HystrixContextSchedulerWorker 和 ThreadPoolWorker 的調度方法 schedule

HystrixContextScheduler ::
public Worker createWorker() {
    // actualScheduler.createWorker() ==> ThreadPoolWorker
    return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

ThreadPoolScheduler ::
public Worker createWorker() {
    return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}

HystrixContextSchedulerWorker :: 
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    // 隊列滿了就直接拋出異常
    if (threadPool != null) {
        if (!threadPool.isQueueSpaceAvailable()) {
            throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
        }
    }
    // 調度 worker ==> ThreadPoolWorker
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}

ThreadPoolWorker ::
public Subscription schedule(final Action0 action) {
    // 封裝調度Action
    ScheduledAction sa = new ScheduledAction(action);

    subscription.add(sa);
    sa.addParent(subscription);

    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    // 提交任務到線程池
    FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
    // 中斷任務的訂閱 ==> shouldInterruptThread.call() 判斷是否超時中斷任務
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

    return sa;
}
複製代碼

爲了便於理解,將源碼抽象成以下的調度流程圖:

線程池滿了拒絕執行

HystrixContextSchedulerWorker 的調度方法中,先調用 threadPool.isQueueSpaceAvailable() 判斷線程池隊列是否還有可用空間,若是沒有就會拋出拒絕異常,若是有就會用 ThreadPoolWorker 進行調度。

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    // 隊列滿了就直接拋出異常
    if (threadPool != null) {
        if (!threadPool.isQueueSpaceAvailable()) {
            throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
        }
    }
    // 調度 worker ==> ThreadPoolWorker
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
複製代碼

再來看看 isQueueSpaceAvailable 的邏輯。

默認狀況下 queueSize 爲 -1,若是 queueSize <= 0,就表示有可用空間,但其實是一個 SynchronousQueue 無容量隊列。之因此返回 true,任務就會直接丟到線程池去執行,若是線程池工做線程滿了,就由線程池自己來拒絕任務,這一點從它的註釋也能夠了解到。

若是設置了隊列大小(配置 maxQueueSize),且隊列已使用的容量小於 queueSizeRejectionThreshold(默認爲5),才表示隊列有可用空間。這裏可能會感受有點疑惑,爲什麼不是判斷隊列是否還有剩餘容量?好比判斷 threadPool.getQueue().remainingCapacity() > 0 來肯定是否還有空間?

經過它的註釋能夠了解到,實際上是爲了實現動態擴容的目的,由於隊列的大小是不能動態修改的,但爲了能在運行時達到隊列動態擴容的目的,它用了另外一個配置 queueSizeRejectionThreshold 來控制進入隊列的數量。好比 maxQueueSize 配置 100,但 queueSizeRejectionThreshold 默認爲 5,因此此時隊列實際上最多隻會進入5個任務;運行時動態修改 queueSizeRejectionThreshold 爲 20,這個時候隊列最多就會進入20個任務了;所以在配置時 maxQueueSize 要大於 queueSizeRejectionThreshold 纔有意義。這種設計仍是值得借鑑的。

/** * Whether the threadpool queue has space available according to the <code>queueSizeRejectionThreshold</code> settings. * * Note that the <code>queueSize</code> is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. * The data structure is static, so this does not make sense as a dynamic lookup. * The <code>queueSizeRejectionThreshold</code> can be dynamic (up to <code>queueSize</code>), so that should * still get checked on each invocation. * <p> * If a SynchronousQueue implementation is used (<code>maxQueueSize</code> <= 0), it always returns 0 as the size so this would always return true. */
@Override
public boolean isQueueSpaceAvailable() {
    // 隊列大小默認爲 -1
    if (queueSize <= 0) {
        // we don't have a queue so we won't look for space but instead let the thread-pool reject or not
        return true;
    } else {
        // 隊列已經使用的容量 超過了 隊列容量拒絕閾值,queueSizeRejectionThreshold 默認爲 5
        return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
    }
}
複製代碼

命令提交到線程池執行

最後,再來看 ThreadPoolWorker 的調度方法,在這個方法裏面,終於看到提交到線程池的代碼了,這一步就真正實現了基於線程池的隔離了。

@Override
public Subscription schedule(final Action0 action) {
    if (subscription.isUnsubscribed()) {
        // don't schedule, we are unsubscribed
        return Subscriptions.unsubscribed();
    }

    // This is internal RxJava API but it is too useful.
    ScheduledAction sa = new ScheduledAction(action);

    subscription.add(sa);
    sa.addParent(subscription);

    // 線程池
    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    // 提交任務到線程池
    FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
    
    // 中斷任務的訂閱 ==> shouldInterruptThread.call() 判斷是否超時中斷任務
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

    return sa;
}
複製代碼

schedule 方法最後,向提交到線程池的 ScheduledAction 添加了一個 FutureCompleterWithConfigurableInterrupt 訂閱對象,它會在取消訂閱的時候取消任務的執行,好比任務超時了,這個時候就會取消任務的執行。

private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
    private final FutureTask<?> f;
    private final Func0<Boolean> shouldInterruptThread;
    private final ThreadPoolExecutor executor;

    private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
        this.f = f;
        this.shouldInterruptThread = shouldInterruptThread;
        this.executor = executor;
    }

    @Override
    public void unsubscribe() {
        executor.remove(f);
        // 判斷是否取消任務
        if (shouldInterruptThread.call()) {
            f.cancel(true);
        } else {
            f.cancel(false);
        }
    }
}
複製代碼

超時檢測中斷線程

超時處理器

前面分析到 executeCommandAndObserve(_cmd) 這個方法有以下這段代碼,executeCommandWithSpecifiedIsolation(_cmd) 返回的 Observable 主要是採起不一樣的隔離策略,而後,若是啓用了超時(默認啓用),會增長一個 HystrixObservableTimeoutOperator 操做器,看起來就是在控制超時相關的。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    //...
    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        // HystrixObservableTimeoutOperator 控制超時
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    // 設置訂閱回調
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
複製代碼

再繼續看 HystrixObservableTimeoutOperator,它會返回一個 Subscriber 對原 Observable 進行一個處理。

  • 首先定義了一個 HystrixContextRunnable 回調,就是拋出 HystrixTimeoutException 這個異常。
  • 而後定義了一個 TimerListener 時間監聽器,這個監聽器的間隔時間是 getIntervalTimeInMilliseconds() 返回的時間,就是命令執行超時時間。
  • 因此,TimerListener 的主要做用就是在超時後更新命令 isCommandTimedOut 的狀態,若是超時後 isCommandTimedOut 仍是 NOT_EXECUTED,就會更新到 TIMED_OUT,並取消任務的執行,而後發出超時的異常。
  • 而後將定義的 TimerListener 添加到了 HystrixTimer 中,因此時間控制的核心邏輯應該是在 HystrixTimer 中。
  • 最後還定義了一個 Subscriber 訂閱,能夠看到它會去更新 isCommandTimedOut 的狀態,並清理 TimerListener 調度器。

isCommandTimedOut 對應的 TimedOutStatus 有三種狀態:NOT_EXECUTED, COMPLETED, TIMED_OUT

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
    final AbstractCommand<R> originalCommand;

    public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
        this.originalCommand = originalCommand;
    }

    @Override
    public Subscriber<? super R> call(final Subscriber<? super R> child) {
        final CompositeSubscription s = new CompositeSubscription();
        child.add(s);

        // 超時回調,拋出 HystrixTimeoutException 異常
        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
            @Override
            public void run() {
                child.onError(new HystrixTimeoutException());
            }
        });

	// 超時監聽器
        TimerListener listener = new TimerListener() {
            // tick 在每次間隔時間會調用一次
            @Override
            public void tick() {
                // 超時後更新狀態
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    // 通知超時失敗
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                    // 中止本來的請求
                    s.unsubscribe();
                    // 拋出超時異常
                    timeoutRunnable.run();
                }
            }

            // 返回間隔時間,默認就是命令執行超時時間
            @Override
            public int getIntervalTimeInMilliseconds() {
                return originalCommand.properties.executionTimeoutInMilliseconds().get();
            }
        };

        // 添加監聽器,開始計算超時
        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
        // 設置到原命令中
        originalCommand.timeoutTimer.set(tl);

        Subscriber<R> parent = new Subscriber<R>() {
            @Override
            public void onCompleted() {
                if (isNotTimedOut()) {
                    tl.clear(); // 清除 TimerListener
                    child.onCompleted();
                }
            }
            //...
            private boolean isNotTimedOut() {
            	// 任務執行完成,更新 isCommandTimedOut 狀態
                return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                        originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
            }
        };
        s.add(parent);

        return parent;
    }
}
複製代碼

超時處理器調度

接着看 HystrixTimer 的 addTimerListener 方法,這個方法就比較好理解了,就是初始化任務調度器,而後調度 TimerListener 的執行,調度器的延遲執行時間和間隔週期都是 TimerListener 返回的命令超時時間。

總結一下 TimerListener 和 HystrixTimer 整合起來,其實就是檢測任務是否超時的,任務執行超時後就會拋出超時異常,而後取消任務的執行,後面應該就會進入降級的邏輯了。

public Reference<TimerListener> addTimerListener(final TimerListener listener) {
    // 初始化 executor 調度器:ScheduledThreadPoolExecutor
    startThreadIfNeeded();

    Runnable r = new Runnable() {
        @Override
        public void run() {
            try {
                // 觸發監聽器
                listener.tick();
            } catch (Exception e) {
                logger.error("Failed while ticking TimerListener", e);
            }
        }
    };

    // 超時時間默認1000毫秒,每隔1000毫秒調度一次
    ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r,
            listener.getIntervalTimeInMilliseconds(), // 延遲多久執行
            listener.getIntervalTimeInMilliseconds(), // 執行週期
            TimeUnit.MILLISECONDS);
    return new TimerReference(listener, f);
}
複製代碼

降級回調

執行錯誤類型

在 Hystrix 基礎篇中,咱們提到了 Hystrix 執行錯誤的六種類型,以下圖所示,除了 BAD_REQUEST,都會降級進入回調方法中。

AbstractCommand 中,能夠看到有以下6個處理錯誤的方法,分別針對 Hystrix 的6中錯誤類型。在分析 HystrixCommand 的執行流程中,咱們知道有不少的回調方法,其中錯誤回調確定就會調用下面其中的一個方法來處理錯誤,這個就不在看了。

須要注意的是,除了 handleBadRequestByEmittingError() 這個處理 BAD_REQUEST 錯誤的方法,其他5個方法最後都會調用 getFallbackOrThrowException 方法來獲取回調方法的Observable,這樣就合上圖中對應起來了。說明錯誤回調的封裝就是 getFallbackOrThrowException

// 信號量拒絕回調
private Observable<R> handleSemaphoreRejectionViaFallback() {
    Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
    executionResult = executionResult.setExecutionException(semaphoreRejectionException);
    eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
    return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
            		"could not acquire a semaphore for execution", semaphoreRejectionException);
}

// 短路回調
private Observable<R> handleShortCircuitViaFallback() {
    eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
    Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
    executionResult = executionResult.setExecutionException(shortCircuitException);
    return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                	"short-circuited", shortCircuitException);
}

// 線程池拒絕回調
private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
    eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
    threadPool.markThreadRejection();
    return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, 
    			"could not be queued for execution", underlying);
}

// 超時回調
private Observable<R> handleTimeoutViaFallback() {
    return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, 
    			"timed-out", new TimeoutException());
}

// 失敗回調
private Observable<R> handleFailureViaFallback(Exception underlying) {
    //...
    eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
    return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
}

// BAD_REQUEST 處理
private Observable<R> handleBadRequestByEmittingError(Exception underlying) {
    Exception toEmit = underlying;
	//...
    return Observable.error(toEmit);
}
複製代碼

降級回調處理

接着來看回調的處理方法 getFallbackOrThrowException,能夠獲得以下信息:

  • 在執行回調的時候會經過信號量來限流,即 getFallbackSemaphore() 獲取的信號量,默認返回的實際類型是 TryableSemaphore,限流數默認是 10。而後會經過這個信號量獲取一個許可證後纔去調用回調方法。
  • 獲取到信號量許可證後,調用 getFallbackObservable() 獲取回調 Observable,若是用戶重載了 getFallback() 方法,就返回 getFallback() 的 Observable;若是沒有重載,將拋出 "No fallback available." 的異常。
  • 最後一步就是訂閱回調對象,進入降級回調的方法了。
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    // 啓用回調
    if (properties.fallbackEnabled().get()) {
        // 設置當前線程
        final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {...};
        // 回調通知
        final Action1<R> markFallbackEmit = new Action1<R>() {...};
        // 回調完成通知
        final Action0 markFallbackCompleted = new Action0() {...};
        // 回調錯誤後處理
        final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {...};
		
        // 回調信號量 => TryableSemaphore
        final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        // 釋放信號量許可證
        final Action0 singleSemaphoreRelease = new Action0() {...};

        // 獲取回調 Observable
        Observable<R> fallbackExecutionChain;
        // 獲取信號量許可證
        if (fallbackSemaphore.tryAcquire()) {
            try {
                // 用戶是否認義了回調方法,即重寫 getFallback()
                if (isFallbackUserDefined()) {
                    executionHook.onFallbackStart(this);
                    fallbackExecutionChain = getFallbackObservable();
                } else {
                    // 拋出異常:"No fallback available."
                    fallbackExecutionChain = getFallbackObservable();
                }
            } catch (Throwable ex) {
                fallbackExecutionChain = Observable.error(ex);
            }

            // 執行回調
            return fallbackExecutionChain
                    .doOnEach(setRequestContext)
                    .lift(new FallbackHookApplication(_cmd))
                    .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                    .doOnNext(markFallbackEmit)
                    .doOnCompleted(markFallbackCompleted)
                    .onErrorResumeNext(handleFallbackError)
                    .doOnTerminate(singleSemaphoreRelease)
                    .doOnUnsubscribe(singleSemaphoreRelease);
        } else {
            return handleFallbackRejectionByEmittingError();
        }
    } else {
        return handleFallbackDisabledByEmittingError(originalException, failureType, message);
    }
}
複製代碼

一張圖總結Hystrix線程池隔離運行流程

下面用一張圖總結下Hystrix任務提交到線程池執行的流程。

斷路器工做原理

斷路器初始化

在 AbstractCommand 構造方法中,初始化了斷路器 HystrixCircuitBreaker, 若是啓用了斷路器(circuitBreakerEnabled),就從 HystrixCircuitBreaker.Factory 中獲取一個 斷路器,默認實現類是 HystrixCircuitBreakerImpl。若是未啓用斷路器,默認實現類則是 NoOpCircuitBreaker,就是什麼都不控制的斷路器。

this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, 
		this.commandGroup, this.commandKey, this.properties, this.metrics);
複製代碼
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    if (enabled) {
        if (fromConstructor == null) {
            // 從 Factory 中獲取默認的 斷路器
            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
        } else {
            return fromConstructor;
        }
    } else {
        // 禁用斷路器就返回什麼都不操做的實現類
        return new NoOpCircuitBreaker();
    }
}
複製代碼

HystrixCircuitBreakerImpl 的構造方法主要須要 HystrixCommandPropertiesHystrixCommandMetrics,可想而知,斷路器須要統計 hystrix 請求的錯誤、超時、拒絕、成功等次數,而後決定是否打開斷路器,那麼這些統計的數據來源就是 HystrixCommandMetrics.

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    this.properties = properties;
    this.metrics = metrics;
}
複製代碼

斷路器工做機制

斷路器容許執行請求

以前分析 applyHystrixSemantics(_cmd) 時,咱們看到了斷路器的使用,applyHystrixSemantics 是應用Hystrix的核心入口,會先經過斷路器判斷是否容許放行請求,斷路器不容許則會走斷路器拒絕降級的方法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // 斷路器是否容許請求
    if (circuitBreaker.allowRequest()) {
        //...
    } else {
        // 短路 => 降級
        return handleShortCircuitViaFallback();
    }
}
複製代碼

接着看 allowRequest() 方法:

  • 首先判斷是否強制打開了斷路器,若是打開了,就不容許請求。也就是咱們能夠在某些狀況下手動打開斷路器來阻止級聯調用。
  • 接着判斷是否強制關閉了斷路器,若是關閉了,就容許請求。也就是說斷路器就算打開了,咱們也能夠手動關閉。
  • 若是沒有手動打開或關閉斷路器,最後會判斷 斷路器未打開,或者斷路器打開了但容許放行一個請求去驗證被調用方已經恢復了,就容許放行請求。不然將拒絕請求執行。
public boolean allowRequest() {
    // 斷路器手動強制打開
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    // 斷路器手動強制關閉
    if (properties.circuitBreakerForceClosed().get()) {
        // 調度一次,進行統計,決定是否打開斷路器
        isOpen();
        return true;
    }
    // 斷路器未打開 或者 運行打開
    return !isOpen() || allowSingleTest();
}
複製代碼

斷路器打開

isOpen() 方法判斷斷路器是否打開:

  • 首先判斷 circuitOpen 是否爲 true,是則表示斷路器打開了。也就是說斷路器的狀態是由 circuitOpen 這個 AtomicBoolean 來控制的。
  • circuitOpen 爲 false,則說明斷路器未打開,接着從統計 HystrixCommandMetrics 中獲取統計的計數 HealthCounts,後面的邏輯就是根據 HealthCounts 計算是否要打開斷路器。
  • 若是 總的請求數 < 斷路器請求閾值(默認20) 就不會打開斷路器。
  • 若是 錯誤百分比率 < 斷路器錯誤比率閾值(默認50%) 就不會打開斷路器。
  • 不然就會打開斷路器,circuitOpen 的狀態設置爲 true,同時設置了斷路器的打開時間。

Hystrix 斷路器的設計很好的利用了原子類的特性,circuitOpen 的更新採用基於 CAS 指令的 compareAndSet,高併發狀況下也能安全更新,更新成功再設置斷路器打開時間,更新失敗則不更新。

private AtomicBoolean circuitOpen = new AtomicBoolean(false);

public boolean isOpen() {
    // circuitOpen 是否打開
    if (circuitOpen.get()) {
        return true;
    }

    // 獲取請求計數
    HealthCounts health = metrics.getHealthCounts();

    // 小於斷路器請求的閾值,默認爲20,至少超過20個請求後纔會開啓斷路器
    if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
        return false;
    }

    // 錯誤百分比
    if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        return false;
    } else {
    	// 打開斷路器
        if (circuitOpen.compareAndSet(false, true)) {
        	// 設置斷路器打開的時間
            circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
            return true;
        } else {
            return true;
        }
    }
}
複製代碼

斷路器半開

若是斷路器已經打開了,這時斷路器會休眠一段時間後,放一個請求去測試被調用方是否已經恢復,這就是半開狀態。

allowSingleTest() 這個方法,是否進入半開狀態的依據是:

  • 判斷當前時間是否大於 斷路器打開時間+斷路器休眠窗口時間,休眠時間默認是5000毫秒,若是是的會同時更新斷路器打開時間爲當前時間。。
  • 也就是說每隔5000毫秒,就放一個請求去測試,若是請求繼續失敗,斷路器仍是維持打開狀態,這就是半開狀態,OPEN -> HALF-OPEN
public boolean allowSingleTest() {
    // 斷路器打開時間
    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
    // 斷路器打開 且 當前時間 > (斷路器打開時間 + 斷路器休眠窗口時間(默認5000毫秒)) 
    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
        // 斷路器打開時間設置爲當前時間
        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
            return true;
        }
    }
    return false;
複製代碼

斷路器恢復

那斷路器打開了什麼時候恢復呢?跟蹤一下 circuitOpen 就知道了,HystrixCircuitBreaker 有個方法 markSuccess() 會去在斷路器打開時關閉斷路器。

public void markSuccess() {
    if (circuitOpen.get()) {
    	// 關閉斷路器
        if (circuitOpen.compareAndSet(true, false)) {
            metrics.resetStream();
        }
    }
}
複製代碼

executeCommandAndObserve(_cmd) 這個方法中,能夠看到有兩個回調函數 markEmitsmarkOnCompleted,他們會在請求執行成功和完成後被回調執行,回調函數裏調用了 circuitBreaker.markSuccess(); 來告訴斷路器請求成功了,而後斷路器就會進入關閉狀態,HALF -> CLOSE

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

    // 標記命令已經執行
    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (commandIsScalar()) {
                //...
                // 告訴斷路器請求成功
                circuitBreaker.markSuccess();
            }
        }
    };

    // 標記命令執行結束
    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
                //...
                // 告訴斷路器請求成功
                circuitBreaker.markSuccess();
            }
        }
    };

    // 處理回調
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {...};
    // 設置當前線程
    final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {...};

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        // 超時會由 HystrixObservableTimeoutOperator 處理,拋出 HystrixTimeoutException 超時異常
        execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    // 設置訂閱回調
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
複製代碼

一張圖總結斷路器工做機制

最後,仍是用一張圖總結下斷路器的工做機制。

相關文章
相關標籤/搜索