隨着業務的愈來愈複雜,保證程序的健壯性對程序猿來講也變得更加的重要,畢竟不寫Bug的程序猿不是一個好的程序猿。但怎樣儘量的保證我們的程序可以穩定的運行,以及出錯後可以進行相應的補償,這裏就須要我們使用熔斷機制了。java
PS:在進入正文以前,不妨思考一下兩個問題:
①熔斷機制究竟爲咱們解決了什麼問題?
②咱們怎樣去本身實現一個簡單的熔斷?git
這裏我們簡單的實現了一個超時後進行熔斷的例子,這裏有用到AspectJ的相關知識,對於熟悉Spring AOP知識的同窗應該沒什麼問題。github
主要分爲兩步:數據庫
Future
控制是否超時,超時後將任務cancel
掉。fallback
方法進行處理。在這裏須要注意的是,fallback
方法參數應該要與原方法相同,這樣我們才能進行補償措施。例如:我們能夠在fallback
方法藉助消息中間件將這些參數進行存儲,而後在適當的時候從消息中間件中讀取出來進行補償消費處理。 1@RestController
2public class HelloController {
3 private Random random = new Random();
4
5 @MyHystrixCommand(fallback="errorMethod")
6 @RequestMapping("/hello")
7 public String hello(@RequestParam("name") String message) throws InterruptedException {
8 int time = random.nextInt(200);
9 System.out.println("spend time : " + time + "ms");
10 Thread.sleep(time);
11 System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhh");
12 return "hello world:" + message;
13 }
14
15 public String errorMethod(String message) {
16 return "error message";
17 }
18}
複製代碼
1@Target(ElementType.METHOD)
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4public @interface MyHystrixCommand {
5 int value() default 100;
6 String fallback() default "";
7}
複製代碼
1@Aspect
2@Component
3public class MyHystrixCommandAspect {
4
5 ExecutorService executor = Executors.newFixedThreadPool(10);
6
7 @Pointcut(value = "@annotation(MyHystrixCommand)")
8 public void pointCut() {
9
10 }
11
12 @Around(value = "pointCut()&&@annotation(hystrixCommand)")
13 public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Throwable {
14 int timeout = hystrixCommand.value();
15 Future future = executor.submit(() -> {
16 try {
17 return joinPoint.proceed();
18 } catch (Throwable throwable) {
19 }
20 return null;
21 });
22 Object returnValue = null;
23 try {
24 returnValue = future.get(timeout, TimeUnit.MILLISECONDS);
25 } catch (InterruptedException | ExecutionException | TimeoutException e) {
26 future.cancel(true);
27 if (StringUtils.isBlank(hystrixCommand.fallback())){
28 throw new Exception("fallback is null");
29 }
30 returnValue = invokeFallbackMethod(joinPoint, hystrixCommand.fallback());
31 }
32 return returnValue;
33 }
34
35 private Object invokeFallbackMethod(ProceedingJoinPoint joinPoint, String fallback) throws Exception {
36 Method method = findFallbackMethod(joinPoint, fallback);
37 if (method == null) {
38 throw new Exception("can not find fallback :" + fallback + " method");
39 } else {
40 method.setAccessible(true);
41 try {
42 Object invoke = method.invoke(joinPoint.getTarget(), joinPoint.getArgs());
43 return invoke;
44 } catch (IllegalAccessException | InvocationTargetException e) {
45 throw e;
46 }
47 }
48 }
49
50
51 private Method findFallbackMethod(ProceedingJoinPoint joinPoint, String fallbackMethodName) {
52 Signature signature = joinPoint.getSignature();
53 MethodSignature methodSignature = (MethodSignature) signature;
54 Method method = methodSignature.getMethod();
55 Class<?>[] parameterTypes = method.getParameterTypes();
56 Method fallbackMethod = null;
57 try {
58 //這裏經過判斷必須取和原方法同樣參數的fallback方法
59 fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallbackMethodName, parameterTypes);
60 } catch (NoSuchMethodException e) {
61 }
62 return fallbackMethod;
63 }
64
65}
複製代碼
固然,上述例子只是一個簡單的超時後熔斷處理的實現方式。我們在實際應用中,還有可能併發超過指定閾值後我們也須要進行降級處理,一個最普通的場景:秒殺案例。這些東西在Hystrix
中都有相應的處理,它提供了線程池和信號量這兩種方式去解決併發的問題。緩存
我們看一下官方介紹併發
In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a library that helps you control the interactions between these distributed services by adding latency tolerance and fault tolerance logic. Hystrix does this by isolating points of access between the services, stopping cascading failures across them, and providing fallback options, all of which improve your system’s overall resiliency.app
在分佈式環境中,調用一些服務不可避免的會出現失敗,Hystrix
幫助我們添加了一些容忍策略,而且將服務進行隔離處理,防止一個服務的失敗影響到了另外一個服務的調用,這些都提升了我們系統的彈性。dom
這裏我們結合一下Spring Cloud Hystrix進行說明,從HystrixCommandAspect
開始分析:異步
1@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
2 public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
3 Method method = getMethodFromTarget(joinPoint);
4 ...
5 MetaHolder metaHolder = metaHolderFactory.create(joinPoint);//第一步
6 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);//第二步
7 ...
8 Object result;
9 try {
10 //第三步
11 if (!metaHolder.isObservable()) {
12 result = CommandExecutor.execute(invokable, executionType, metaHolder);
13 } else {
14 result = executeObservable(invokable, executionType, metaHolder);
15 }
16 }
17 ....
18 return result;
19 }
複製代碼
這個切面主要針對HystrixCommand
和HystrixCollapser
這兩個註解,前者用於進行熔斷降級處理,後者用來根據配置進行合併請求(類比數據庫操做,將多個insert語句合併成一個insert batch語句)。我們側重進行HystrixCommand
這一塊的分析。分佈式
這段代碼對應上面的MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
,裏面封裝了好比調用方法method
,參數args
,方法所屬對象target
,動態代理對象proxy
,回調方法fallbackMethod
等等一些元數據的封裝。這些數據在建立命令對象時會被使用。
它持有一個命令對象,而且能夠在合適的時候經過這個命令對象完成具體的業務邏輯,針對HystrixCommand
上述的命令對象就是GenericObservableCommand
和GenericCommand
的一種,這裏命令對象的選擇和方法的返回值有關,若是返回值爲Observable
類型,則建立GenericObservableCommand
命令,不然建立GenericCommand
命令。
1 public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
2 ...
3 switch (executionType) {
4 case SYNCHRONOUS: {
5 return castToExecutable(invokable, executionType).execute();
6 }
7 case ASYNCHRONOUS: {
8 HystrixExecutable executable = castToExecutable(invokable, executionType);
9 if (metaHolder.hasFallbackMethodCommand()
10 && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
11 return new FutureDecorator(executable.queue());
12 }
13 return executable.queue();
14 }
15 case OBSERVABLE: {
16 HystrixObservable observable = castToObservable(invokable);
17 return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
18 }
19 ...
20 }
21 }
複製代碼
從上面的代碼段中,能夠很容易的看出共有三種策略,同步、異步、OBSERVABLE,而Observable
又分爲Cold Observable(observable.toObservable())
和Hot Observable(observable.observe())
。因此說總共有四種執行方式。可是底層都會調用到AbstractCommand.toObservable()
方法。
Future
對象,包含着執行結束後返回的單一結果。Observable
對象,它表明操做的多個結果,可是已經被訂閱者消費掉了。Observable
對象,它表明操做的多個結果,須要我們本身手動訂閱並消費掉。在執行邏輯中,大量用到了RxJava
,各類回調處理,看的着實頭暈,感興趣的同窗能夠自行閱讀源碼,我這裏只是介紹一些關鍵的流程點。
①首先會檢查是否命中緩存(toObservable
方法中),命中緩存則直接返回:
1/* try from cache first */
2 if (requestCacheEnabled) {
3 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
4 if (fromCache != null) {
5 isResponseFromCache = true;
6 return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
7 }
8}
複製代碼
②檢查斷路器是否打開,若是斷路器打開,則經過handleShortCircuitViaFallback
直接進行fallback處理:
1private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
2 executionHook.onStart(_cmd);
3
4 /* determine if we're allowed to execute */
5 if (circuitBreaker.allowRequest()) {
6 }else {
7 return handleShortCircuitViaFallback();
8 }
9 ...
10}
複製代碼
③檢查是否用了信號量,若是用了,則判斷是否被佔滿,佔滿後則拋出異常,經過handleSemaphoreRejectionViaFallback
直接轉到fallback中進行執行,不執行後面的邏輯。若是沒用,則會返回一個默認的TryableSemaphoreNoOp.DEFAULT
,在進行executionSemaphore.tryAcquire()
時始終返回true。
1if (executionSemaphore.tryAcquire()) {
2 try {
3 /* used to track userThreadExecutionTime */
4 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
5 return executeCommandAndObserve(_cmd)
6 .doOnError(markExceptionThrown)
7 .doOnTerminate(singleSemaphoreRelease)
8 .doOnUnsubscribe(singleSemaphoreRelease);
9 } catch (RuntimeException e) {
10 return Observable.error(e);
11 }
12} else {
13 return handleSemaphoreRejectionViaFallback();
14}
複製代碼
④執行命令中的邏輯
經過重寫AbstractCommand
中的getExecutionObservable()
方法使得下面兩個命令類中的相應邏輯被調用。
若是run
或者construct
中設置了超時時間,若是執行時間超過了閾值,則會拋出TimeoutException
,或者在執行過程當中拋出其餘異常,都會進入fallback中進行處理邏輯。
⑤發生異常後執行fallback
1 private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd,
2 final HystrixEventType eventType,
3 final FailureType failureType,
4 final String message,
5 final Exception originalException) {
6}
複製代碼
最終都會調用到這個方法,我們看看FailureType
具體有哪幾種類型。
run
方法或者construct
方法拋出異常時。handleShortCircuitViaFallback
方法。Hystrix
中大量用了RxJava
,閱讀源碼看起來難免會以爲頭暈,能夠考慮在關鍵點打幾個斷點看看,否則各類回調會讓你繞圈圈。不過我的以爲RxJava
代碼看起來仍是蠻優美的,只不過有些許不適應而已,後面有時間會研究一下RxJava
。