Hystrix流程圖以下:spring
Hystrix整個工做流以下:編程
從流程圖上可知道,第5步線程池/隊列/信號量已滿時,還會執行第7步邏輯,更新熔斷器統計信息,而第6步不管成功與否,都會更新熔斷器統計信息。api
hystrix在服務降級熔斷的過程當中有幾個步驟他是必需要去完成的瀏覽器
上一篇幅我是經過註解的方式來進行服務熔錯的,此次不經過註解換一種方式,首先在spring-cloud-user服務中寫如下內容緩存
而後啓動服務訪問瀏覽器,結果若是我想的同樣併發
下面演示個帶超時降級的Hystrix註解app
而後用AOP寫本身的攔截規則框架
/** *這裏面用到的是AOP的知識點,若是不瞭解能夠先自行補下,後面我有空把Spring的AOP原理也寫下,這樣回頭看這個就沒這麼難了 */ @Component @Aspect //切入 public class GhyHystrixAspect { //經過線程池去請求 ExecutorService executorService= Executors.newFixedThreadPool(10); //定義切點針對GhyHystrix進行切入 @Pointcut(value = "@annotation(GhyHystrix)") public void pointCut(){} //切入後執行的方法 @Around(value = "pointCut()&&@annotation(hystrixCommand)") public Object doPointCut(ProceedingJoinPoint joinPoint, GhyHystrix hystrixCommand) throws InterruptedException, ExecutionException, TimeoutException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { //定義超時降級 int timeout=hystrixCommand.timeout(); //前置的判斷邏輯 Future future=executorService.submit(()->{ try { return joinPoint.proceed(); //執行目標方法 } catch (Throwable throwable) { throwable.printStackTrace(); } return null; }); Object result; try { //獲得開始和結束時間判斷是否超時,若是超時就降級 result=future.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); //超時了就取消請求 future.cancel(true); // 先判斷是否爲空若是空就把異常拋出去 if(StringUtils.isBlank(hystrixCommand.fallback())){ throw e; } //調用fallback result=invokeFallback(joinPoint,hystrixCommand.fallback()); } return result; } //反射調用 private Object invokeFallback(ProceedingJoinPoint joinPoint,String fallback) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { MethodSignature signature=(MethodSignature)joinPoint.getSignature(); //拿到方法的信息 Method method=signature.getMethod(); //獲得參數類型 Class<?>[] parameterTypes=method.getParameterTypes(); //以上是獲取被代理的方法的參數和Method //獲得fallback方法 try { Method fallbackMethod=joinPoint.getTarget().getClass().getMethod(fallback,parameterTypes); fallbackMethod.setAccessible(true); //完成反射調用 return fallbackMethod.invoke(joinPoint.getTarget(),joinPoint.getArgs()); } catch (Exception e) { e.printStackTrace(); throw e; } } }
而後再寫個調用邏輯,用本身定義的註解less
瀏覽器訪問,返回的不是咱們剛剛定義的降級內容,其實這也挺好想的,我用的是以前的項目,以前在spring-cloud-api工程中定義了熔斷規則,改一下就好異步
將這此內容改下就好,還有配置文件隱藏下,這裏就不搞了
當請求過來時,若是請求失敗,先判斷請求次數是否達到了最小請求次數,再判斷錯誤率是否達到了閾值,若是沒達到就繼續請求,這個錯誤率的統計時間默認是10S;若是達到了閾值就要打開斷路器,打開斷 路器後有5秒的時間是熔斷狀態,5秒後,若是有請求過來,就會試着把請求發送到遠程服務,若是成功,斷路器就關閉;若是失敗斷路器繼續開啓;這個流程就引出第一個概念,那就是滑動窗口
在 hystrix 裏,大量使用了 RxJava 這個響應式函數編程框架,滑動窗口的實現也是使用了 RxJava 框架。它其實就是一個 流量控制技術;居然提到了滑動窗口,那就必需要提兩上東西,一個是計數器,另外一個就是滑動窗口;爲了更通俗的理解計數器和滑動窗口關係,就以一個例子說明;假若有一個場景:要作一個請求限制,限制要求一分鐘內最多隻能有60個請求經過,這時最通用的作方就是用個計數器,計數一分鐘內請求的次數,在這一分鐘內每來一個請求計數器就加1;一分鐘事後進入下一個一分鐘時計數器就把計數歸零從新計數;因此說若是要限流判斷就只用判斷這一分鐘內的計數量就能夠了,但這種作法在每一個1分鐘的臨界值時是有問題的,問題是啥呢,假如說在0到58S時都沒有請求,可是忽然在第59S時一會兒來了60個請求,在60S時再來60個請求,這個時候發生的狀況是在相鄰兩秒內一會兒來了120個請求,此時由於59S在第一個時間段;60S在第二個時間段,因此沒有知足觸發熔斷條件,這就導至了相鄰兩秒間的請求量過了閾值,系統極可能炸了,爲此引出了另外一個玩意,那就是滑動窗口;滑動窗口把一分鐘分紅6個窗口,每一個窗口是10S,紅色框表明能夠滑動的滑動窗口,黑色的窗口表明10S的統計數值,第一個10S統計 完成後紅色滑動窗口會向前滑動一格,改爲滑動窗口後他統計的就是紅色滑動窗口內的訪問量總和了
hystrix是經過滑動窗口統計的,他一共有10個窗口,每一個窗口表明1S,因此他統計的是他10S內的數據
上圖的每一個小矩形表明一個桶,能夠看到,每一個桶都記錄着1秒內的四個指標數據:成功量、失敗量、超時量和拒絕量,這裏的拒絕量指的就是上面流程圖中【信號量/線程池資源檢查】中被拒絕的流量。10個桶合起來是一個完整的滑動窗口,因此計算一個滑動窗口的總數據須要將10個桶的數據加起來。
Hystrix熔斷的@HystrixCommand註解,是經過HystrixCommandAspect這個切面來處理的。其中關注@Around註解聲明的方法,它針對於請求合併,以及降級的註解進行代理。這裏重點針對HystrixCommand這個註解進行詳細分析。
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { Method method = getMethodFromTarget(joinPoint); //省略代碼... MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //若是是異步,則建立GenericObservableCommand, 不然,則建立GenericCommand HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { //是不是響應式的(因爲咱們這些都是同步的會走 這個邏輯)
//默認是走這裏面,用命令執行器去執行 result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; }
點擊進入 CommandExecutor類的execute方法,這個方法主要用來執行命令,從代碼中能夠看出這裏有三個執行類型,分別是同步、異步、以及響應式。其中,響應式又分爲Cold Observable(observable.toObservable()) 和 HotObservable(observable.observe())
默認的executionType=SYNCHRONOUS ,同步請求。
須要注意的是,Hystrix用到了RxJava這個框架,它是一個響應式編程框架,在Android裏面用得比較多
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) {
case SYNCHRONOUS: { return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } }
由於是走默認的,因此進入HystrixCommand類的execute()方法;這個方法中,首先調用queue(),這個方法會返回一個future對象。
public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }
queue這個方法中,返回了一個Future對象,這個future對象的實現是f,f是以匿名內部類,它是Java.util.concurrent中定一個的一個異步帶返回值對象。當調用queue().get()方法時,最終是委派給了delegate.get 方法。
public Future<R> queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true; * thus, to comply with the contract of Future, we must wrap around it. */ final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; } if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCa ncel().get()) { /* * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked. * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption, * than that interruption request cannot be taken back. */ interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) { final Thread t = executionThread.get(); if (t != null && !t.equals(Thread.currentThread())) { t.interrupt(); } } return res; } @Override public boolean isCancelled() { return delegate.isCancelled(); } @Override public boolean isDone() { return delegate.isDone(); }
//最終會調用此方法 @Override public R get() throws InterruptedException, ExecutionException { return delegate.get(); } @Override public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; /* special handling of error states that throw immediately */ if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { Throwable t = decomposeException(e); if (t instanceof HystrixBadRequestException) { return f; } else if (t instanceof HystrixRuntimeException) { HystrixRuntimeException hre = (HystrixRuntimeException) t; switch (hre.getFailureType()) { case COMMAND_EXCEPTION: case TIMEOUT: // we don't throw these types from queue() only from queue().get() as they are execution errors return f; default: // these are errors we throw from queue() as they as rejection type errors throw hre; } } else { throw Exceptions.sneakyThrow(t); } } } return f; }
由於最終是委派給了delegate.get 方法執行,而delegate在開頭final Future<R> delegate = toObservable().toBlocking().toFuture();中,因此進入toObservable()方法中,在RxJava中,分爲幾種角色
在queue中,調用toObservable()方法建立一個被觀察者。經過Observable定義一個被觀察者,這個被觀察者會被toObservable().toBlocking().toFuture() ,實際上就是返回可得到 run() 抽象方法執行結果的Future 。 run() 方法由子類實現,執行正常的業務邏輯。在下面這段代碼中,當存在subscriber時,便會調用Func0#call() 方法,而這個subscriber是在 toBlocking() 中被訂閱的。
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; //doOnCompleted handler already did all of the SUCCESS work //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(true); //user code did run } } }; //mark the command as CANCELLED and store the latency (in addition to standard cleanup) final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); try { executionHook.onUnsubscribe(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx); } _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); } handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); try { executionHook.onUnsubscribe(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx); } _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); } handleCommandEnd(true); //user code did run } } }; final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() { @Override public R call(R r) { R afterFirstApplication = r; try { afterFirstApplication = executionHook.onComplete(_cmd, r); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx); } try { return executionHook.onEmit(_cmd, afterFirstApplication); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx); return afterFirstApplication; } } }; final Action0 fireOnCompletedHook = new Action0() { @Override public void call() { try { executionHook.onSuccess(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx); } } }; return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { /* this is a stateful object so can only be used once */ /* CAS保證命令只執行一次 */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } // 命令開始時間戳 commandStartTimestamp = System.currentTimeMillis(); // 打印日誌 if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } // 緩存開關,緩存KEY(這個是Hystrix中請求緩存功能,hystrix支持將一個請求結果緩存起 來,下一個具備相同key的請求將直接從緩存中取出結果,減小請求開銷) final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); /* try from cache first */ if (requestCacheEnabled) {//若是開啓了緩存機制,則從緩存中獲取結果 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 聲明執行命令的Observable Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; //保存請求結果到緩存中 if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } });
執行命令的Observable的定義以下,經過defer定義了一個 applyHystrixSemantics 的事件。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { // 當commandState處於UNSUBSCRIBED時,不執行命令 if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } //返回執行命令的Observable return applyHystrixSemantics(_cmd); } }; Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks);
applyHystrixSemantics方法;假設緩存特性未開啓或者未命中緩存,那麼代碼將執行 applyHystrixSemantics 。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//跟進 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
executeCommandAndObserve