Hystrix是Netflix的一個開源框架,地址以下:https://github.com/Netflix/Hystrixjava
中文名爲「豪豬」,即平時很溫順,在感覺到危險的時候,用刺保護本身;在危險過去後,仍是一個溫順的肉球。git
因此,整個框架的核心業務也就是這2點:github
對於一個系統而言,它每每承擔着2層角色,服務提供者與服務消費者。對於服務消費者而言最大的痛苦就是如何「明哲保身」,作過網關項目的同窗確定感同身受。web
上面是一個常見的系統依賴關係,底層的依賴每每不少,通訊協議包括socket、HTTP、Dubbo、webservice等等。當通訊層發生網絡抖動以及所依賴的系統發生業務響應異常時,咱們業務自己所提供的服務能力也直接會受到影響。編程
這種效果傳遞下去就頗有可能形成雪崩效應,即整個業務聯調發生異常,好比業務總體超時,或者訂單數據不一致。tomcat
那麼核心問題就來了,如何檢測業務處於異常狀態?網絡
成功率!成功率直接反映了業務的數據流轉狀態,是最直接的業務表現。併發
固然,也能夠根據超時時間作判斷,好比Sentinel的實現。其實這裏概念上能夠作一個轉化,用時間作超時控制,超時=失敗,這依然是一個成功率的概念。app
如同豪豬同樣,「刺」就是他的保護工具,全部的攻擊都會被刺無情的懟回去。 框架
在Hystrix的實現中,這就出現了「熔斷器」的概念,即當前的系統是否處於須要保護的狀態。
當熔斷器處於開啓的狀態時,全部的請求都不會真正的走以前的業務邏輯,而是直接返回一個約定的信息,即fallBack。經過這種快速失敗原則保護咱們的系統。
可是,系統不該該永遠處於「有刺」的狀態,當危險事後須要恢復正常。
因而對熔斷器的核心操做就是以下幾個功能:
熔斷器的核心API以下圖:
這四個概念是咱們談起微服務會常常談到的概念,這裏咱們討論的是Hystrix的實現方式。
先上一個Hystrix的業務流程圖
能夠看到Hystrix的請求都要通過HystrixCommand的包裝,其核心邏輯在AbstractComman.java類中。
下面的源碼是基於RxJava的,看以前最好先了解下RxJava的常見用法與邏輯,不然看起來會很迷惑。
簡單的說,RxJava就是基於回調的函數式編程。
通俗的說,就等同於策略模式的匿名內部類實現。
首先看信號量是如何影響咱們的請求的:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 自定義擴展 executionHook.onStart(_cmd); //判斷熔斷器是否容許請求過來 if (circuitBreaker.attemptExecution()) {
//得到分組信號量,若是沒有采用信號量分組,返回默認經過的信號量實現 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
//調用終止的回調函數 final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; //調用異常的回調函數 final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; //根據信號量嘗試競爭信號量 if (executionSemaphore.tryAcquire()) { try { //競爭成功,註冊執行參數 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else {
//競爭失敗,進入fallback return handleSemaphoreRejectionViaFallback(); } } else {
//熔斷器已打開,進入fallback return handleShortCircuitViaFallback(); } }
何時熔斷器能夠放請求進來:
@Override public boolean attemptExecution() {
//動態屬性判斷,熔斷器是否強制開着,若是強制開着,就不容許請求 if (properties.circuitBreakerForceOpen().get()) { return false; }
//若是強制關閉,就容許請求 if (properties.circuitBreakerForceClosed().get()) { return true; }
//若是當前是關閉,就容許請求 if (circuitOpened.get() == -1) { return true; } else {
//若是當前開着,就看是否已通過了"滑動窗口",過了就能夠請求,不過就不能夠 if (isAfterSleepWindow()) { //only the first request after sleep window should execute //if the executing command succeeds, the status will transition to CLOSED //if the executing command fails, the status will transition to OPEN //if the executing command gets unsubscribed, the status will transition to OPEN
//這裏使用CAS的方式,只有一個請求能過來,即"半關閉"狀態 if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { return true; } else { return false; } } else { return false; } } } }
這裏有個重要概念就是"滑動窗口":
private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
//滑動窗口的判斷就是看看熔斷器打開的時間與如今相比是否超過了配置的滑動窗口 return currentTime > circuitOpenTime + sleepWindowTime; }
若是將業務請求進行隔離?
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//判斷隔離策略是什麼,是線程池隔離仍是信號量隔離 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
//線程池隔離的運行邏輯以下 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } //按照配置生成監控數據 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ try {
//執行擴展點邏輯 executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.empty(); } }
//註冊各類場景的回調函數 }).doOnTerminate(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it }
//將邏輯放在線程池的調度器上執行,即將上述邏輯放入線程池中 }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else {
//走到這裏就是信號量隔離,在當前線程中執行,沒有調度器 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); //執行發生的回調 final Action1<R> markEmits = new Action1<R>() { @Override public void call(R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } if (commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); circuitBreaker.markSuccess(); } } }; //執行成功的回調,標記下狀態,熔斷器根據這個狀態維護熔斷邏輯 final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); circuitBreaker.markSuccess(); } } }; //執行失敗的回調 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e);
//各類回調進行各類fallback if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } //註冊各類回調函數 return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }