翻譯:Hystrix - How To Use

轉載請註明出處: 翻譯:Hystrix - How To Usehtml

Hello World!

下面的代碼展現了HystrixCommand版的Hello World:java

public class CommandHelloWorld extends HystrixCommand<String> {

    private final String name;

    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        // a real example would do work like a network call here
        return "Hello " + name + "!";
    }
}

查看源碼react

HystrixObservableCommand的同等實現以下:git

public class CommandHelloWorld extends HystrixObservableCommand<String> {

    private final String name;

    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> observer) {
                try {
                    if (!observer.isUnsubscribed()) {
                        // a real example would do work like a network call here
                        observer.onNext("Hello");
                        observer.onNext(name + "!");
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
         } ).subscribeOn(Schedulers.io());
    }
}

Synchronous Execution

能夠經過調用HystrixCommand.execute()方法實現同步執行, 示例以下:github

String s = new CommandHelloWorld("World").execute();

測試以下:web

@Test
        public void testSynchronous() {
            assertEquals("Hello World!", new CommandHelloWorld("World").execute());
            assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute());
        }

HystrixObservableCommand不提供同步執行方法, 可是若是肯定其只會產生一個值, 那麼也能夠用以下方式實現:後端

  1. HystrixObservableCommand.observe().observe().toBlocking().toFuture().get()
  2. HystrixObservableCommand.toObservable().observe().toBlocking().toFuture().get()

若是實際上產生了多個值, 上述的代碼將會拋出java.lang.IllegalArgumentException: Sequence contains too many elements.緩存

Asynchronous Execution

能夠經過調用HystrixCommand.queue()方法實現異步執行, 示例以下:cookie

Future<String> fs = new CommandHelloWorld("World").queue();

此時能夠經過Future.get()方法獲取command執行結果:網絡

String s = fs.get();

測試代碼以下:

@Test
        public void testAsynchronous1() throws Exception {
            assertEquals("Hello World!", new CommandHelloWorld("World").queue().get());
            assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get());
        }

        @Test
        public void testAsynchronous2() throws Exception {

            Future<String> fWorld = new CommandHelloWorld("World").queue();
            Future<String> fBob = new CommandHelloWorld("Bob").queue();

            assertEquals("Hello World!", fWorld.get());
            assertEquals("Hello Bob!", fBob.get());
        }

下面的兩種實現是等價的:

String s1 = new CommandHelloWorld("World").execute();
String s2 = new CommandHelloWorld("World").queue().get();

HystrixObservableCommand不提供queue方法, 可是若是肯定其只會產生一個值, 那麼也能夠用以下方式實現:

  1. HystrixObservableCommand.observe().observe().toBlocking().toFuture()
  2. HystrixObservableCommand.toObservable().observe().toBlocking().toFuture()

若是實際上產生了多個值, 上述的代碼將會拋出java.lang.IllegalArgumentException: Sequence contains too many elements.

Reactive Execution

你也能夠將HystrixCommand當作一個可觀察對象(Observable)來觀察(Observe)其產生的結果, 可使用如下任意一個方法實現:

  1. observe(): 一旦調用該方法, 請求將當即開始執行, 其利用ReplaySubject特性能夠保證不會丟失任何command產生的結果, 即便結果在你訂閱以前產生的也不會丟失.
  2. toObservable(): 調用該方法後不會當即執行請求, 而是當有訂閱者訂閱時纔會執行.
Observable<String> ho = new CommandHelloWorld("World").observe();
// or Observable<String> co = new CommandHelloWorld("World").toObservable();

而後你能夠經過訂閱到這個Observable來取得command產生的結果:

ho.subscribe(new Action1<String>() {

    @Override
    public void call(String s) {
         // value emitted here
    }

});

測試以下:

@Test
public void testObservable() throws Exception {

    Observable<String> fWorld = new CommandHelloWorld("World").observe();
    Observable<String> fBob = new CommandHelloWorld("Bob").observe();

    // blocking
    assertEquals("Hello World!", fWorld.toBlockingObservable().single());
    assertEquals("Hello Bob!", fBob.toBlockingObservable().single());

    // non-blocking 
    // - this is a verbose anonymous inner-class approach and doesn't do assertions
    fWorld.subscribe(new Observer<String>() {

        @Override
        public void onCompleted() {
            // nothing needed here
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onNext(String v) {
            System.out.println("onNext: " + v);
        }

    });

    // non-blocking
    // - also verbose anonymous inner-class
    // - ignore errors and onCompleted signal
    fBob.subscribe(new Action1<String>() {

        @Override
        public void call(String v) {
            System.out.println("onNext: " + v);
        }

    });
}

使用Java 8的Lambda表達式可使代碼更簡潔:

fWorld.subscribe((v) -> {
        System.out.println("onNext: " + v);
    })
    
    // - or while also including error handling
    
    fWorld.subscribe((v) -> {
        System.out.println("onNext: " + v);
    }, (exception) -> {
        exception.printStackTrace();
    })

關於Observable的信息能夠在這裏查閱

Reactive Commands

相比將HystrixCommand使用上述方法轉換成一個Observable, 你也能夠選擇建立一個HystrixObservableCommand對象. HystrixObservableCommand包裝的Observable容許產生多個結果(譯者注: Subscriber.onNext能夠調用屢次), 而HystrixCommand即便轉換成了Observable也只能產生一個結果.

使用HystrixObservableCommnad時, 你須要重載construct方法來實現你的業務邏輯, 而不是重載run方法, contruct方法將會返回你須要包裝的Observable.

使用下面任意一個方法能夠從HystrixObservableCommand中獲取Observable對象:

  1. observe(): 一旦調用該方法, 請求將當即開始執行, 其利用ReplaySubject特性能夠保證不會丟失任何command產生的結果, 即便結果在你訂閱以前產生的也不會丟失.
  2. toObservable(): 調用該方法後不會當即執行請求, 而是當有訂閱者訂閱時纔會執行.

Fallback

大多數狀況下, 咱們都但願command在執行失敗時可以有一個候選方法來處理, 如: 返回一個默認值或執行其餘失敗處理邏輯, 除了如下幾個狀況:

  1. 執行寫操做的command: 當command的目標是執行寫操做而不是讀操做, 那麼一般須要將寫操做失敗的錯誤交給調用者處理.
  2. 批處理系統/離線計算: 若是command的目標是作一些離線計算、生成報表、填充緩存等, 那麼一樣應該將失敗交給調用者處理.

不管command是否實現了getFallback()方法, command執行失敗時, Hystrix的狀態和斷路器(circuit-breaker)的狀態/指標都會進行更新.

HystrixCommand能夠經過實現getFallback()方法來實現降級處理, run()方法異常、執行超時、線程池或信號量已滿拒絕提供服務、斷路器短路時, 都會調用getFallback():

public class CommandHelloFailure extends HystrixCommand<String> {

    private final String name;

    public CommandHelloFailure(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        throw new RuntimeException("this command always fails");
    }

    @Override
    protected String getFallback() {
        return "Hello Failure " + name + "!";
    }
}

查看源碼

這個命令的run()方法老是會執行失敗, 可是調用者老是能收到getFallback()方法返回的值, 而不是收到一個異常:

@Test
    public void testSynchronous() {
        assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute());
        assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute());
    }

HystrixObservableCommand能夠經過重載resumeWithFallback方法實現原Observable執行失敗時返回回另外一個Observable, 須要注意的是, 原Observable有可能在發出多個結果以後纔出現錯誤, 所以在fallback實現的邏輯中不該該假設訂閱者只會收到失敗邏輯中發出的結果.

Hystrix內部使用了RxJavaonErrorResumeNext操做符來實現Observable之間的無縫轉移.

Error Propagation

HystrixBadRequestException異常外, run方法中拋出的全部異常都會被認爲是執行失敗且會觸發getFallback()方法和斷路器的邏輯.

你能夠在HystrixBadRequestException中包裝想要拋出的異常, 而後經過getCause()方法獲取. HystrixBadRequestException使用在不該該被錯誤指標(failure metrics)統計和不該該觸發getFallback()方法的場景, 例如報告參數不合法或者非系統異常等.

對於HystrixObservableCommand, 不可恢復的錯誤都會在經過onError方法通知, 並經過獲取用戶實現的resumeWithFallback()方法返回的Observable來完成回退機制.

執行異常類型

Failure Type Exception class Exception.cause
FAILURE HystrixRuntimeException underlying exception(user-controlled)
TIMEOUT HystrixRuntimeException j.u.c.TimeoutException
SHORT_CIRCUITED HystrixRuntimeException j.l.RuntimeException
THREAD_POOL_REJECTED HystrixRuntimeException j.u.c.RejectedExecutionException
SEMAPHORE_REJECTED HystrixRuntimeException j.l.RuntimeException
BAD_REQUEST HystrixBadRequestException underlying exception(user-controller)

Command Name

默認的command name是從類名中派生的:

getClass().getSimpleName()

能夠經過HystrixCommandHystrixObservableCommand的構造器來指定command name:

public CommandHelloWorld(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
        this.name = name;
    }

能夠經過以下方式來重用Setter:

private static final Setter cachedSetter = 
        Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"));    

    public CommandHelloWorld(String name) {
        super(cachedSetter);
        this.name = name;
    }

HystrixCommandKey是一個接口, 所以能夠將其實現爲一個枚舉或者常規的類, 可是它已經內置了一個Factory類來構建幫助構建內部實例, 使用方式以下:

HystrixCommandKey.Factory.asKey("Hello World");

Command Group

Hystrix使用command group來爲分組, 分組信息主要用於報告、警報、儀表盤上顯示, 或者是標識團隊/庫的擁有者.

默認狀況下, 除非已經用這個名字定義了一個信號量, 不然 Hystrix將使用這個名稱來定義command的線程池.

HystrixCommandGroupKey是一個接口, 所以能夠將其實現爲一個枚舉或者常規的類, 可是它已經內置了一個Factory類來構建幫助構建內部實例, 使用方式以下:

HystrixCommandGroupKey.Factory.asKey("Example Group")

Command Thread-pool

thread-pool key主要用於在監控、指標發佈、緩存等相似場景中標識一個HystrixThreadPool, 一個HystrixCommand於其構造函數中傳入的HystrixThreadPoolKey指定的HystrixThreadPool相關聯, 若是未指定的話, 則使用HystrixCommandGroupKey來獲取/建立HystrixThreadPool.

能夠經過HystrixCommandHystrixObservableCommand的構造器來指定其值:

public CommandHelloWorld(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
        this.name = name;
    }

HystrixCommandThreadPoolKey是一個接口, 所以能夠將其實現爲一個枚舉或者常規的類, 可是它已經內置了一個Factory類來構建幫助構建內部實例, 使用方式以下:

HystrixThreadPoolKey.Factory.asKey("Hello World Pool")

使用HystrixThreadPoolKey而不是使用不一樣的HystrixCommandGroupKey的緣由是: 可能會有多條command在邏輯功能上屬於同一個組(group), 可是其中的某些command須要和其餘command隔離開, 例如:

  1. 兩條用於訪問視頻元數據的command
  2. 兩條commandgroup name都是VideoMetadata
  3. command A與資源#1互斥
  4. command B與資源#2互斥

若是command A因爲延遲等緣由致使其所在的線程池資源耗盡, 不該該影響command B#2的執行, 由於他們訪問的是不一樣的後端資源.

所以, 從邏輯上來講, 咱們但願這兩條command應該被分到同一個分組, 可是咱們一樣系統將這兩條命令的執行隔離開來, 所以咱們使用HystrixThreadPoolKey將其分配到不一樣的線程池.

Request Cache

能夠經過實現HystrixCommandHystrixObservableCommandgetCacheKey()方法開啓用對請求的緩存功能:

public class CommandUsingRequestCache extends HystrixCommand<Boolean> {

    private final int value;

    protected CommandUsingRequestCache(int value) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.value = value;
    }

    @Override
    protected Boolean run() {
        return value == 0 || value % 2 == 0;
    }

    @Override
    protected String getCacheKey() {
        return String.valueOf(value);
    }
}

因爲該功能依賴於請求的上下文信息, 所以咱們必須初始化一個HystrixRequestContext, 使用方式以下:

@Test
        public void testWithoutCacheHits() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                assertTrue(new CommandUsingRequestCache(2).execute());
                assertFalse(new CommandUsingRequestCache(1).execute());
                assertTrue(new CommandUsingRequestCache(0).execute());
                assertTrue(new CommandUsingRequestCache(58672).execute());
            } finally {
                context.shutdown();
            }
        }

一般狀況下, 上下文信息(HystrixRequestContext)應該在持有用戶請求的ServletFilter或者其餘擁有生命週期管理功能的類來初始化和關閉.

下面的例子展現了command如何從緩存中獲取數據, 以及如何查詢一個數據是不是從緩存中獲取到的:

@Test
        public void testWithCacheHits() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
                CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);

                assertTrue(command2a.execute());
                // this is the first time we've executed this command with
                // the value of "2" so it should not be from cache
                assertFalse(command2a.isResponseFromCache());

                assertTrue(command2b.execute());
                // this is the second time we've executed this command with
                // the same value so it should return from cache
                assertTrue(command2b.isResponseFromCache());
            } finally {
                context.shutdown();
            }

            // start a new request context
            context = HystrixRequestContext.initializeContext();
            try {
                CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
                assertTrue(command3b.execute());
                // this is a new request context so this 
                // should not come from cache
                assertFalse(command3b.isResponseFromCache());
            } finally {
                context.shutdown();
            }
        }

Request Collapsing

請求合併能夠用於將多條請求綁定到一塊兒, 由同一個HystrixCommand實例執行.

collapser能夠經過batch sizebatch建立以來的耗時來自動將請求合併執行.

Hystrix支持兩個請求合併方式: 請求級的合併和全局級的合併. 默認是請求範圍的合併, 能夠在構造collapser時指定值.

請求級(request-scoped)的collapser只會合併每個HystrixRequestContext中的請求, 而全局級(globally-scoped)的collapser則能夠跨HystrixRequestContext合併請求. 所以, 若是你下游的依賴者沒法再一個command中處理多個HystrixRequestContext的話, 那麼你應該使用請求級的合併.

在Netflix, 咱們只會使用請求級的合併, 由於咱們當前全部的系統都是基於一個command對應一個HystrixRequestContext的設想下構建的. 所以, 當一個command使用不一樣的參數在一個請求中併發執行時, 合併是有效的.

下面的代碼展現瞭如何實現請求級的HystrixCollapser:

public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {

    private final Integer key;

    public CommandCollapserGetValueForKey(Integer key) {
        this.key = key;
    }

    @Override
    public Integer getRequestArgument() {
        return key;
    }

    @Override
    protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
        return new BatchCommand(requests);
    }

    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
        int count = 0;
        for (CollapsedRequest<String, Integer> request : requests) {
            request.setResponse(batchResponse.get(count++));
        }
    }

    private static final class BatchCommand extends HystrixCommand<List<String>> {
        private final Collection<CollapsedRequest<String, Integer>> requests;

        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
                super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
            this.requests = requests;
        }

        @Override
        protected List<String> run() {
            ArrayList<String> response = new ArrayList<String>();
            for (CollapsedRequest<String, Integer> request : requests) {
                // artificial response for each argument received in the batch
                response.add("ValueForKey: " + request.getArgument());
            }
            return response;
        }
    }
}

下面的代碼展現了若是使用collapser自動合併4個CommandCollapserGetValueForKey到一個HystrixCommand中執行:

@Test
public void testCollapser() throws Exception {
    HystrixRequestContext context = HystrixRequestContext.initializeContext();
    try {
        Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
        Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
        Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
        Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();

        assertEquals("ValueForKey: 1", f1.get());
        assertEquals("ValueForKey: 2", f2.get());
        assertEquals("ValueForKey: 3", f3.get());
        assertEquals("ValueForKey: 4", f4.get());

        // assert that the batch command 'GetValueForKey' was in fact
        // executed and that it executed only once
        assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
        HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
        // assert the command is the one we're expecting
        assertEquals("GetValueForKey", command.getCommandKey().name());
        // confirm that it was a COLLAPSED command execution
        assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
        // and that it was successful
        assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
    } finally {
        context.shutdown();
    }
}

Request Context Setup

使用請求級的特性時(如: 請求緩存、請求合併、請求日誌)你必須管理HystrixRequestContext的生命週期(或者實現HystrixConcurrencyStategy).

這意味着你必須在請求以前執行以下代碼:

HystrixRequestContext context = HystrixRequestContext.initializeContext();

並在請求結束後執行以下代碼:

context.shutdown();

在標準的Java web應用中, 你可使用Setvlet Filter實現的以下的過濾器來管理:

public class HystrixRequestContextServletFilter implements Filter {

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
     throws IOException, ServletException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            chain.doFilter(request, response);
        } finally {
            context.shutdown();
        }
    }
}

能夠在web.xml中加入以下代碼實現對全部的請求都使用該過濾器:

<filter>
      <display-name>HystrixRequestContextServletFilter</display-name>
      <filter-name>HystrixRequestContextServletFilter</filter-name>
      <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
    </filter>
    <filter-mapping>
      <filter-name>HystrixRequestContextServletFilter</filter-name>
      <url-pattern>/*</url-pattern>
   </filter-mapping>

Common Patterns

如下是HystrixCommandHystrixObservableCommand的通常用法和使用模式.

Fail Fast

最基本的使用是執行一條只作一件事情且沒有實現回退方法的command, 這樣的command在發生任何錯誤時都會拋出異常:

public class CommandThatFailsFast extends HystrixCommand<String> {

    private final boolean throwException;

    public CommandThatFailsFast(boolean throwException) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.throwException = throwException;
    }

    @Override
    protected String run() {
        if (throwException) {
            throw new RuntimeException("failure from CommandThatFailsFast");
        } else {
            return "success";
        }
    }

下面的代碼演示了上述行爲:

@Test
public void testSuccess() {
    assertEquals("success", new CommandThatFailsFast(false).execute());
}

@Test
public void testFailure() {
    try {
        new CommandThatFailsFast(true).execute();
        fail("we should have thrown an exception");
    } catch (HystrixRuntimeException e) {
        assertEquals("failure from CommandThatFailsFast", e.getCause().getMessage());
        e.printStackTrace();
    }
}

HystrixObservableCommand須要重載resumeWithFallback()方法來實現一樣的行爲:

@Override
    protected Observable<String> resumeWithFallback() {
        if (throwException) {
            return Observable.error(new Throwable("failure from CommandThatFailsFast"));
        } else {
            return Observable.just("success");
        }
    }

Fail Silent

靜默失敗等同於返回一個空的響應或者移除功能. 能夠是返回null、空Map、空List, 或者其餘相似的響應.

能夠經過實現HystrixCommand.getFallback()方法實現該功能:

public class CommandThatFailsSilently extends HystrixCommand<String> {

    private final boolean throwException;

    public CommandThatFailsSilently(boolean throwException) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.throwException = throwException;
    }

    @Override
    protected String run() {
        if (throwException) {
            throw new RuntimeException("failure from CommandThatFailsFast");
        } else {
            return "success";
        }
    }

    @Override
    protected String getFallback() {
        return null;
    }
}
@Test
public void testSuccess() {
    assertEquals("success", new CommandThatFailsSilently(false).execute());
}

@Test
public void testFailure() {
    try {
        assertEquals(null, new CommandThatFailsSilently(true).execute());
    } catch (HystrixRuntimeException e) {
        fail("we should not get an exception as we fail silently with a fallback");
    }
}

或者返回一個空List的實現以下:

@Override
    protected List<String> getFallback() {
        return Collections.emptyList();
    }

HystrixObservableCommand能夠經過重載resumeWithFallback()方法實現一樣的行爲:

@Override
    protected Observable<String> resumeWithFallback() {
        return Observable.empty();
    }

Fallback: Static

Fallback能夠返回代碼裏設定的默認值, 這種方式能夠經過默認行爲來有效避免於靜默失敗帶來影響.

例如, 若是一個應返回true/false的用戶認證的command執行失敗了, 那麼其默認行爲能夠以下:

@Override
    protected Boolean getFallback() {
        return true;
    }

對於HystrixObservableCommand能夠經過重載resumeWithFallback()方法實現一樣的行爲:

@Override
    protected Observable<Boolean> resumeWithFallback() {
        return Observable.just( true );
    }

Fallback: Stubbed

command返回的是一個包含多個字段的複合對象, 且該對象的一部分字段值能夠經過其餘請求狀態得到, 另外一部分狀態能夠經過設置默認值得到時, 你一般須要使用存根(stubbed)模式.

你可能能夠從存根值(stubbed values)中獲得適當的值的狀況以下:

  1. cookies
  2. 請求參數和請求頭
  3. 當前失敗請求的前一個服務請求的響應

fallback代碼塊內能夠靜態地獲取請求範圍內的存根(stubbed)值, 可是一般咱們更推薦在構建command實例時注入這些值, 就像下面實例的代碼中的countryCodeFromGeoLookup同樣:

public class CommandWithStubbedFallback extends HystrixCommand<UserAccount> {

    private final int customerId;
    private final String countryCodeFromGeoLookup;

    /**
     * @param customerId
     *            The customerID to retrieve UserAccount for
     * @param countryCodeFromGeoLookup
     *            The default country code from the HTTP request geo code lookup used for fallback.
     */
    protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.customerId = customerId;
        this.countryCodeFromGeoLookup = countryCodeFromGeoLookup;
    }

    @Override
    protected UserAccount run() {
        // fetch UserAccount from remote service
        //        return UserAccountClient.getAccount(customerId);
        throw new RuntimeException("forcing failure for example");
    }

    @Override
    protected UserAccount getFallback() {
        /**
         * Return stubbed fallback with some static defaults, placeholders,
         * and an injected value 'countryCodeFromGeoLookup' that we'll use
         * instead of what we would have retrieved from the remote service.
         */
        return new UserAccount(customerId, "Unknown Name",
                countryCodeFromGeoLookup, true, true, false);
    }

    public static class UserAccount {
        private final int customerId;
        private final String name;
        private final String countryCode;
        private final boolean isFeatureXPermitted;
        private final boolean isFeatureYPermitted;
        private final boolean isFeatureZPermitted;

        UserAccount(int customerId, String name, String countryCode,
                boolean isFeatureXPermitted,
                boolean isFeatureYPermitted,
                boolean isFeatureZPermitted) {
            this.customerId = customerId;
            this.name = name;
            this.countryCode = countryCode;
            this.isFeatureXPermitted = isFeatureXPermitted;
            this.isFeatureYPermitted = isFeatureYPermitted;
            this.isFeatureZPermitted = isFeatureZPermitted;
        }
    }
}

下面的代碼演示了上述行爲:

@Test
    public void test() {
        CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, "ca");
        UserAccount account = command.execute();
        assertTrue(command.isFailedExecution());
        assertTrue(command.isResponseFromFallback());
        assertEquals(1234, account.customerId);
        assertEquals("ca", account.countryCode);
        assertEquals(true, account.isFeatureXPermitted);
        assertEquals(true, account.isFeatureYPermitted);
        assertEquals(false, account.isFeatureZPermitted);
    }

對於HystrixObservableCommand能夠經過重載resumeWithFallback()方法實現一樣的行爲:

@Override
protected Observable<Boolean> resumeWithFallback() {
    return Observable.just( new UserAccount(customerId, "Unknown Name",
                                            countryCodeFromGeoLookup, true, true, false) );
}

若是你想要從Observable中發出多個值, 那麼當失敗發生時, 本來的Observable可能已經發出的一部分值, 此時你或許更但願可以只從fallback邏輯中發出另外一部分未被髮出的值, 下面的例子就展現瞭如何實現這一個目的: 它經過追蹤原Observable發出的最後一個值來實現fallback邏輯中的Observable應該從什麼地方繼續發出存根值(stubbed value) :

@Override
protected Observable<Integer> construct() {
    return Observable.just(1, 2, 3)
            .concatWith(Observable.<Integer> error(new RuntimeException("forced error")))
            .doOnNext(new Action1<Integer>() {
                @Override
                public void call(Integer t1) {
                    lastSeen = t1;
                }
                
            })
            .subscribeOn(Schedulers.computation());
}

@Override
protected Observable<Integer> resumeWithFallback() {
    if (lastSeen < 4) {
        return Observable.range(lastSeen + 1, 4 - lastSeen);
    } else {
        return Observable.empty();
    }
}

Fallback: Cache via Network

有時後端的服務異常也會引發command執行失敗, 此時咱們也能夠從緩存中(如: memcached)取得相關的數據.

因爲在fallback的邏輯代碼中訪問網絡可能會再次失敗, 所以必須構建新的HystrixCommandHystrixObservableCommand來執行:

translation-Hystrix-How-To-Use-1

很重要的一點是執行fallback邏輯的command須要在一個不一樣的線程池中執行, 不然若是原command的延遲變高且其所在線程池已經滿了的話, 執行fallback邏輯的command將沒法在同一個線程池中執行.

下面的代碼展現了CommandWithFallbackViaNetwork如何在getFallback()方法中執行FallbackViaNetwork.

注意, FallbackViaNetwork一樣也具備回退機制, 這裏經過返回null來實現fail silent.

FallbackViaNetwork默認會從HystrixCommandGroupKey中繼承線程池的配置RemoteServiceX, 所以須要在其構造器中注入HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")來使其在不一樣的線程池中執行.

這樣, CommandWithFallbackViaNetwork會在名爲RemoteServiceX的線程池中執行, 而FallbackViaNetwork會在名爲RemoteServiceXFallback的線程池中執行.

public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
    private final int id;

    protected CommandWithFallbackViaNetwork(int id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand")));
        this.id = id;
    }

    @Override
    protected String run() {
        //        RemoteServiceXClient.getValue(id);
        throw new RuntimeException("force failure for example");
    }

    @Override
    protected String getFallback() {
        return new FallbackViaNetwork(id).execute();
    }

    private static class FallbackViaNetwork extends HystrixCommand<String> {
        private final int id;

        public FallbackViaNetwork(int id) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
                    // use a different threadpool for the fallback command
                    // so saturating the RemoteServiceX pool won't prevent
                    // fallbacks from executing
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
            this.id = id;
        }

        @Override
        protected String run() {
            MemCacheClient.getValue(id);
        }

        @Override
        protected String getFallback() {
            // the fallback also failed
            // so this fallback-of-a-fallback will 
            // fail silently and return null
            return null;
        }
    }
}

Primary + Secondary with Fallback

有些系統可能具備是以雙系統模式搭建的 — 主從模式或主備模式.

有時從系統或備用系統會被認爲是失敗狀態的一種, 僅在執行fallback邏輯是才使用它;這種場景和Cache via Network一節中描述的場景是同樣的.

然而, 若是切換到從系統是一個很正常時, 例如發佈新代碼時(這是有狀態的系統發佈代碼的一種方式), 此時每當切換到從系統使用時, 主系統都是處於不可用狀態,斷路器將會打開且發出警報.

這並非咱們指望發生的事, 這種狼來了式的警報可能會致使真正發生問題的時候咱們卻把它當成正常的誤報而忽略了.

所以, 咱們能夠經過在其前面放置一個門面HystrixCommand(見下文), 將主/從系統的切換視爲正常的、健康的狀態.

translation-Hystrix-How-To-Use-2

主從HystrixCommand都是須要訪問網絡且實現了特定的業務邏輯, 所以其實現上應該是線程隔離的. 它們可能具備顯著的性能差距(一般從系統是一個靜態緩存), 所以將兩個command隔離的另外一個好處是能夠針對性地調優.

你不須要將這兩個command都公開發布, 只須要將它們隱藏在另外一個由信號量隔離的HystrixCommand中(稱之爲門面HystrixCommand), 在這個command中去實現主系統仍是從系統的調用選擇. 只有當主從系統都失敗了, 纔會去執行這個門面commandfallback邏輯.

門面HystrixCommand可使用信號量隔離的, 由於其業務邏輯僅僅是調用另外兩個線程隔離的HystrixCommand, 它不涉及任何的網絡訪問、重試等容易出錯的事, 所以不必將這部分代碼放到其餘線程去執行.

public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {

    private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);

    private final int id;

    public CommandFacadeWithPrimarySecondary(int id) {
        super(Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
                .andCommandPropertiesDefaults(
                        // we want to default to semaphore-isolation since this wraps
                        // 2 others commands that are already thread isolated
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
        this.id = id;
    }

    @Override
    protected String run() {
        if (usePrimary.get()) {
            return new PrimaryCommand(id).execute();
        } else {
            return new SecondaryCommand(id).execute();
        }
    }

    @Override
    protected String getFallback() {
        return "static-fallback-" + id;
    }

    @Override
    protected String getCacheKey() {
        return String.valueOf(id);
    }

    private static class PrimaryCommand extends HystrixCommand<String> {

        private final int id;

        private PrimaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
                    .andCommandPropertiesDefaults(
                            // we default to a 600ms timeout for primary
                            HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
            this.id = id;
        }

        @Override
        protected String run() {
            // perform expensive 'primary' service call
            return "responseFromPrimary-" + id;
        }

    }

    private static class SecondaryCommand extends HystrixCommand<String> {

        private final int id;

        private SecondaryCommand(int id) {
            super(Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
                    .andCommandPropertiesDefaults(
                            // we default to a 100ms timeout for secondary
                            HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
            this.id = id;
        }

        @Override
        protected String run() {
            // perform fast 'secondary' service call
            return "responseFromSecondary-" + id;
        }

    }

    public static class UnitTest {

        @Test
        public void testPrimary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
                assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }

        @Test
        public void testSecondary() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
                assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute());
            } finally {
                context.shutdown();
                ConfigurationManager.getConfigInstance().clear();
            }
        }
    }
}

Client Doesn't Perform Network Access

當你使用HystrixCommand實現的業務邏輯不涉及到網絡訪問、對延遲敏感且沒法接受多線程帶來的開銷時, 你須要設置executionIsolationStrategy)屬性的值爲ExecutionIsolationStrategy.SEMAPHORE, 此時Hystrix會使用信號量隔離代替線程隔離.

下面的代碼展現瞭如何爲command設置該屬性(也能夠在運行時動態改變這個屬性的值):

public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {

    private final int id;

    public CommandUsingSemaphoreIsolation(int id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                // since we're doing an in-memory cache lookup we choose SEMAPHORE isolation
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
        this.id = id;
    }

    @Override
    protected String run() {
        // a real implementation would retrieve data from in memory data structure
        return "ValueFromHashMap_" + id;
    }

}

Get-Set-Get with Request Cache Invalidation

Get-Set-Get是指: Get請求的結果被緩存下來後, 另外一個command對同一個資源發出了Set請求, 此時由Get請求緩存的結果應該失效, 避免隨後的Get請求獲取到過期的緩存結果, 此時能夠經過調用HystrixRequestCache.clear())方法來使緩存失效.

public class CommandUsingRequestCacheInvalidation {

    /* represents a remote data store */
    private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_";

    public static class GetterCommand extends HystrixCommand<String> {

        private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand");
        private final int id;

        public GetterCommand(int id) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet"))
                    .andCommandKey(GETTER_KEY));
            this.id = id;
        }

        @Override
        protected String run() {
            return prefixStoredOnRemoteDataStore + id;
        }

        @Override
        protected String getCacheKey() {
            return String.valueOf(id);
        }

        /**
         * Allow the cache to be flushed for this object.
         * 
         * @param id
         *            argument that would normally be passed to the command
         */
        public static void flushCache(int id) {
            HystrixRequestCache.getInstance(GETTER_KEY,
                    HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id));
        }

    }

    public static class SetterCommand extends HystrixCommand<Void> {

        private final int id;
        private final String prefix;

        public SetterCommand(int id, String prefix) {
            super(HystrixCommandGroupKey.Factory.asKey("GetSetGet"));
            this.id = id;
            this.prefix = prefix;
        }

        @Override
        protected Void run() {
            // persist the value against the datastore
            prefixStoredOnRemoteDataStore = prefix;
            // flush the cache
            GetterCommand.flushCache(id);
            // no return value
            return null;
        }
    }
}
@Test
        public void getGetSetGet() {
            HystrixRequestContext context = HystrixRequestContext.initializeContext();
            try {
                assertEquals("ValueBeforeSet_1", new GetterCommand(1).execute());
                GetterCommand commandAgainstCache = new GetterCommand(1);
                assertEquals("ValueBeforeSet_1", commandAgainstCache.execute());
                // confirm it executed against cache the second time
                assertTrue(commandAgainstCache.isResponseFromCache());
                // set the new value
                new SetterCommand(1, "ValueAfterSet_").execute();
                // fetch it again
                GetterCommand commandAfterSet = new GetterCommand(1);
                // the getter should return with the new prefix, not the value from cache
                assertFalse(commandAfterSet.isResponseFromCache());
                assertEquals("ValueAfterSet_1", commandAfterSet.execute());
            } finally {
                context.shutdown();
            }
        }
    }

Migrating a Library to Hystrix

若是你要遷移一個已有的客戶端庫到Hystrix, 你應該將全部的服務方法(service methods)替換成HystrixCommand.

服務方法(service methods)轉而調用HystrixCommand且不在包含任何額外的業務邏輯.

所以, 在遷移以前, 一個服務庫多是這樣的:

translation-Hystrix-How-To-Use-3

遷移完成以後, 服務庫的用戶要能直接訪問到HystrixCommand, 或者經過服務門面(service facade)的代理間接訪問到HystrixCommand.

translation-Hystrix-How-To-Use-4

相關文章
相關標籤/搜索