本文主要研究下hystrix的fallbackjava
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javagit
/** * This decorates "Hystrix" functionality around the run() Observable. * * @return R */ private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { //...... final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; //...... Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javagithub
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.attemptExecution()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javaapp
private Observable<R> handleSemaphoreRejectionViaFallback() { Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution"); executionResult = executionResult.setExecutionException(semaphoreRejectionException); eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey); logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it // retrieve a fallback or throw an exception if no fallback available return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution", semaphoreRejectionException); } private Observable<R> handleShortCircuitViaFallback() { // record that we are returning a short-circuited fallback eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey); // short-circuit and go directly to fallback (or throw an exception if no fallback implemented) Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN"); executionResult = executionResult.setExecutionException(shortCircuitException); try { return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited", shortCircuitException); } catch (Exception e) { return Observable.error(e); } } private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) { eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey); threadPool.markThreadRejection(); // use a fallback instead (or throw exception if not implemented) return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying); } private Observable<R> handleTimeoutViaFallback() { return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException()); } private Observable<R> handleFailureViaFallback(Exception underlying) { /** * All other error handling */ logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying); // report failure eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey); // record the exception executionResult = executionResult.setException(underlying); return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying); }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javaide
/** * Execute <code>getFallback()</code> within protection of a semaphore that limits number of concurrent executions. * <p> * Fallback implementations shouldn't perform anything that can be blocking, but we protect against it anyways in case someone doesn't abide by the contract. * <p> * If something in the <code>getFallback()</code> implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially * all threads to pile up and block. * * @return K * @throws UnsupportedOperationException * if getFallback() not implemented * @throws HystrixRuntimeException * if getFallback() fails (throws an Exception) or is rejected by the semaphore */ private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) { final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread(); long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); // record the executionResult // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144) executionResult = executionResult.addEvent((int) latency, eventType); if (isUnrecoverable(originalException)) { logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException); /* executionHook for all errors */ Exception e = wrapWithOnErrorHook(failureType, originalException); return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null)); } else { if (isRecoverableError(originalException)) { logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException); } if (properties.fallbackEnabled().get()) { /* fallback behavior is permitted so attempt */ final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(requestContext); } }; final Action1<R> markFallbackEmit = new Action1<R>() { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT); eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey); } } }; final Action0 markFallbackCompleted = new Action0() { @Override public void call() { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS); } }; final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { /* executionHook for all errors */ Exception e = wrapWithOnErrorHook(failureType, originalException); Exception fe = getExceptionFromThrowable(t); long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); Exception toEmit; if (fe instanceof UnsupportedOperationException) { logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING); toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe); } else { logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe); eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE); toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe); } // NOTE: we're suppressing fallback exception here if (shouldNotBeWrapped(originalException)) { return Observable.error(e); } return Observable.error(toEmit); } }; final TryableSemaphore fallbackSemaphore = getFallbackSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { fallbackSemaphore.release(); } } }; Observable<R> fallbackExecutionChain; // acquire a permit if (fallbackSemaphore.tryAcquire()) { try { if (isFallbackUserDefined()) { executionHook.onFallbackStart(this); fallbackExecutionChain = getFallbackObservable(); } else { //same logic as above without the hook invocation fallbackExecutionChain = getFallbackObservable(); } } catch (Throwable ex) { //If hook or user-fallback throws, then use that as the result of the fallback lookup fallbackExecutionChain = Observable.error(ex); } return fallbackExecutionChain .doOnEach(setRequestContext) .lift(new FallbackHookApplication(_cmd)) .lift(new DeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } else { return handleFallbackRejectionByEmittingError(); } } else { return handleFallbackDisabledByEmittingError(originalException, failureType, message); } } }
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javaui
/** * Get the TryableSemaphore this HystrixCommand should use if a fallback occurs. * * @return TryableSemaphore */ protected TryableSemaphore getFallbackSemaphore() { if (fallbackSemaphoreOverride == null) { TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // we didn't find one cache so setup fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return fallbackSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return fallbackSemaphoreOverride; } }
針對每一個commandKey獲取或建立TryableSemaphoreActual
hystrix的fallback主要分爲5種類型:this
這幾個方法最後都調用了getFallbackOrThrowException方法。