hystrix源碼之metrics

hystrix經過rxjava消息模式來獲取和監聽命令的metrics信息。java

  metrics主體結構包括一下部分:windows

  hystrix metrics主要分爲三個部分,命令執行metrics,線程池metrics,合併命令metrics。分別來統計命令執行過程當中產生的metrics,線程池執行過程當中產生的metrics,合併命令執行過程彙總產生的metrics。各個metrics內部有兩類消息流組件,當各個行爲發生時,首先向消息接收流組件發生消息,各種消息分析流組件監聽消息接收流組件,對接收到的數據進行統計輸出。各個metrics組件再經過監聽消息分析流組件來獲取統計後的消息。併發

單例模式app

  三個metrics都使用了單利模式,以HystrixCommandMetrics爲例,key爲commandkey。ide

複製代碼

// String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)
    private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
        // attempt to retrieve from cache first
        HystrixCommandMetrics commandMetrics = metrics.get(key.name());
        if (commandMetrics != null) {
            return commandMetrics;
        } else {
            synchronized (HystrixCommandMetrics.class) {
                HystrixCommandMetrics existingMetrics = metrics.get(key.name());
                if (existingMetrics != null) {
                    return existingMetrics;
                } else {
                    HystrixThreadPoolKey nonNullThreadPoolKey;
                    if (threadPoolKey == null) {
                        nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name());
                    } else {
                        nonNullThreadPoolKey = threadPoolKey;
                    }
                    HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
                    metrics.putIfAbsent(key.name(), newCommandMetrics);
                    return newCommandMetrics;
                }
            }
        }
    }

複製代碼

HystrixCommandMetricsthis

  markCommandStart在命令執行前調用,最終會向開始消息流(HystrixCommandStartStream)發送開始消息(HystrixCommandExecutionStarted)。.net

  markCommandDone在命令執行後調用,最終會向完成消息流(HystrixCommandCompletionStream)發送完成消息(HystrixCommandCompletion)。線程

複製代碼

/* package-private */ void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
        int currentCount = concurrentExecutionCount.incrementAndGet();
        HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
    }
/* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
        HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
        if (executionStarted) {
            concurrentExecutionCount.decrementAndGet();
        }
    }

複製代碼

  還有一些統計流,監聽了 HystrixCommandStartStream和HystrixCommandCompletionStream進行統計,HystrixCommandMetrics經過這些統計流得到相應的統計數據。code

複製代碼

private HealthCountsStream healthCountsStream;
    private final RollingCommandEventCounterStream rollingCommandEventCounterStream;
    private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
    private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
    private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
    private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;
    /* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
        super(null);
        healthCountsStream = HealthCountsStream.getInstance(key, properties);
        rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
        cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);
        rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
        rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
        rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
    }
  //獲取指定事件窗口期內數據指標
    public long getRollingCount(HystrixEventType eventType) {
        return rollingCommandEventCounterStream.getLatest(eventType);
    }
  //獲取指定事件持續的數據指標
    public long getCumulativeCount(HystrixEventType eventType) {
        return cumulativeCommandEventCounterStream.getLatest(eventType);
    }//獲取某一百分比的執行時間public int getExecutionTimePercentile(double percentile) {
        return rollingCommandLatencyDistributionStream.getLatestPercentile(percentile);
    }//獲取平均的執行時間
    public int getExecutionTimeMean() {
        return rollingCommandLatencyDistributionStream.getLatestMean();
    }
    //獲取某一百分比的總時間
    public int getTotalTimePercentile(double percentile) {
        return rollingCommandUserLatencyDistributionStream.getLatestPercentile(percentile);
    }//獲取平均的總時間
    public int getTotalTimeMean() {
        return rollingCommandUserLatencyDistributionStream.getLatestMean();
    }
    //獲取窗口期內最大併發量
    public long getRollingMaxConcurrentExecutions() {
        return rollingCommandMaxConcurrencyStream.getLatestRollingMax();
    }//獲取當前併發量
    public int getCurrentConcurrentExecutionCount() {
        return concurrentExecutionCount.get();
    }  //獲取命令執行健康狀況
    public HealthCounts getHealthCounts() {
        return healthCountsStream.getLatest();
    }

複製代碼

HystrixCollapserMetrics對象

  markResponseFromCache、markBatch,最終會向Collapser消息流(HystrixCollapserEventStream)發送Collapser消息(HystrixCollapserEvent)。

複製代碼

public void markResponseFromCache() {
        HystrixThreadEventStream.getInstance().collapserResponseFromCache(collapserKey);
    }

    public void markBatch(int batchSize) {
        HystrixThreadEventStream.getInstance().collapserBatchExecuted(collapserKey, batchSize);
    }

複製代碼

  還有一些統計流,監聽了 HystrixCollapserEventStream進行統計,HystrixCollapserMetrics經過這些統計流得到相應的統計數據。

複製代碼

private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
    private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
    private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;

    /* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) {
        super(null);
        rollingCollapserEventCounterStream = RollingCollapserEventCounterStream.getInstance(key, properties);
        cumulativeCollapserEventCounterStream = CumulativeCollapserEventCounterStream.getInstance(key, properties);
        rollingCollapserBatchSizeDistributionStream = RollingCollapserBatchSizeDistributionStream.getInstance(key, properties);
    }
    //獲取指定事件窗口期內數據指標
    public long getRollingCount(HystrixEventType.Collapser collapserEventType) {
        return rollingCollapserEventCounterStream.getLatest(collapserEventType);
    }
    //獲取指定事件持續的數據指標
    public long getCumulativeCount(HystrixEventType.Collapser collapserEventType) {
        return cumulativeCollapserEventCounterStream.getLatest(collapserEventType);
    }
    //獲取指定百分比的batchsize
    public int getBatchSizePercentile(double percentile) {
        return rollingCollapserBatchSizeDistributionStream.getLatestPercentile(percentile);
    }
    //獲取平均的batchsize
    public int getBatchSizeMean() {
        return rollingCollapserBatchSizeDistributionStream.getLatestMean();
    }

複製代碼

 HystrixThreadPoolMetrics

  有一些統計流,監聽了監聽了 HystrixThreadPoolStartStream和HystrixThreadPoolCompletionStream進行統計,HystrixThreadPoolMetrics經過這些統計流得到相應的統計數據。

複製代碼

private final RollingThreadPoolEventCounterStream rollingCounterStream;
    private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
    private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;
    private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
        super(null);
        this.threadPoolKey = threadPoolKey;
        this.threadPool = threadPool;
        this.properties = properties;
        rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
        cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
        rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties);
    }
/**
     獲取窗口期內線程池執行的個數*/
    public long getRollingCountThreadsExecuted() {
        return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
    }

    /**
     獲取持續的線程池執行個數*/
    public long getCumulativeCountThreadsExecuted() {
        return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
    }

    /**
    獲取窗口期內線程池拒絕的個數*/
    public long getRollingCountThreadsRejected() {
        return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
    }

    /**
    獲取持續內線程池拒絕的個數*/
    public long getCumulativeCountThreadsRejected() {
        return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
    }
    //獲取指定事件窗口期內數據指標
    public long getRollingCount(HystrixEventType.ThreadPool event) {
        return rollingCounterStream.getLatestCount(event);
    }
   //獲取指定事件持續的數據指標
    public long getCumulativeCount(HystrixEventType.ThreadPool event) {
        return cumulativeCounterStream.getLatestCount(event);
    }/**
    獲取窗口期內最大併發量*/
    public long getRollingMaxActiveThreads() {
        return rollingThreadPoolMaxConcurrencyStream.getLatestRollingMax();
    }

複製代碼

 還有一些根據線程池獲取線程池當前指標

複製代碼

public Number getCurrentActiveCount() {
        return threadPool.getActiveCount();
    }
    public Number getCurrentCompletedTaskCount() {
        return threadPool.getCompletedTaskCount();
    }
    public Number getCurrentCorePoolSize() {
        return threadPool.getCorePoolSize();
    }
    public Number getCurrentLargestPoolSize() {
        return threadPool.getLargestPoolSize();
    }
    public Number getCurrentMaximumPoolSize() {
        return threadPool.getMaximumPoolSize();
    }
    public Number getCurrentPoolSize() {
        return threadPool.getPoolSize();
    }
    public Number getCurrentTaskCount() {
        return threadPool.getTaskCount();
    }
    public Number getCurrentQueueSize() {
        return threadPool.getQueue().size();
    }

複製代碼

消息接收流

  HystrixCommandStartStream、HystrixCommandCompletionStream、HystrixCollapserEventStream、HystrixThreadPoolCompletionStream、HystrixThreadPoolStartStream是hystrix的消息監聽流,分別接收命令、合併命令、線程池發出的消息。

  消息接收流使用了單例模式,每一個key對應一個消息流。

複製代碼

public static HystrixCommandStartStream getInstance(HystrixCommandKey commandKey) {
        HystrixCommandStartStream initialStream = streams.get(commandKey.name());
        if (initialStream != null) {
            return initialStream;
        } else {
            synchronized (HystrixCommandStartStream.class) {
                HystrixCommandStartStream existingStream = streams.get(commandKey.name());
                if (existingStream == null) {
                    HystrixCommandStartStream newStream = new HystrixCommandStartStream(commandKey);
                    streams.putIfAbsent(commandKey.name(), newStream);
                    return newStream;
                } else {
                    return existingStream;
                }
            }
        }
    }

複製代碼

  內部使用rxjava來實現消息機制

HystrixCommandStartStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;

        this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create());
        this.readOnlyStream = writeOnlySubject.share();
    }

HystrixThreadEventStream

  hystrix事件接收流的統一處理類。

  commandExecutionStarted方法向HystrixCommandStartStream發送HystrixCommandExecutionStarted消息,若是是線程執行的話向HystrixThreadPoolStartStream發送HystrixCommandExecutionStarted消息。executionDone方法向HystrixCommandCompletionStream發送HystrixCommandCompletion消息,若是線程執行的話向HystrixThreadPoolCompletionStream發送HystrixCommandCompletion消息;collapserResponseFromCache和collapserBatchExecuted方法分別向HystrixCollapserEventStream發送類型爲RESPONSE_FROM_CACHE和BATCH_EXECUTED ADDED_TO_BATCH的HystrixCollapserEvent。

複製代碼

private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject;
    private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
    private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject;
    private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {
        @Override
        protected HystrixThreadEventStream initialValue() {
            return new HystrixThreadEventStream(Thread.currentThread());
        }
    };
    private static final Action1<HystrixCommandExecutionStarted> writeCommandStartsToShardedStreams = new Action1<HystrixCommandExecutionStarted>() {
        @Override
        public void call(HystrixCommandExecutionStarted event) {
            HystrixCommandStartStream commandStartStream = HystrixCommandStartStream.getInstance(event.getCommandKey());
            commandStartStream.write(event);

            if (event.isExecutedInThread()) {
                HystrixThreadPoolStartStream threadPoolStartStream = HystrixThreadPoolStartStream.getInstance(event.getThreadPoolKey());
                threadPoolStartStream.write(event);
            }
        }
    };
    private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
        @Override
        public void call(HystrixCommandCompletion commandCompletion) {
            HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
            commandStream.write(commandCompletion);

            if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
                HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
                threadPoolStream.write(commandCompletion);
            }
        }
    };
    private static final Action1<HystrixCollapserEvent> writeCollapserExecutionsToShardedStreams = new Action1<HystrixCollapserEvent>() {
        @Override
        public void call(HystrixCollapserEvent collapserEvent) {
            HystrixCollapserEventStream collapserStream = HystrixCollapserEventStream.getInstance(collapserEvent.getCollapserKey());
            collapserStream.write(collapserEvent);
        }
    };
    /* package */ HystrixThreadEventStream(Thread thread) {
        this.threadId = thread.getId();
        this.threadName = thread.getName();
        writeOnlyCommandStartSubject = PublishSubject.create();
        writeOnlyCommandCompletionSubject = PublishSubject.create();
        writeOnlyCollapserSubject = PublishSubject.create();
        writeOnlyCommandStartSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandStartsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
        writeOnlyCommandCompletionSubject
                .onBackpressureBuffer()
            .doOnNext(writeCommandCompletionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
        writeOnlyCollapserSubject
                .onBackpressureBuffer()
               .doOnNext(writeCollapserExecutionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
    }
    public static HystrixThreadEventStream getInstance() {
        return threadLocalStreams.get();
    }
    public void shutdown() {
        writeOnlyCommandStartSubject.onCompleted();
        writeOnlyCommandCompletionSubject.onCompleted();
        writeOnlyCollapserSubject.onCompleted();
    }
    public void commandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey,HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy, int currentConcurrency) {
        HystrixCommandExecutionStarted event = new HystrixCommandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentConcurrency);
        writeOnlyCommandStartSubject.onNext(event);
    }
    public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
        writeOnlyCommandCompletionSubject.onNext(event);
    }
    public void collapserResponseFromCache(HystrixCollapserKey collapserKey) {
        HystrixCollapserEvent collapserEvent = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.RESPONSE_FROM_CACHE, 1);
        writeOnlyCollapserSubject.onNext(collapserEvent);
    }
    public void collapserBatchExecuted(HystrixCollapserKey collapserKey, int batchSize) {
        HystrixCollapserEvent batchExecution = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.BATCH_EXECUTED, 1);
        HystrixCollapserEvent batchAdditions = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.ADDED_TO_BATCH, batchSize);
        writeOnlyCollapserSubject.onNext(batchExecution);
        writeOnlyCollapserSubject.onNext(batchAdditions);
    }

複製代碼

 

  

消息體

HystrixCommandExecutionStarted

  命令開始執行消息,內部包括了該命令的執行策略和併發數。

複製代碼

private final HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy;
    private final int currentConcurrency;

    public HystrixCommandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey,
                                          HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy,
                                          int currentConcurrency) {
        super(commandKey, threadPoolKey);
        this.isolationStrategy = isolationStrategy;
        this.currentConcurrency = currentConcurrency;
    }

複製代碼

HystrixCommandCompletion

  命令執行完成消息。內部包含執行結果對象ExecutionResult和請求上下文對象HystrixRequestContext。

protected final ExecutionResult executionResult;
protected final HystrixRequestContext requestContext;

ExecutionResult

  執行結果數據。

複製代碼

  private final EventCounts eventCounts;//事件數量
    private final Exception failedExecutionException;//失敗異常
    private final Exception executionException; //執行異常
    private final long startTimestamp;//命令開始執行時間
    private final int executionLatency; //執行run的時間
    private final int userThreadLatency; //請求提交到執行結束的時間
    private final boolean executionOccurred;//ture 執行過命令 false 未執行過命令
    private final boolean isExecutedInThread;//ture 使用線程池執行 false 不是使用線程池執行
    private final HystrixCollapserKey collapserKey;

複製代碼

 EventCounts

  記錄各個事件的次數。

  private final BitSet events;
   private final int numEmissions//emission次數
   private final int numFallbackEmissions;//fallback次數
   private final int numCollapsed;//合併格式

 HystrixCollapserEvent

  合併命令消息,內部包含合併命令key、事件類型、次數。

private final HystrixCollapserKey collapserKey;
    private final HystrixEventType.Collapser eventType;
    private final int count;

消息監聽流

BucketedCounterStream

   監聽HystrixEvent消息,並對各個事件類型聚合一個時間段內的數據。

複製代碼

protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
            }
        };

        final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
        for (int i = 0; i < numBuckets; i++) {
            emptyEventCountsToStart.add(getEmptyBucketSummary());
        }

        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
            }
        });
    }

複製代碼

BucketedCumulativeCounterStream

  BucketedCounterStream子類,對聚合的數據進行持續統計。

複製代碼

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);

        this.sourceStream = bucketedStream
                .scan(getEmptyOutputValue(), reduceBucket)
                .skip(numBuckets)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }

複製代碼

BucketedRollingCounterStream

  BucketedCounterStream子類,對聚合的數據在窗口期內統計。

複製代碼

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
            @Override
            public Observable<Output> call(Observable<Bucket> window) {
                return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
            }
        };
        this.sourceStream = bucketedStream      //stream broken up into buckets
                .window(numBuckets, 1)          //emit overlapping windows of buckets
                .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }

複製代碼

  CumulativeCollapserEventCounterStream、CumulativeCommandEventCounterStream、CumulativeThreadPoolEventCounterStream繼承BucketedCumulativeCounterStream,分別監控合併命令,命令,線程池的各個事件,並持續合併各個事件的數據。

  RollingCollapserEventCounterStream、RollingCommandEventCounterStream、RollingThreadPoolEventCounterStream繼承BucketedRollingCounterStream分別監控合併命令,命令,線程池的各個事件,併合並窗口期內各個事件的數據。HealthCountsStream繼承BucketedRollingCounterStream監控和計算調用的失敗率。

RollingConcurrencyStream

  監聽HystrixCommandExecutionStarted消息,得到當前併發量,並統計窗口期內的最大併發量。

複製代碼

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
        final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
        for (int i = 0; i < numBuckets; i++) {
            emptyRollingMaxBuckets.add(0);
        }
        rollingMaxStream = inputEventStream
                .observe()
                .map(getConcurrencyCountFromEvent)
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
                .flatMap(reduceStreamToMax)
                .startWith(emptyRollingMaxBuckets)
                .window(numBuckets, 1)
                .flatMap(reduceStreamToMax)
                .share()
                .onBackpressureDrop();
    }
private static final Func1<Observable<Integer>, Observable<Integer>> reduceStreamToMax = new Func1<Observable<Integer>, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Observable<Integer> observedConcurrency) {
            return observedConcurrency.reduce(0, reduceToMax);
        }
    };

複製代碼

RollingCommandMaxConcurrencyStream、RollingThreadPoolMaxConcurrencyStream分別監聽HystrixCommandStartStream、HystrixThreadPoolStartStream消息流,統計窗口期內最大併發量。

RollingDistributionStream

  統計某一數據的窗口期內分佈狀況。

複製代碼

protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                                        final Func2<Histogram, Event, Histogram> addValuesToBucket) {
        final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
        for (int i = 0; i < numBuckets; i++) {
            emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
        }

        final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
            @Override
            public Observable<Histogram> call(Observable<Event> bucket) {
                return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
            }
        };

        rollingDistributionStream = stream
                .observe()
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
                .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
                .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
                .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
                .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
                .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
                .share()
                .onBackpressureDrop();
    }

複製代碼

RollingCollapserBatchSizeDistributionStream、RollingCommandLatencyDistributionStream、RollingCommandUserLatencyDistributionStream分別監聽HystrixCollapserEventStream、HystrixCommandCompletionStream、HystrixCommandCompletionStream流來統計命令執行時間、執行總時間、ADDED_TO_BATCH的個數的百分比分佈狀況。

 

其餘流

HystrixConfigurationStream

  經過該數據流能夠定時獲取hystrix最新的properties配置信息,com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用該流來獲取配置信息。

複製代碼

public HystrixConfigurationStream(final int intervalInMilliseconds) {
        this.intervalInMilliseconds = intervalInMilliseconds;
        this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
                .map(getAllConfig)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()
                .onBackpressureDrop();
    }
private static final Func1<Long, HystrixConfiguration> getAllConfig =
            new Func1<Long, HystrixConfiguration>() {
                @Override
                public HystrixConfiguration call(Long timestamp) {
                    return HystrixConfiguration.from(
                            getAllCommandConfig.call(timestamp),
                            getAllThreadPoolConfig.call(timestamp),
                            getAllCollapserConfig.call(timestamp)
                    );
                }
            };

複製代碼

複製代碼

private static final Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>> getAllCommandConfig =
            new Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>>() {
                @Override
                public Map<HystrixCommandKey, HystrixCommandConfiguration> call(Long timestamp) {
                    Map<HystrixCommandKey, HystrixCommandConfiguration> commandConfigPerKey = new HashMap<HystrixCommandKey, HystrixCommandConfiguration>();
                    for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) {
                        HystrixCommandKey commandKey = commandMetrics.getCommandKey();
                        HystrixThreadPoolKey threadPoolKey = commandMetrics.getThreadPoolKey();
                        HystrixCommandGroupKey groupKey = commandMetrics.getCommandGroup();
                        commandConfigPerKey.put(commandKey, sampleCommandConfiguration(commandKey, threadPoolKey, groupKey, commandMetrics.getProperties()));
                    }
                    return commandConfigPerKey;
                }
            };

    private static final Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getAllThreadPoolConfig =
            new Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() {
                @Override
                public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(Long timestamp) {
                    Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> threadPoolConfigPerKey = new HashMap<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>();
                    for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) {
                        HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey();
                        threadPoolConfigPerKey.put(threadPoolKey, sampleThreadPoolConfiguration(threadPoolKey, threadPoolMetrics.getProperties()));
                    }
                    return threadPoolConfigPerKey;
                }
            };

    private static final Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getAllCollapserConfig =
            new Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() {
                @Override
                public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(Long timestamp) {
                    Map<HystrixCollapserKey, HystrixCollapserConfiguration> collapserConfigPerKey = new HashMap<HystrixCollapserKey, HystrixCollapserConfiguration>();
                    for (HystrixCollapserMetrics collapserMetrics: HystrixCollapserMetrics.getInstances()) {
                        HystrixCollapserKey collapserKey = collapserMetrics.getCollapserKey();
                        collapserConfigPerKey.put(collapserKey, sampleCollapserConfiguration(collapserKey, collapserMetrics.getProperties()));
                    }
                    return collapserConfigPerKey;
                }
            };

複製代碼

相關文章
相關標籤/搜索