聊聊HystrixEventNotifier

本文主要研究一下HystrixEventNotifierjava

HystrixEventNotifier

/**
 * Abstract EventNotifier that allows receiving notifications for different events with default implementations.
 * <p>
 * See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
 * href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
 * <p>
 * <b>Note on thread-safety and performance</b>
 * <p>
 * A single implementation of this class will be used globally so methods on this class will be invoked concurrently from multiple threads so all functionality must be thread-safe.
 * <p>
 * Methods are also invoked synchronously and will add to execution time of the commands so all behavior should be fast. If anything time-consuming is to be done it should be spawned asynchronously
 * onto separate worker threads.
 */
public abstract class HystrixEventNotifier {

    /**
     * Called for every event fired.
     * <p>
     * <b>Default Implementation: </b> Does nothing
     * 
     * @param eventType event type
     * @param key event key
     */
    public void markEvent(HystrixEventType eventType, HystrixCommandKey key) {
        // do nothing
    }

    /**
     * Called after a command is executed using thread isolation.
     * <p>
     * Will not get called if a command is rejected, short-circuited etc.
     * <p>
     * <b>Default Implementation: </b> Does nothing
     * 
     * @param key
     *            {@link HystrixCommandKey} of command instance.
     * @param isolationStrategy
     *            {@link ExecutionIsolationStrategy} the isolation strategy used by the command when executed
     * @param duration
     *            time in milliseconds of executing <code>run()</code> method
     * @param eventsDuringExecution
     *            {@code List<HystrixEventType>} of events occurred during execution.
     */
    public void markCommandExecution(HystrixCommandKey key, ExecutionIsolationStrategy isolationStrategy, int duration, List<HystrixEventType> eventsDuringExecution) {
        // do nothing
    }

}
  • 這個notifier是同步調用的,所以裏頭方法的實現不能太耗時,否則則會阻塞,若是方法太耗時則須要考慮異步到其餘線程
  • markEvent,在每一個事件觸發的時候調用,markCommandExecution是在使用線程隔離方式的時候會調用

HystrixEventNotifierDefault

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/eventnotifier/HystrixEventNotifierDefault.javagit

/**
 * Default implementations of {@link HystrixEventNotifier} that does nothing.
 * 
 * @ExcludeFromJavadoc
 */
public class HystrixEventNotifierDefault extends HystrixEventNotifier {

    private static HystrixEventNotifierDefault INSTANCE = new HystrixEventNotifierDefault();

    private HystrixEventNotifierDefault() {

    }

    public static HystrixEventNotifier getInstance() {
        return INSTANCE;
    }
}
默認實現不作任何操做

HystrixEventType

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixEventType.javagithub

/**
 * Various states/events that execution can result in or have tracked.
 * <p>
 * These are most often accessed via {@link HystrixRequestLog} or {@link HystrixCommand#getExecutionEvents()}.
 */
public enum HystrixEventType {
    EMIT(false),
    SUCCESS(true),
    FAILURE(false),
    TIMEOUT(false),
    BAD_REQUEST(true),
    SHORT_CIRCUITED(false),
    THREAD_POOL_REJECTED(false),
    SEMAPHORE_REJECTED(false),
    FALLBACK_EMIT(false),
    FALLBACK_SUCCESS(true),
    FALLBACK_FAILURE(true),
    FALLBACK_REJECTION(true),
    FALLBACK_MISSING(true),
    EXCEPTION_THROWN(false),
    RESPONSE_FROM_CACHE(true),
    CANCELLED(true),
    COLLAPSED(false),
    COMMAND_MAX_ACTIVE(false);

    private final boolean isTerminal;

    HystrixEventType(boolean isTerminal) {
        this.isTerminal = isTerminal;
    }

    public boolean isTerminal() {
        return isTerminal;
    }
    //......

    /**
     * List of events that throw an Exception to the caller
     */
    public final static List<HystrixEventType> EXCEPTION_PRODUCING_EVENT_TYPES = new ArrayList<HystrixEventType>();

    /**
     * List of events that are terminal
     */
    public final static List<HystrixEventType> TERMINAL_EVENT_TYPES = new ArrayList<HystrixEventType>();

    static {
        EXCEPTION_PRODUCING_EVENT_TYPES.add(BAD_REQUEST);
        EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_FAILURE);
        EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_MISSING);
        EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_REJECTION);

        for (HystrixEventType eventType: HystrixEventType.values()) {
            if (eventType.isTerminal()) {
                TERMINAL_EVENT_TYPES.add(eventType);
            }
        }
    }
    //......
}
  • 這裏定了HystrixEvent的枚舉,而後還對這些事件進行了分類,分爲EXCEPTION_PRODUCING_EVENT_TYPES以及EXCEPTION_PRODUCING_EVENT_TYPES兩類
  • EXCEPTION_PRODUCING_EVENT_TYPES包括BAD_REQUEST、FALLBACK_FAILURE、FALLBACK_MISSING、FALLBACK_REJECTION
  • TERMINAL_EVENT_TYPES主要是根據event枚舉的isTerminal屬性來,包括SUCCESS、BAD_REQUEST、FALLBACK_SUCCESS、FALLBACK_FAILURE、FALLBACK_REJECTION、FALLBACK_MISSING、RESPONSE_FROM_CACHE、CANCELLED

markCommandExecution

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java異步

/**
     * This decorates "Hystrix" functionality around the run() Observable.
     *
     * @return R
     */
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        final Action1<R> markEmits = new Action1<R>() {
            @Override
            public void call(R r) {
                if (shouldOutputOnNextEvents()) {
                    executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                    eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
                }
                if (commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };

        final Action0 markOnCompleted = new Action0() {
            @Override
            public void call() {
                if (!commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };

        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
            @Override
            public void call(Notification<? super R> rNotification) {
                setRequestContextIfNeeded(currentRequestContext);
            }
        };

        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
  • 這裏在markEmits的action裏頭若是是scalar command,會調用markCommandExecution
  • 這裏在markOnCompleted的action裏頭,若是不是scalar command,則會調用markCommandExecution

小結

在HystrixCommand以及HystrixObservableCommand調用的時候,都會調用HystrixEventNotifier來發布事件,提供給開發者自定義實現,來作指標收集及監控報警。async

doc

相關文章
相關標籤/搜索