Spring Cloud Hystrix原理篇(十一)

1、Hystrix處理流程

Hystrix流程圖以下:spring

 

 

 

Hystrix整個工做流以下:編程

  1. 構造一個 HystrixCommand或HystrixObservableCommand對象,用於封裝請求,並在構造方法配置請求被執行須要的參數;
  2. 執行命令,Hystrix提供了4種執行命令的方法,後面詳述;
  3. 判斷是否使用緩存響應請求,若啓用了緩存,且緩存可用,直接使用緩存響應請求。Hystrix支持請求緩存,但須要用戶自定義啓動;
  4. 判斷熔斷器是否打開,若是打開,跳到第8步;
  5. 判斷線程池/隊列/信號量是否已滿,已滿則跳到第8步;
  6. 執行HystrixObservableCommand.construct()或HystrixCommand.run(),若是執行失敗或者超時,跳到第8步;不然,跳到第9步;
  7. 統計熔斷器監控指標;
  8. 走Fallback備用邏輯
  9. 返回請求響應

從流程圖上可知道,第5步線程池/隊列/信號量已滿時,還會執行第7步邏輯,更新熔斷器統計信息,而第6步不管成功與否,都會更新熔斷器統計信息。api

2、Hystrix的核心原理

hystrix在服務降級熔斷的過程當中有幾個步驟他是必需要去完成的瀏覽器

  •  可配置化的降級策略(根據不一樣的服務降級類型配置不一樣的降級策略方案):
    • 三種方式:信號量/線程 、超時(默認1s)、熔斷(錯誤率)
    • 在HystrixCommandProperty類中經過相關屬性去配置改變他的默認策略(上篇中有說明過)
  • 能夠識別的降級邊界:
    • @HystrixCommand(Spring AOP經過註解標註一個接口的資源,去表示說明這個接口須要經過Hystrix來接管這個請求,若是達到註解內的配置要求就熔斷)
    • 本身去繼承HystrixCommand 抽象類,等下演示下,這玩意還挺好玩的
  • 數據採集:
    • 如何觸發熔斷(上篇幅也說過10s 內20個請求 ,錯誤率達到50),這裏引出的問題是如何採集數據,如何統計數據.
    • SEMAPHORE,最大併發數量 (它底層其實就是個AQS 統計次數tryAcquire(), acquire())
  • 行爲干預: 觸發降級/熔斷以後,對正常業務產生影響
  • 結果乾預: 經過fallback()返回數據
  • 自動恢復(處於熔斷狀態下,會每隔5s嘗試去恢復)

 2.一、經過HystrixCommand 接管咱們定義的請求

上一篇幅我是經過註解的方式來進行服務熔錯的,此次不經過註解換一種方式,首先在spring-cloud-user服務中寫如下內容緩存

 

 

 

 

 

 而後啓動服務訪問瀏覽器,結果若是我想的同樣併發

 

 

 2.二、Hystrix是如何工做的

下面演示個帶超時降級的Hystrix註解app

 

 而後用AOP寫本身的攔截規則框架

 

 

/**
 *這裏面用到的是AOP的知識點,若是不瞭解能夠先自行補下,後面我有空把Spring的AOP原理也寫下,這樣回頭看這個就沒這麼難了
 */
@Component
@Aspect  //切入
public class GhyHystrixAspect {
    //經過線程池去請求
    ExecutorService executorService= Executors.newFixedThreadPool(10);
    //定義切點針對GhyHystrix進行切入
    @Pointcut(value = "@annotation(GhyHystrix)")
    public void pointCut(){}
    //切入後執行的方法
    @Around(value = "pointCut()&&@annotation(hystrixCommand)")
    public Object doPointCut(ProceedingJoinPoint joinPoint, GhyHystrix hystrixCommand) throws InterruptedException, ExecutionException, TimeoutException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        //定義超時降級
        int timeout=hystrixCommand.timeout();
        //前置的判斷邏輯
        Future future=executorService.submit(()->{
            try {
                return joinPoint.proceed(); //執行目標方法
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return null;
        });
        Object result;
        try {
            //獲得開始和結束時間判斷是否超時,若是超時就降級
            result=future.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            //超時了就取消請求
            future.cancel(true);
            // 先判斷是否爲空若是空就把異常拋出去
            if(StringUtils.isBlank(hystrixCommand.fallback())){
                throw e;
            }
            //調用fallback
            result=invokeFallback(joinPoint,hystrixCommand.fallback());
        }
        return result;
    }
   //反射調用
    private Object invokeFallback(ProceedingJoinPoint joinPoint,String fallback) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        MethodSignature signature=(MethodSignature)joinPoint.getSignature();
        //拿到方法的信息
        Method method=signature.getMethod();
        //獲得參數類型
        Class<?>[] parameterTypes=method.getParameterTypes();
        //以上是獲取被代理的方法的參數和Method
        //獲得fallback方法
        try {
            Method fallbackMethod=joinPoint.getTarget().getClass().getMethod(fallback,parameterTypes);
            fallbackMethod.setAccessible(true);
            //完成反射調用
            return fallbackMethod.invoke(joinPoint.getTarget(),joinPoint.getArgs());
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }
}

而後再寫個調用邏輯,用本身定義的註解less

 

 瀏覽器訪問,返回的不是咱們剛剛定義的降級內容,其實這也挺好想的,我用的是以前的項目,以前在spring-cloud-api工程中定義了熔斷規則,改一下就好異步

 

 

 將這此內容改下就好,還有配置文件隱藏下,這裏就不搞了

 3、Hystrix的熔斷的原理以及請求代理的原理

當請求過來時,若是請求失敗,先判斷請求次數是否達到了最小請求次數,再判斷錯誤率是否達到了閾值,若是沒達到就繼續請求,這個錯誤率的統計時間默認是10S;若是達到了閾值就要打開斷路器,打開斷 路器後有5秒的時間是熔斷狀態,5秒後,若是有請求過來,就會試着把請求發送到遠程服務,若是成功,斷路器就關閉;若是失敗斷路器繼續開啓;這個流程就引出第一個概念,那就是滑動窗口

3.一、滑動窗口

在 hystrix 裏,大量使用了 RxJava 這個響應式函數編程框架,滑動窗口的實現也是使用了 RxJava 框架。它其實就是一個 流量控制技術;居然提到了滑動窗口,那就必需要提兩上東西,一個是計數器,另外一個就是滑動窗口;爲了更通俗的理解計數器和滑動窗口關係,就以一個例子說明;假若有一個場景:要作一個請求限制,限制要求一分鐘內最多隻能有60個請求經過,這時最通用的作方就是用個計數器,計數一分鐘內請求的次數,在這一分鐘內每來一個請求計數器就加1;一分鐘事後進入下一個一分鐘時計數器就把計數歸零從新計數;因此說若是要限流判斷就只用判斷這一分鐘內的計數量就能夠了,但這種作法在每一個1分鐘的臨界值時是有問題的,問題是啥呢,假如說在0到58S時都沒有請求,可是忽然在第59S時一會兒來了60個請求,在60S時再來60個請求,這個時候發生的狀況是在相鄰兩秒內一會兒來了120個請求,此時由於59S在第一個時間段;60S在第二個時間段,因此沒有知足觸發熔斷條件,這就導至了相鄰兩秒間的請求量過了閾值,系統極可能炸了,爲此引出了另外一個玩意,那就是滑動窗口;滑動窗口把一分鐘分紅6個窗口,每一個窗口是10S,紅色框表明能夠滑動的滑動窗口,黑色的窗口表明10S的統計數值,第一個10S統計 完成後紅色滑動窗口會向前滑動一格,改爲滑動窗口後他統計的就是紅色滑動窗口內的訪問量總和了

 

hystrix是經過滑動窗口統計的,他一共有10個窗口,每一個窗口表明1S,因此他統計的是他10S內的數據

 

 

 

上圖的每一個小矩形表明一個桶,能夠看到,每一個桶都記錄着1秒內的四個指標數據:成功量、失敗量、超時量和拒絕量,這裏的拒絕量指的就是上面流程圖中【信號量/線程池資源檢查】中被拒絕的流量。10個桶合起來是一個完整的滑動窗口,因此計算一個滑動窗口的總數據須要將10個桶的數據加起來

 

 4、Hystrix熔斷的源碼分析

Hystrix熔斷的@HystrixCommand註解,是經過HystrixCommandAspect這個切面來處理的。其中關注@Around註解聲明的方法,它針對於請求合併,以及降級的註解進行代理。這裏重點針對HystrixCommand這個註解進行詳細分析。

  • getMethodFromTarget 獲取目標方法信息
  • MetaHolder metaHolder = metaHolderFactory.create(joinPoint); 獲取元數據,好比調用方法,HystrixProperty註解數據、方法參數等
  • HystrixCommandFactory.getInstance().create 獲取調用者,它持有一個命令對象,而且能夠在合適的時候經過這個命令對象完成具體的業務邏輯
  • execute,執行命令
@Around("hystrixCommandAnnotationPointcut() ||
hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint
joinPoint) throws Throwable {
  Method method = getMethodFromTarget(joinPoint);
  //省略代碼...
  MetaHolderFactory metaHolderFactory =
META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
  MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
  //若是是異步,則建立GenericObservableCommand, 不然,則建立GenericCommand
  HystrixInvokable invokable =
HystrixCommandFactory.getInstance().create(metaHolder);
 
  ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
    metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
  Object result;
  try {
    if (!metaHolder.isObservable()) { //是不是響應式的(因爲咱們這些都是同步的會走
這個邏輯)
//默認是走這裏面,用命令執行器去執行       result
= CommandExecutor.execute(invokable, executionType, metaHolder);    } else {       result = executeObservable(invokable, executionType, metaHolder);    }  } catch (HystrixBadRequestException e) {     throw e.getCause();  } catch (HystrixRuntimeException e) {     throw hystrixRuntimeExceptionToThrowable(metaHolder, e); }   return result; }

點擊進入 CommandExecutor類的execute方法,這個方法主要用來執行命令,從代碼中能夠看出這裏有三個執行類型,分別是同步、異步、以及響應式。其中,響應式又分爲Cold Observable(observable.toObservable()) 和 HotObservable(observable.observe())

默認的executionType=SYNCHRONOUS ,同步請求。

  • execute():同步執行,返回一個單一的對象結果,發生錯誤時拋出異常。
  • queue():異步執行,返回一個 Future 對象,包含着執行結束後返回的單一結果。
  • observe():這個方法返回一個 Observable 對象,它表明操做的多個結果,可是已經被訂閱者消費掉了。
  • toObservable():這個方法返回一個 Observable 對象,它表明操做的多個結果,須要我們本身手動訂閱並消費掉。

須要注意的是,Hystrix用到了RxJava這個框架,它是一個響應式編程框架,在Android裏面用得比較多 

    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);
         
        switch (executionType) {
case SYNCHRONOUS: { return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } }

由於是走默認的,因此進入HystrixCommand類的execute()方法;這個方法中,首先調用queue(),這個方法會返回一個future對象。

  public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

queue這個方法中,返回了一個Future對象,這個future對象的實現是f,f是以匿名內部類,它是Java.util.concurrent中定一個的一個異步帶返回值對象。當調用queue().get()方法時,最終是委派給了delegate.get 方法。

public Future<R> queue() {
  /*
    * The Future returned by Observable.toBlocking().toFuture() does not
implement the
    * interruption of the execution thread when the "mayInterrupt" flag of
Future.cancel(boolean) is set to true;
    * thus, to comply with the contract of Future, we must wrap around it.
    */
  final Future<R> delegate = toObservable().toBlocking().toFuture();
  final Future<R> f = new Future<R>() {
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
      if (delegate.isCancelled()) {
        return false;
     }
      if
(HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCa
ncel().get()) {
        /*
          * The only valid transition here is false -> true. If there
are two futures, say f1 and f2, created by this command
          * (which is super-weird, but has never been prohibited),
and calls to f1.cancel(true) and to f2.cancel(false) are
          * issued by different threads, it's unclear about what
value would be used by the time mayInterruptOnCancel is checked.
          * The most consistent way to deal with this scenario is to
say that if *any* cancellation is invoked with interruption,
          * than that interruption request cannot be taken back.
          */
        interruptOnFutureCancel.compareAndSet(false,
mayInterruptIfRunning);
     }
      final boolean res = delegate.cancel(interruptOnFutureCancel.get());
      if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
        final Thread t = executionThread.get();
        if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
       }
     }
      return res;
   }
    @Override
    public boolean isCancelled() {
      return delegate.isCancelled();
   }
    @Override
    public boolean isDone() {
      return delegate.isDone();
   }
//最終會調用此方法     @Override     
public R get() throws InterruptedException, ExecutionException {       return delegate.get();    }     @Override     public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {       return delegate.get(timeout, unit);    }  };   /* special handling of error states that throw immediately */   if (f.isDone()) {     try {       f.get();       return f;    } catch (Exception e) {       Throwable t = decomposeException(e);       if (t instanceof HystrixBadRequestException) {         return f;      } else if (t instanceof HystrixRuntimeException) {         HystrixRuntimeException hre = (HystrixRuntimeException) t;         switch (hre.getFailureType()) {           case COMMAND_EXCEPTION:           case TIMEOUT:             // we don't throw these types from queue() only from queue().get() as they are execution errors             return f;           default:             // these are errors we throw from queue() as they as rejection type errors             throw hre;        }      } else {         throw Exceptions.sneakyThrow(t);      }    }  } return f; }

由於最終是委派給了delegate.get 方法執行,而delegate在開頭final Future<R> delegate = toObservable().toBlocking().toFuture();中,因此進入toObservable()方法中,在RxJava中,分爲幾種角色

  • Observable(被觀察者),它的主要做用是產生事件
  • Observer(觀察者),它的做用是接收事件並做出相應
  • Subscribe(訂閱),它用來鏈接被觀察者和觀察者
  • Event(事件),被觀察者、觀察者、溝通的載體

在queue中,調用toObservable()方法建立一個被觀察者。經過Observable定義一個被觀察者,這個被觀察者會被toObservable().toBlocking().toFuture() ,實際上就是返回可得到 run() 抽象方法執行結果的Future 。 run() 方法由子類實現,執行正常的業務邏輯。在下面這段代碼中,當存在subscriber時,便會調用Func0#call() 方法,而這個subscriber是在 toBlocking() 中被訂閱的。

  • 調用 isRequestCachingEnabled(); 判斷請求結果緩存功能是否開啓,若是開啓而且命中了緩存,則會以Observable形式返回一個緩存結果
  • 建立執行命令的Observable: hystrixObservable
  • 當緩存處於開啓狀態而且沒有命中緩存時,則建立一個「訂閱了執行命令的Observable」:HystrixCommandResponseFromCache
    • 建立存儲到緩存的Observable: HystrixCachedObservable
    • 將toCache添加到緩存中,返回獲取緩存的Observable:fromCache
    • 若是添加失敗: fromCache!=null, 則調用 toCache.unsubscribe() 方法,取消HystrixCachedObservable 的訂閱
    • 若是添加成功,則調用 toCache.toObservable(); 得到緩存Observable
  • 當緩存特性沒有開啓時,則返回執行命令的Observable。
   public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;

        //doOnCompleted handler already did all of the SUCCESS work
        //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
        final Action0 terminateCommandCleanup = new Action0() {

            @Override
            public void call() {
                if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                    handleCommandEnd(false); //user code never ran
                } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                    handleCommandEnd(true); //user code did run
                }
            }
        };

        //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
        final Action0 unsubscribeCommandCleanup = new Action0() {
            @Override
            public void call() {
                if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                    if (!_cmd.executionResult.containsTerminalEvent()) {
                        _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                        try {
                            executionHook.onUnsubscribe(_cmd);
                        } catch (Throwable hookEx) {
                            logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                        }
                        _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                                .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                    }
                    handleCommandEnd(false); //user code never ran
                } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                    if (!_cmd.executionResult.containsTerminalEvent()) {
                        _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                        try {
                            executionHook.onUnsubscribe(_cmd);
                        } catch (Throwable hookEx) {
                            logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                        }
                        _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                                .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                    }
                    handleCommandEnd(true); //user code did run
                }
            }
        };

        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };

        final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
            @Override
            public R call(R r) {
                R afterFirstApplication = r;

                try {
                    afterFirstApplication = executionHook.onComplete(_cmd, r);
                } catch (Throwable hookEx) {
                    logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
                }

                try {
                    return executionHook.onEmit(_cmd, afterFirstApplication);
                } catch (Throwable hookEx) {
                    logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                    return afterFirstApplication;
                }
            }
        };

        final Action0 fireOnCompletedHook = new Action0() {
            @Override
            public void call() {
                try {
                    executionHook.onSuccess(_cmd);
                } catch (Throwable hookEx) {
                    logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
                }
            }
        };
return Observable.defer(new Func0<Observable<R>>() {
  @Override
  public Observable<R> call() {
    /* this is a stateful object so can only be used once */
    /* CAS保證命令只執行一次 */
    if (!commandState.compareAndSet(CommandState.NOT_STARTED,
CommandState.OBSERVABLE_CHAIN_CREATED)) {
      IllegalStateException ex = new IllegalStateException("This instance
can only be executed once. Please instantiate a new instance.");
      //TODO make a new error type for this
      throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION,
_cmd.getClass(), getLogMessagePrefix() + " command executed multiple times -
this is not permitted.", ex, null);
   }
   // 命令開始時間戳
    commandStartTimestamp = System.currentTimeMillis();
    // 打印日誌
    if (properties.requestLogEnabled().get()) {
      // log this command execution regardless of what happened
      if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
     }
   }
   // 緩存開關,緩存KEY(這個是Hystrix中請求緩存功能,hystrix支持將一個請求結果緩存起
來,下一個具備相同key的請求將直接從緩存中取出結果,減小請求開銷)
    final boolean requestCacheEnabled = isRequestCachingEnabled();
    final String cacheKey = getCacheKey();
    /* try from cache first */
    if (requestCacheEnabled) {//若是開啓了緩存機制,則從緩存中獲取結果
      HystrixCommandResponseFromCache<R> fromCache =
(HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
      if (fromCache != null) {
        isResponseFromCache = true;
        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
     }
   }
    // 聲明執行命令的Observable
    Observable<R> hystrixObservable =
      Observable.defer(applyHystrixSemantics)
     .map(wrapWithAllOnNextHooks);
    Observable<R> afterCache;
    //保存請求結果到緩存中
    if (requestCacheEnabled && cacheKey != null) {
      // wrap it for caching
      HystrixCachedObservable<R> toCache =
HystrixCachedObservable.from(hystrixObservable, _cmd);
      HystrixCommandResponseFromCache<R> fromCache =
(HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey,
toCache);
      if (fromCache != null) {
        // another thread beat us so we'll use the cached value instead
        toCache.unsubscribe();
        isResponseFromCache = true;
        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
     } else {
        // we just created an ObservableCommand so we cast and return it
        afterCache = toCache.toObservable();
     }
   } else {
      afterCache = hystrixObservable;
   }
    return afterCache
     .doOnTerminate(terminateCommandCleanup)   // perform cleanup once
(either on normal terminal state (this line), or unsubscribe (next line))
     .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
     .doOnCompleted(fireOnCompletedHook);
 }
});

執行命令的Observable的定義以下,經過defer定義了一個 applyHystrixSemantics 的事件。

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
  @Override
 public Observable<R> call() {
    // 當commandState處於UNSUBSCRIBED時,不執行命令
    if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
      return Observable.never();
   }
    //返回執行命令的Observable
    return applyHystrixSemantics(_cmd);
 }
};
Observable<R> hystrixObservable =
  Observable.defer(applyHystrixSemantics)
 .map(wrapWithAllOnNextHooks);

applyHystrixSemantics方法;假設緩存特性未開啓或者未命中緩存,那麼代碼將執行 applyHystrixSemantics 。

  • 傳入的_cmd是一個GenericCommand,最終執行這個command中的run方法,本質就是完成對queryOrder方法的代理
  • circuitBreaker.allowRequest() 若是爲true,表示當前不處於熔斷狀態,正常執行,不然,調用 handleShortCircuitViaFallback 實現服務降級,若是咱們配置了fallback方法,則會得到咱們配置的fallback執行
  • 若是當前hystrix處於未熔斷狀態,則
    • getExecutionSemaphore 判斷當前策略是否爲信號量(TryableSemaphoreNoOp/TryableSemaphoreActual),若是是,則調用 tryAcquire 來獲取信號量。若是當前信號量滿了,則調用 handleSemaphoreRejectionViaFallback 方法。
    • 調用 executeCommandAndObserve 獲取命令執行Observable。

 

   private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//跟進
return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }

executeCommandAndObserve

相關文章
相關標籤/搜索