教程:一塊兒學習Hystrix--Hystrix命令執行

目錄

  • 「Hello World!」
  • 同步執行
  • 異步執行
  • 響應執行

「Hello World!」

    下面是一個經過實現接口 HystrixCommand的一個Hello World 示例:   html

public class HystrixHelloWorld extends HystrixCommand<String> {
    private final String name;

    public HystrixHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        return "Hello " + name + "!";
    }
}

   點擊查看詳細源碼   java

HystrixObservableCommand 等價於 HystrixCommandreact

    一個等效的Hello World解決方案,使用 HystrixObservableCommand 代替 HystrixCommand ,經過覆蓋 construct 方法,以下所示:   git

public class HystrixObservableHelloWorld extends HystrixObservableCommand<String> {
    private final String name;

    public HystrixObservableHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> observer) {
                try {
                    if (!observer.isUnsubscribed()) {
                        // a real example would do work like a network call here
                        observer.onNext("Hello");
                        observer.onNext(name + "!");
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        } ).subscribeOn(Schedulers.io());
    }
}

同步執行

    經過 execute() 方法同步調用 HystrixCommand 的實現,示例以下:github

    

String s = new HystrixHelloWorld("World").execute();

單元測試以下:異步

@Test
        public void testSynchronous() { //測試同步
            assertEquals("Hello World!", new HystrixHelloWorld("World").execute());
            assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").execute());
        }

關於實現了HystrixObservableCommand 的方法示例以下:ide

    關於HystrixObservableCommand 的實現沒有簡單的 execute 方法調用,若是清楚經過一個命令產生的 Observable 一定僅僅產生一個單一的值,則能夠對 Observable 應用RXjava的操做 .toBlocking().toFuture().get() 模擬 execute 方法。單元測試

異步執行

    咱們能夠經過使用 queue() 方法異步執行 HystrixCommand ,示例以下: 測試

Future<String> fWorld = new HystrixHelloWorld("World").queue();

咱們能夠經過 Future獲取到命令的結果集this

String fw=fWorld.get();

經過單元測試模擬操做以下:

@Test
        public void testAsynchronous1() throws Exception { //測試異步
            assertEquals("Hello World!", new HystrixHelloWorld("World").queue().get());
            assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").queue().get());
        }

        @Test
        public void testAsynchronous2() throws Exception { //測試異步

            Future<String> fWorld = new HystrixHelloWorld("World").queue();
            Future<String> fBob = new HystrixHelloWorld("Bob").queue();

            assertEquals("Hello World!", fWorld.get());
            assertEquals("Hello Bob!", fBob.get());
        }

下面的操做是等價的:

String s1 = new HystrixHelloWorld("World").execute();
String s2 = new HystrixHelloWorld("World").queue().get();

    關於實現了HystrixObservableCommand 的方法示例以下:

   HystrixObservableCommand沒有 queue 這種簡單實現異步的方法 ,若是清楚經過一個命令產生的 Observable 一定僅僅產生一個單一的值,則能夠對 Observable 應用RxJava操做 .toBlocking().toFuture() 模擬 queue 方法。

響應執行

    能夠經過一下任意方法監聽 HystrixCommand 的結果:

  • observe() — 執行這個命令會返回一個熱 Observable馬上執行hystrix的命令 ,由於這個 Observable 經過 ReplaySubject 過濾,我們不會有丟失訂閱以前的任何東西的危險。
  • toObservable() — 執行這個命令會返回一個「冷「 Observable,直到訂閱 Observable 纔會開始執行命令和發送結果 。
Observable<String> fWorld = new HystrixHelloWorld("World").observe();

執行完上面的代碼,咱們能夠經過訂閱 Observable 獲取到它的值

fWorld.subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    // value emitted here
                }
            });

下面的單元測試示例以下:

@Test
        public void testObservable() throws Exception {

            Observable<String> fWorld = new HystrixHelloWorld("World").observe();
            Observable<String> fBob = new HystrixHelloWorld("Bob").observe();

            // blocking
            assertEquals("Hello World!", fWorld.toBlocking().single());
            assertEquals("Hello Bob!", fBob.toBlocking().single());

            fWorld.subscribe(new Observer<String>() {

                @Override
                public void onCompleted() {
                    // 這裏能夠什麼都不作
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onNext(String v) {
                    System.out.println("onNext: " + v);
                }

            });
            fBob.subscribe(new Action1<String>() {

                @Override
                public void call(String v) {
                    System.out.println("onNext: " + v);
                }

            });
        }

使用java8的 lambda 表達式,示例以下:

fWorld.subscribe((v) -> {
        System.out.println("onNext: " + v);
    })
    
    // - or while also including error handling
    
    fWorld.subscribe((v) -> {
        System.out.println("onNext: " + v);
    }, (exception) -> {
        exception.printStackTrace();
    })

點擊連接瞭解更多關於RXjava中 的 Observable

相關文章
相關標籤/搜索