下面是一個經過實現接口 HystrixCommand
的一個Hello World 示例: html
public class HystrixHelloWorld extends HystrixCommand<String> { private final String name; public HystrixHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { return "Hello " + name + "!"; } }
點擊查看詳細源碼 java
HystrixObservableCommand
等價於 HystrixCommand
react
一個等效的Hello World解決方案,使用 HystrixObservableCommand
代替 HystrixCommand
,經過覆蓋 construct
方法,以下所示: git
public class HystrixObservableHelloWorld extends HystrixObservableCommand<String> { private final String name; public HystrixObservableHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable<String> construct() { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
經過 execute()
方法同步調用 HystrixCommand
的實現,示例以下:github
String s = new HystrixHelloWorld("World").execute();
單元測試以下:異步
@Test public void testSynchronous() { //測試同步 assertEquals("Hello World!", new HystrixHelloWorld("World").execute()); assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").execute()); }
關於實現了HystrixObservableCommand
的方法示例以下:ide
關於HystrixObservableCommand
的實現沒有簡單的 execute
方法調用,若是清楚經過一個命令產生的 Observable
一定僅僅產生一個單一的值,則能夠對 Observable
應用RXjava的操做 .toBlocking().toFuture().get()
模擬 execute
方法。單元測試
咱們能夠經過使用 queue()
方法異步執行 HystrixCommand
,示例以下: 測試
Future<String> fWorld = new HystrixHelloWorld("World").queue();
咱們能夠經過 Future獲取到命令的結果集this
String fw=fWorld.get();
經過單元測試模擬操做以下:
@Test public void testAsynchronous1() throws Exception { //測試異步 assertEquals("Hello World!", new HystrixHelloWorld("World").queue().get()); assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").queue().get()); } @Test public void testAsynchronous2() throws Exception { //測試異步 Future<String> fWorld = new HystrixHelloWorld("World").queue(); Future<String> fBob = new HystrixHelloWorld("Bob").queue(); assertEquals("Hello World!", fWorld.get()); assertEquals("Hello Bob!", fBob.get()); }
下面的操做是等價的:
String s1 = new HystrixHelloWorld("World").execute(); String s2 = new HystrixHelloWorld("World").queue().get();
關於實現了HystrixObservableCommand
的方法示例以下:
HystrixObservableCommand
沒有 queue
這種簡單實現異步的方法 ,若是清楚經過一個命令產生的 Observable
一定僅僅產生一個單一的值,則能夠對 Observable
應用RxJava操做 .toBlocking().toFuture()
模擬 queue
方法。
能夠經過一下任意方法監聽 HystrixCommand
的結果:
observe()
— 執行這個命令會返回一個熱 Observable馬上執行hystrix的命令 ,由於這個 Observable 經過 ReplaySubject
過濾,我們不會有丟失訂閱以前的任何東西的危險。toObservable()
— 執行這個命令會返回一個「冷「 Observable,直到訂閱 Observable 纔會開始執行命令和發送結果 。Observable<String> fWorld = new HystrixHelloWorld("World").observe();
執行完上面的代碼,咱們能夠經過訂閱 Observable 獲取到它的值
fWorld.subscribe(new Action1<String>() { @Override public void call(String s) { // value emitted here } });
下面的單元測試示例以下:
@Test public void testObservable() throws Exception { Observable<String> fWorld = new HystrixHelloWorld("World").observe(); Observable<String> fBob = new HystrixHelloWorld("Bob").observe(); // blocking assertEquals("Hello World!", fWorld.toBlocking().single()); assertEquals("Hello Bob!", fBob.toBlocking().single()); fWorld.subscribe(new Observer<String>() { @Override public void onCompleted() { // 這裏能夠什麼都不作 } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("onNext: " + v); } }); fBob.subscribe(new Action1<String>() { @Override public void call(String v) { System.out.println("onNext: " + v); } }); }
使用java8的 lambda 表達式,示例以下:
fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }) // - or while also including error handling fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }, (exception) -> { exception.printStackTrace(); })
點擊連接瞭解更多關於RXjava中 的 Observable