hystrix經過rxjava消息模式來獲取和監聽命令的metrics信息。java
metrics主體結構包括一下部分:windows
hystrix metrics主要分爲三個部分,命令執行metrics,線程池metrics,合併命令metrics。分別來統計命令執行過程當中產生的metrics,線程池執行過程當中產生的metrics,合併命令執行過程彙總產生的metrics。各個metrics內部有兩類消息流組件,當各個行爲發生時,首先向消息接收流組件發生消息,各種消息分析流組件監聽消息接收流組件,對接收到的數據進行統計輸出。各個metrics組件再經過監聽消息分析流組件來獲取統計後的消息。併發
單例模式app
三個metrics都使用了單利模式,以HystrixCommandMetrics爲例,key爲commandkey。ide
// String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly) private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>(); public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) { // attempt to retrieve from cache first HystrixCommandMetrics commandMetrics = metrics.get(key.name()); if (commandMetrics != null) { return commandMetrics; } else { synchronized (HystrixCommandMetrics.class) { HystrixCommandMetrics existingMetrics = metrics.get(key.name()); if (existingMetrics != null) { return existingMetrics; } else { HystrixThreadPoolKey nonNullThreadPoolKey; if (threadPoolKey == null) { nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name()); } else { nonNullThreadPoolKey = threadPoolKey; } HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier()); metrics.putIfAbsent(key.name(), newCommandMetrics); return newCommandMetrics; } } } }
HystrixCommandMetricsthis
markCommandStart在命令執行前調用,最終會向開始消息流(HystrixCommandStartStream)發送開始消息(HystrixCommandExecutionStarted)。.net
markCommandDone在命令執行後調用,最終會向完成消息流(HystrixCommandCompletionStream)發送完成消息(HystrixCommandCompletion)。線程
/* package-private */ void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) { int currentCount = concurrentExecutionCount.incrementAndGet(); HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount); } /* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) { HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey); if (executionStarted) { concurrentExecutionCount.decrementAndGet(); } }
還有一些統計流,監聽了 HystrixCommandStartStream和HystrixCommandCompletionStream進行統計,HystrixCommandMetrics經過這些統計流得到相應的統計數據。code
private HealthCountsStream healthCountsStream; private final RollingCommandEventCounterStream rollingCommandEventCounterStream; private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream; private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream; private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream; private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream; /* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) { super(null); healthCountsStream = HealthCountsStream.getInstance(key, properties); rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties); cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties); rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties); rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties); rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties); } //獲取指定事件窗口期內數據指標 public long getRollingCount(HystrixEventType eventType) { return rollingCommandEventCounterStream.getLatest(eventType); } //獲取指定事件持續的數據指標 public long getCumulativeCount(HystrixEventType eventType) { return cumulativeCommandEventCounterStream.getLatest(eventType); }//獲取某一百分比的執行時間public int getExecutionTimePercentile(double percentile) { return rollingCommandLatencyDistributionStream.getLatestPercentile(percentile); }//獲取平均的執行時間 public int getExecutionTimeMean() { return rollingCommandLatencyDistributionStream.getLatestMean(); } //獲取某一百分比的總時間 public int getTotalTimePercentile(double percentile) { return rollingCommandUserLatencyDistributionStream.getLatestPercentile(percentile); }//獲取平均的總時間 public int getTotalTimeMean() { return rollingCommandUserLatencyDistributionStream.getLatestMean(); } //獲取窗口期內最大併發量 public long getRollingMaxConcurrentExecutions() { return rollingCommandMaxConcurrencyStream.getLatestRollingMax(); }//獲取當前併發量 public int getCurrentConcurrentExecutionCount() { return concurrentExecutionCount.get(); } //獲取命令執行健康狀況 public HealthCounts getHealthCounts() { return healthCountsStream.getLatest(); }
HystrixCollapserMetrics對象
markResponseFromCache、markBatch,最終會向Collapser消息流(HystrixCollapserEventStream)發送Collapser消息(HystrixCollapserEvent)。
public void markResponseFromCache() { HystrixThreadEventStream.getInstance().collapserResponseFromCache(collapserKey); } public void markBatch(int batchSize) { HystrixThreadEventStream.getInstance().collapserBatchExecuted(collapserKey, batchSize); }
還有一些統計流,監聽了 HystrixCollapserEventStream進行統計,HystrixCollapserMetrics經過這些統計流得到相應的統計數據。
private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream; private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream; private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream; /* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) { super(null); rollingCollapserEventCounterStream = RollingCollapserEventCounterStream.getInstance(key, properties); cumulativeCollapserEventCounterStream = CumulativeCollapserEventCounterStream.getInstance(key, properties); rollingCollapserBatchSizeDistributionStream = RollingCollapserBatchSizeDistributionStream.getInstance(key, properties); } //獲取指定事件窗口期內數據指標 public long getRollingCount(HystrixEventType.Collapser collapserEventType) { return rollingCollapserEventCounterStream.getLatest(collapserEventType); } //獲取指定事件持續的數據指標 public long getCumulativeCount(HystrixEventType.Collapser collapserEventType) { return cumulativeCollapserEventCounterStream.getLatest(collapserEventType); } //獲取指定百分比的batchsize public int getBatchSizePercentile(double percentile) { return rollingCollapserBatchSizeDistributionStream.getLatestPercentile(percentile); } //獲取平均的batchsize public int getBatchSizeMean() { return rollingCollapserBatchSizeDistributionStream.getLatestMean(); }
HystrixThreadPoolMetrics
有一些統計流,監聽了監聽了 HystrixThreadPoolStartStream和HystrixThreadPoolCompletionStream進行統計,HystrixThreadPoolMetrics經過這些統計流得到相應的統計數據。
private final RollingThreadPoolEventCounterStream rollingCounterStream; private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream; private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream; private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) { super(null); this.threadPoolKey = threadPoolKey; this.threadPool = threadPool; this.properties = properties; rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties); cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties); rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties); } /** 獲取窗口期內線程池執行的個數*/ public long getRollingCountThreadsExecuted() { return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED); } /** 獲取持續的線程池執行個數*/ public long getCumulativeCountThreadsExecuted() { return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED); } /** 獲取窗口期內線程池拒絕的個數*/ public long getRollingCountThreadsRejected() { return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED); } /** 獲取持續內線程池拒絕的個數*/ public long getCumulativeCountThreadsRejected() { return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED); } //獲取指定事件窗口期內數據指標 public long getRollingCount(HystrixEventType.ThreadPool event) { return rollingCounterStream.getLatestCount(event); } //獲取指定事件持續的數據指標 public long getCumulativeCount(HystrixEventType.ThreadPool event) { return cumulativeCounterStream.getLatestCount(event); }/** 獲取窗口期內最大併發量*/ public long getRollingMaxActiveThreads() { return rollingThreadPoolMaxConcurrencyStream.getLatestRollingMax(); }
還有一些根據線程池獲取線程池當前指標
public Number getCurrentActiveCount() { return threadPool.getActiveCount(); } public Number getCurrentCompletedTaskCount() { return threadPool.getCompletedTaskCount(); } public Number getCurrentCorePoolSize() { return threadPool.getCorePoolSize(); } public Number getCurrentLargestPoolSize() { return threadPool.getLargestPoolSize(); } public Number getCurrentMaximumPoolSize() { return threadPool.getMaximumPoolSize(); } public Number getCurrentPoolSize() { return threadPool.getPoolSize(); } public Number getCurrentTaskCount() { return threadPool.getTaskCount(); } public Number getCurrentQueueSize() { return threadPool.getQueue().size(); }
消息接收流
HystrixCommandStartStream、HystrixCommandCompletionStream、HystrixCollapserEventStream、HystrixThreadPoolCompletionStream、HystrixThreadPoolStartStream是hystrix的消息監聽流,分別接收命令、合併命令、線程池發出的消息。
消息接收流使用了單例模式,每一個key對應一個消息流。
public static HystrixCommandStartStream getInstance(HystrixCommandKey commandKey) { HystrixCommandStartStream initialStream = streams.get(commandKey.name()); if (initialStream != null) { return initialStream; } else { synchronized (HystrixCommandStartStream.class) { HystrixCommandStartStream existingStream = streams.get(commandKey.name()); if (existingStream == null) { HystrixCommandStartStream newStream = new HystrixCommandStartStream(commandKey); streams.putIfAbsent(commandKey.name(), newStream); return newStream; } else { return existingStream; } } } }
內部使用rxjava來實現消息機制
HystrixCommandStartStream(final HystrixCommandKey commandKey) { this.commandKey = commandKey; this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create()); this.readOnlyStream = writeOnlySubject.share(); }
HystrixThreadEventStream
hystrix事件接收流的統一處理類。
commandExecutionStarted方法向HystrixCommandStartStream發送HystrixCommandExecutionStarted消息,若是是線程執行的話向HystrixThreadPoolStartStream發送HystrixCommandExecutionStarted消息。executionDone方法向HystrixCommandCompletionStream發送HystrixCommandCompletion消息,若是線程執行的話向HystrixThreadPoolCompletionStream發送HystrixCommandCompletion消息;collapserResponseFromCache和collapserBatchExecuted方法分別向HystrixCollapserEventStream發送類型爲RESPONSE_FROM_CACHE和BATCH_EXECUTED ADDED_TO_BATCH的HystrixCollapserEvent。
private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject; private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject; private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject; private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() { @Override protected HystrixThreadEventStream initialValue() { return new HystrixThreadEventStream(Thread.currentThread()); } }; private static final Action1<HystrixCommandExecutionStarted> writeCommandStartsToShardedStreams = new Action1<HystrixCommandExecutionStarted>() { @Override public void call(HystrixCommandExecutionStarted event) { HystrixCommandStartStream commandStartStream = HystrixCommandStartStream.getInstance(event.getCommandKey()); commandStartStream.write(event); if (event.isExecutedInThread()) { HystrixThreadPoolStartStream threadPoolStartStream = HystrixThreadPoolStartStream.getInstance(event.getThreadPoolKey()); threadPoolStartStream.write(event); } } }; private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() { @Override public void call(HystrixCommandCompletion commandCompletion) { HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey()); commandStream.write(commandCompletion); if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) { HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey()); threadPoolStream.write(commandCompletion); } } }; private static final Action1<HystrixCollapserEvent> writeCollapserExecutionsToShardedStreams = new Action1<HystrixCollapserEvent>() { @Override public void call(HystrixCollapserEvent collapserEvent) { HystrixCollapserEventStream collapserStream = HystrixCollapserEventStream.getInstance(collapserEvent.getCollapserKey()); collapserStream.write(collapserEvent); } }; /* package */ HystrixThreadEventStream(Thread thread) { this.threadId = thread.getId(); this.threadName = thread.getName(); writeOnlyCommandStartSubject = PublishSubject.create(); writeOnlyCommandCompletionSubject = PublishSubject.create(); writeOnlyCollapserSubject = PublishSubject.create(); writeOnlyCommandStartSubject .onBackpressureBuffer() .doOnNext(writeCommandStartsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); writeOnlyCommandCompletionSubject .onBackpressureBuffer() .doOnNext(writeCommandCompletionsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); writeOnlyCollapserSubject .onBackpressureBuffer() .doOnNext(writeCollapserExecutionsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); } public static HystrixThreadEventStream getInstance() { return threadLocalStreams.get(); } public void shutdown() { writeOnlyCommandStartSubject.onCompleted(); writeOnlyCommandCompletionSubject.onCompleted(); writeOnlyCollapserSubject.onCompleted(); } public void commandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey,HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy, int currentConcurrency) { HystrixCommandExecutionStarted event = new HystrixCommandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentConcurrency); writeOnlyCommandStartSubject.onNext(event); } public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) { HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey); writeOnlyCommandCompletionSubject.onNext(event); } public void collapserResponseFromCache(HystrixCollapserKey collapserKey) { HystrixCollapserEvent collapserEvent = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.RESPONSE_FROM_CACHE, 1); writeOnlyCollapserSubject.onNext(collapserEvent); } public void collapserBatchExecuted(HystrixCollapserKey collapserKey, int batchSize) { HystrixCollapserEvent batchExecution = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.BATCH_EXECUTED, 1); HystrixCollapserEvent batchAdditions = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.ADDED_TO_BATCH, batchSize); writeOnlyCollapserSubject.onNext(batchExecution); writeOnlyCollapserSubject.onNext(batchAdditions); }
消息體
HystrixCommandExecutionStarted
命令開始執行消息,內部包括了該命令的執行策略和併發數。
private final HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy; private final int currentConcurrency; public HystrixCommandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy, int currentConcurrency) { super(commandKey, threadPoolKey); this.isolationStrategy = isolationStrategy; this.currentConcurrency = currentConcurrency; }
HystrixCommandCompletion
命令執行完成消息。內部包含執行結果對象ExecutionResult和請求上下文對象HystrixRequestContext。
protected final ExecutionResult executionResult; protected final HystrixRequestContext requestContext;
ExecutionResult
執行結果數據。
private final EventCounts eventCounts;//事件數量 private final Exception failedExecutionException;//失敗異常 private final Exception executionException; //執行異常 private final long startTimestamp;//命令開始執行時間 private final int executionLatency; //執行run的時間 private final int userThreadLatency; //請求提交到執行結束的時間 private final boolean executionOccurred;//ture 執行過命令 false 未執行過命令 private final boolean isExecutedInThread;//ture 使用線程池執行 false 不是使用線程池執行 private final HystrixCollapserKey collapserKey;
EventCounts
記錄各個事件的次數。
private final BitSet events; private final int numEmissions//emission次數 private final int numFallbackEmissions;//fallback次數 private final int numCollapsed;//合併格式
HystrixCollapserEvent
合併命令消息,內部包含合併命令key、事件類型、次數。
private final HystrixCollapserKey collapserKey; private final HystrixEventType.Collapser eventType; private final int count;
消息監聽流
BucketedCounterStream
監聽HystrixEvent消息,並對各個事件類型聚合一個時間段內的數據。
protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket) { this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } }); }
BucketedCumulativeCounterStream
BucketedCounterStream子類,對聚合的數據進行持續統計。
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs, Func2<Bucket, Event, Bucket> reduceCommandCompletion, Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion); this.sourceStream = bucketedStream .scan(getEmptyOutputValue(), reduceBucket) .skip(numBuckets) .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer }
BucketedRollingCounterStream
BucketedCounterStream子類,對聚合的數據在窗口期內統計。
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() { @Override public Observable<Output> call(Observable<Bucket> window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer }
CumulativeCollapserEventCounterStream、CumulativeCommandEventCounterStream、CumulativeThreadPoolEventCounterStream繼承BucketedCumulativeCounterStream,分別監控合併命令,命令,線程池的各個事件,並持續合併各個事件的數據。
RollingCollapserEventCounterStream、RollingCommandEventCounterStream、RollingThreadPoolEventCounterStream繼承BucketedRollingCounterStream分別監控合併命令,命令,線程池的各個事件,併合並窗口期內各個事件的數據。HealthCountsStream繼承BucketedRollingCounterStream監控和計算調用的失敗率。
RollingConcurrencyStream
監聽HystrixCommandExecutionStarted消息,得到當前併發量,並統計窗口期內的最大併發量。
protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) { final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>(); for (int i = 0; i < numBuckets; i++) { emptyRollingMaxBuckets.add(0); } rollingMaxStream = inputEventStream .observe() .map(getConcurrencyCountFromEvent) .window(bucketSizeInMs, TimeUnit.MILLISECONDS) .flatMap(reduceStreamToMax) .startWith(emptyRollingMaxBuckets) .window(numBuckets, 1) .flatMap(reduceStreamToMax) .share() .onBackpressureDrop(); } private static final Func1<Observable<Integer>, Observable<Integer>> reduceStreamToMax = new Func1<Observable<Integer>, Observable<Integer>>() { @Override public Observable<Integer> call(Observable<Integer> observedConcurrency) { return observedConcurrency.reduce(0, reduceToMax); } };
RollingCommandMaxConcurrencyStream、RollingThreadPoolMaxConcurrencyStream分別監聽HystrixCommandStartStream、HystrixThreadPoolStartStream消息流,統計窗口期內最大併發量。
RollingDistributionStream
統計某一數據的窗口期內分佈狀況。
protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs, final Func2<Histogram, Event, Histogram> addValuesToBucket) { final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>(); for (int i = 0; i < numBuckets; i++) { emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram()); } final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() { @Override public Observable<Histogram> call(Observable<Event> bucket) { return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket); } }; rollingDistributionStream = stream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached) .share() .onBackpressureDrop(); }
RollingCollapserBatchSizeDistributionStream、RollingCommandLatencyDistributionStream、RollingCommandUserLatencyDistributionStream分別監聽HystrixCollapserEventStream、HystrixCommandCompletionStream、HystrixCommandCompletionStream流來統計命令執行時間、執行總時間、ADDED_TO_BATCH的個數的百分比分佈狀況。
其餘流
HystrixConfigurationStream
經過該數據流能夠定時獲取hystrix最新的properties配置信息,com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用該流來獲取配置信息。
public HystrixConfigurationStream(final int intervalInMilliseconds) { this.intervalInMilliseconds = intervalInMilliseconds; this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS) .map(getAllConfig) .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() .onBackpressureDrop(); } private static final Func1<Long, HystrixConfiguration> getAllConfig = new Func1<Long, HystrixConfiguration>() { @Override public HystrixConfiguration call(Long timestamp) { return HystrixConfiguration.from( getAllCommandConfig.call(timestamp), getAllThreadPoolConfig.call(timestamp), getAllCollapserConfig.call(timestamp) ); } };
private static final Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>> getAllCommandConfig = new Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>>() { @Override public Map<HystrixCommandKey, HystrixCommandConfiguration> call(Long timestamp) { Map<HystrixCommandKey, HystrixCommandConfiguration> commandConfigPerKey = new HashMap<HystrixCommandKey, HystrixCommandConfiguration>(); for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) { HystrixCommandKey commandKey = commandMetrics.getCommandKey(); HystrixThreadPoolKey threadPoolKey = commandMetrics.getThreadPoolKey(); HystrixCommandGroupKey groupKey = commandMetrics.getCommandGroup(); commandConfigPerKey.put(commandKey, sampleCommandConfiguration(commandKey, threadPoolKey, groupKey, commandMetrics.getProperties())); } return commandConfigPerKey; } }; private static final Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getAllThreadPoolConfig = new Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() { @Override public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(Long timestamp) { Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> threadPoolConfigPerKey = new HashMap<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>(); for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) { HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey(); threadPoolConfigPerKey.put(threadPoolKey, sampleThreadPoolConfiguration(threadPoolKey, threadPoolMetrics.getProperties())); } return threadPoolConfigPerKey; } }; private static final Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getAllCollapserConfig = new Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() { @Override public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(Long timestamp) { Map<HystrixCollapserKey, HystrixCollapserConfiguration> collapserConfigPerKey = new HashMap<HystrixCollapserKey, HystrixCollapserConfiguration>(); for (HystrixCollapserMetrics collapserMetrics: HystrixCollapserMetrics.getInstances()) { HystrixCollapserKey collapserKey = collapserMetrics.getCollapserKey(); collapserConfigPerKey.put(collapserKey, sampleCollapserConfiguration(collapserKey, collapserMetrics.getProperties())); } return collapserConfigPerKey; } };