Hystrix執行流程分析

首發於 http://otzh.ml

前言

Hystrix已經不在維護了,可是成功的開源項目老是值得學習的.剛開始看 Hystrix 源碼時,會發現一堆 Action,Function 的邏輯,這其實就是 RxJava 的特色了--響應式編程.上篇文章已經對RxJava做過入門介紹,不熟悉的同窗能夠先去看看.本文會簡單介紹 Hystrix,再根據demo結合源碼來了解Hystrix的執行流程.html

Hystrix簡單介紹

  1. 什麼是 Hystrix?

    Hystrix 是一個延遲容錯庫,旨在隔離對遠程系統、服務和第三方庫的訪問點,中止級聯故障,並在錯誤不可避免的複雜分佈式系統中可以彈性恢復。java

  2. 核心概念react

    1. Command 命令

      Command 是Hystrix的入口,對用戶來講,咱們只須要建立對應的 command,將須要保護的接口包裝起來就能夠.能夠無需關注再以後的邏輯.與 Spring 深度集成後還能夠經過註解的方式,就更加對開發友好了.git

    2. Circuit Breaker 斷路器

      斷路器,是從電氣領域引伸過來的概念,具備過載短路欠電壓保護功能,有保護線路和電源的能力.在Hystrix中即爲當請求超過必定比例響應失敗時,hystrix 會對請求進行攔截處理,保證服務的穩定性,以及防止出現服務之間級聯雪崩的可能性.github

    3. Isolation 隔離策略

      隔離策略是 Hystrix 的設計亮點所在,利用艙壁模式的思想來對訪問的資源進行隔離,每一個資源是獨立的依賴,單個資源的異常不該該影響到其餘. Hystrix 的隔離策略目前有兩種:線程池隔離,信號量隔離.編程

      isolation

  3. Hystrix的運行流程緩存

    官方的 How it Works 對流程有很詳細的介紹,圖示清晰,相信看完流程圖就能對運行流程有必定的瞭解.

    來自hystrix的github站點

一次Command執行

HystrixCommand是標準的命令模式實現,每一次請求即爲一次命令的建立執行經歷的過程.從上述Hystrix流程圖能夠看出建立流程最終會指向toObservable,在以前RxJava入門時有介紹到Observable即爲被觀察者,做用是發送數據給觀察者進行相應的,所以能夠知道這個方法應該是較爲關鍵的.app

UML

hystrixcommman-uml.png

  1. HystrixInvokable 標記這個一個可執行的接口,沒有任何抽象方法或常量
  2. HystrixExecutable 是爲HystrixCommand設計的接口,主要提供執行命令的抽象方法,例如:execute(),queue(),observe()
  3. HystrixObservable 是爲Observable設計的接口,主要提供自動訂閱(observe())和生成Observable(toObservable())的抽象方法
  4. HystrixInvokableInfo 提供大量的狀態查詢(獲取屬性配置,是否開啓斷路器等)
  5. AbstractCommand 核心邏輯的實現
  6. HystrixCommand 定製邏輯實現以及留給用戶實現的接口(好比:run())

樣例代碼

經過新建一個 command 來看 Hystrix 是如何建立並執行的.HystrixCommand 是一個抽象類,其中有一個run方法須要咱們實現本身的業務邏輯,如下是偷懶採用匿名內部類的形式呈現.構造方法的內部實現咱們就不關注了,直接看下執行的邏輯吧.less

HystrixCommand demo = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey("demo-group")) {
            @Override
            protected String run() {
                return "Hello World~";
            }
        };
demo.execute();

執行過程

流程圖

execute

這是官方給出的一次完整調用的鏈路.上述的 demo 中咱們直接調用了execute方法,因此調用的路徑爲execute() -> queue() -> toObservable() -> toBlocking() -> toFuture() -> get().核心的邏輯其實就在toObservable()中.分佈式

HystrixCommand.java

execute

execute方法爲同步調用返回結果,並對異常做處理.內部會調用queue

// 同步調用執行
public R execute() {
  try {
    // queue()返回的是Future類型的對象,因此這裏是阻塞get
    return queue().get();
  } catch (Exception e) {
    throw decomposeException(e);
  }
}
queue

queue的第一行代碼完成了核心的訂閱邏輯.

  1. toObservable() 生成了 Hystrix 的 Observable 對象
  2. Observable 轉換爲 BlockingObservable 能夠阻塞控制數據發送
  3. toFuture 實現對 BlockingObservable 的訂閱
public Future<R> queue() {
  // 着重關注的是這行代碼
  // 完成了Observable的建立及訂閱
  // toBlocking()是將Observable轉爲BlockingObservable,轉換後的Observable能夠阻塞數據的發送
  final Future<R> delegate = toObservable().toBlocking().toFuture();

  final Future<R> f = new Future<R>() {
    // 因爲toObservable().toBlocking().toFuture()返回的Future若是中斷了,
    // 不會對當前線程進行中斷,因此這裏將返回的Future進行了再次包裝,處理異常邏輯
    ...
  }

  // 判斷是否已經結束了,有異常則直接拋出
  if (f.isDone()) {
    try {
      f.get();
      return f;
    } catch (Exception e) {
            // 省略這段判斷
    }
  }

  return f;
}

BlockingObservable.java

// 被包裝的Observable
private final Observable<? extends T> o;

// toBlocking()會調用該靜態方法將 源Observable簡單包裝成BlockingObservable
public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
  return new BlockingObservable<T>(o);
}

public Future<T> toFuture() {
  return BlockingOperatorToFuture.toFuture((Observable<T>)o);
}

BlockingOperatorToFuture.java

ReactiveX 關於toFuture的解讀

The toFuture operator applies to the BlockingObservable subclass, so in order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

toFuture只能做用於BlockingObservable因此也纔會有上文想要轉換爲BlockingObservable的操做

// 該操做將 源Observable轉換爲返回單個數據項的Future
public static <T> Future<T> toFuture(Observable<? extends T> that) {
      // CountDownLatch 判斷是否完成
    final CountDownLatch finished = new CountDownLatch(1);
      // 存儲執行結果
    final AtomicReference<T> value = new AtomicReference<T>();
      // 存儲錯誤結果
    final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

      // single()方法能夠限制Observable只發送單條數據
      // 若是有多條數據 會拋 IllegalArgumentException
      // 若是沒有數據能夠發送 會拋 NoSuchElementException
    @SuppressWarnings("unchecked")
    final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {
                // single()返回的Observable就能夠對其進行標準的處理了
        @Override
        public void onCompleted() {
            finished.countDown();
        }

        @Override
        public void onError(Throwable e) {
            error.compareAndSet(null, e);
            finished.countDown();
        }

        @Override
        public void onNext(T v) {
            // "single" guarantees there is only one "onNext"
            value.set(v);
        }
    });
        
      // 最後將Subscription返回的數據封裝成Future,實現對應的邏輯
    return new Future<T>() {
            // 能夠查看源碼
    };

}

AbstractCommand.java

AbstractCommandtoObservable實現的地方,屬於Hystrix的核心邏輯,代碼較長,能夠和方法調用的流程圖一塊兒食用.toObservable主要是完成緩存和建立Observable,requestLog的邏輯,當第一次建立Observable時,applyHystrixSemantics方法是Hystrix的語義實現,能夠跳着看.

tips: 下文中有不少 Action和 Function,他們很類似,都有call方法,可是區別在於Function有返回值,而Action沒有,方法後跟着的數字表明有幾個入參.Func0/Func3即沒有入參和有三個入參
toObservable

toObservable代碼較長且分層仍是清晰的,因此下面一塊一塊寫.其邏輯和文章開始提到的Hystrix流程圖是徹底一致的.

toObservable.png

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;
      // 此處省略掉了不少個Action和Function,大部分是來作掃尾清理的函數,因此用到的時候再說
  
      // defer在上篇rxjava入門中提到過,是一種建立型的操做符,每次訂閱時會產生新的Observable,回調方法中所實現的纔是真正咱們須要的Observable
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
              
                        // 校驗命令的狀態,保證其只執行一次
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }

            commandStartTimestamp = System.currentTimeMillis();
                        // properties爲當前command的全部屬性
              // 容許記錄請求log時會保存當前執行的command
            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }
                        
              // 是否開啓了請求緩存
            final boolean requestCacheEnabled = isRequestCachingEnabled();
              // 獲取緩存key
            final String cacheKey = getCacheKey();

            // 開啓緩存後,嘗試從緩存中取
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }
              // 沒有開啓請求緩存時,就執行正常的邏輯
            Observable<R> hystrixObservable =
                          // 這裏又經過defer建立了咱們須要的Observable
                    Observable.defer(applyHystrixSemantics)
                                          // 發送前會先走一遍hook,默認executionHook是空實現的,因此這裏就跳過了
                            .map(wrapWithAllOnNextHooks);
          
            // 獲得最後的封裝好的Observable後,將其放入緩存
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }

            return afterCache
                          // 終止時的操做
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                          // 取消訂閱時的操做
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                          // 完成時的操做
                    .doOnCompleted(fireOnCompletedHook);
        }
    }
handleRequestCacheHitAndEmitValues

緩存擊中時的處理

private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
        try {
              // Hystrix中有大量的hook 若是有心作二次開發的,能夠利用這些hook作到很完善的監控
            executionHook.onCacheHit(this);
        } catch (Throwable hookEx) {
            logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
        }   
  // 將緩存的結果賦給當前command
    return fromCache.toObservableWithStateCopiedInto(this)
                    // doOnTerminate 或者是後面看到的doOnUnsubscribe,doOnError,都指的是在響應onTerminate/onUnsubscribe/onError後的操做,即在Observable的生命週期上註冊一個動做優雅的處理邏輯
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                      // 命令最終狀態的不一樣進行不一樣處理
                    if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                        cleanUpAfterResponseFromCache(false); //user code never ran
                    } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                        cleanUpAfterResponseFromCache(true); //user code did run
                    }
                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                      // 命令最終狀態的不一樣進行不一樣處理
                    if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                        cleanUpAfterResponseFromCache(false); //user code never ran
                    } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                        cleanUpAfterResponseFromCache(true); //user code did run
                    }
                }
            });
}
applyHystrixSemantics

由於本片文章的主要目的是在講執行流程,因此失敗回退和斷路器相關的就留到之後的文章中再寫.

applyHystrixSemantics.png

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
          // 再也不訂閱了就返回不發送數據的Observable
        if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
              // 不發送任何數據或通知
            return Observable.never();
        }
        return applyHystrixSemantics(_cmd);
    }
};

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // 標記開始執行的hook
  // 若是hook內拋異常了,會快速失敗且沒有fallback處理
  executionHook.onStart(_cmd);

  /* determine if we're allowed to execute */
  // 斷路器核心邏輯: 判斷是否容許執行(TODO)
  if (circuitBreaker.allowRequest()) {
    // Hystrix本身造的信號量輪子,之因此不用juc下,官方解釋爲juc的Semphore實現太複雜,並且沒有動態調節的信號量大小的能力,簡而言之,不知足需求!
    // 根據不一樣隔離策略(線程池隔離/信號量隔離)獲取不一樣的TryableSemphore
    final TryableSemaphore executionSemaphore = getExecutionSemaphore();
    // Semaphore釋放標誌
    final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
    
    // 釋放信號量的Action
    final Action0 singleSemaphoreRelease = new Action0() {
      @Override
      public void call() {
        if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
          executionSemaphore.release();
        }
      }
    };

    // 異常處理
    final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
      @Override
      public void call(Throwable t) {
        // HystrixEventNotifier是hystrix的插件,不一樣的事件發送不一樣的通知,默認是空實現.
        eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
      }
    };
        
    // 線程池隔離的TryableSemphore始終爲true
    if (executionSemaphore.tryAcquire()) {
      try {
        /* used to track userThreadExecutionTime */
        // executionResult是一次命令執行的結果信息封裝
        // 這裏設置起始時間是爲了記錄命令的生命週期,執行過程當中會set其餘屬性進去
        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
        return executeCommandAndObserve(_cmd)
          // 報錯時的處理
          .doOnError(markExceptionThrown)
          // 終止時釋放
          .doOnTerminate(singleSemaphoreRelease)
          // 取消訂閱時釋放
          .doOnUnsubscribe(singleSemaphoreRelease);
      } catch (RuntimeException e) {
        return Observable.error(e);
      }
    } else {
      // tryAcquire失敗後會作fallback處理,TODO
      return handleSemaphoreRejectionViaFallback();
    }
  } else {
    // 斷路器短路(拒絕請求)fallback處理 TODO
    return handleShortCircuitViaFallback();
  }
}
executeCommandAndObserve

executeCommandAndObserve.png

/**
 * 執行run方法的地方
 */
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
      // 獲取當前上下文
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

      // 發送數據時的Action響應
    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
              // 若是onNext時須要上報時,作如下處理
            if (shouldOutputOnNextEvents()) {
                  // result標記
                executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                  // 通知
                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
            }
              // commandIsScalar是一個我不解的地方,在網上也沒有查到好的解釋
              // 該方法爲抽象方法,有HystrixCommand實現返回true.HystrixObservableCommand返回false
            if (commandIsScalar()) {
                  // 耗時
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                  // 通知
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                  // 斷路器標記成功(斷路器半開時的反饋,決定是否關閉斷路器)
                circuitBreaker.markSuccess();
            }
        }
    };

    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
                            // 同markEmits 相似處理
            }
        }
    };

      // 失敗回退的邏輯
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
          // 不是重點略過了
        }
    };

      // 請求上下文的處理
    final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
        @Override
        public void call(Notification<? super R> rNotification) {
            setRequestContextIfNeeded(currentRequestContext);
        }
    };

    Observable<R> execution;
      // 若是有執行超時限制,會將包裝後的Observable再轉變爲支持TimeOut的
    if (properties.executionTimeoutEnabled().get()) {
          // 根據不一樣的隔離策略包裝爲不一樣的Observable
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                      // lift 是rxjava中一種基本操做符 能夠將Observable轉換成另外一種Observable
                      // 包裝爲帶有超時限制的Observable
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
executeCommandWithSpecifiedIsolation

根據不一樣的隔離策略建立不一樣的執行Observable

executeCommandSpecfi.png

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                  // 因爲源碼太長,這裏只關注正常的流程,須要詳細瞭解能夠去看看源碼
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                    try {
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }})
        .doOnTerminate(new Action0() {})
        .doOnUnsubscribe(new Action0() {})
        // 指定在某一個線程上執行,是rxjava中很重要的線程調度的概念
        .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
        }));
    } else { // 信號量隔離策略
        return Observable.defer(new Func0<Observable<R>>() {
                        // 邏輯與線程池大體相同
        });
    }
}
getUserExecutionObservable

獲取用戶執行的邏輯

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
          // getExecutionObservable是抽象方法,有HystrixCommand自行實現
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }
        // 將Observable做其餘中轉
    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

lift操做符

lift能夠轉換成一個新的Observable,它很像一個代理,將原來的Observable代理到本身這裏,訂閱時通知原來的Observable發送數據,經本身這裏流轉加工處理再返回給訂閱者.Map/FlatMap操做符底層其實就是用的lift進行實現的.

getExecutionObservable
@Override
final protected Observable<R> getExecutionObservable() {
  return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
      try {
        // just操做符就是直接執行的Observable
        // run方法就是咱們實現的業務邏輯: Hello World~
        return Observable.just(run());
      } catch (Throwable ex) {
        return Observable.error(ex);
      }
    }
  }).doOnSubscribe(new Action0() {
    @Override
    public void call() {
         // 執行訂閱時將執行線程記爲當前線程,必要時咱們能夠interrupt
      executionThread.set(Thread.currentThread());
    }
  });
}

總結

但願本身能把埋下的坑一一填完: 容錯機制,metrics,斷路器等等...

參考

  1. Hystrix How it Works
  2. ReactiveX官網
  3. 阮一峯: 中文技術文檔寫做規範
  4. RxJava lift 原理解析
相關文章
相關標籤/搜索