Hystrix是個什麼玩意兒

1、什麼是Hystrix

 

  Hystrix是Netflix的一個開源框架,地址以下:https://github.com/Netflix/Hystrixjava

  中文名爲「豪豬」,即平時很溫順,在感覺到危險的時候,用刺保護本身;在危險過去後,仍是一個溫順的肉球。git

  因此,整個框架的核心業務也就是這2點:github

  1. 什麼時候須要保護
  2. 如何保護

 2、什麼時候須要保護

  對於一個系統而言,它每每承擔着2層角色,服務提供者與服務消費者。對於服務消費者而言最大的痛苦就是如何「明哲保身」,作過網關項目的同窗確定感同身受。web

  上面是一個常見的系統依賴關係,底層的依賴每每不少,通訊協議包括socket、HTTP、Dubbo、webservice等等。當通訊層發生網絡抖動以及所依賴的系統發生業務響應異常時,咱們業務自己所提供的服務能力也直接會受到影響。編程

  這種效果傳遞下去就頗有可能形成雪崩效應,即整個業務聯調發生異常,好比業務總體超時,或者訂單數據不一致。tomcat

  那麼核心問題就來了,如何檢測業務處於異常狀態?網絡

  成功率!成功率直接反映了業務的數據流轉狀態,是最直接的業務表現。併發

  固然,也能夠根據超時時間作判斷,好比Sentinel的實現。其實這裏概念上能夠作一個轉化,用時間作超時控制,超時=失敗,這依然是一個成功率的概念。app

3、如何保護 

  如同豪豬同樣,「刺」就是他的保護工具,全部的攻擊都會被刺無情的懟回去。 框架

  在Hystrix的實現中,這就出現了「熔斷器」的概念,即當前的系統是否處於須要保護的狀態。

  當熔斷器處於開啓的狀態時,全部的請求都不會真正的走以前的業務邏輯,而是直接返回一個約定的信息,即fallBack。經過這種快速失敗原則保護咱們的系統。 

  可是,系統不該該永遠處於「有刺」的狀態,當危險事後須要恢復正常。

  因而對熔斷器的核心操做就是以下幾個功能:

  1. 若是成功率太低,就打開熔斷器,阻止正常業務
  2. 隨着時間的流動,熔斷器處於半打開狀態,嘗試性放入一筆請求

  熔斷器的核心API以下圖: 

4、限流、熔斷、隔離、降級

 這四個概念是咱們談起微服務會常常談到的概念,這裏咱們討論的是Hystrix的實現方式。

  • 限流
    • 這裏的限流與guava的RateLimiter的限流差別比較大,一個是爲了「保護自我」,一個是「保護下游」。
    • 當對服務進行限流時,超過的流量將直接fallback,即熔斷。而RateLimiter關心的實際上是「流量整形」,將不規整流量在必定速度內規整。
  • 熔斷
    • 當個人應用沒法提供服務時,我要對上游請求熔斷,避免上游把我壓垮。
    • 當個人下游依賴成功率太低時,我要對下游請求熔斷,避免下游把我拖垮。
  • 降級
    • 降級與熔斷緊密相關,熔斷後業務如何表現,約定一個快速失敗的fallback,即爲服務降級。   
  • 隔離
    • 業務之間不可互相影響,不一樣業務須要有獨立的運行空間。
    • 最完全的,能夠採用物理隔離,不一樣的機器部署。
    • 次之,採用進程隔離,一個機器多個tomcat。
    • 次之,請求隔離。
    • 因爲Hystrix框架所屬的層級爲代碼層,因此實現的是請求隔離,線程池或信號量。 

5、源碼分析

   先上一個Hystrix的業務流程圖

 

   能夠看到Hystrix的請求都要通過HystrixCommand的包裝,其核心邏輯在AbstractComman.java類中。

  下面的源碼是基於RxJava的,看以前最好先了解下RxJava的常見用法與邏輯,不然看起來會很迷惑。

  簡單的說,RxJava就是基於回調的函數式編程。

  通俗的說,就等同於策略模式的匿名內部類實現

 5.1 熔斷器

  首先看信號量是如何影響咱們的請求的:

    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // 自定義擴展
        executionHook.onStart(_cmd);

        //判斷熔斷器是否容許請求過來
        if (circuitBreaker.attemptExecution()) {
       //得到分組信號量,若是沒有采用信號量分組,返回默認經過的信號量實現
final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
       //調用終止的回調函數
final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } };        //調用異常的回調函數 final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } };        //根據信號量嘗試競爭信號量 if (executionSemaphore.tryAcquire()) { try { //競爭成功,註冊執行參數 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else {
          //競爭失敗,進入fallback
return handleSemaphoreRejectionViaFallback(); } } else {
        //熔斷器已打開,進入fallback
return handleShortCircuitViaFallback(); } }

 

  何時熔斷器能夠放請求進來:

        @Override
        public boolean attemptExecution() {
       //動態屬性判斷,熔斷器是否強制開着,若是強制開着,就不容許請求
if (properties.circuitBreakerForceOpen().get()) { return false; }
       //若是強制關閉,就容許請求
if (properties.circuitBreakerForceClosed().get()) { return true; }
       //若是當前是關閉,就容許請求
if (circuitOpened.get() == -1) { return true; } else {
          //若是當前開着,就看是否已通過了"滑動窗口",過了就能夠請求,不過就不能夠
if (isAfterSleepWindow()) { //only the first request after sleep window should execute //if the executing command succeeds, the status will transition to CLOSED //if the executing command fails, the status will transition to OPEN //if the executing command gets unsubscribed, the status will transition to OPEN
            //這裏使用CAS的方式,只有一個請求能過來,即"半關閉"狀態
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { return true; } else { return false; } } else { return false; } } } }

  這裏有個重要概念就是"滑動窗口":

        private boolean isAfterSleepWindow() {
            final long circuitOpenTime = circuitOpened.get();
            final long currentTime = System.currentTimeMillis();
            final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
       //滑動窗口的判斷就是看看熔斷器打開的時間與如今相比是否超過了配置的滑動窗口
return currentTime > circuitOpenTime + sleepWindowTime; }

5.2 隔離

 若是將業務請求進行隔離?

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
     //判斷隔離策略是什麼,是線程池隔離仍是信號量隔離    
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
       //線程池隔離的運行邏輯以下 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); }             //按照配置生成監控數據 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ try {
                 //執行擴展點邏輯 executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.empty(); } }
        //註冊各類場景的回調函數 }).doOnTerminate(
new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it }
        //將邏輯放在線程池的調度器上執行,即將上述邏輯放入線程池中 }).subscribeOn(threadPool.getScheduler(
new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else {
        //走到這裏就是信號量隔離,在當前線程中執行,沒有調度器
return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }

5.3 核心運行流程

    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
     //執行發生的回調
        final Action1<R> markEmits = new Action1<R>() {
            @Override
            public void call(R r) {
                if (shouldOutputOnNextEvents()) {
                    executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                    eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
                }
                if (commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };
     //執行成功的回調,標記下狀態,熔斷器根據這個狀態維護熔斷邏輯
        final Action0 markOnCompleted = new Action0() {
            @Override
            public void call() {
                if (!commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };
     //執行失敗的回調
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
          //各類回調進行各類fallback
if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); }      //註冊各類回調函數 return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }

6、小結

  • Hystrix是基於單機應用的熔斷限流框架
  • 根據熔斷器的滑動窗口判斷當前請求是否能夠執行
  • 線程競爭實現「半關閉」狀態,拿一個請求試試是否能夠關閉熔斷器
  • 線程池隔離將請求丟到線程池中運行,限流依靠線程池拒絕策略
  • 信號量隔離在當前線程中運行,限流依靠併發請求數
  • 當信號量競爭失敗/線程池隊列滿,就進入限流模式,執行fallback
  • 當熔斷器開啓,就熔斷請求,執行fallback 
  • 整個框架採用的RxJava的編程模式,回調函數滿天飛 
相關文章
相關標籤/搜索