本文主要研究一下HystrixEventStreamjava
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/HystrixEventStream.javareact
/** * Base interface for a stream of {@link com.netflix.hystrix.HystrixEventType}s. Allows consumption by individual * {@link com.netflix.hystrix.HystrixEventType} or by time-based bucketing of events */ public interface HystrixEventStream<E extends HystrixEvent> { Observable<E> observe(); }
這個接口定義了一個observe方法,返回的是rxjava的Observable,它有以下幾個實現類
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/HystrixCommandStartStream.javagit
/** * Per-Command stream of {@link HystrixCommandExecutionStarted}s. This gets written to by {@link HystrixThreadEventStream}s. * Events are emitted synchronously in the same thread that performs the command execution. */ public class HystrixCommandStartStream implements HystrixEventStream<HystrixCommandExecutionStarted> { private final HystrixCommandKey commandKey; private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlySubject; private final Observable<HystrixCommandExecutionStarted> readOnlyStream; private static final ConcurrentMap<String, HystrixCommandStartStream> streams = new ConcurrentHashMap<String, HystrixCommandStartStream>(); public static HystrixCommandStartStream getInstance(HystrixCommandKey commandKey) { HystrixCommandStartStream initialStream = streams.get(commandKey.name()); if (initialStream != null) { return initialStream; } else { synchronized (HystrixCommandStartStream.class) { HystrixCommandStartStream existingStream = streams.get(commandKey.name()); if (existingStream == null) { HystrixCommandStartStream newStream = new HystrixCommandStartStream(commandKey); streams.putIfAbsent(commandKey.name(), newStream); return newStream; } else { return existingStream; } } } } HystrixCommandStartStream(final HystrixCommandKey commandKey) { this.commandKey = commandKey; this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create()); this.readOnlyStream = writeOnlySubject.share(); } public static void reset() { streams.clear(); } public void write(HystrixCommandExecutionStarted event) { writeOnlySubject.onNext(event); } @Override public Observable<HystrixCommandExecutionStarted> observe() { return readOnlyStream; } @Override public String toString() { return "HystrixCommandStartStream(" + commandKey.name() + ")"; } }
HystrixThreadEventStream會同步寫入HystrixCommandExecutionStarted,這裏的write方法調用了writeOnlySubject的onNext方法
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/HystrixThreadEventStream.javagithub
/** * Per-thread event stream. No synchronization required when writing to it since it's single-threaded. * * Some threads will be dedicated to a single HystrixCommandKey (a member of a thread-isolated {@link HystrixThreadPool}. * However, many situations arise where a single thread may serve many different commands. Examples include: * * Application caller threads (semaphore-isolated commands, or thread-pool-rejections) * * Timer threads (timeouts or collapsers) * <p> * I don't think that a thread-level view is an interesting one to consume (I could be wrong), so at the moment there * is no public way to consume it. I can always add it later, if desired. * <p> * Instead, this stream writes to the following streams, which have more meaning to metrics consumers: * <ul> * <li>{@link HystrixCommandCompletionStream}</li> * <li>{@link HystrixCommandStartStream}</li> * <li>{@link HystrixThreadPoolCompletionStream}</li> * <li>{@link HystrixThreadPoolStartStream}</li> * <li>{@link HystrixCollapserEventStream}</li> * </ul> * * Also note that any observers of this stream do so on the thread that writes the metric. This is the command caller * thread in the SEMAPHORE-isolated case, and the Hystrix thread in the THREAD-isolated case. I determined this to * be more efficient CPU-wise than immediately hopping off-thread and doing all the metric calculations in the * RxComputationThreadPool. */ public class HystrixThreadEventStream { private final long threadId; private final String threadName; private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject; private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject; private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject; private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() { @Override protected HystrixThreadEventStream initialValue() { return new HystrixThreadEventStream(Thread.currentThread()); } }; private static final Action1<HystrixCommandExecutionStarted> writeCommandStartsToShardedStreams = new Action1<HystrixCommandExecutionStarted>() { @Override public void call(HystrixCommandExecutionStarted event) { HystrixCommandStartStream commandStartStream = HystrixCommandStartStream.getInstance(event.getCommandKey()); commandStartStream.write(event); if (event.isExecutedInThread()) { HystrixThreadPoolStartStream threadPoolStartStream = HystrixThreadPoolStartStream.getInstance(event.getThreadPoolKey()); threadPoolStartStream.write(event); } } }; private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() { @Override public void call(HystrixCommandCompletion commandCompletion) { HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey()); commandStream.write(commandCompletion); if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) { HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey()); threadPoolStream.write(commandCompletion); } } }; private static final Action1<HystrixCollapserEvent> writeCollapserExecutionsToShardedStreams = new Action1<HystrixCollapserEvent>() { @Override public void call(HystrixCollapserEvent collapserEvent) { HystrixCollapserEventStream collapserStream = HystrixCollapserEventStream.getInstance(collapserEvent.getCollapserKey()); collapserStream.write(collapserEvent); } }; /* package */ HystrixThreadEventStream(Thread thread) { this.threadId = thread.getId(); this.threadName = thread.getName(); writeOnlyCommandStartSubject = PublishSubject.create(); writeOnlyCommandCompletionSubject = PublishSubject.create(); writeOnlyCollapserSubject = PublishSubject.create(); writeOnlyCommandStartSubject .onBackpressureBuffer() .doOnNext(writeCommandStartsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); writeOnlyCommandCompletionSubject .onBackpressureBuffer() .doOnNext(writeCommandCompletionsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); writeOnlyCollapserSubject .onBackpressureBuffer() .doOnNext(writeCollapserExecutionsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); } public static HystrixThreadEventStream getInstance() { return threadLocalStreams.get(); } public void shutdown() { writeOnlyCommandStartSubject.onCompleted(); writeOnlyCommandCompletionSubject.onCompleted(); writeOnlyCollapserSubject.onCompleted(); } public void commandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy, int currentConcurrency) { HystrixCommandExecutionStarted event = new HystrixCommandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentConcurrency); writeOnlyCommandStartSubject.onNext(event); } public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) { HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey); writeOnlyCommandCompletionSubject.onNext(event); } public void collapserResponseFromCache(HystrixCollapserKey collapserKey) { HystrixCollapserEvent collapserEvent = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.RESPONSE_FROM_CACHE, 1); writeOnlyCollapserSubject.onNext(collapserEvent); } public void collapserBatchExecuted(HystrixCollapserKey collapserKey, int batchSize) { HystrixCollapserEvent batchExecution = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.BATCH_EXECUTED, 1); HystrixCollapserEvent batchAdditions = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.ADDED_TO_BATCH, batchSize); writeOnlyCollapserSubject.onNext(batchExecution); writeOnlyCollapserSubject.onNext(batchAdditions); } @Override public String toString() { return "HystrixThreadEventStream (" + threadId + " - " + threadName + ")"; } }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixCommandMetrics.javawindows
/** * Used by {@link HystrixCommand} to record metrics. */ public class HystrixCommandMetrics extends HystrixMetrics { //...... /* package-private */ void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) { int currentCount = concurrentExecutionCount.incrementAndGet(); HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount); } /* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) { HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey); if (executionStarted) { concurrentExecutionCount.decrementAndGet(); } } //...... }
HystrixCommandMetrics裏頭的markCommandStart以及markCommandDone方法會調用HystrixThreadEventStream獲取實例,而後寫入指標
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixCollapserMetrics.javaapp
/** * Used by {@link HystrixCollapser} to record metrics. * {@link HystrixEventNotifier} not hooked up yet. It may be in the future. */ public class HystrixCollapserMetrics extends HystrixMetrics { //...... public void markRequestBatched() { } public void markResponseFromCache() { HystrixThreadEventStream.getInstance().collapserResponseFromCache(collapserKey); } public void markBatch(int batchSize) { HystrixThreadEventStream.getInstance().collapserBatchExecuted(collapserKey, batchSize); } public void markShards(int numShards) { } //...... }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javaide
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }
executeCommandWithSpecifiedIsolation方法調用了markCommandStart方法
private void cleanUpAfterResponseFromCache(boolean commandExecutionStarted) { Reference<TimerListener> tl = timeoutTimer.get(); if (tl != null) { tl.clear(); } final long latency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult .addEvent(-1, HystrixEventType.RESPONSE_FROM_CACHE) .markUserThreadCompletion(latency) .setNotExecutedInThread(); ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE) .markUserThreadCompletion(latency); metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted); eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey); } private void handleCommandEnd(boolean commandExecutionStarted) { Reference<TimerListener> tl = timeoutTimer.get(); if (tl != null) { tl.clear(); } long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); if (executionResultAtTimeOfCancellation == null) { metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } if (endCurrentThreadExecutingCommand != null) { endCurrentThreadExecutingCommand.call(); } }
cleanUpAfterResponseFromCache以及handleCommandEnd方法調用了markCommandDone方法
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedCounterStream.javaui
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> { //...... protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket) { this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } }); } //...... }
這裏調用了HystrixEventStream<Event>的observe方法來消費event stream,這裏的Event是泛型,即HystrixEvent的子類
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/RollingConcurrencyStream.javathis
public abstract class RollingConcurrencyStream { //...... protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) { final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>(); for (int i = 0; i < numBuckets; i++) { emptyRollingMaxBuckets.add(0); } rollingMaxStream = inputEventStream .observe() .map(getConcurrencyCountFromEvent) .window(bucketSizeInMs, TimeUnit.MILLISECONDS) .flatMap(reduceStreamToMax) .startWith(emptyRollingMaxBuckets) .window(numBuckets, 1) .flatMap(reduceStreamToMax) .share() .onBackpressureDrop(); } //...... }
這裏調用了HystrixEventStream<HystrixCommandExecutionStarted>的observe方法來消費event stream
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/RollingDistributionStream.java.net
public class RollingDistributionStream<Event extends HystrixEvent> { //...... protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs, final Func2<Histogram, Event, Histogram> addValuesToBucket) { final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>(); for (int i = 0; i < numBuckets; i++) { emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram()); } final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() { @Override public Observable<Histogram> call(Observable<Event> bucket) { return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket); } }; rollingDistributionStream = stream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached) .share() .onBackpressureDrop(); } //...... }
這裏調用了HystrixEventStream<Event>的observe方法來消費event stream,這裏的Event是泛型,即HystrixEvent的子類
HystrixEventStream是hystrix基於rxjava設計的一個reactive stream,hystrix command在相應的生命週期裏頭會調用HystrixThreadEventStream獲取實例,往指定HystrixCommandKey的相關stream發佈對應的事件,造成event stream,而後會有其餘stream去消費event stream而後造成對應的metrics。