Hystrix拾遺

HystrixCommand 與 HystrixObservableCommand 區別

HystrixCommand 的命令邏輯寫在run() HystrixObservableCommand的命令邏輯寫在construct()java

HystrixCommand的run()是由新建立的線程執行(非阻塞) HystrixObservableCommand的construct()是由調用程序線程執行(阻塞)併發

HystrixCommand一個實例只能向調用程序發送單條數據 HystrixObservableCommand一個實例能夠順序發送多條數據異步

重要方法

execute 與 queue

這2個方法是隻有HystrixCommand纔有的,execute是阻塞的,queue是非阻塞的。ide

observe 與 toObservable

這2個方法是HystrixCommand 與 HystrixObservableCommand都有的。函數

不過對於HystrixCommand 與 HystrixObservableCommand這2個方法的表現是不太同樣的。高併發

咱們知道HystrixCommand邏輯在run中,HystrixObservableCommand的邏輯在construct中,observe方法觸發run是非阻塞的方式,就是新的線程執行run,而觸發construct則是阻塞方式,就是調用線程執行construct。學習

使用toObservable和observe方法不太同樣,toObserve方法自己不會觸發run或者construct方法,而是要在subscribe的時候觸發這run或者construct方法。觸發的方式和observe同樣,對於run使用新線程非阻塞的方式,對於construct使用調用線程阻塞的方式。測試

GitHub上對於observe和toObservable區別介紹的確有點繞,其實就是,observe無論有沒有訂閱者都會執行run或者construct,toObserve只有有訂閱者的時候纔會執行run或者construct的方法。this

observe和toObserve用的很少,只有有多條結果不是一塊兒返回的時候纔會用到。.net

HystrixCommand同步與異步測試

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;

import java.util.concurrent.TimeUnit;

public class ASHystrixCommand extends HystrixCommand<String>{

    public ASHystrixCommand() {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("as")).
                andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(6000)));
    }

    @Override
    protected String run() throws Exception {
        TimeUnit.SECONDS.sleep(5);
        return "done";
    }

    @Override
    protected String getFallback() {
        return "as failure";
    }

}

上面我把Hystrix的執行超時時間設置爲6秒,主要是爲了在run中模擬長時間運行,異步調用給調用線程執行機會。

下面是測試代碼:

import org.junit.Test;

import java.util.concurrent.Future;

public class ASHystrixCommandTest {

    private ASHystrixCommand command = new ASHystrixCommand();

    @Test
    public void Syn() throws Exception {
        System.out.println(command.execute());
        System.out.println("do other thing");
    }

    @Test
    public void Asyn() throws Exception {
        Future<String> queue = command.queue();
        System.out.println("do other thing");
        System.out.println(queue.get());
    }

}

咱們能夠看到Syn測試的輸出是:

done do other thing

說明execute是同步執行的。

而Asyn測試的輸出結果是:

do other thing done

說明Asyn是異步執行的,queue執行命令是非阻塞的,Future.get纔會阻塞

HystrixObservableCommand 測試

HystrixObservableCommand依賴RxJava,想要使用好的能夠先學習一下RxJava,不過不了解也是沒有關係的,這裏簡單介紹幾個名詞: Observable: 被觀察者 Observer:觀察者 Flowable :被觀察者 Subscriber :觀察者

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ObservableCommand extends HystrixObservableCommand<String>{

    private String name;

    public ObservableCommand(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("obs")).
                andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(6000)));
        this.name = name;
    }

    public Observable<String> resumeWithFallback(){
        return null;
    }

    @Override
    protected Observable<String> construct() {
        System.out.println("construct");
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> observer) {
                try {
                    if (!observer.isUnsubscribed()) {
                        observer.onNext("Hello");
                        observer.onNext(name + " !");
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        } ).subscribeOn(Schedulers.io());
    }
}

HystrixObservableCommand的邏輯是在construct中,失敗函數也和HystrixCommand的getFailBack不一樣,而是使用的resumeWithFallback。

測試類:

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.observables.BlockingObservable;

public class ObservableCommandTest {

    @Test
    public void obs(){
        ObservableCommand observableCommand = new ObservableCommand("allen");
        Observable<String> observe = observableCommand.observe();
//        BlockingObservable<String> stringBlockingObservable = observe.toBlocking();
//        Iterator<String> iterator = stringBlockingObservable.getIterator();
//        while(iterator.hasNext()) {
//            System.out.println(iterator.next());
//        }

        observe.subscribe(new Observer<String>() {
             @Override
            public void onCompleted() {
                System.out.println("obs completed");
            }

             @Override
            public void onError(Throwable e) {
                System.out.println("obs error");
                e.printStackTrace();
            }

             @Override
            public void onNext(String v) {
                System.out.println("obs onNext: " + v);
            }

        });

    }

    @Test
    public void obsWithoutSub(){
        ObservableCommand observableCommand = new ObservableCommand("allen");
        Observable<String> observe = observableCommand.observe();
    }

    @Test
    public void tobs(){
        ObservableCommand observableCommand = new ObservableCommand("allen");
        Observable<String> observe = observableCommand.toObservable();
//        BlockingObservable<String> stringBlockingObservable = observe.toBlocking();
//        Iterator<String> iterator = stringBlockingObservable.getIterator();
//        while(iterator.hasNext()) {
//            System.out.println(iterator.next());
//        }

        observe.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("obs completed");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("obs error");
                e.printStackTrace();
            }

            @Override
            public void onNext(String v) {
                System.out.println("obs onNext: " + v);
            }

        });
    }

    @Test
    public void tobsWithoutSub(){
        ObservableCommand observableCommand = new ObservableCommand("allen");
        Observable<String> observe = observableCommand.toObservable();
    }

    @Test
    public void action(){
        ObservableCommand observableCommand = new ObservableCommand("allen");
        Observable<String> observe = observableCommand.toObservable();

        observe.subscribe(new Action1<String>() {
            @Override
            public void call(String v) {
                System.out.println("action1 call: " + v);
            }

        });
    }

    @Test
    public void blocking() {
        ObservableCommand observableCommand = new ObservableCommand("allen");
        Observable<String> observe = observableCommand.observe();
        BlockingObservable<String> stringBlockingObservable = observe.toBlocking();
        System.out.println("before");
        System.out.println(stringBlockingObservable.single());
    }

}

上面主要測試了observe和toObserve沒有訂閱者的時候的執行狀況: 從輸出咱們能夠看到,當toObserve沒有訂閱者的時候是不會執行construct的,而observe無論有沒有訂閱者都會執行construct。

Observable的subscribe能夠接受的類型有Observer,這個監聽了出現異常和完成事件,若是不關心這2個事件,也能夠使用Action1。

參考

Hystrix 各個配置

Hystrix實例與高併發測試

相關文章
相關標籤/搜索