Hystrix執行原理

前奏

Hystrix的常規使用姿式java

@Test
    public void test_run(){
        String s = new CommandHelloWorld("Bob").execute();
        System.out.println(s);
    }

咱們的command在new的時候發生了什麼?execute()是如何執行的?execute執行失敗或者超時如何fallback?segmentfault

1、PREPARE 初始化

當咱們new XXCommand()的時候,大部分的工做都是在 AbstractCommand完成緩存

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    this.commandGroup = initGroupKey(group);
    this.commandKey = initCommandKey(key, getClass());
    this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

    //Strategies from plugins
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    this.executionHook = initExecutionHook(executionHook);

    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

    /* fallback semaphore override if applicable */
    this.fallbackSemaphoreOverride = fallbackSemaphore;

    /* execution semaphore override if applicable */
    this.executionSemaphoreOverride = executionSemaphore;
}

能夠很清晰的看到,這裏面在進行command配置裝載、線程池配置裝載及線程池的建立、指標蒐集器、熔斷器的初始化等等。app

//HystrixCommandMetrics
ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();

//HystrixThreadPoolDefault
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

//com.netflix.hystrix.HystrixCircuitBreaker.Factory
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

除HystrixCommand每次都須要從新創建,其它基本都以commandKey維護着配置,熔斷器,指標的單例而線程池則以threadkey進場存儲。ide

咱們能夠了瞭解下Hystrix的線程池如何管理
建立線程調用 HystrixThreadPool.Factory.getInstanceui

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
    String key = threadPoolKey.name();

    // this should find it for all but the first time
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

從緩存中以threadPoolKey獲取線程池,獲取不到則 調用new HystrixThreadPoolDefault新建this

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
    this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
    HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    this.queueSize = properties.maxQueueSize().get();

    this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
            concurrencyStrategy.getThreadPool(threadPoolKey, properties),
            properties);
    this.threadPool = this.metrics.getThreadPool();
    this.queue = this.threadPool.getQueue();

    /* strategy: HystrixMetricsPublisherThreadPool */
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}

注意spa

this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);

其中 concurrencyStrategy.getThreadPool,HystrixConcurrencyStrategy就是hystrix的線程建立策略者.net

真正的建立線程執行
HystrixConcurrencyStrategy#getThreadPool線程

public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
    .....各類配置,此處代碼省略......

    if (allowMaximumSizeToDivergeFromCoreSize) {
        final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    } else {
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
    }
}

這裏調用java JUC原生的 ThreadPoolExecutor建立線程

2、Observable 大串燒

Hystrix的執行利用RxJava,組合了不少的Observable,造成一個Observable,和傳統的調用鏈相比更加簡潔。

3、各色Observable顯神通

3.1.command 狀態位

  1. toObservable 第一個observable,在下一個chain以前,會更改HystrixCommand狀態位 OBSERVABLE_CHAIN_CREATED
  2. toObservable doOnTerminate,探測到terminate時,會將HystrixCommand更改成 TERMINAL
  3. executeCommandWithSpecifiedIsolation在開始執行的時候會更改HystrixCommand更改成 USER_CODE_EXECUTED
  4. toObservable doOnUnsubscribe,探測到terminate時,會將HystrixCommand更改成 UNSUBSCRIBED
3.2.executeCommandWithSpecifiedIsolation

分配執行線程,維護線程狀態

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                .....省略干擾代碼.....
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }

                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {

                    try {
                       .....省略干擾代碼.....

                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }
        }).doOnTerminate(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                }
                //if it was unsubscribed, then other cleanup handled it
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                    //if it was never started and was cancelled, then no need to clean up
                }
                //if it was terminal, then other cleanup handled it
            }
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else {
        .....省略干擾代碼.....
    }
}

具體邏輯
1.判斷隔離策略,若是是Semaphore 信號量則在當前線程上執行,不然進入線程分配邏輯
2.更改HystrixCommand的狀態 USER_CODE_EXECUTED
3.判斷HystrixCommand超時狀態,若是已經超時則拋出異常
4.更改當前command的線程執行狀態爲 STARTED
5.調用 getUserExecutionObservable 執行具體邏輯
6.doOnTerminate 當Observale執行完畢後(HystrixCommand可能失敗也可能執行成功),此時的線程狀態可能有兩種分別是 STARTEDNOT_USING_THREAD , 而後更改線程狀態爲 TERMINAL
7.doOnUnsubscribe 當Observable被取消訂閱,更改線程狀態爲 TERMINAL
8.subscribeOn 指定scheduler,這裏Hystrix實現了本身的scheduler,在scheduler的worker指定線程池,在配置線程以前會從新加載線程池配置(這裏是Rxjava的東西,暫時你們能夠粗略的認爲這裏就是指定線程池,而後把要執行的任務扔到這個線程池裏)

@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    touchConfig();
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

// allow us to change things via fast-properties by setting it each time
private void touchConfig() {
    final int dynamicCoreSize = properties.coreSize().get();
    final int configuredMaximumSize = properties.maximumSize().get();
    int dynamicMaximumSize = properties.actualMaximumSize();
    final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    boolean maxTooLow = false;

    if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
        //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
        dynamicMaximumSize = dynamicCoreSize;
        maxTooLow = true;
    }

    // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
    if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
        if (maxTooLow) {
            logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ".  Maximum size will be set to " +
                    dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
        }
        threadPool.setCorePoolSize(dynamicCoreSize);
        threadPool.setMaximumPoolSize(dynamicMaximumSize);
    }

    threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}

touchConfig 執行具體的線程池參數調整。

從上面的過程也能發現,該observable也是維護線程狀態的地方,線程的狀態變動見下圖

3.3.getUserExecutionObservable

執行具體業務邏輯

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }

    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

userObservable = getExecutionObservable(); 由HystrixCommand本身實現

//HystrixCommand
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}

這裏看到 run()應該就明白了,就是咱們本身的業務代碼 CommandHelloWorld去實現的。

3.4.getFallbackOrThrowException

當executeCommandWithSpecifiedIsolation探測到異常時觸發該Observable。getFallbackOrThrowException裏具體fallback執行看
executeCommandAndObserve。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    .....省略干擾代碼.....
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        .....省略干擾代碼.....
    };

    .....省略干擾代碼.....

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

doErrorResumeNext 會觸發下一個 handleFallback。

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    ....省略干擾代碼....

    if (isUnrecoverable(originalException)) {
        ....省略干擾代碼....
    } else {
        ....省略干擾代碼....

        if (properties.fallbackEnabled().get()) {
        
            ....省略干擾代碼....

            Observable<R> fallbackExecutionChain;

            // acquire a permit
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    if (isFallbackUserDefined()) {
                        executionHook.onFallbackStart(this);
                        fallbackExecutionChain = getFallbackObservable();
                    } else {
                        //same logic as above without the hook invocation
                        fallbackExecutionChain = getFallbackObservable();
                    }
                } catch (Throwable ex) {
                    //If hook or user-fallback throws, then use that as the result of the fallback lookup
                    fallbackExecutionChain = Observable.error(ex);
                }

                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                        .doOnNext(markFallbackEmit)
                        .doOnCompleted(markFallbackCompleted)
                        .onErrorResumeNext(handleFallbackError)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
               return handleFallbackRejectionByEmittingError();
            }
        } else {
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }
}

這裏優先幾個步驟
1.判斷異常是不是能走fallback處理,不能則拋出HystrixRuntimeException
2.判斷配置是否開啓容許fallback,開啓,則進入 getFallbackObservable(),而該方法具體有HystrixCommand實現,調用的則是用戶的Command的fallback方法,若是調用方沒有覆蓋該方法,則會執行HystrixCommand的fallback方法,拋出未定義fallback方法的異常

protected R getFallback() {
    throw new UnsupportedOperationException("No fallback available.");
 }

@Override
final protected Observable<R> getFallbackObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
               //調用方 fallback邏輯
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    });
}

後續系列文章,歡迎參閱
Hystrix熔斷器執行機制
Hystrix超時實現機制

相關文章
相關標籤/搜索