第五章 服務容錯保護:Spring Cloud Hystrix

  在微服務架構中,咱們將系統拆分爲不少個服務,各個服務之間經過註冊與訂閱的方式相互依賴,因爲各個服務都是在各自的進程中運行,就有可能因爲網絡緣由或者服務自身的問題致使調用故障或延遲,隨着服務的積壓,可能會致使服務崩潰。爲了解決這一系列的問題,斷路器等一系列服務保護機制出現了。java

  斷路器自己是一種開關保護機制,用於在電路上保護線路過載,當線路中有電器發生短路時,斷路器可以及時切斷故障電路,防止發生過載、發熱甚至起火等嚴重後果。web

  在分佈式架構中,斷路器模式的做用也是相似的。spring

  針對上述問題,Spring Cloud Hystrix 實現了斷路器、線路隔離等一系列服務保護功能。它也是基於 Netflix 的開源框架 Hystrix 實現的,該框架的目標在於經過控制那些訪問遠程系統、服務和第三方庫的節點,從而對延遲和故障提供更強大的容錯能力。Hystrix 具有服務降級、服務熔斷、線程和信號隔離、請求緩存、請求合併以及服務監控等強大功能。apache

快速入門

  在開始實現斷路器以前,先用以前實現的一些內容做爲基礎,構建一個以下圖所示的服務調用關係。編程

  須要啓動的工程有以下一些:緩存

  • eureka-server 工程:服務註冊中心,端口爲8082。
  • hello-service 工程:HELLO-SERVICE 的服務單元,兩個實例啓動端口分別爲 2221 和 2222.
  • ribbon-consumer 工程:使用 Ribbon 實現的服務消費者,端口爲 3333

   在未加入斷路器以前,關閉8081的實例,發送 GET 請求到 http://localhost:3333/ribbon-consumer ,能夠獲取下面的輸入。網絡

  下面引入 Spring Cloud Hystrix。架構

  • 在 ribbon-consumer 工程的 pom.xml 的 dependency 節點中引入 spring-cloud-starter-hystrix 依賴:
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
  • 在 ribbon-consumer 工程的主類上使用 @EnableCircuitBreaker 註解開啓斷路器功能:
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class DemoApplication {

    @Bean
    @LoadBalanced
    RestTemplate restTemplate(){
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

  注:此處還可使用 Spring Cloud 應用中的 @SpringCloudApplication 註解來修飾主類,該註解的具體定義以下。能夠看到,該註解中包含了上述所引用的三個註解,這意味着一個 Spring Cloud 標準應用應包含服務發現以及斷路器。app

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.springframework.cloud.client;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootApplication
@EnableDiscoveryClient
@EnableCircuitBreaker
public @interface SpringCloudApplication {
}
  • 改造服務消費方式,新增 HelloService 類,注入 RestTemplate 實例。而後,將在 ConsumerController 中對 RestTemplate 的使用遷移到 helloService 函數中,最後,在 helloService 函數上增長 @HystrixCommand 註解來指定回調方法。
package com.example.demo.web;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.client.RestTemplate;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(fallbackMethod = "helloFallback")
    public String helloService(){
        return restTemplate.getForEntity("http://hello-service/index",
                String.class).getBody();
    }

    public String helloFallback(){
        return "error";
    }
}
  • 修改 ConsumerController 類, 注入上面實現的 HelloService 實例,並在 helloConsumer 中進行調用:
package com.example.demo.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-9
 */

@RestController
public class ConsumerController {

    @Autowired
    HelloService helloService;

    @RequestMapping(value = "ribbon-consumer", method = RequestMethod.GET)
    public String helloConsumer(){
        return helloService.helloService();
    }
}

  下面,對斷路器實現的服務回調邏輯進行驗證,從新啓動以前關閉的 2221 端口的 hello-service,確保此時服務註冊中心、兩個 hello-service 和 ribbon-consumer 均已啓動,再次訪問 http://localhost:3333/ribbon-consumer 能夠輪詢兩個 hello-serive 並返回一些文字信息。此時斷開其中任意一個端口的 hello-service,再次訪問,當輪詢到關閉的端口服務時,輸出內容爲 error ,再也不是以前的提示信息。框架

  除了經過斷開具體的服務實例來模擬某個節點沒法訪問的狀況以外,還能夠模擬一下服務阻塞(長時間未響應)的狀況。下面對hello-serive 的 /index 接口作一些修改,具體以下:

 

package com.example.demo.web;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-9
 */

@RestController
public class HelloController {

    private final Logger logger = Logger.getLogger(getClass());

    @Autowired
    private DiscoveryClient client;

    @RequestMapping(value = "/index")
    public String index(){
        ServiceInstance instance = client.getLocalServiceInstance();
        // 讓處理線程等待幾秒鐘
        int sleepTime = new Random().nextInt(3000);
        logger.info("sleepTime:"+sleepTime);

        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("/hello:host:"+instance.getHost()+" port:"+instance.getPort()
                +" service_id:"+instance.getServiceId());
        return "hello world!";
    }
}

  經過Thread.sleep 函數可以讓 /index 接口的處理線程不是立刻返回內容,而是在阻塞幾秒後才返回內容。因爲 Hystrix 默認超時時間爲 2000 毫秒,因此這裏採用了 0 至 3000 的隨機數以讓處理過程有必定機率發生超時來觸發斷路器。爲了更精確的觀察斷路器的觸發,在消費者調用函數中作一些時間記錄,具體以下:

package com.example.demo.web;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.client.RestTemplate;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(fallbackMethod = "helloFallback")
    public String helloService(){
        long beginTime = System.currentTimeMillis();
        String body = restTemplate.getForEntity("http://hello-service/index",
                String.class).getBody();
        long endTime = System.currentTimeMillis();
        System.out.println("Spend Time : "+ (endTime - beginTime));
        return body;
    }

    public String helloFallback(){
        return "error";
    }
}

 

原理分析

工做流程

  1. 建立 HystrixCommand 或 HystrixObservableCommand 對象

  首先,建立一個 HystrixCommand 或 HystrixObservableCommand 對象,用來表示對依賴服務的操做請求,同時傳遞全部須要的參數。從其命名中咱們就能知道它採用了「命令模式」 來實現服務調用操做的封裝。而這兩個 Command 對象分別針對不一樣的應用場景。

  • HystrixCommand :用在依賴的服務返回單個操做結果的時候。
  • HystrixObservableCommand :用在依賴的服務返回多個操做結果的時候。

  命令模式,未來自客戶端的請求封裝成一個對象,從而讓你可使用不一樣的請求對客戶端進行參數化。它能夠被用於實現「行爲請求者」 與 「行爲實現者」 的解耦,以便使二者能夠適應變化。下面的示例是對命令模式的簡單實現:

package com.example.demo.command;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

// 接收者
public class Receiver {
    public void active(){
        //真正的業務邏輯
        System.out.println("測試命令模式");
    }
}
package com.example.demo.command;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */
//抽象命令
public interface Command {
    void excute();
}
package com.example.demo.command;

import org.springframework.beans.factory.annotation.Autowired;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

//具體命令實現
public class CommandImpl implements Command {
    private Receiver receiver;

    public CommandImpl(Receiver receiver) {
        this.receiver = receiver;
    }

    @Override
    public void excute() {
        this.receiver.active();
    }
}
package com.example.demo.command;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

//客戶端調用
public class Invoker {

    private Command command;

    public void setCommand(Command command) {
        this.command = command;
    }

    public void  active (){
        command.excute();
    }
}
package com.example.demo.command;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

public class Client {
    public static void main(String[] args) {
        Receiver receiver = new Receiver();
        Command command = new CommandImpl(receiver);
        Invoker invoker = new Invoker();
        invoker.setCommand(command);
        invoker.active(); //客戶端經過調用者來執行命令
    }
}

  從代碼中,能夠看到這樣幾個對象。

  • Receiver:接收者,它知道如何處理具體的業務邏輯。
  • Command:抽象命令,它定義了一個命令對象應具有的一系列命令操做,好比 execute 等。當命令操做被調用的時候就會觸發接收者去作具體命令對應的業務邏輯。
  • CommandImpl:具體的命令實現,在這裏它綁定了命令操做與接收者之間的關係,execute 命令的實現委託給了 Receiver 的 action 函數。
  • Invoker:調用者,它持有一個命令對象,而且能夠在須要的時候經過命令對象完成具體的業務邏輯。

  從上面的示例中,咱們能夠看到,調用者 Invoker 與操做者 Receiver 經過 Command 命令接口實現瞭解耦。對於調用者來講,咱們能夠爲其注入多個命令操做,調用者只需在須要的時候直接調用便可,而不須要知道這些操做命令實際是如何實現的。而在這裏所提到的 HystrixCommand 和 HystrixObservableCommand 則是在 Hystrix 中對 Command 的進一步抽象定義。

  2. 命令執行

  命令執行方式一共有4種,而 Hystrix 在執行時會根據建立的Command對象以及具體的狀況來選擇一種執行。其中 HystrixCommand 實現了下面兩個執行方式。

  • execute():同步執行,從依賴的服務返回一個單一的結果對象,或是在發生錯誤的時候拋出異常。
  • queue():異步執行,直接返回一個 Future 對象,其中包含了服務執行結束時要返回的單一結果對象。
R execute();

Future<R> queue();

  而 HystrixObservableCommand 實現了另外兩種執行方式。

  • observe():返回 Observable 對象,它表明了操做的多個結果,它是一個 HotObservable。
  • toObservable():一樣返回 Observable 對象,也表明了操做的多個結果,但它返回的是一個 Cold Observable。
Observable<R> observe();

Observable<R> toObservable();

  在 Hystrix 的底層實現中大量使用了 RxJava ,爲了更容易的理解後續內容,在這裏對 RxJava 的觀察者-訂閱者模式作一個簡單的入門介紹。

  上面提到的 Observable 對象就是 RxJava 中的核心內容之一,能夠理解爲 「事件源」 或者 「被觀察者」,與其對應的 Subscriber 對象,能夠理解爲 「訂閱者」 或者 「觀察者」。這兩個對象是 RxJava 響應式編程的重要組成部分。

  • Observable 用來向訂閱者 Subscriber 對象發佈事件,Subscriber 對象則在接收到事件後對其進行處理,而在這裏所指的事件一般就是對依賴服務的調用。
  • 一個 Observable 能夠發出多個事件,知道結束或者發生異常。
  • Observable 對象每發出一個事件,就會調用對應觀察者 Subscriber 對象的 onNext() 方法。
  • 每個 Observable 的執行,最後必定會經過調用 Subscriber.onCompleted() 或者 Subscriber.onError() 來結束該事件的操做流。

  下面經過一個簡單的例子來直觀理解一下 Observable 與 Subscribers:

package com.example.demo.Observable_Subsciber;

import rx.Observable;
import rx.Subscriber;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

public class Obs_Subs {

    public static void main(String[] args) {

        //建立事件源
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello RxJava ");
                subscriber.onNext("I'm XX");
                subscriber.onCompleted();
            }
        });

        //建立訂閱者
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String s) {

            }
        };

        observable.subscribe(subscriber);

    }
}

  在該示例中,建立了一個簡單的事件源 observable,一個對事件傳遞內容輸出的訂閱者 subscriber ,經過 observable.subscribe(subscriber) 來觸發事件的發佈。

  在這裏咱們對於事件源 observable 提到了兩個不一樣的概念:Hot Observable 和 Cold Observable ,分別對應了上面的 command.observe() 和 command.toObservable() 的返回對象。其中 HotObservable,不論 「事件源」 是否有 「訂閱者」 ,都會在建立後對事件進行發佈,因此對於 Hot Observable 的每個 「訂閱者」 都有多是從 「事件源」 的中途開始的,並可能只是看到了整個操做的局部過程。而 Cold Observable 在沒有 「訂閱者」 的時候並不會發佈事件,而是進行等待,直到有 「訂閱者」 以後才發佈事件,因此對於 Cold Observable 的訂閱者,它能夠保證從一開始看到整個操做的所有過程。

  3. 結果是否被緩存

  若當前命令的請求緩存功能是被啓用的,而且該命令緩存命中,那麼緩存的結果會當即以 Observable 對象的形式返回。

  4. 斷路器是否打開

  在命令結果沒有緩存命中的時候,Hystrix 在執行命令前須要檢查斷路器是否爲打開狀態:

  • 打開:Hystrix不執行命令,轉到 fallback 處理邏輯(對應下面第8步)。
  • 關閉:Hystrix 跳到第5步,檢查是否有可用資源來執行命令。

  5. 線程池 / 請求隊列 / 信息量是否佔滿

  若是與命令相關的線程池 / 請求隊列 / 信息量已經佔滿,那麼 Hystrix 不會執行命令,跳轉到 fallback 處理邏輯(對應下面第8步)。

  注意:此處的線程池並不是容器的線程池,而是每一個依賴服務的專有線程池。Hystrix 爲了保證不會由於某個依賴服務的問題影響到其餘依賴服務而採用了 「艙壁模式」 來隔離每一個依賴的服務。

  6. HystrixObservableCommand.construct() 或 HystrixCommand.run() 

  Hystrix 會根據編寫的方法來決定採起什麼樣的方式去請求依賴服務。

  • HystrixCommand.run() :返回一個單一的結果,或者拋出異常。
  • HystrixObservableCommand.construct():返回一個 Observable 對象來發射多個結果,或經過 onError 發送錯誤通知。

  若是 run() 或 construct() 方法的執行時間超過了命令設置的超時閾值,當前處理線程會拋出 TimeoutException。這種狀況下,也會跳轉到 fallback 處理邏輯(第8步)。

   7. 計算斷路器的健康度

  Hystrix 會將 「成功」、「失敗」、「拒絕」、「超時」 等信息報告給斷路器,而斷路器會維護一組計數器來統計這些數據。

  斷路器會使用這些統計數據來決定是否要將斷路器打開,來對某個依賴服務的請求進行 「熔斷 / 短路」,直到恢復期結束。

  8. fallback 處理

  當命令執行失敗的時候,Hystrix 會進入 fallback 嘗試回退處理,咱們一般也稱爲 「服務降級」。下面就是可以引起服務降級處理的幾種狀況:

  • 第4步,當前命令處於 「熔斷 / 短路」 狀態,斷路器是打開的時候。
  • 第5步,當前命令的線程池、請求隊列或者信號量被佔滿的時候。
  • 第6步,HystrixObservableCommand.construct() 或者 HystrixCommand.run() 拋出異常的時候。

  九、返回成功的響應

  

斷路器原理

  HystrixCircuitBreaker 的定義:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.netflix.hystrix;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscriber;
import rx.Subscription;

public interface HystrixCircuitBreaker {
    boolean allowRequest();

    boolean isOpen();

    void markSuccess();

    void markNonSuccess();

    boolean attemptExecution();

    public static class NoOpCircuitBreaker implements HystrixCircuitBreaker {
        public NoOpCircuitBreaker() {
        }

        public boolean allowRequest() {
            return true;
        }

        public boolean isOpen() {
            return false;
        }

        public void markSuccess() {
        }

        public void markNonSuccess() {
        }

        public boolean attemptExecution() {
            return true;
        }
    }

    public static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;
        private final AtomicReference<HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status> status;
        private final AtomicLong circuitOpened;
        private final AtomicReference<Subscription> activeSubscription;

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.status = new AtomicReference(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED);
            this.circuitOpened = new AtomicLong(-1L);
            this.activeSubscription = new AtomicReference((Object)null);
            this.properties = properties;
            this.metrics = metrics;
            Subscription s = this.subscribeToStream();
            this.activeSubscription.set(s);
        }

        private Subscription subscribeToStream() {
            return this.metrics.getHealthCountsStream().observe().subscribe(new Subscriber() {
                public void onCompleted() {
                }

                public void onError(Throwable e) {
                }

                public void onNext(HealthCounts hc) {
                    if(hc.getTotalRequests() >= (long)((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerRequestVolumeThreshold().get()).intValue() && hc.getErrorPercentage() >= ((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerErrorThresholdPercentage().get()).intValue() && HystrixCircuitBreakerImpl.this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) {
                        HystrixCircuitBreakerImpl.this.circuitOpened.set(System.currentTimeMillis());
                    }

                }
            });
        }

        public void markSuccess() {
            if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED)) {
                this.metrics.resetStream();
                Subscription previousSubscription = (Subscription)this.activeSubscription.get();
                if(previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }

                Subscription newSubscription = this.subscribeToStream();
                this.activeSubscription.set(newSubscription);
                this.circuitOpened.set(-1L);
            }

        }

        public void markNonSuccess() {
            if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) {
                this.circuitOpened.set(System.currentTimeMillis());
            }

        }

        public boolean isOpen() {
            return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L);
        }

        public boolean allowRequest() {
            return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow())));
        }

        private boolean isAfterSleepWindow() {
            long circuitOpenTime = this.circuitOpened.get();
            long currentTime = System.currentTimeMillis();
            long sleepWindowTime = (long)((Integer)this.properties.circuitBreakerSleepWindowInMilliseconds().get()).intValue();
            return currentTime > circuitOpenTime + sleepWindowTime;
        }

        public boolean attemptExecution() {
            return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(this.isAfterSleepWindow()?this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN):false)));
        }

        static enum Status {
            CLOSED,
            OPEN,
            HALF_OPEN;

            private Status() {
            }
        }
    }

    public static class Factory {
        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap();

        public Factory() {
        }

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            HystrixCircuitBreaker previouslyCached = (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name());
            if(previouslyCached != null) {
                return previouslyCached;
            } else {
                HystrixCircuitBreaker cbForCommand = (HystrixCircuitBreaker)circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreaker.HystrixCircuitBreakerImpl(key, group, properties, metrics));
                return cbForCommand == null?(HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()):cbForCommand;
            }
        }

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
            return (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name());
        }

        static void reset() {
            circuitBreakersByCommand.clear();
        }
    }
}

  主要定義了三個斷路器的抽象方法。

  • allowRequest:Hystrix 命令的請求經過它判斷是否被執行。
  • isOpen:返回當前斷路器是否打開。
  • markSuccess:用來閉合斷路器。

  另外還有三個靜態類。

  • 靜態類 Factory 中維護了一個 Hystrix 命令與 HystrixCircuitBreaker 的關係集合:ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand,其中 String 類型的key 經過 HystrixCommandKey 定義,每個 Hystrix 命令須要有一個 key 來標識,同時一個 Hystrix 命令也會在該集合中找到它對應的斷路器 HystrixCircuitBreaker 實例。
  • 靜態類 NoOpCircuitBreaker 定義了一個什麼都不作的斷路器實現,它容許全部請求,而且斷路器狀態始終閉合。
  • 靜態類 HystrixCircuitBreakerImpl 是斷路器接口 HystrixCIrcuitBreaker 的實現類,在該類中定義斷路器的 4 個核心對象。  
    • HystrixCommandProperties properties :斷路器對應 HystrixCommand 實例的屬性對象。
    • HystrixCommandMetrics metrics :用來讓 HystrixCommand 記錄各種度量指標的對象。
    • AtomicLong circuitOpened :斷路器打開或是上一次測試的事件戳。

  HystrixCircuitBreakerImpl 的各個實現方法以下:

  • isOpen:判斷斷路器的打開 / 關閉狀態。
 public boolean isOpen() {
  return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L);
 }
  • allowRequest:判斷請求是否被容許:
 public boolean allowRequest() {
  return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow())));
 }
  • markSuccess:「半開路」 狀態時使用。若Hystrix命令調用成功,經過該方法將打開的斷路器關閉,並重置度量指標對象。
 public void markNonSuccess() {
  if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) {
    this.circuitOpened.set(System.currentTimeMillis());
  }
}

 

依賴隔離

  Hystrix 使用 「艙壁模式」 實現線程池的隔離,它爲每個依賴服務建立一個獨立的線程池,就算某個依賴服務出現延遲太高的狀況,也不會拖慢其餘的依賴服務。

 

使用詳解

建立請求命令

  Hystrix 命令就是咱們以前所說的 HystrixCommand,它用來封裝具體的依賴服務調用邏輯。

  能夠經過繼承的方式來實現,好比:

相關文章
相關標籤/搜索