本文主要研究一下hystrix的BucketedCounterStreamhtml
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedCounterStream.javajava
/** * Abstract class that imposes a bucketing structure and provides streams of buckets * * @param <Event> type of raw data that needs to get summarized into a bucket * @param <Bucket> type of data contained in each bucket * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be) */ public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> { protected final int numBuckets; protected final Observable<Bucket> bucketedStream; protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null); private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary; private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue()); protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket) { this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } }); } abstract Bucket getEmptyBucketSummary(); abstract Output getEmptyOutputValue(); /** * Return the stream of buckets * @return stream of buckets */ public abstract Observable<Output> observe(); public void startCachingStreamValuesIfUnstarted() { if (subscription.get() == null) { //the stream is not yet started Subscription candidateSubscription = observe().subscribe(counterSubject); if (subscription.compareAndSet(null, candidateSubscription)) { //won the race to set the subscription } else { //lost the race to set the subscription, so we need to cancel this one candidateSubscription.unsubscribe(); } } } /** * Synchronous call to retrieve the last calculated bucket without waiting for any emissions * @return last calculated bucket */ public Output getLatest() { startCachingStreamValuesIfUnstarted(); if (counterSubject.hasValue()) { return counterSubject.getValue(); } else { return getEmptyOutputValue(); } } public void unsubscribe() { Subscription s = subscription.get(); if (s != null) { s.unsubscribe(); subscription.compareAndSet(s, null); } } }
final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get(); final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get(); final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedRollingCounterStream.javagit
/** * Refinement of {@link BucketedCounterStream} which reduces numBuckets at a time. * * @param <Event> type of raw data that needs to get summarized into a bucket * @param <Bucket> type of data contained in each bucket * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be) */ public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> { private Observable<Output> sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() { @Override public Observable<Output> call(Observable<Bucket> window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer } @Override public Observable<Output> observe() { return sourceStream; } /* package-private */ boolean isSourceCurrentlySubscribed() { return isSourceCurrentlySubscribed.get(); } }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedCumulativeCounterStream.javawindows
/** * Refinement of {@link BucketedCounterStream} which accumulates counters infinitely in the bucket-reduction step * * @param <Event> type of raw data that needs to get summarized into a bucket * @param <Bucket> type of data contained in each bucket * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be) */ public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> { private Observable<Output> sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs, Func2<Bucket, Event, Bucket> reduceCommandCompletion, Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion); this.sourceStream = bucketedStream .scan(getEmptyOutputValue(), reduceBucket) .skip(numBuckets) .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer } @Override public Observable<Output> observe() { return sourceStream; } }
rolling及cumulative使用的是rxjava的window及scan操做來實現,看起來比較簡潔。