本文主要研究一下HystrixEventNotifierjava
/** * Abstract EventNotifier that allows receiving notifications for different events with default implementations. * <p> * See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a * href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>. * <p> * <b>Note on thread-safety and performance</b> * <p> * A single implementation of this class will be used globally so methods on this class will be invoked concurrently from multiple threads so all functionality must be thread-safe. * <p> * Methods are also invoked synchronously and will add to execution time of the commands so all behavior should be fast. If anything time-consuming is to be done it should be spawned asynchronously * onto separate worker threads. */ public abstract class HystrixEventNotifier { /** * Called for every event fired. * <p> * <b>Default Implementation: </b> Does nothing * * @param eventType event type * @param key event key */ public void markEvent(HystrixEventType eventType, HystrixCommandKey key) { // do nothing } /** * Called after a command is executed using thread isolation. * <p> * Will not get called if a command is rejected, short-circuited etc. * <p> * <b>Default Implementation: </b> Does nothing * * @param key * {@link HystrixCommandKey} of command instance. * @param isolationStrategy * {@link ExecutionIsolationStrategy} the isolation strategy used by the command when executed * @param duration * time in milliseconds of executing <code>run()</code> method * @param eventsDuringExecution * {@code List<HystrixEventType>} of events occurred during execution. */ public void markCommandExecution(HystrixCommandKey key, ExecutionIsolationStrategy isolationStrategy, int duration, List<HystrixEventType> eventsDuringExecution) { // do nothing } }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/eventnotifier/HystrixEventNotifierDefault.javagit
/** * Default implementations of {@link HystrixEventNotifier} that does nothing. * * @ExcludeFromJavadoc */ public class HystrixEventNotifierDefault extends HystrixEventNotifier { private static HystrixEventNotifierDefault INSTANCE = new HystrixEventNotifierDefault(); private HystrixEventNotifierDefault() { } public static HystrixEventNotifier getInstance() { return INSTANCE; } }
默認實現不作任何操做
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixEventType.javagithub
/** * Various states/events that execution can result in or have tracked. * <p> * These are most often accessed via {@link HystrixRequestLog} or {@link HystrixCommand#getExecutionEvents()}. */ public enum HystrixEventType { EMIT(false), SUCCESS(true), FAILURE(false), TIMEOUT(false), BAD_REQUEST(true), SHORT_CIRCUITED(false), THREAD_POOL_REJECTED(false), SEMAPHORE_REJECTED(false), FALLBACK_EMIT(false), FALLBACK_SUCCESS(true), FALLBACK_FAILURE(true), FALLBACK_REJECTION(true), FALLBACK_MISSING(true), EXCEPTION_THROWN(false), RESPONSE_FROM_CACHE(true), CANCELLED(true), COLLAPSED(false), COMMAND_MAX_ACTIVE(false); private final boolean isTerminal; HystrixEventType(boolean isTerminal) { this.isTerminal = isTerminal; } public boolean isTerminal() { return isTerminal; } //...... /** * List of events that throw an Exception to the caller */ public final static List<HystrixEventType> EXCEPTION_PRODUCING_EVENT_TYPES = new ArrayList<HystrixEventType>(); /** * List of events that are terminal */ public final static List<HystrixEventType> TERMINAL_EVENT_TYPES = new ArrayList<HystrixEventType>(); static { EXCEPTION_PRODUCING_EVENT_TYPES.add(BAD_REQUEST); EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_FAILURE); EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_MISSING); EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_REJECTION); for (HystrixEventType eventType: HystrixEventType.values()) { if (eventType.isTerminal()) { TERMINAL_EVENT_TYPES.add(eventType); } } } //...... }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java異步
/** * This decorates "Hystrix" functionality around the run() Observable. * * @return R */ private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); final Action1<R> markEmits = new Action1<R>() { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } if (commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); circuitBreaker.markSuccess(); } } }; final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); circuitBreaker.markSuccess(); } } }; final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
在HystrixCommand以及HystrixObservableCommand調用的時候,都會調用HystrixEventNotifier來發布事件,提供給開發者自定義實現,來作指標收集及監控報警。async