本文主要研究一下hystrix的timeout處理java
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javagit
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { final AbstractCommand<R> originalCommand; public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { this.originalCommand = originalCommand; } @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); // if the child unsubscribes we unsubscribe our parent as well child.add(s); //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); TimerListener listener = new TimerListener() { @Override public void tick() { // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath // otherwise it means we lost a race and the run() execution completed or did not start if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // report timeout failure originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // shut down the original request s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run(); //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout } } @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); // set externally so execute/queue can see this originalCommand.timeoutTimer.set(tl); /** * If this subscriber receives values it means the parent succeeded/completed */ Subscriber<R> parent = new Subscriber<R>() { @Override public void onCompleted() { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onCompleted(); } } @Override public void onError(Throwable e) { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onError(e); } } @Override public void onNext(R v) { if (isNotTimedOut()) { child.onNext(v); } } private boolean isNotTimedOut() { // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } }; // if s is unsubscribed we want to unsubscribe the parent s.add(parent); return parent; } }
這裏有個timerListener去將isCommandTimedOut屬性從TimedOutStatus.NOT_EXECUTED改成TimedOutStatus.TIMED_OUT
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run();
這裏若是設置超時狀態成功的話,則onError拋出HystrixTimeoutException異常。
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/util/HystrixTimer.javagithub
/** * Add a {@link TimerListener} that will be executed until it is garbage collected or removed by clearing the returned {@link Reference}. * <p> * NOTE: It is the responsibility of code that adds a listener via this method to clear this listener when completed. * <p> * <blockquote> * * <pre> {@code * // add a TimerListener * Reference<TimerListener> listener = HystrixTimer.getInstance().addTimerListener(listenerImpl); * * // sometime later, often in a thread shutdown, request cleanup, servlet filter or something similar the listener must be shutdown via the clear() method * listener.clear(); * }</pre> * </blockquote> * * * @param listener * TimerListener implementation that will be triggered according to its <code>getIntervalTimeInMilliseconds()</code> method implementation. * @return reference to the TimerListener that allows cleanup via the <code>clear()</code> method */ public Reference<TimerListener> addTimerListener(final TimerListener listener) { startThreadIfNeeded(); // add the listener Runnable r = new Runnable() { @Override public void run() { try { listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } }; ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); }
這個TimerListener是經過ScheduledThreadPoolExecutor的scheduleAtFixedRate來調度的
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javaide
private void cleanUpAfterResponseFromCache(boolean commandExecutionStarted) { Reference<TimerListener> tl = timeoutTimer.get(); if (tl != null) { tl.clear(); } final long latency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult .addEvent(-1, HystrixEventType.RESPONSE_FROM_CACHE) .markUserThreadCompletion(latency) .setNotExecutedInThread(); ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE) .markUserThreadCompletion(latency); metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted); eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey); } private void handleCommandEnd(boolean commandExecutionStarted) { Reference<TimerListener> tl = timeoutTimer.get(); if (tl != null) { tl.clear(); } long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); if (executionResultAtTimeOfCancellation == null) { metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } if (endCurrentThreadExecutingCommand != null) { endCurrentThreadExecutingCommand.call(); } }
cleanUpAfterResponseFromCache以及handleCommandEnd會清理掉這個timeoutTimer
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/util/HystrixTimer.javathis
private static class TimerReference extends SoftReference<TimerListener> { private final ScheduledFuture<?> f; TimerReference(TimerListener referent, ScheduledFuture<?> f) { super(referent); this.f = f; } @Override public void clear() { super.clear(); // stop this ScheduledFuture from any further executions f.cancel(false); } }
TimerReference的clear方法裏頭,除了調用父類的clear方法外,還調用了ScheduledFuture的cancel(false)方法,這樣子來取消掉線程的調度
hystrix的timeout處理是經過添加一個TimeoutListener來進行調度處理的,調度是採用線程池的scheduleAtFixedRate方式調度的(executionTimeoutInMilliseconds以後執行
),調度執行的是listener的tick方法。該方法會去設置isCommandTimedOut,從TimedOutStatus.NOT_EXECUTED改成TimedOutStatus.TIMED_OUT,若是成功則觸發timeoutRunnable方法,拋出HystrixTimeoutException異常。線程