聊聊微服務熔斷降級Hystrix

  在如今的微服務使用的過程當中,常常會遇到依賴的服務不可用,那麼若是依賴的服務不可用的話,會致使把本身的服務也會拖死,那麼就產生了熔斷,熔斷顧名思義就是當服務處於不可用的時候採起半開關的狀態,達到必定數量後就熔斷器就打開。這就至關於家裏邊的保險絲,若是電壓太高的話,保險絲就會斷掉,起到保護電器的做用。java

  目前支持熔斷,降級的就是Hystrix,固然還有resilience4j還有Sentinel。今天我們以Hystrix爲主吧。其餘的你們能夠自行研究。併發

  Hystrix主要實現三個功能,接下來我們繼續展開。異步

  1. 資源隔離ide

  2. 熔斷微服務

  3. 降級ui

  資源隔離分爲兩種,一種是線程池隔離,一種是信號量semaphore隔離。線程池以請求的線程和執行的線程分爲不一樣的線程執行,信號量是請求和執行採用相同的線程。this

  固然,涉及到線程池的話,那麼就支持異步,支持異步Future的話也就支持get的時候支持超時獲取。信號量這些功能不支持,可是信號量是支持熔斷,限流。他們的區別以下:spa

  線程切換 異步 超時 熔斷 限流 資源開銷
線程池
信號量

  HystrixCommand的命令執行大體以下圖:.net

  依賴的pom以下:線程

        <!-- 依賴版本 -->
        <hystrix.version>1.3.16</hystrix.version>
        <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>

        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>${hystrix.version}</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>${hystrix-metrics-event-stream.version}</version>
        </dependency>

  支持同步,異步,觀察事件攔截,以及訂閱方式,下面我們直接看代碼實現吧。你們一看就明白了:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author huangqingshi
 * @Date 2019-03-17
 */
public class HelloWorldCommand extends HystrixCommand<String> {

    private final String name;

    public HelloWorldCommand(String name) {
        //指定命令組名
        super(HystrixCommandGroupKey.Factory.asKey("myGroup"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        //邏輯封裝在run裏邊
        return "Hello:" + name + " thread:" + Thread.currentThread().getName();
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        //每一個Command只能調用一次,不能重複調用。重複調用會報異常。
        HelloWorldCommand  helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
        //execute同步調用 等同於:helloWorldCommand.queue().get();
        String result = helloWorldCommand.execute();
        System.out.println("result:" + result);

        helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
        //異步調用
        Future<String> future = helloWorldCommand.queue();
        //get能夠指定獲取的時間100毫秒,默認爲1秒
        result = future.get(100, TimeUnit.MILLISECONDS);
        System.out.println("result:" + result);
        System.out.println("main thread:" + Thread.currentThread().getName());

        testObserve();

    }

    public static void testObserve() {
        //註冊觀察者事件攔截
        Observable<String> observable = new HelloWorldCommand("observe").observe();
        //註冊回調事件
        observable.subscribe(new Action1<String>() {
            @Override
            public void call(String result) {
                //result就是調用HelloWorldCommand的結果
                System.out.println("callback:" + result);
            }
        });
        //註冊完成版的事件
        observable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted調用:onNext : onError以後調用");
            }

            @Override
            public void onError(Throwable throwable) {
                //異常產生了以後會調用
                System.out.println("onError:" + throwable.getMessage());
            }

            @Override
            public void onNext(String s) {
                //獲取結果後回調
                System.out.println("onNext:" + s);
            }
        });
    }

}
執行結果以下:

result:Hello:Synchronous-hystrix thread:hystrix-myGroup-1
result:Hello:Asynchronous-hystrix thread:hystrix-myGroup-2
main thread:main
callback:Hello:observe thread:hystrix-myGroup-3
onNext:Hello:observe thread:hystrix-myGroup-3
onCompleted調用:onNext : onError以後調用

  接下來是線程池隔離的例子:

import com.netflix.hystrix.*;

/**
 * @author huangqingshi
 * @Date 2019-03-17
 */
public class ThreadPoolCommand extends HystrixCommand<String> {

    private String name;

    public ThreadPoolCommand(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
             .andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
             .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                .withCircuitBreakerRequestVolumeThreshold(10) //至少10個請求,熔斷器才進行錯誤計算 默認值20
                .withCircuitBreakerSleepWindowInMilliseconds(5000) //熔斷終端5秒後會進入半打開狀態
                .withCircuitBreakerErrorThresholdPercentage(50)    //錯誤率達到50開啓熔斷保護
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                //10個核心線程
             ).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
        );
        this.name = name;
    }


    @Override
    protected String run() throws Exception {
        return "threadPoolCommand name:" + name;
    }

    public static void main(String[] args) {
        ThreadPoolCommand threadPoolCommand = new ThreadPoolCommand("threadPool");
        String result = threadPoolCommand.execute();
        System.out.println("result:" + result);
    }
}

執行結果:
result:threadPoolCommand name:threadPool

  信號量隔離例子:

/**
 * @author huangqingshi
 * @Date 2019-03-17
 */
public class SemaphoreCommand extends HystrixCommand<String> {

    private String name;

    public SemaphoreCommand(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("semaphoreGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("semaphoreCommand"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                 //至少10個請求,熔斷器纔會進行錯誤率的計算 默認值20
                .withCircuitBreakerRequestVolumeThreshold(10)
                 //熔斷器中斷請求5秒後會自動進入半打開狀態,放部分流量進行重試 默認值5000ms
                .withCircuitBreakerSleepWindowInMilliseconds(5000)
                //錯誤率達到50開啓熔斷保護
                .withCircuitBreakerErrorThresholdPercentage(50)
                 //設置隔離策略
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                 //最大併發量10
                .withExecutionIsolationSemaphoreMaxConcurrentRequests(10)
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        return "semaphore success name:" + name;
    }

    @Override
    protected String getFallback() {
        return "semaphore fallback name:" + name;
    }

    public static void main(String[] args) {
        SemaphoreCommand semaphoreCommand = new SemaphoreCommand("semaphoreCommand");
        String result = semaphoreCommand.execute();
        System.out.println(result);
    }

}
執行結果:
semaphore success name:semaphoreCommand

  在執行的過程當中,若是出現調用服務的時候出現錯誤的時候會先進行熔斷,就是若是流量達到設置的量的時候進行統計,好比10個請求,而後若是出現錯誤率超過配置的錯誤率就會進行將熔斷進行打開,打開以後會進行調用降級方法fallback。過了一段時間後,能夠放行部分流量,若是流量正常了,則會將熔斷器開關關閉。下圖是來自官方文檔截圖,裏邊維護者一個bucket,每秒一個bucket,裏邊記錄着成功,失敗,超時,拒絕。這個週期是經過withCircuitBreakerSleepWindowInMilliseconds配置的。

  接下來我們看一降低級,也就是熔斷器打開的時候,會走fallback方法,繼續看例子。

import com.netflix.hystrix.*;

/**
 * @author huangqingshi
 * @Date 2019-03-17
 */
public class ThreadPoolCommand extends HystrixCommand<String> {

    private String name;

    public ThreadPoolCommand(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
             .andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
             .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                .withCircuitBreakerRequestVolumeThreshold(10) //至少10個請求,熔斷器才進行錯誤計算 默認值20
                .withCircuitBreakerSleepWindowInMilliseconds(5000) //熔斷終端5秒後會進入半打開狀態
                .withCircuitBreakerErrorThresholdPercentage(50)    //錯誤率達到50開啓熔斷保護
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                //10個核心線程
             ).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
        );
        this.name = name;
    }


    @Override
    protected String run() throws Exception {
        return "threadPoolCommand name:" + name;
    }

    public static void main(String[] args) {
        ThreadPoolCommand threadPoolCommand = new ThreadPoolCommand("threadPool");
        String result = threadPoolCommand.execute();
        System.out.println("result:" + result);
    }
}
執行結果:
result:executed fallback
而且拋出超時異常。由於程序故意設計超時的。

  固然,Hystrixcommand還支持primary或secondary的方式,能夠先看看流程圖:

  是否執行primary是經過參數primarySecondary.userPrimary爲true時執行。false的時候執行secondary方式。

/**
 * @author huangqingshi
 * @Date 2019-03-18
 */
public class PrimarySecondaryFacade extends HystrixCommand<String> {

    private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().
            getBooleanProperty("primarySecondary.usePrimary", true);

    private int id;

    public PrimarySecondaryFacade(int id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("primarySecondCommand"))
             //此處採用信號量,primary、secondary採用線程池
            .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
                    HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
            )

        );
        this.id = id;
    }

    @Override
    protected String run() throws Exception {
        if(usePrimary.get()) {
            return new PrimaryCommand(id).execute();
        } else {
            return new SecondaryCommand(id).execute();
        }
    }

    @Override
    protected String getFallback() {
        return "static-fallback-" + id;
    }

    @Override
    protected String getCacheKey() {
        return String.valueOf(id);
    }

    private static class PrimaryCommand extends HystrixCommand<String> {

        private final int id;

        private PrimaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
                            withExecutionIsolationThreadTimeoutInMilliseconds(600)));
            this.id = id;
        }

        @Override
        protected String run() {
            return "responseFromPrimary-" + id;
        }

    }

    private static class SecondaryCommand extends HystrixCommand<String> {

        private final int id;

        private SecondaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
                            withExecutionIsolationThreadTimeoutInMilliseconds(600)));
            this.id = id;
        }

        @Override
        protected String run() {
            return "responseFromSecondary-" + id;
        }

    }

    public static class UnitTest {

        @Test
        public void testPrimary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
                assertEquals("responseFromPrimary-100", new PrimarySecondaryFacade(100).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }

        @Test
        public void testSecondary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
                assertEquals("responseFromSecondary-100", new PrimarySecondaryFacade(100).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }
    }


}

  好了,這個基本上就是Hystrix的基本功能,可是有個問題就是Hystrix已經不維護了,可是目前的穩定版你們也都在使用,因此列出來了。固然也推薦你們使用Sentinel,功能比較強大,就是自適應限流功能等,功能也很是強大,後續研究以後再出相關文章吧。這個文章就當你們的一個敲門磚吧,有問題請及時告知,謝謝。

相關文章
相關標籤/搜索