首發於 http://otzh.ml
Hystrix已經不在維護了,可是成功的開源項目老是值得學習的.剛開始看 Hystrix 源碼時,會發現一堆 Action,Function 的邏輯,這其實就是 RxJava 的特色了--響應式編程.上篇文章已經對RxJava做過入門介紹,不熟悉的同窗能夠先去看看.本文會簡單介紹 Hystrix,再根據demo結合源碼來了解Hystrix的執行流程.html
Hystrix 是一個延遲和容錯庫,旨在隔離對遠程系統、服務和第三方庫的訪問點,中止級聯故障,並在錯誤不可避免的複雜分佈式系統中可以彈性恢復。java
核心概念react
Command 是Hystrix的入口,對用戶來講,咱們只須要建立對應的 command,將須要保護的接口包裝起來就能夠.能夠無需關注再以後的邏輯.與 Spring 深度集成後還能夠經過註解的方式,就更加對開發友好了.git
斷路器,是從電氣領域引伸過來的概念,具備過載、短路和欠電壓保護功能,有保護線路和電源的能力.在Hystrix中即爲當請求超過必定比例響應失敗時,hystrix 會對請求進行攔截處理,保證服務的穩定性,以及防止出現服務之間級聯雪崩的可能性.github
隔離策略是 Hystrix 的設計亮點所在,利用艙壁模式的思想來對訪問的資源進行隔離,每一個資源是獨立的依賴,單個資源的異常不該該影響到其餘. Hystrix 的隔離策略目前有兩種:線程池隔離,信號量隔離.編程
Hystrix的運行流程緩存
官方的 How it Works 對流程有很詳細的介紹,圖示清晰,相信看完流程圖就能對運行流程有必定的瞭解.
HystrixCommand
是標準的命令模式實現,每一次請求即爲一次命令的建立執行經歷的過程.從上述Hystrix流程圖能夠看出建立流程最終會指向toObservable
,在以前RxJava入門時有介紹到Observable
即爲被觀察者,做用是發送數據給觀察者進行相應的,所以能夠知道這個方法應該是較爲關鍵的.app
HystrixCommand
設計的接口,主要提供執行命令的抽象方法,例如:execute()
,queue()
,observe()
Observable
設計的接口,主要提供自動訂閱(observe()
)和生成Observable(toObservable()
)的抽象方法run()
)經過新建一個 command 來看 Hystrix 是如何建立並執行的.HystrixCommand 是一個抽象類,其中有一個run
方法須要咱們實現本身的業務邏輯,如下是偷懶採用匿名內部類的形式呈現.構造方法的內部實現咱們就不關注了,直接看下執行的邏輯吧.less
HystrixCommand demo = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey("demo-group")) { @Override protected String run() { return "Hello World~"; } }; demo.execute();
這是官方給出的一次完整調用的鏈路.上述的 demo 中咱們直接調用了execute
方法,因此調用的路徑爲execute() -> queue() -> toObservable() -> toBlocking() -> toFuture() -> get()
.核心的邏輯其實就在toObservable()
中.分佈式
execute
方法爲同步調用返回結果,並對異常做處理.內部會調用queue
// 同步調用執行 public R execute() { try { // queue()返回的是Future類型的對象,因此這裏是阻塞get return queue().get(); } catch (Exception e) { throw decomposeException(e); } }
queue
的第一行代碼完成了核心的訂閱邏輯.
toObservable()
生成了 Hystrix 的 Observable 對象Observable
轉換爲 BlockingObservable
能夠阻塞控制數據發送toFuture
實現對 BlockingObservable
的訂閱public Future<R> queue() { // 着重關注的是這行代碼 // 完成了Observable的建立及訂閱 // toBlocking()是將Observable轉爲BlockingObservable,轉換後的Observable能夠阻塞數據的發送 final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { // 因爲toObservable().toBlocking().toFuture()返回的Future若是中斷了, // 不會對當前線程進行中斷,因此這裏將返回的Future進行了再次包裝,處理異常邏輯 ... } // 判斷是否已經結束了,有異常則直接拋出 if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { // 省略這段判斷 } } return f; }
// 被包裝的Observable private final Observable<? extends T> o; // toBlocking()會調用該靜態方法將 源Observable簡單包裝成BlockingObservable public static <T> BlockingObservable<T> from(final Observable<? extends T> o) { return new BlockingObservable<T>(o); } public Future<T> toFuture() { return BlockingOperatorToFuture.toFuture((Observable<T>)o); }
ReactiveX 關於toFuture的解讀The
toFuture
operator applies to theBlockingObservable
subclass, so in order to use it, you must first convert your source Observable into aBlockingObservable
by means of either theBlockingObservable.from
method or theObservable.toBlocking
operator.
toFuture
只能做用於BlockingObservable
因此也纔會有上文想要轉換爲BlockingObservable的操做
// 該操做將 源Observable轉換爲返回單個數據項的Future public static <T> Future<T> toFuture(Observable<? extends T> that) { // CountDownLatch 判斷是否完成 final CountDownLatch finished = new CountDownLatch(1); // 存儲執行結果 final AtomicReference<T> value = new AtomicReference<T>(); // 存儲錯誤結果 final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); // single()方法能夠限制Observable只發送單條數據 // 若是有多條數據 會拋 IllegalArgumentException // 若是沒有數據能夠發送 會拋 NoSuchElementException @SuppressWarnings("unchecked") final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() { // single()返回的Observable就能夠對其進行標準的處理了 @Override public void onCompleted() { finished.countDown(); } @Override public void onError(Throwable e) { error.compareAndSet(null, e); finished.countDown(); } @Override public void onNext(T v) { // "single" guarantees there is only one "onNext" value.set(v); } }); // 最後將Subscription返回的數據封裝成Future,實現對應的邏輯 return new Future<T>() { // 能夠查看源碼 }; }
AbstractCommand
是toObservable
實現的地方,屬於Hystrix的核心邏輯,代碼較長,能夠和方法調用的流程圖一塊兒食用.toObservable
主要是完成緩存和建立Observable,requestLog的邏輯,當第一次建立Observable時,applyHystrixSemantics
方法是Hystrix的語義實現,能夠跳着看.
tips: 下文中有不少 Action和 Function,他們很類似,都有call方法,可是區別在於Function有返回值,而Action沒有,方法後跟着的數字表明有幾個入參.Func0/Func3即沒有入參和有三個入參
toObservable
代碼較長且分層仍是清晰的,因此下面一塊一塊寫.其邏輯和文章開始提到的Hystrix流程圖是徹底一致的.
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; // 此處省略掉了不少個Action和Function,大部分是來作掃尾清理的函數,因此用到的時候再說 // defer在上篇rxjava入門中提到過,是一種建立型的操做符,每次訂閱時會產生新的Observable,回調方法中所實現的纔是真正咱們須要的Observable return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 校驗命令的狀態,保證其只執行一次 if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } commandStartTimestamp = System.currentTimeMillis(); // properties爲當前command的全部屬性 // 容許記錄請求log時會保存當前執行的command if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } // 是否開啓了請求緩存 final boolean requestCacheEnabled = isRequestCachingEnabled(); // 獲取緩存key final String cacheKey = getCacheKey(); // 開啓緩存後,嘗試從緩存中取 if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 沒有開啓請求緩存時,就執行正常的邏輯 Observable<R> hystrixObservable = // 這裏又經過defer建立了咱們須要的Observable Observable.defer(applyHystrixSemantics) // 發送前會先走一遍hook,默認executionHook是空實現的,因此這裏就跳過了 .map(wrapWithAllOnNextHooks); // 獲得最後的封裝好的Observable後,將其放入緩存 if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache // 終止時的操做 .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) // 取消訂閱時的操做 .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once // 完成時的操做 .doOnCompleted(fireOnCompletedHook); } }
緩存擊中時的處理
private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) { try { // Hystrix中有大量的hook 若是有心作二次開發的,能夠利用這些hook作到很完善的監控 executionHook.onCacheHit(this); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx); } // 將緩存的結果賦給當前command return fromCache.toObservableWithStateCopiedInto(this) // doOnTerminate 或者是後面看到的doOnUnsubscribe,doOnError,都指的是在響應onTerminate/onUnsubscribe/onError後的操做,即在Observable的生命週期上註冊一個動做優雅的處理邏輯 .doOnTerminate(new Action0() { @Override public void call() { // 命令最終狀態的不一樣進行不一樣處理 if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { cleanUpAfterResponseFromCache(false); //user code never ran } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { cleanUpAfterResponseFromCache(true); //user code did run } } }) .doOnUnsubscribe(new Action0() { @Override public void call() { // 命令最終狀態的不一樣進行不一樣處理 if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { cleanUpAfterResponseFromCache(false); //user code never ran } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { cleanUpAfterResponseFromCache(true); //user code did run } } }); }
由於本片文章的主要目的是在講執行流程,因此失敗回退和斷路器相關的就留到之後的文章中再寫.
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { // 再也不訂閱了就返回不發送數據的Observable if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { // 不發送任何數據或通知 return Observable.never(); } return applyHystrixSemantics(_cmd); } }; private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 標記開始執行的hook // 若是hook內拋異常了,會快速失敗且沒有fallback處理 executionHook.onStart(_cmd); /* determine if we're allowed to execute */ // 斷路器核心邏輯: 判斷是否容許執行(TODO) if (circuitBreaker.allowRequest()) { // Hystrix本身造的信號量輪子,之因此不用juc下,官方解釋爲juc的Semphore實現太複雜,並且沒有動態調節的信號量大小的能力,簡而言之,不知足需求! // 根據不一樣隔離策略(線程池隔離/信號量隔離)獲取不一樣的TryableSemphore final TryableSemaphore executionSemaphore = getExecutionSemaphore(); // Semaphore釋放標誌 final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); // 釋放信號量的Action final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; // 異常處理 final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { // HystrixEventNotifier是hystrix的插件,不一樣的事件發送不一樣的通知,默認是空實現. eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; // 線程池隔離的TryableSemphore始終爲true if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ // executionResult是一次命令執行的結果信息封裝 // 這裏設置起始時間是爲了記錄命令的生命週期,執行過程當中會set其餘屬性進去 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) // 報錯時的處理 .doOnError(markExceptionThrown) // 終止時釋放 .doOnTerminate(singleSemaphoreRelease) // 取消訂閱時釋放 .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // tryAcquire失敗後會作fallback處理,TODO return handleSemaphoreRejectionViaFallback(); } } else { // 斷路器短路(拒絕請求)fallback處理 TODO return handleShortCircuitViaFallback(); } }
/** * 執行run方法的地方 */ private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { // 獲取當前上下文 final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // 發送數據時的Action響應 final Action1<R> markEmits = new Action1<R>() { @Override public void call(R r) { // 若是onNext時須要上報時,作如下處理 if (shouldOutputOnNextEvents()) { // result標記 executionResult = executionResult.addEvent(HystrixEventType.EMIT); // 通知 eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } // commandIsScalar是一個我不解的地方,在網上也沒有查到好的解釋 // 該方法爲抽象方法,有HystrixCommand實現返回true.HystrixObservableCommand返回false if (commandIsScalar()) { // 耗時 long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); // 通知 eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); // 斷路器標記成功(斷路器半開時的反饋,決定是否關閉斷路器) circuitBreaker.markSuccess(); } } }; final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { // 同markEmits 相似處理 } } }; // 失敗回退的邏輯 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { // 不是重點略過了 } }; // 請求上下文的處理 final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable<R> execution; // 若是有執行超時限制,會將包裝後的Observable再轉變爲支持TimeOut的 if (properties.executionTimeoutEnabled().get()) { // 根據不一樣的隔離策略包裝爲不一樣的Observable execution = executeCommandWithSpecifiedIsolation(_cmd) // lift 是rxjava中一種基本操做符 能夠將Observable轉換成另外一種Observable // 包裝爲帶有超時限制的Observable .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
根據不一樣的隔離策略建立不一樣的執行Observable
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 因爲源碼太長,這裏只關注正常的流程,須要詳細瞭解能夠去看看源碼 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { try { return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } }}) .doOnTerminate(new Action0() {}) .doOnUnsubscribe(new Action0() {}) // 指定在某一個線程上執行,是rxjava中很重要的線程調度的概念 .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { })); } else { // 信號量隔離策略 return Observable.defer(new Func0<Observable<R>>() { // 邏輯與線程池大體相同 }); } }
獲取用戶執行的邏輯
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { // getExecutionObservable是抽象方法,有HystrixCommand自行實現 userObservable = getExecutionObservable(); } catch (Throwable ex) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error userObservable = Observable.error(ex); } // 將Observable做其餘中轉 return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)); }
lift操做符
lift能夠轉換成一個新的Observable,它很像一個代理,將原來的Observable代理到本身這裏,訂閱時通知原來的Observable發送數據,經本身這裏流轉加工處理再返回給訂閱者.Map/FlatMap
操做符底層其實就是用的lift
進行實現的.
@Override final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { // just操做符就是直接執行的Observable // run方法就是咱們實現的業務邏輯: Hello World~ return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // 執行訂閱時將執行線程記爲當前線程,必要時咱們能夠interrupt executionThread.set(Thread.currentThread()); } }); }
但願本身能把埋下的坑一一填完: 容錯機制,metrics,斷路器等等...