HystrixCommand 的命令邏輯寫在run() HystrixObservableCommand的命令邏輯寫在construct()java
HystrixCommand的run()是由新建立的線程執行(非阻塞) HystrixObservableCommand的construct()是由調用程序線程執行(阻塞)併發
HystrixCommand一個實例只能向調用程序發送單條數據 HystrixObservableCommand一個實例能夠順序發送多條數據異步
這2個方法是隻有HystrixCommand纔有的,execute是阻塞的,queue是非阻塞的。ide
這2個方法是HystrixCommand 與 HystrixObservableCommand都有的。函數
不過對於HystrixCommand 與 HystrixObservableCommand這2個方法的表現是不太同樣的。高併發
咱們知道HystrixCommand邏輯在run中,HystrixObservableCommand的邏輯在construct中,observe方法觸發run是非阻塞的方式,就是新的線程執行run,而觸發construct則是阻塞方式,就是調用線程執行construct。學習
使用toObservable和observe方法不太同樣,toObserve方法自己不會觸發run或者construct方法,而是要在subscribe的時候觸發這run或者construct方法。觸發的方式和observe同樣,對於run使用新線程非阻塞的方式,對於construct使用調用線程阻塞的方式。測試
GitHub上對於observe和toObservable區別介紹的確有點繞,其實就是,observe無論有沒有訂閱者都會執行run或者construct,toObserve只有有訂閱者的時候纔會執行run或者construct的方法。this
observe和toObserve用的很少,只有有多條結果不是一塊兒返回的時候纔會用到。.net
import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandProperties; import java.util.concurrent.TimeUnit; public class ASHystrixCommand extends HystrixCommand<String>{ public ASHystrixCommand() { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("as")). andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(6000))); } @Override protected String run() throws Exception { TimeUnit.SECONDS.sleep(5); return "done"; } @Override protected String getFallback() { return "as failure"; } }
上面我把Hystrix的執行超時時間設置爲6秒,主要是爲了在run中模擬長時間運行,異步調用給調用線程執行機會。
下面是測試代碼:
import org.junit.Test; import java.util.concurrent.Future; public class ASHystrixCommandTest { private ASHystrixCommand command = new ASHystrixCommand(); @Test public void Syn() throws Exception { System.out.println(command.execute()); System.out.println("do other thing"); } @Test public void Asyn() throws Exception { Future<String> queue = command.queue(); System.out.println("do other thing"); System.out.println(queue.get()); } }
咱們能夠看到Syn測試的輸出是:
done do other thing
說明execute是同步執行的。
而Asyn測試的輸出結果是:
do other thing done
說明Asyn是異步執行的,queue執行命令是非阻塞的,Future.get纔會阻塞
HystrixObservableCommand依賴RxJava,想要使用好的能夠先學習一下RxJava,不過不了解也是沒有關係的,這裏簡單介紹幾個名詞: Observable: 被觀察者 Observer:觀察者 Flowable :被觀察者 Subscriber :觀察者
import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandProperties; import com.netflix.hystrix.HystrixObservableCommand; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ObservableCommand extends HystrixObservableCommand<String>{ private String name; public ObservableCommand(String name) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("obs")). andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(6000))); this.name = name; } public Observable<String> resumeWithFallback(){ return null; } @Override protected Observable<String> construct() { System.out.println("construct"); return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> observer) { try { if (!observer.isUnsubscribed()) { observer.onNext("Hello"); observer.onNext(name + " !"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
HystrixObservableCommand的邏輯是在construct中,失敗函數也和HystrixCommand的getFailBack不一樣,而是使用的resumeWithFallback。
測試類:
import org.junit.Test; import rx.Observable; import rx.Observer; import rx.functions.Action1; import rx.observables.BlockingObservable; public class ObservableCommandTest { @Test public void obs(){ ObservableCommand observableCommand = new ObservableCommand("allen"); Observable<String> observe = observableCommand.observe(); // BlockingObservable<String> stringBlockingObservable = observe.toBlocking(); // Iterator<String> iterator = stringBlockingObservable.getIterator(); // while(iterator.hasNext()) { // System.out.println(iterator.next()); // } observe.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("obs completed"); } @Override public void onError(Throwable e) { System.out.println("obs error"); e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("obs onNext: " + v); } }); } @Test public void obsWithoutSub(){ ObservableCommand observableCommand = new ObservableCommand("allen"); Observable<String> observe = observableCommand.observe(); } @Test public void tobs(){ ObservableCommand observableCommand = new ObservableCommand("allen"); Observable<String> observe = observableCommand.toObservable(); // BlockingObservable<String> stringBlockingObservable = observe.toBlocking(); // Iterator<String> iterator = stringBlockingObservable.getIterator(); // while(iterator.hasNext()) { // System.out.println(iterator.next()); // } observe.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("obs completed"); } @Override public void onError(Throwable e) { System.out.println("obs error"); e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("obs onNext: " + v); } }); } @Test public void tobsWithoutSub(){ ObservableCommand observableCommand = new ObservableCommand("allen"); Observable<String> observe = observableCommand.toObservable(); } @Test public void action(){ ObservableCommand observableCommand = new ObservableCommand("allen"); Observable<String> observe = observableCommand.toObservable(); observe.subscribe(new Action1<String>() { @Override public void call(String v) { System.out.println("action1 call: " + v); } }); } @Test public void blocking() { ObservableCommand observableCommand = new ObservableCommand("allen"); Observable<String> observe = observableCommand.observe(); BlockingObservable<String> stringBlockingObservable = observe.toBlocking(); System.out.println("before"); System.out.println(stringBlockingObservable.single()); } }
上面主要測試了observe和toObserve沒有訂閱者的時候的執行狀況: 從輸出咱們能夠看到,當toObserve沒有訂閱者的時候是不會執行construct的,而observe無論有沒有訂閱者都會執行construct。
Observable的subscribe能夠接受的類型有Observer,這個監聽了出現異常和完成事件,若是不關心這2個事件,也能夠使用Action1。