轉載請註明出處: 翻譯:Hystrix - How To Usehtml
下面的代碼展現了HystrixCommand
版的Hello World
:java
public class CommandHelloWorld extends HystrixCommand<String> { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // a real example would do work like a network call here return "Hello " + name + "!"; } }
查看源碼react
HystrixObservableCommand
的同等實現以下:git
public class CommandHelloWorld extends HystrixObservableCommand<String> { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable<String> construct() { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
能夠經過調用HystrixCommand.execute()
方法實現同步執行, 示例以下:github
String s = new CommandHelloWorld("World").execute();
測試以下:web
@Test public void testSynchronous() { assertEquals("Hello World!", new CommandHelloWorld("World").execute()); assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute()); }
HystrixObservableCommand
不提供同步執行方法, 可是若是肯定其只會產生一個值, 那麼也能夠用以下方式實現:後端
若是實際上產生了多個值, 上述的代碼將會拋出java.lang.IllegalArgumentException: Sequence contains too many elements
.緩存
能夠經過調用HystrixCommand.queue()
方法實現異步執行, 示例以下:cookie
Future<String> fs = new CommandHelloWorld("World").queue();
此時能夠經過Future.get()
方法獲取command
執行結果:網絡
String s = fs.get();
測試代碼以下:
@Test public void testAsynchronous1() throws Exception { assertEquals("Hello World!", new CommandHelloWorld("World").queue().get()); assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get()); } @Test public void testAsynchronous2() throws Exception { Future<String> fWorld = new CommandHelloWorld("World").queue(); Future<String> fBob = new CommandHelloWorld("Bob").queue(); assertEquals("Hello World!", fWorld.get()); assertEquals("Hello Bob!", fBob.get()); }
下面的兩種實現是等價的:
String s1 = new CommandHelloWorld("World").execute(); String s2 = new CommandHelloWorld("World").queue().get();
HystrixObservableCommand
不提供queue
方法, 可是若是肯定其只會產生一個值, 那麼也能夠用以下方式實現:
若是實際上產生了多個值, 上述的代碼將會拋出java.lang.IllegalArgumentException: Sequence contains too many elements
.
你也能夠將HystrixCommand
當作一個可觀察對象(Observable
)來觀察(Observe
)其產生的結果, 可使用如下任意一個方法實現:
observe()
: 一旦調用該方法, 請求將當即開始執行, 其利用ReplaySubject
特性能夠保證不會丟失任何command
產生的結果, 即便結果在你訂閱以前產生的也不會丟失.toObservable()
: 調用該方法後不會當即執行請求, 而是當有訂閱者訂閱時纔會執行.Observable<String> ho = new CommandHelloWorld("World").observe(); // or Observable<String> co = new CommandHelloWorld("World").toObservable();
而後你能夠經過訂閱到這個Observable
來取得command
產生的結果:
ho.subscribe(new Action1<String>() { @Override public void call(String s) { // value emitted here } });
測試以下:
@Test public void testObservable() throws Exception { Observable<String> fWorld = new CommandHelloWorld("World").observe(); Observable<String> fBob = new CommandHelloWorld("Bob").observe(); // blocking assertEquals("Hello World!", fWorld.toBlockingObservable().single()); assertEquals("Hello Bob!", fBob.toBlockingObservable().single()); // non-blocking // - this is a verbose anonymous inner-class approach and doesn't do assertions fWorld.subscribe(new Observer<String>() { @Override public void onCompleted() { // nothing needed here } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("onNext: " + v); } }); // non-blocking // - also verbose anonymous inner-class // - ignore errors and onCompleted signal fBob.subscribe(new Action1<String>() { @Override public void call(String v) { System.out.println("onNext: " + v); } }); }
使用Java 8的Lambda表達式可使代碼更簡潔:
fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }) // - or while also including error handling fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }, (exception) -> { exception.printStackTrace(); })
關於Observable
的信息能夠在這裏查閱
相比將HystrixCommand
使用上述方法轉換成一個Observable
, 你也能夠選擇建立一個HystrixObservableCommand
對象. HystrixObservableCommand
包裝的Observable
容許產生多個結果(譯者注: Subscriber.onNext
能夠調用屢次), 而HystrixCommand
即便轉換成了Observable
也只能產生一個結果.
使用HystrixObservableCommnad
時, 你須要重載construct
方法來實現你的業務邏輯, 而不是重載run
方法, contruct
方法將會返回你須要包裝的Observable
.
使用下面任意一個方法能夠從HystrixObservableCommand
中獲取Observable
對象:
observe()
: 一旦調用該方法, 請求將當即開始執行, 其利用ReplaySubject
特性能夠保證不會丟失任何command
產生的結果, 即便結果在你訂閱以前產生的也不會丟失.toObservable()
: 調用該方法後不會當即執行請求, 而是當有訂閱者訂閱時纔會執行.大多數狀況下, 咱們都但願command
在執行失敗時可以有一個候選方法來處理, 如: 返回一個默認值或執行其餘失敗處理邏輯, 除了如下幾個狀況:
command
: 當command
的目標是執行寫操做而不是讀操做, 那麼一般須要將寫操做失敗的錯誤交給調用者處理.command
的目標是作一些離線計算、生成報表、填充緩存等, 那麼一樣應該將失敗交給調用者處理.不管command
是否實現了getFallback()
方法, command
執行失敗時, Hystrix
的狀態和斷路器(circuit-breaker
)的狀態/指標都會進行更新.
HystrixCommand
能夠經過實現getFallback()
方法來實現降級處理, run()
方法異常、執行超時、線程池或信號量已滿拒絕提供服務、斷路器短路時, 都會調用getFallback()
:
public class CommandHelloFailure extends HystrixCommand<String> { private final String name; public CommandHelloFailure(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { throw new RuntimeException("this command always fails"); } @Override protected String getFallback() { return "Hello Failure " + name + "!"; } }
這個命令的run()
方法老是會執行失敗, 可是調用者老是能收到getFallback()
方法返回的值, 而不是收到一個異常:
@Test public void testSynchronous() { assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute()); assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute()); }
HystrixObservableCommand
能夠經過重載resumeWithFallback
方法實現原Observable
執行失敗時返回回另外一個Observable
, 須要注意的是, 原Observable
有可能在發出多個結果以後纔出現錯誤, 所以在fallback
實現的邏輯中不該該假設訂閱者只會收到失敗邏輯中發出的結果.
Hystrix
內部使用了RxJava
的onErrorResumeNext
操做符來實現Observable
之間的無縫轉移.
除HystrixBadRequestException
異常外, run
方法中拋出的全部異常都會被認爲是執行失敗且會觸發getFallback()
方法和斷路器的邏輯.
你能夠在HystrixBadRequestException
中包裝想要拋出的異常, 而後經過getCause()
方法獲取. HystrixBadRequestException
使用在不該該被錯誤指標(failure metrics)統計和不該該觸發getFallback()
方法的場景, 例如報告參數不合法或者非系統異常等.
對於HystrixObservableCommand
, 不可恢復的錯誤都會在經過onError
方法通知, 並經過獲取用戶實現的resumeWithFallback()
方法返回的Observable
來完成回退機制.
Failure Type | Exception class | Exception.cause |
---|---|---|
FAILURE | HystrixRuntimeException | underlying exception(user-controlled) |
TIMEOUT | HystrixRuntimeException | j.u.c.TimeoutException |
SHORT_CIRCUITED | HystrixRuntimeException | j.l.RuntimeException |
THREAD_POOL_REJECTED | HystrixRuntimeException | j.u.c.RejectedExecutionException |
SEMAPHORE_REJECTED | HystrixRuntimeException | j.l.RuntimeException |
BAD_REQUEST | HystrixBadRequestException | underlying exception(user-controller) |
默認的command name
是從類名中派生的:
getClass().getSimpleName()
能夠經過HystrixCommand
或HystrixObservableCommand
的構造器來指定command name
:
public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))); this.name = name; }
能夠經過以下方式來重用Setter
:
private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")); public CommandHelloWorld(String name) { super(cachedSetter); this.name = name; }
HystrixCommandKey
是一個接口, 所以能夠將其實現爲一個枚舉或者常規的類, 可是它已經內置了一個Factory
類來構建幫助構建內部實例, 使用方式以下:
HystrixCommandKey.Factory.asKey("Hello World");
Hystrix
使用command group
來爲分組, 分組信息主要用於報告、警報、儀表盤上顯示, 或者是標識團隊/庫的擁有者.
默認狀況下, 除非已經用這個名字定義了一個信號量, 不然 Hystrix
將使用這個名稱來定義command
的線程池.
HystrixCommandGroupKey
是一個接口, 所以能夠將其實現爲一個枚舉或者常規的類, 可是它已經內置了一個Factory
類來構建幫助構建內部實例, 使用方式以下:
HystrixCommandGroupKey.Factory.asKey("Example Group")
thread-pool key
主要用於在監控、指標發佈、緩存等相似場景中標識一個HystrixThreadPool
, 一個HystrixCommand
於其構造函數中傳入的HystrixThreadPoolKey
指定的HystrixThreadPool
相關聯, 若是未指定的話, 則使用HystrixCommandGroupKey
來獲取/建立HystrixThreadPool
.
能夠經過HystrixCommand
或HystrixObservableCommand
的構造器來指定其值:
public CommandHelloWorld(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool"))); this.name = name; }
HystrixCommandThreadPoolKey
是一個接口, 所以能夠將其實現爲一個枚舉或者常規的類, 可是它已經內置了一個Factory
類來構建幫助構建內部實例, 使用方式以下:
HystrixThreadPoolKey.Factory.asKey("Hello World Pool")
使用HystrixThreadPoolKey
而不是使用不一樣的HystrixCommandGroupKey
的緣由是: 可能會有多條command
在邏輯功能上屬於同一個組(group), 可是其中的某些command
須要和其餘command
隔離開, 例如:
command
command
的group name
都是VideoMetadata
command A
與資源#1
互斥command B
與資源#2
互斥若是command A
因爲延遲等緣由致使其所在的線程池資源耗盡, 不該該影響command B
對#2
的執行, 由於他們訪問的是不一樣的後端資源.
所以, 從邏輯上來講, 咱們但願這兩條command
應該被分到同一個分組, 可是咱們一樣系統將這兩條命令的執行隔離開來, 所以咱們使用HystrixThreadPoolKey
將其分配到不一樣的線程池.
能夠經過實現HystrixCommand
或HystrixObservableCommand
的getCacheKey()
方法開啓用對請求的緩存功能:
public class CommandUsingRequestCache extends HystrixCommand<Boolean> { private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); } }
因爲該功能依賴於請求的上下文信息, 所以咱們必須初始化一個HystrixRequestContext
, 使用方式以下:
@Test public void testWithoutCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache(2).execute()); assertFalse(new CommandUsingRequestCache(1).execute()); assertTrue(new CommandUsingRequestCache(0).execute()); assertTrue(new CommandUsingRequestCache(58672).execute()); } finally { context.shutdown(); } }
一般狀況下, 上下文信息(HystrixRequestContext
)應該在持有用戶請求的ServletFilter
或者其餘擁有生命週期管理功能的類來初始化和關閉.
下面的例子展現了command
如何從緩存中獲取數據, 以及如何查詢一個數據是不是從緩存中獲取到的:
@Test public void testWithCacheHits() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command2a = new CommandUsingRequestCache(2); CommandUsingRequestCache command2b = new CommandUsingRequestCache(2); assertTrue(command2a.execute()); // this is the first time we've executed this command with // the value of "2" so it should not be from cache assertFalse(command2a.isResponseFromCache()); assertTrue(command2b.execute()); // this is the second time we've executed this command with // the same value so it should return from cache assertTrue(command2b.isResponseFromCache()); } finally { context.shutdown(); } // start a new request context context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command3b = new CommandUsingRequestCache(2); assertTrue(command3b.execute()); // this is a new request context so this // should not come from cache assertFalse(command3b.isResponseFromCache()); } finally { context.shutdown(); } }
請求合併能夠用於將多條請求綁定到一塊兒, 由同一個HystrixCommand
實例執行.
collapser
能夠經過batch size
和batch
建立以來的耗時來自動將請求合併執行.
Hystrix
支持兩個請求合併方式: 請求級的合併和全局級的合併. 默認是請求範圍的合併, 能夠在構造collapser
時指定值.
請求級(request-scoped
)的collapser
只會合併每個HystrixRequestContext
中的請求, 而全局級(globally-scoped
)的collapser
則能夠跨HystrixRequestContext
合併請求. 所以, 若是你下游的依賴者沒法再一個command
中處理多個HystrixRequestContext
的話, 那麼你應該使用請求級的合併.
在Netflix, 咱們只會使用請求級的合併, 由於咱們當前全部的系統都是基於一個command
對應一個HystrixRequestContext
的設想下構建的. 所以, 當一個command
使用不一樣的參數在一個請求中併發執行時, 合併是有效的.
下面的代碼展現瞭如何實現請求級的HystrixCollapser
:
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } @Override public Integer getRequestArgument() { return key; } @Override protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) { return new BatchCommand(requests); } @Override protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) { int count = 0; for (CollapsedRequest<String, Integer> request : requests) { request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand<List<String>> { private final Collection<CollapsedRequest<String, Integer>> requests; private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.requests = requests; } @Override protected List<String> run() { ArrayList<String> response = new ArrayList<String>(); for (CollapsedRequest<String, Integer> request : requests) { // artificial response for each argument received in the batch response.add("ValueForKey: " + request.getArgument()); } return response; } } }
下面的代碼展現了若是使用collapser
自動合併4個CommandCollapserGetValueForKey
到一個HystrixCommand
中執行:
@Test public void testCollapser() throws Exception { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { Future<String> f1 = new CommandCollapserGetValueForKey(1).queue(); Future<String> f2 = new CommandCollapserGetValueForKey(2).queue(); Future<String> f3 = new CommandCollapserGetValueForKey(3).queue(); Future<String> f4 = new CommandCollapserGetValueForKey(4).queue(); assertEquals("ValueForKey: 1", f1.get()); assertEquals("ValueForKey: 2", f2.get()); assertEquals("ValueForKey: 3", f3.get()); assertEquals("ValueForKey: 4", f4.get()); // assert that the batch command 'GetValueForKey' was in fact // executed and that it executed only once assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0]; // assert the command is the one we're expecting assertEquals("GetValueForKey", command.getCommandKey().name()); // confirm that it was a COLLAPSED command execution assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); // and that it was successful assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); } }
使用請求級的特性時(如: 請求緩存、請求合併、請求日誌)你必須管理HystrixRequestContext
的生命週期(或者實現HystrixConcurrencyStategy
).
這意味着你必須在請求以前執行以下代碼:
HystrixRequestContext context = HystrixRequestContext.initializeContext();
並在請求結束後執行以下代碼:
context.shutdown();
在標準的Java web應用中, 你可使用Setvlet Filter
實現的以下的過濾器來管理:
public class HystrixRequestContextServletFilter implements Filter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } } }
能夠在web.xml
中加入以下代碼實現對全部的請求都使用該過濾器:
<filter> <display-name>HystrixRequestContextServletFilter</display-name> <filter-name>HystrixRequestContextServletFilter</filter-name> <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class> </filter> <filter-mapping> <filter-name>HystrixRequestContextServletFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>
如下是HystrixCommand
和HystrixObservableCommand
的通常用法和使用模式.
最基本的使用是執行一條只作一件事情且沒有實現回退方法的command
, 這樣的command
在發生任何錯誤時都會拋出異常:
public class CommandThatFailsFast extends HystrixCommand<String> { private final boolean throwException; public CommandThatFailsFast(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } }
下面的代碼演示了上述行爲:
@Test public void testSuccess() { assertEquals("success", new CommandThatFailsFast(false).execute()); } @Test public void testFailure() { try { new CommandThatFailsFast(true).execute(); fail("we should have thrown an exception"); } catch (HystrixRuntimeException e) { assertEquals("failure from CommandThatFailsFast", e.getCause().getMessage()); e.printStackTrace(); } }
HystrixObservableCommand
須要重載resumeWithFallback()
方法來實現一樣的行爲:
@Override protected Observable<String> resumeWithFallback() { if (throwException) { return Observable.error(new Throwable("failure from CommandThatFailsFast")); } else { return Observable.just("success"); } }
靜默失敗等同於返回一個空的響應或者移除功能. 能夠是返回null
、空Map、空List, 或者其餘相似的響應.
能夠經過實現HystrixCommand.getFallback()
方法實現該功能:
public class CommandThatFailsSilently extends HystrixCommand<String> { private final boolean throwException; public CommandThatFailsSilently(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } } @Override protected String getFallback() { return null; } }
@Test public void testSuccess() { assertEquals("success", new CommandThatFailsSilently(false).execute()); } @Test public void testFailure() { try { assertEquals(null, new CommandThatFailsSilently(true).execute()); } catch (HystrixRuntimeException e) { fail("we should not get an exception as we fail silently with a fallback"); } }
或者返回一個空List的實現以下:
@Override protected List<String> getFallback() { return Collections.emptyList(); }
HystrixObservableCommand
能夠經過重載resumeWithFallback()
方法實現一樣的行爲:
@Override protected Observable<String> resumeWithFallback() { return Observable.empty(); }
Fallback
能夠返回代碼裏設定的默認值, 這種方式能夠經過默認行爲來有效避免於靜默失敗帶來影響.
例如, 若是一個應返回true/false的用戶認證的command
執行失敗了, 那麼其默認行爲能夠以下:
@Override protected Boolean getFallback() { return true; }
對於HystrixObservableCommand能夠經過重載resumeWithFallback()
方法實現一樣的行爲:
@Override protected Observable<Boolean> resumeWithFallback() { return Observable.just( true ); }
當command
返回的是一個包含多個字段的複合對象, 且該對象的一部分字段值能夠經過其餘請求狀態得到, 另外一部分狀態能夠經過設置默認值得到時, 你一般須要使用存根(stubbed)模式.
你可能能夠從存根值(stubbed values)中獲得適當的值的狀況以下:
在fallback
代碼塊內能夠靜態地獲取請求範圍內的存根(stubbed)值, 可是一般咱們更推薦在構建command
實例時注入這些值, 就像下面實例的代碼中的countryCodeFromGeoLookup
同樣:
public class CommandWithStubbedFallback extends HystrixCommand<UserAccount> { private final int customerId; private final String countryCodeFromGeoLookup; /** * @param customerId * The customerID to retrieve UserAccount for * @param countryCodeFromGeoLookup * The default country code from the HTTP request geo code lookup used for fallback. */ protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.customerId = customerId; this.countryCodeFromGeoLookup = countryCodeFromGeoLookup; } @Override protected UserAccount run() { // fetch UserAccount from remote service // return UserAccountClient.getAccount(customerId); throw new RuntimeException("forcing failure for example"); } @Override protected UserAccount getFallback() { /** * Return stubbed fallback with some static defaults, placeholders, * and an injected value 'countryCodeFromGeoLookup' that we'll use * instead of what we would have retrieved from the remote service. */ return new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false); } public static class UserAccount { private final int customerId; private final String name; private final String countryCode; private final boolean isFeatureXPermitted; private final boolean isFeatureYPermitted; private final boolean isFeatureZPermitted; UserAccount(int customerId, String name, String countryCode, boolean isFeatureXPermitted, boolean isFeatureYPermitted, boolean isFeatureZPermitted) { this.customerId = customerId; this.name = name; this.countryCode = countryCode; this.isFeatureXPermitted = isFeatureXPermitted; this.isFeatureYPermitted = isFeatureYPermitted; this.isFeatureZPermitted = isFeatureZPermitted; } } }
下面的代碼演示了上述行爲:
@Test public void test() { CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, "ca"); UserAccount account = command.execute(); assertTrue(command.isFailedExecution()); assertTrue(command.isResponseFromFallback()); assertEquals(1234, account.customerId); assertEquals("ca", account.countryCode); assertEquals(true, account.isFeatureXPermitted); assertEquals(true, account.isFeatureYPermitted); assertEquals(false, account.isFeatureZPermitted); }
對於HystrixObservableCommand能夠經過重載resumeWithFallback()
方法實現一樣的行爲:
@Override protected Observable<Boolean> resumeWithFallback() { return Observable.just( new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false) ); }
若是你想要從Observable
中發出多個值, 那麼當失敗發生時, 本來的Observable
可能已經發出的一部分值, 此時你或許更但願可以只從fallback
邏輯中發出另外一部分未被髮出的值, 下面的例子就展現瞭如何實現這一個目的: 它經過追蹤原Observable
發出的最後一個值來實現fallback
邏輯中的Observable
應該從什麼地方繼續發出存根值(stubbed value) :
@Override protected Observable<Integer> construct() { return Observable.just(1, 2, 3) .concatWith(Observable.<Integer> error(new RuntimeException("forced error"))) .doOnNext(new Action1<Integer>() { @Override public void call(Integer t1) { lastSeen = t1; } }) .subscribeOn(Schedulers.computation()); } @Override protected Observable<Integer> resumeWithFallback() { if (lastSeen < 4) { return Observable.range(lastSeen + 1, 4 - lastSeen); } else { return Observable.empty(); } }
有時後端的服務異常也會引發command
執行失敗, 此時咱們也能夠從緩存中(如: memcached)取得相關的數據.
因爲在fallback
的邏輯代碼中訪問網絡可能會再次失敗, 所以必須構建新的HystrixCommand
或HystrixObservableCommand
來執行:
很重要的一點是執行fallback
邏輯的command
須要在一個不一樣的線程池中執行, 不然若是原command
的延遲變高且其所在線程池已經滿了的話, 執行fallback
邏輯的command
將沒法在同一個線程池中執行.
下面的代碼展現了CommandWithFallbackViaNetwork
如何在getFallback()
方法中執行FallbackViaNetwork
.
注意, FallbackViaNetwork
一樣也具備回退機制, 這裏經過返回null
來實現fail silent
.
FallbackViaNetwork
默認會從HystrixCommandGroupKey
中繼承線程池的配置RemoteServiceX
, 所以須要在其構造器中注入HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")
來使其在不一樣的線程池中執行.
這樣, CommandWithFallbackViaNetwork
會在名爲RemoteServiceX
的線程池中執行, 而FallbackViaNetwork
會在名爲RemoteServiceXFallback
的線程池中執行.
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> { private final int id; protected CommandWithFallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand"))); this.id = id; } @Override protected String run() { // RemoteServiceXClient.getValue(id); throw new RuntimeException("force failure for example"); } @Override protected String getFallback() { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand<String> { private final int id; public FallbackViaNetwork(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand")) // use a different threadpool for the fallback command // so saturating the RemoteServiceX pool won't prevent // fallbacks from executing .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback"))); this.id = id; } @Override protected String run() { MemCacheClient.getValue(id); } @Override protected String getFallback() { // the fallback also failed // so this fallback-of-a-fallback will // fail silently and return null return null; } } }
有些系統可能具備是以雙系統模式搭建的 — 主從模式或主備模式.
有時從系統或備用系統會被認爲是失敗狀態的一種, 僅在執行fallback
邏輯是才使用它;這種場景和Cache via Network
一節中描述的場景是同樣的.
然而, 若是切換到從系統是一個很正常時, 例如發佈新代碼時(這是有狀態的系統發佈代碼的一種方式), 此時每當切換到從系統使用時, 主系統都是處於不可用狀態,斷路器將會打開且發出警報.
這並非咱們指望發生的事, 這種狼來了
式的警報可能會致使真正發生問題的時候咱們卻把它當成正常的誤報而忽略了.
所以, 咱們能夠經過在其前面放置一個門面HystrixCommand
(見下文), 將主/從系統的切換視爲正常的、健康的狀態.
主從HystrixCommand
都是須要訪問網絡且實現了特定的業務邏輯, 所以其實現上應該是線程隔離的. 它們可能具備顯著的性能差距(一般從系統是一個靜態緩存), 所以將兩個command
隔離的另外一個好處是能夠針對性地調優.
你不須要將這兩個command
都公開發布, 只須要將它們隱藏在另外一個由信號量隔離的HystrixCommand
中(稱之爲門面HystrixCommand
), 在這個command
中去實現主系統仍是從系統的調用選擇. 只有當主從系統都失敗了, 纔會去執行這個門面command
的fallback
邏輯.
門面HystrixCommand
可使用信號量隔離的, 由於其業務邏輯僅僅是調用另外兩個線程隔離的HystrixCommand
, 它不涉及任何的網絡訪問、重試等容易出錯的事, 所以不必將這部分代碼放到其餘線程去執行.
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> { private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true); private final int id; public CommandFacadeWithPrimarySecondary(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand")) .andCommandPropertiesDefaults( // we want to default to semaphore-isolation since this wraps // 2 others commands that are already thread isolated HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { if (usePrimary.get()) { return new PrimaryCommand(id).execute(); } else { return new SecondaryCommand(id).execute(); } } @Override protected String getFallback() { return "static-fallback-" + id; } @Override protected String getCacheKey() { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand<String> { private final int id; private PrimaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand")) .andCommandPropertiesDefaults( // we default to a 600ms timeout for primary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600))); this.id = id; } @Override protected String run() { // perform expensive 'primary' service call return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand<String> { private final int id; private SecondaryCommand(int id) { super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX")) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand")) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand")) .andCommandPropertiesDefaults( // we default to a 100ms timeout for secondary HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100))); this.id = id; } @Override protected String run() { // perform fast 'secondary' service call return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true); assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false); assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } } }
當你使用HystrixCommand
實現的業務邏輯不涉及到網絡訪問、對延遲敏感且沒法接受多線程帶來的開銷時, 你須要設置executionIsolationStrategy
)屬性的值爲ExecutionIsolationStrategy
.SEMAPHORE
, 此時Hystrix
會使用信號量隔離代替線程隔離.
下面的代碼展現瞭如何爲command
設置該屬性(也能夠在運行時動態改變這個屬性的值):
public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> { private final int id; public CommandUsingSemaphoreIsolation(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) // since we're doing an in-memory cache lookup we choose SEMAPHORE isolation .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this.id = id; } @Override protected String run() { // a real implementation would retrieve data from in memory data structure return "ValueFromHashMap_" + id; } }
Get-Set-Get是指: Get請求的結果被緩存下來後, 另外一個command
對同一個資源發出了Set請求, 此時由Get請求緩存的結果應該失效, 避免隨後的Get請求獲取到過期的緩存結果, 此時能夠經過調用HystrixRequestCache.clear()
)方法來使緩存失效.
public class CommandUsingRequestCacheInvalidation { /* represents a remote data store */ private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_"; public static class GetterCommand extends HystrixCommand<String> { private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand"); private final int id; public GetterCommand(int id) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet")) .andCommandKey(GETTER_KEY)); this.id = id; } @Override protected String run() { return prefixStoredOnRemoteDataStore + id; } @Override protected String getCacheKey() { return String.valueOf(id); } /** * Allow the cache to be flushed for this object. * * @param id * argument that would normally be passed to the command */ public static void flushCache(int id) { HystrixRequestCache.getInstance(GETTER_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id)); } } public static class SetterCommand extends HystrixCommand<Void> { private final int id; private final String prefix; public SetterCommand(int id, String prefix) { super(HystrixCommandGroupKey.Factory.asKey("GetSetGet")); this.id = id; this.prefix = prefix; } @Override protected Void run() { // persist the value against the datastore prefixStoredOnRemoteDataStore = prefix; // flush the cache GetterCommand.flushCache(id); // no return value return null; } } }
@Test public void getGetSetGet() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertEquals("ValueBeforeSet_1", new GetterCommand(1).execute()); GetterCommand commandAgainstCache = new GetterCommand(1); assertEquals("ValueBeforeSet_1", commandAgainstCache.execute()); // confirm it executed against cache the second time assertTrue(commandAgainstCache.isResponseFromCache()); // set the new value new SetterCommand(1, "ValueAfterSet_").execute(); // fetch it again GetterCommand commandAfterSet = new GetterCommand(1); // the getter should return with the new prefix, not the value from cache assertFalse(commandAfterSet.isResponseFromCache()); assertEquals("ValueAfterSet_1", commandAfterSet.execute()); } finally { context.shutdown(); } } }
若是你要遷移一個已有的客戶端庫到Hystrix
, 你應該將全部的服務方法(service methods
)替換成HystrixCommand
.
服務方法(service methods
)轉而調用HystrixCommand
且不在包含任何額外的業務邏輯.
所以, 在遷移以前, 一個服務庫多是這樣的:
遷移完成以後, 服務庫的用戶要能直接訪問到HystrixCommand
, 或者經過服務門面(service facade
)的代理間接訪問到HystrixCommand
.