Hystrix的常規使用姿式java
@Test public void test_run(){ String s = new CommandHelloWorld("Bob").execute(); System.out.println(s); }
咱們的command在new的時候發生了什麼?execute()是如何執行的?execute執行失敗或者超時如何fallback?segmentfault
當咱們new XXCommand()的時候,大部分的工做都是在 AbstractCommand
完成緩存
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { this.commandGroup = initGroupKey(group); this.commandKey = initCommandKey(key, getClass()); this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //Strategies from plugins this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); this.executionHook = initExecutionHook(executionHook); this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); /* fallback semaphore override if applicable */ this.fallbackSemaphoreOverride = fallbackSemaphore; /* execution semaphore override if applicable */ this.executionSemaphoreOverride = executionSemaphore; }
能夠很清晰的看到,這裏面在進行command配置裝載、線程池配置裝載及線程池的建立、指標蒐集器、熔斷器的初始化等等。app
//HystrixCommandMetrics ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>(); //HystrixThreadPoolDefault final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>(); //com.netflix.hystrix.HystrixCircuitBreaker.Factory private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
除HystrixCommand每次都須要從新創建,其它基本都以commandKey維護着配置,熔斷器,指標的單例而線程池則以threadkey進場存儲。ide
咱們能夠了瞭解下Hystrix的線程池如何管理
建立線程調用 HystrixThreadPool.Factory.getInstanceui
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name(); // this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }
從緩存中以threadPoolKey獲取線程池,獲取不到則 調用new HystrixThreadPoolDefault
新建this
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); this.threadPool = this.metrics.getThreadPool(); this.queue = this.threadPool.getQueue(); /* strategy: HystrixMetricsPublisherThreadPool */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); }
注意spa
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);
其中 concurrencyStrategy.getThreadPool,HystrixConcurrencyStrategy
就是hystrix的線程建立策略者.net
真正的建立線程執行
HystrixConcurrencyStrategy#getThreadPool線程
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { .....各類配置,此處代碼省略...... if (allowMaximumSizeToDivergeFromCoreSize) { final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } }
這裏調用java JUC原生的 ThreadPoolExecutor建立線程
Hystrix的執行利用RxJava,組合了不少的Observable,造成一個Observable,和傳統的調用鏈相比更加簡潔。
toObservable
第一個observable,在下一個chain以前,會更改HystrixCommand狀態位 OBSERVABLE_CHAIN_CREATED
toObservable
doOnTerminate,探測到terminate時,會將HystrixCommand更改成 TERMINAL
executeCommandWithSpecifiedIsolation
在開始執行的時候會更改HystrixCommand更改成 USER_CODE_EXECUTED
toObservable
doOnUnsubscribe,探測到terminate時,會將HystrixCommand更改成 UNSUBSCRIBED
分配執行線程,維護線程狀態
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { .....省略干擾代碼..... if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { try { .....省略干擾代碼..... return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { .....省略干擾代碼..... } }
具體邏輯
1.判斷隔離策略,若是是Semaphore 信號量則在當前線程上執行,不然進入線程分配邏輯
2.更改HystrixCommand的狀態 USER_CODE_EXECUTED
3.判斷HystrixCommand超時狀態,若是已經超時則拋出異常
4.更改當前command的線程執行狀態爲 STARTED
5.調用 getUserExecutionObservable 執行具體邏輯
6.doOnTerminate
當Observale執行完畢後(HystrixCommand可能失敗也可能執行成功),此時的線程狀態可能有兩種分別是 STARTED
和 NOT_USING_THREAD
, 而後更改線程狀態爲 TERMINAL
7.doOnUnsubscribe
當Observable被取消訂閱,更改線程狀態爲 TERMINAL
8.subscribeOn
指定scheduler,這裏Hystrix實現了本身的scheduler,在scheduler的worker指定線程池,在配置線程以前會從新加載線程池配置(這裏是Rxjava的東西,暫時你們能夠粗略的認爲這裏就是指定線程池,而後把要執行的任務扔到這個線程池裏)
@Override public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { touchConfig(); return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } // allow us to change things via fast-properties by setting it each time private void touchConfig() { final int dynamicCoreSize = properties.coreSize().get(); final int configuredMaximumSize = properties.maximumSize().get(); int dynamicMaximumSize = properties.actualMaximumSize(); final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get(); boolean maxTooLow = false; if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) { //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum dynamicMaximumSize = dynamicCoreSize; maxTooLow = true; } // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed. if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) { if (maxTooLow) { logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ". Maximum size will be set to " + dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); } threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); }
touchConfig
執行具體的線程池參數調整。
從上面的過程也能發現,該observable也是維護線程狀態的地方,線程的狀態變動見下圖
執行具體業務邏輯
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { userObservable = getExecutionObservable(); } catch (Throwable ex) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error userObservable = Observable.error(ex); } return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)); }
userObservable = getExecutionObservable();
由HystrixCommand本身實現
//HystrixCommand final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); }
這裏看到 run()
應該就明白了,就是咱們本身的業務代碼 CommandHelloWorld
去實現的。
當executeCommandWithSpecifiedIsolation探測到異常時觸發該Observable。getFallbackOrThrowException裏具體fallback執行看
executeCommandAndObserve。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { .....省略干擾代碼..... final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { .....省略干擾代碼..... }; .....省略干擾代碼..... Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
doErrorResumeNext 會觸發下一個 handleFallback。
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { ....省略干擾代碼.... if (isUnrecoverable(originalException)) { ....省略干擾代碼.... } else { ....省略干擾代碼.... if (properties.fallbackEnabled().get()) { ....省略干擾代碼.... Observable<R> fallbackExecutionChain; // acquire a permit if (fallbackSemaphore.tryAcquire()) { try { if (isFallbackUserDefined()) { executionHook.onFallbackStart(this); fallbackExecutionChain = getFallbackObservable(); } else { //same logic as above without the hook invocation fallbackExecutionChain = getFallbackObservable(); } } catch (Throwable ex) { //If hook or user-fallback throws, then use that as the result of the fallback lookup fallbackExecutionChain = Observable.error(ex); } return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .lift(new DeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } else { return handleFallbackRejectionByEmittingError(); } } else { return handleFallbackDisabledByEmittingError(originalException, failureType, message); } } }
這裏優先幾個步驟
1.判斷異常是不是能走fallback處理,不能則拋出HystrixRuntimeException
2.判斷配置是否開啓容許fallback,開啓,則進入 getFallbackObservable()
,而該方法具體有HystrixCommand實現,調用的則是用戶的Command的fallback方法,若是調用方沒有覆蓋該方法,則會執行HystrixCommand的fallback方法,拋出未定義fallback方法的異常
protected R getFallback() { throw new UnsupportedOperationException("No fallback available."); } @Override final protected Observable<R> getFallbackObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { //調用方 fallback邏輯 return Observable.just(getFallback()); } catch (Throwable ex) { return Observable.error(ex); } } }); }
後續系列文章,歡迎參閱
Hystrix熔斷器執行機制
Hystrix超時實現機制