Hystrix有兩個請求命令 HystrixCommand、HystrixObservableCommand。java
HystrixCommand用在依賴服務返回單個操做結果的時候。又兩種執行方式git
-execute():同步執行。從依賴的服務返回一個單一的結果對象,或是在發生錯誤的時候拋出異常。github
-queue();異步執行。直接返回一個Future對象,其中包含了服務執行結束時要返回的單一結果對象。web
HystrixObservableCommand 用在依賴服務返回多個操做結果的時候。它也實現了兩種執行方式spring
-observe():返回Obervable對象,他表明了操做的多個結果,他是一個HotObservableapp
-toObservable():一樣返回Observable對象,也表明了操做多個結果,但它返回的是一個Cold Observable。異步
package org.hope.hystrix.example; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixRequestCache; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.client.RestTemplate; /** * Created by lisen on 2017/12/15. * HystrixCommand用在命令服務返回單個操做結果的時候 */ public class CommandHelloWorld extends HystrixCommand<String> { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() throws Exception { int i = 1/0; return "Hello " + name + "!"; } /** * 降級。Hystrix會在run()執行過程當中出現錯誤、超時、線程池拒絕、斷路器熔斷等狀況時, * 執行getFallBack()方法內的邏輯 */ @Override protected String getFallback() { return "faild"; } }
package org.hope.hystrix.example; import org.junit.Test; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.functions.Action1; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; /** * Created by lisen on 2017/12/15. * */ public class CommandHelloWorldTest { /** * 測試同步執行 */ @Test public void testSynchronous() { System.out.println(new CommandHelloWorld("World").execute()); } /** * 測試異步執行 */ @Test public void testAsynchronous() throws ExecutionException, InterruptedException { Future<String> fWorld = new CommandHelloWorld("World").queue(); System.out.println(fWorld.get()); //一步執行用get()來獲取結果 } /** * 雖然HystrixCommand具有了observe()和toObservable()的功能,可是它的實現有必定的侷限性, * 它返回的Observable只能發射一次數據,因此Hystrix還提供了HystrixObservableCommand, * 經過它實現的命令能夠獲取能發屢次的Observable */ @Test public void testObserve() { /** * 返回的是Hot Observable,HotObservable,不論 「事件源」 是否有「訂閱者」 * 都會在建立後對事件進行發佈。因此對於Hot Observable的每個「訂閱者」都有 * 可能從「事件源」的中途開始的,並可能只是看到了整個操做的局部過程 */ //blocking Observable<String> ho = new CommandHelloWorld("World").observe(); // System.out.println(ho.toBlocking().single()); //non-blockking //- this is a verbose anonymous inner-class approach and doesn't do assertions ho.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("==============onCompleted"); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String s) { System.out.println("=========onNext: " + s); } }); ho.subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println("==================call:" + s); } }); } @Test public void testToObservable() { /** * Cold Observable在沒有 「訂閱者」 的時候並不會發佈時間, * 而是進行等待,知道有 「訂閱者」 以後才發佈事件,因此對於 * Cold Observable的訂閱者,它能夠保證從一開始看到整個操做的所有過程。 */ Observable<String> co = new CommandHelloWorld("World").toObservable(); System.out.println(co.toBlocking().single()); } }
package org.hope.hystrix.example.service; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode; import com.netflix.hystrix.contrib.javanica.command.AsyncResult; import org.springframework.stereotype.Service; import rx.Observable; import rx.Subscriber; import java.util.concurrent.Future; /** * 用@HystrixCommand的方式來實現 */ @Service public class UserService { /** * 同步的方式。 * fallbackMethod定義降級 */ @HystrixCommand(fallbackMethod = "helloFallback") public String getUserId(String name) { int i = 1/0; //此處拋異常,測試服務降級 return "你好:" + name; } public String helloFallback(String name) { return "error"; } //異步的執行 @HystrixCommand(fallbackMethod = "getUserNameError") public Future<String> getUserName(final Long id) { return new AsyncResult<String>() { @Override public String invoke() { int i = 1/0;//此處拋異常,測試服務降級 return "小明:" + id; } }; } public String getUserNameError(Long id) { return "faile"; } }
單元測試:ide
package org.hope.hystrix.example.service; import javafx.application.Application; import org.hope.hystrix.example.HystrixApplication; import org.hope.hystrix.example.model.User; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.concurrent.ExecutionException; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = HystrixApplication.class) public class UserServiceTest { @Autowired private UserService userService; /** * 測試同步 */ @Test public void testGetUserId() { System.out.println("=================" + userService.getUserId("lisi")); } /** * 測試異步 */ @Test public void testGetUserName() throws ExecutionException, InterruptedException { System.out.println("=================" + userService.getUserName(30L).get()); } }
HystrixObservable經過實現 protected Observable<String> construct() 方法來執行邏輯。經過 重寫 resumeWithFallback方法來實現服務降級微服務
package org.hope.hystrix.example; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixObservableCommand; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ObservableCommandHelloWorld extends HystrixObservableCommand<String> { private final String name; public ObservableCommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable<String> construct() { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { try { if(!subscriber.isUnsubscribed()) { subscriber.onNext("Hello"); int i = 1 / 0; //模擬異常 subscriber.onNext(name + "!"); subscriber.onCompleted(); } } catch (Exception e) { subscriber.onError(e); } } }).subscribeOn(Schedulers.io()); } /** * 服務降級 */ @Override protected Observable<String> resumeWithFallback() { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { try { if (!subscriber.isUnsubscribed()) { subscriber.onNext("失敗了!"); subscriber.onNext("找大神來排查一下吧!"); subscriber.onCompleted(); } } catch (Exception e) { subscriber.onError(e); } } }).subscribeOn(Schedulers.io()); } }
單元測試:單元測試
package org.hope.hystrix.example; import org.junit.Test; import rx.Observable; import java.util.Iterator; public class ObservableCommandHelloWorldTest { @Test public void testObservable() { Observable<String> observable= new ObservableCommandHelloWorld("World").observe(); Iterator<String> iterator = observable.toBlocking().getIterator(); while(iterator.hasNext()) { System.out.println(iterator.next()); } } @Test public void testToObservable() { Observable<String> observable= new ObservableCommandHelloWorld("World").observe(); Iterator<String> iterator = observable.toBlocking().getIterator(); while(iterator.hasNext()) { System.out.println(iterator.next()); } } }
package org.hope.hystrix.example.service; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode; import org.springframework.stereotype.Service; import rx.Observable; import rx.Subscriber; @Service public class ObservableUserService { /** * EAGER參數表示使用observe()方式執行 */ @HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER, fallbackMethod = "observFailed") //使用observe()執行方式 public Observable<String> getUserById(final Long id) { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { try { if(!subscriber.isUnsubscribed()) { subscriber.onNext("張三的ID:"); int i = 1 / 0; //拋異常,模擬服務降級 subscriber.onNext(String.valueOf(id)); subscriber.onCompleted(); } } catch (Exception e) { subscriber.onError(e); } } }); } private String observFailed(Long id) { return "observFailed---->" + id; } /** * LAZY參數表示使用toObservable()方式執行 */ @HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY, fallbackMethod = "toObserbableError") //表示使用toObservable()執行方式 public Observable<String> getUserByName(final String name) { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { try { if(!subscriber.isUnsubscribed()) { subscriber.onNext("找到"); subscriber.onNext(name); int i = 1/0; ////拋異常,模擬服務降級 subscriber.onNext("了"); subscriber.onCompleted(); } } catch (Exception e) { subscriber.onError(e); } } }); } private String toObserbableError(String name) { return "toObserbableError--->" + name; } }
單元測試:
package org.hope.hystrix.example.service; import org.hope.hystrix.example.HystrixApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.Iterator; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = HystrixApplication.class) public class ObservableUserServiceTest { @Autowired private ObservableUserService observableUserService; @Test public void testObserve() { Iterator<String> iterator = observableUserService.getUserById(30L).toBlocking().getIterator(); while(iterator.hasNext()) { System.out.println("===============" + iterator.next()); } } @Test public void testToObservable() { Iterator<String> iterator = observableUserService.getUserByName("王五").toBlocking().getIterator(); while(iterator.hasNext()) { System.out.println("===============" + iterator.next()); } } }
在實際使用時,咱們須要爲大多數執行過程當中可能會失敗的Hystrix命令實現服務降級邏輯,可是也有一些狀況能夠不去實現降級邏輯,好比:
執行寫操做的命令:
執行批處理或離線計算的命令:
https://gitee.com/huayicompany/Hystrix-learn/tree/master/hystrix-example
[1]Github,https://github.com/Netflix/Hystrix/wiki/How-it-Works
[2] 《SpringCloud微服務實戰》,電子工業出版社,翟永超