final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
return handleFailureViaFallback(e);
private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
// use a fallback instead (or throw exception if not implemented)
return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying);
final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// record the executionResult
// do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
executionResult = executionResult.addEvent((int) latency, eventType);
if (isUnrecoverable(originalException)) {
logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
/* executionHook for all errors */
Exception e = wrapWithOnErrorHook(failureType, originalException);
return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
} else {
if (isRecoverableError(originalException)) {
logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
if (properties.fallbackEnabled().get()) {
/* fallback behavior is permitted so attempt */
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
public void call(Notification<? super R> rNotification) {
final Action1<R> markFallbackEmit = new Action1<R>() {
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
final Action0 markFallbackCompleted = new Action0() {
public void call() {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
public Observable<R> call(Throwable t) {
/* executionHook for all errors */
Exception e = wrapWithOnErrorHook(failureType, originalException);
Exception fe = getExceptionFromThrowable(t);
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
Exception toEmit;
if (fe instanceof UnsupportedOperationException) {
logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
} else {
logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
// NOTE: we're suppressing fallback exception here
if (shouldNotBeWrapped(originalException)) {
return Observable.error(e);
return Observable.error(toEmit);
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
Observable<R> fallbackExecutionChain;
// acquire a permit
if (fallbackSemaphore.tryAcquire()) {
try {
if (isFallbackUserDefined()) {
fallbackExecutionChain = getFallbackObservable();
} else {
//same logic as above without the hook invocation
fallbackExecutionChain = getFallbackObservable();
} catch (Throwable ex) {
//If hook or user-fallback throws, then use that as the result of the fallback lookup
fallbackExecutionChain = Observable.error(ex);
return fallbackExecutionChain
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
} else {
return handleFallbackRejectionByEmittingError();
} else {
return handleFallbackDisabledByEmittingError(originalException, failureType, message);
方法,判斷異常是否爲不可恢復異常,若是不可恢復則直接返回失敗 FALLBACK_EMIT
抽象方法 singleSemaphoreRelease
方法 final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
public Observable<R> call() {
try {
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
對於其餘異常類型的處理感興趣的同窗能夠繼續基於Func1 handleFallback