Hystrix指標窗口實現原理

1、引子

Hystrix是一個熔斷中間件,可以實現fast-fail並走備用方案。Hystrix基於滑動窗口斷定服務失敗佔比選擇性熔斷。滑動窗口的實現方案有不少種,指標計數也有不少種實現常見的就是AtomicInteger進行原子增減維護計數,具體的方案就不探討了。java

Hystrix是基於Rxjava去實現的,那麼如何利用RxJava實現指標的匯聚和滑動窗口實現呢?固然本篇不是做爲教程去介紹RxJava的使用姿式,本篇文章主要解說Hystrix是什麼一個思路完成這項功能。git

2、指標數據上傳

看HystrixCommand執行的主入口github

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    final Action0 terminateCommandCleanup = new Action0() {

        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                handleCommandEnd(true); //user code did run
            }
        }
    };

    //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
    final Action0 unsubscribeCommandCleanup = new Action0() {
        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                .......省略干擾代碼...........
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                .......省略干擾代碼...........
                handleCommandEnd(true); //user code did run
            }
        }
    };

   .......省略干擾代碼...........

    return Observable.defer(new Func0<Observable<R>>() {

    .......省略干擾代碼...........

            return afterCache
                    .doOnTerminate(terminateCommandCleanup) 
                    .doOnUnsubscribe(unsubscribeCommandCleanup) 
                    .doOnCompleted(fireOnCompletedHook);
        }
});

咱們的主入口Observable當doOnTerminate doOnUnsubscribe 的時候觸發 handleCommandEnd 方法,從字面意思就是當command執行結束處理一些事情。算法

private void handleCommandEnd(boolean commandExecutionStarted) {
    ........省略干擾代碼..........
    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
    if (executionResultAtTimeOfCancellation == null) {
        metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
    } else {
        metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
    }
    ........省略干擾代碼..........
}

注意看 metrics.markCommandDone,調用了HystrixCommandMetrics的markCommandDone方法,把一個executionResult傳入了進來。ExecutionResult 這是個什麼鬼呢?
咱們截取部分代碼瀏覽下segmentfault

public class ExecutionResult {
    private final EventCounts eventCounts;
    private final Exception failedExecutionException;
    private final Exception executionException;
    private final long startTimestamp;
    private final int executionLatency; //time spent in run() method
    private final int userThreadLatency; //time elapsed between caller thread submitting request and response being visible to it
    private final boolean executionOccurred;
    private final boolean isExecutedInThread;
    private final HystrixCollapserKey collapserKey;

    private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
    private static final int NUM_EVENT_TYPES = ALL_EVENT_TYPES.length;
    private static final BitSet EXCEPTION_PRODUCING_EVENTS = new BitSet(NUM_EVENT_TYPES);
    private static final BitSet TERMINAL_EVENTS = new BitSet(NUM_EVENT_TYPES);

以你們聰慧的頭腦應該可以猜想到這個類就是當前HystrixCommand的 執行結果記錄,只不過這個結果不只僅是結果,也包含了各類狀態以及出現的異常。它的身影在Hystrix執行原理裏講的各Observable裏出現,跟着HystrixCommand整個生命週期。數組

回到上面講,當時command執行完畢後,調用了HystrixCommandMetrics的markCommandDone方法app

void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
    HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
    if (executionStarted) {
        concurrentExecutionCount.decrementAndGet();
    }
}

最終調用量HystrixThreadEventStream. executionDone方法的HystrixThreadEventStream是ThreadLocal方式,和當前線程綁定ide

//HystrixThreadEventStream.threadLocalStreams
private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {
    @Override
    protected HystrixThreadEventStream initialValue() {
        return new HystrixThreadEventStream(Thread.currentThread());
    }
};

executionDone代碼以下函數

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
    HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
    writeOnlyCommandCompletionSubject.onNext(event);
}

這裏根據 executionResult, threadpoolkey,comandKey,生成 了一個HystrixCommandCompletion而後經過writeOnlyCommandCompletionSubject寫入,writeOnlyCommandCompletionSubject整個東西,咱們等會再看。如今思考下HystrixCommandCompletion是什麼?HystrixCommandCompletion包含了 ExecutionResultHystrixRequestContext,它是一種HystrixEvent,標識着command執行完成的一個事件,該事件是當前這個點HystrixCommand的請求信息,執行結果,狀態等數據的載體。

從上面類圖能夠看到不只僅HystrixCommandCompletion一種還有其它的Event,這裏就不一一介紹了。post

writeOnlyCommandCompletionSubject onNext的時候會觸發 writeCommandCompletionsToShardedStreams執行裏面的call()方法。

private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
    @Override
    public void call(HystrixCommandCompletion commandCompletion) {
        HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
        commandStream.write(commandCompletion);

        if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
            HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
            threadPoolStream.write(commandCompletion);
        }
    }
};

這個方法的意思是,會把HystrixCommandCompletion 經過HystrixCommandCompletionStream 寫入,若是當前command使用的是線程池隔離策略的話 會經過 HystrixThreadPoolCompletionStream 再寫一遍。HystrixCommandCompletionStream HystrixThreadPoolCompletionStream 他們兩個概念相似,咱們拿着前者解釋,這個是個什麼東西。
HystrixCommandCompletionStream 以commandKey爲key,維護在內存中,調用它的write的方法實則是調用內部屬性 writeOnlySubject的方法,writeOnlySubject是一個Subject(RxJava的東西),經過SerializedSubject保證其寫入的順序性,調用其share()方法得到一個Observable也就是readOnlyStream,讓外界可以讀這個Subject的數據。總結下Subject是鏈接兩個Observable之間的橋樑,它有兩個泛型元素標識着進出數據類型,所有都是HystrixCommandCompletion類型

HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;

        this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
        this.readOnlyStream = writeOnlySubject.share();
    }

咱們從源頭開始梳理,明白了這個HystrixCommandCompletion數據流是如何寫入的(其它類型的的思路一致,就不一一解釋了),那它是如何被蒐集起來呢?

3、指標數據蒐集

追溯至AbstractCommand初始化

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    ........省略代碼........
    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    ........省略代碼........
}

初始化command指標

HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
    super(null);
    this.key = key;
    this.group = commandGroup;
    this.threadPoolKey = threadPoolKey;
    this.properties = properties;

    healthCountsStream = HealthCountsStream.getInstance(key, properties);
    rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
    cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);

    rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
    rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
    rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
}

有不少各類 XXXStream.getInstance(),這些Stream就是針對各種用途進行指標蒐集,統計的具體實現,下面能夠看下他們的UML類圖

Hystrix幾個別Stream類圖(並不是全部子類)

BucketedCounterStream實現了基本的桶計數器,BucketedCumulativeCounterStream基於父類實現了累計計數,BucketedRollingCounterStream基於父類實現了滑動窗口計數。二者的子類就是對特定指標的具體實現。

接下來分兩塊累計計數和滑動窗口計數,挑選其對應的CumulativeCommandEventCounterStream和HealthCountsStream進行詳細說明。

3.一、BucketedCounterStream 基本桶的實現

數據採集示意圖

protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
    this.numBuckets = numBuckets;
    this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
        @Override
        public Observable<Bucket> call(Observable<Event> eventBucket) {
            return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
        }
    };

    final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
    for (int i = 0; i < numBuckets; i++) {
        emptyEventCountsToStart.add(getEmptyBucketSummary());
    }

    this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
        @Override
        public Observable<Bucket> call() {
            return inputEventStream
                    .observe()
                    .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
                    .flatMap(reduceBucketToSummary)                
                    .startWith(emptyEventCountsToStart);   
        }
    });
}

這裏父類的構造方法主要成三個部分分別是
I. reduceBucketToSummary 每一個桶如何計算聚合的數據

appendRawEventToBucket的實現由其子類決定,不過大同小異,咱們自行拔下代碼看下HealthCountsStream, 能夠看到他用的是HystrixCommandMetrics.appendEventToBucket

public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {
        @Override
        public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
            ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
            for (HystrixEventType eventType: ALL_EVENT_TYPES) {
                switch (eventType) {
                    case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
                    default:
                        initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);
                        break;
                }
            }
            return initialCountArray;
        }
    };
}

這個方法就是將一個桶時長內的數據進行累計計數相加。initialCountArray能夠看出一個桶內前面的n個數據流的計算結果,數組的下標就是HystrixEventType 枚舉裏事件的下標值。

II. emptyEventCountsToStart 第一個桶的定義,裝逼點叫創世桶

III. window窗口的定義,這裏第一個參數就是每一個桶的時長,第二個參數時間的單位。利用RxJava的window幫咱們作聚合數據。

.window(bucketSizeInMs, TimeUnit.MILLISECONDS)

Bucket 時長如何計算
每一個桶的時長如何得出的?這個也是基於咱們的配置得出,拿HealthCountsStream舉例子。
metrics.rollingStats.timeInMilliseconds 滑動窗口時長 默認10000ms
metrics.healthSnapshot.intervalInMilliseconds 檢測健康狀態的時間片,默認500ms 在這裏對應一個bucket的時長

滑動窗口內桶的個數 = 滑動窗口時長 / bucket時長

而 CumulativeCommandEventCounterStream
metrics.rollingStats.timeInMilliseconds 滑動窗口時長 默認10000ms
metrics.rollingStats.numBuckets 滑動窗口要切的桶個數

bucket時長 = 滑動窗口時長 / 桶個數

不一樣職能的 XXXStream對應的算法和對應的配置也不同,不過都一個套路,就不一一去展現了。

inputEventStream
inputEventStream 能夠認爲是窗口採集的數據流,這個數據流由其子類去傳遞,大體看了下

//HealthCountsStream
private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
                               Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
    super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
}

//RollingThreadPoolEventCounterStream
private RollingThreadPoolEventCounterStream(HystrixThreadPoolKey threadPoolKey, int numCounterBuckets, int counterBucketSizeInMs,
                                                Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
                                                Func2<long[], long[], long[]> reduceBucket) {
    super(HystrixThreadPoolCompletionStream.getInstance(threadPoolKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
}

咱們發現這個 inputEventStream,其實就是 HystrixCommandCompletionStream、HystrixThreadPoolCompletionStream或者其它的,咱們挑其中HystrixCommandCompletionStream看下,這個就是上面第二部分指標數據上傳裏講的寫數據那個stream,inputEventStream.observe()也就是 HystrixCommandCompletionStream的 readOnlyStreamSubject的只讀Observable。(這裏若是沒明白能夠回到第二點看下結尾的部分)

3.二、累計計數器之CumulativeCommandEventCounterStream

先看下累計計數器的父類BucketedCumulativeCounterStream

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
    super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);

    this.sourceStream = bucketedStream
            .scan(getEmptyOutputValue(), reduceBucket)
            .skip(numBuckets)
            ........省略代碼........
            
}

bucketedStream就是3.1裏的數據匯聚後的一個一個桶流,這裏執行了scan方法,scan方法的意思就是會將當前窗口內已經提交的數據流進行按照順序進行遍歷並執行指定的function邏輯,scan裏有兩個參數第一個參數表示上一次執行function的結果,第二個參數就是每次遍歷要執行的function,scan完畢後skip numBuckets 個bucket,能夠認爲丟棄掉已經計算過的bucket。

scan裏的function是如何實現呢?它也是實現累計計數的關鍵,由子類實現,本小節也就是CumulativeCommandEventCounterStream來實現

CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);

發現調用的是 HystrixCommandMetrics.bucketAggregator,咱們看下其函數體

public static final Func2<long[], long[], long[]> bucketAggregator = new Func2<long[], long[], long[]>() {
    @Override
    public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {
        for (HystrixEventType eventType: ALL_EVENT_TYPES) {
            switch (eventType) {
                case EXCEPTION_THROWN:
                    for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
                        cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()];
                    }
                    break;
                default:
                    cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()];
                    break;
            }
        }
        return cumulativeEvents;
    }
};

call() 方法有兩個參數第一個參數指的以前的計算結果,第二個參數指的當前桶內的計數,方法體不難理解,就是對各個時間的count計數累加。

如此,一個command的計數就實現了,其它累計計數也雷同。

3.三、滑動窗口之HealthCountsStream

直接父類代碼

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
    super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
    Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
        @Override
        public Observable<Output> call(Observable<Bucket> window) {
            return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
        }
    };
    this.sourceStream = bucketedStream      
            .window(numBuckets, 1)          
            .flatMap(reduceWindowToSummary) 
            ........省略代碼........
}

依然像累計計數器同樣對父級的桶流數據進行操做,這裏用的是window(),第一個參數表示桶的個數,第二個參數表示一次移動的個數。這裏numBuckets就是咱們的滑動窗口桶個數

滑動窗口

第一排咱們能夠認爲是移動前的滑動窗口的數據,在執行完 flatMap裏的function以後,滑動窗口向前移動一個桶位,那麼 23 5 2 0 這個桶就被丟棄了,而後新進了最新的桶 45 6 2 0
那麼每次滑動窗口內的數據是如何被處理呢?就是flatMap裏的function作的,reduceWindowToSummary 最終被具體的子類stream實現,咱們就研究下HealthCountsStream

private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
    @Override
    public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
        return healthCounts.plus(bucketEventCounts);
    }
};

//HystrixCommandMetrics.HealthCounts#plus
public HealthCounts plus(long[] eventTypeCounts) {
    long updatedTotalCount = totalCount;
    long updatedErrorCount = errorCount;

    long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
    long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
    long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
    long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
    long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];

    updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    return new HealthCounts(updatedTotalCount, updatedErrorCount);
}

方法的實現也顯而易見,統計了當前滑動窗口內成功數、失敗數、線程拒絕數,超時數.....

該stream的職責就是探測服務的可用性,也是Hystrix熔斷器是否生效依賴的數據源。

4、回顧

Hystrix的滑動窗口設計相對於其它可能稍微偏難理解些,其主要緣由仍是由於咱們對RxJava的瞭解不夠,不過這不重要,只要耐心的多看幾遍就沒有什麼問題。

本篇主要從指標數據上報到指標數據收集來逐步解開Hystrix指標蒐集的神祕面紗。最後借用一大牛的圖彙總下本篇的內容

參考文檔
官方文檔-How it works
官方文檔-configuration
Hystrix 1.5 滑動窗口實現原理總結


系列文章推薦
Hystrix經常使用功能介紹
Hystrix執行原理
Hystrix熔斷器執行機制
Hystrix超時實現機制

相關文章
相關標籤/搜索