Hystrix請求命令 HystrixCommand、HystrixObservableCommand

Hystrix有兩個請求命令 HystrixCommand、HystrixObservableCommand。java

  HystrixCommand用在依賴服務返回單個操做結果的時候。又兩種執行方式git

    -execute():同步執行。從依賴的服務返回一個單一的結果對象,或是在發生錯誤的時候拋出異常。github

    -queue();異步執行。直接返回一個Future對象,其中包含了服務執行結束時要返回的單一結果對象。web

    

  HystrixObservableCommand 用在依賴服務返回多個操做結果的時候。它也實現了兩種執行方式spring

    -observe():返回Obervable對象,他表明了操做的多個結果,他是一個HotObservableapp

    -toObservable():一樣返回Observable對象,也表明了操做多個結果,但它返回的是一個Cold Observable異步

HystrixCommand:

 使用方式一:繼承的方式

package org.hope.hystrix.example;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixRequestCache;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.client.RestTemplate;

/**
 * Created by lisen on 2017/12/15.
 * HystrixCommand用在命令服務返回單個操做結果的時候
 */
public class CommandHelloWorld extends HystrixCommand<String> {
    private final String name;

    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        int i = 1/0;
        return "Hello " + name + "!";
    }

    /**
     * 降級。Hystrix會在run()執行過程當中出現錯誤、超時、線程池拒絕、斷路器熔斷等狀況時,
     * 執行getFallBack()方法內的邏輯
     */
    @Override
    protected String getFallback() {
        return "faild";
    }
}

單元測試:

package org.hope.hystrix.example;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action1;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static org.junit.Assert.assertEquals;

/**
 * Created by lisen on 2017/12/15.
 *
 */
public class CommandHelloWorldTest {

    /**
     * 測試同步執行
     */
    @Test
    public void testSynchronous() {
        System.out.println(new CommandHelloWorld("World").execute());
    }

    /**
     * 測試異步執行
     */
    @Test
    public void testAsynchronous() throws ExecutionException, InterruptedException {
        Future<String> fWorld = new CommandHelloWorld("World").queue();
        System.out.println(fWorld.get());  //一步執行用get()來獲取結果
    }

    /**
     * 雖然HystrixCommand具有了observe()和toObservable()的功能,可是它的實現有必定的侷限性,
     * 它返回的Observable只能發射一次數據,因此Hystrix還提供了HystrixObservableCommand,
     * 經過它實現的命令能夠獲取能發屢次的Observable
     */
    @Test
    public void testObserve() {
        /**
         * 返回的是Hot Observable,HotObservable,不論 「事件源」 是否有「訂閱者」
         * 都會在建立後對事件進行發佈。因此對於Hot Observable的每個「訂閱者」都有
         * 可能從「事件源」的中途開始的,並可能只是看到了整個操做的局部過程
         */
        //blocking
        Observable<String> ho = new CommandHelloWorld("World").observe();
//        System.out.println(ho.toBlocking().single());

        //non-blockking
        //- this is a verbose anonymous inner-class approach and doesn't do assertions
        ho.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("==============onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(String s) {
                System.out.println("=========onNext: " + s);
            }
        });

        ho.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("==================call:" + s);
            }
        });
    }

    @Test
    public void testToObservable() {
        /**
         * Cold Observable在沒有 「訂閱者」 的時候並不會發佈時間,
         * 而是進行等待,知道有 「訂閱者」 以後才發佈事件,因此對於
         * Cold Observable的訂閱者,它能夠保證從一開始看到整個操做的所有過程。
         */
        Observable<String> co = new CommandHelloWorld("World").toObservable();
        System.out.println(co.toBlocking().single());
    }

}

使用方式二:註解的方式;

package org.hope.hystrix.example.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode;
import com.netflix.hystrix.contrib.javanica.command.AsyncResult;
import org.springframework.stereotype.Service;
import rx.Observable;
import rx.Subscriber;

import java.util.concurrent.Future;

/**
 * 用@HystrixCommand的方式來實現
 */
@Service
public class UserService {
    /**
     * 同步的方式。
     * fallbackMethod定義降級
     */
    @HystrixCommand(fallbackMethod = "helloFallback")
    public String getUserId(String name) {
        int i = 1/0; //此處拋異常,測試服務降級
        return "你好:" + name;
    }

    public String helloFallback(String name) {
        return "error";
    }

    //異步的執行
    @HystrixCommand(fallbackMethod = "getUserNameError")
    public Future<String> getUserName(final Long id) {
        return new AsyncResult<String>() {
            @Override
            public String invoke() {
                int i = 1/0;//此處拋異常,測試服務降級
                return "小明:" + id;
            }
        };
    }

    public String getUserNameError(Long id) {
        return "faile";
    }
}

 

單元測試:ide

package org.hope.hystrix.example.service;

import javafx.application.Application;
import org.hope.hystrix.example.HystrixApplication;
import org.hope.hystrix.example.model.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.concurrent.ExecutionException;


@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = HystrixApplication.class)
public class UserServiceTest {

    @Autowired
    private UserService userService;

    /**
     * 測試同步
     */
    @Test
    public void testGetUserId() {
        System.out.println("=================" + userService.getUserId("lisi"));
    }

    /**
     * 測試異步
     */
    @Test
    public void testGetUserName() throws ExecutionException, InterruptedException {
        System.out.println("=================" + userService.getUserName(30L).get());
    }
}

 

 


 

HystrixObservableCommand:

使用方式一:繼承的方式

   HystrixObservable經過實現 protected Observable<String> construct() 方法來執行邏輯。經過 重寫 resumeWithFallback方法來實現服務降級微服務

package org.hope.hystrix.example;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ObservableCommandHelloWorld extends HystrixObservableCommand<String> {

    private final String name;

    public ObservableCommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if(!subscriber.isUnsubscribed()) {
                        subscriber.onNext("Hello");
                        int i = 1 / 0; //模擬異常
                        subscriber.onNext(name + "!");
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.io());
    }

    /**
     * 服務降級
     */
    @Override
    protected Observable<String> resumeWithFallback() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext("失敗了!");
                        subscriber.onNext("找大神來排查一下吧!");
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.io());
    }
}

 

 單元測試:單元測試

package org.hope.hystrix.example;

import org.junit.Test;
import rx.Observable;

import java.util.Iterator;

public class ObservableCommandHelloWorldTest {

    @Test
    public void testObservable() {
        Observable<String> observable= new ObservableCommandHelloWorld("World").observe();

        Iterator<String> iterator = observable.toBlocking().getIterator();
        while(iterator.hasNext()) {
            System.out.println(iterator.next());
        }
    }

    @Test
    public void testToObservable() {
        Observable<String> observable= new ObservableCommandHelloWorld("World").observe();
        Iterator<String> iterator = observable.toBlocking().getIterator();
        while(iterator.hasNext()) {
            System.out.println(iterator.next());
        }
    }

}

使用方式二:註解的方式;

package org.hope.hystrix.example.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode;
import org.springframework.stereotype.Service;
import rx.Observable;
import rx.Subscriber;

@Service
public class ObservableUserService {
    /**
     *  EAGER參數表示使用observe()方式執行
     */
    @HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER, fallbackMethod = "observFailed") //使用observe()執行方式
    public Observable<String> getUserById(final Long id) {
       return Observable.create(new Observable.OnSubscribe<String>() {
           @Override
           public void call(Subscriber<? super String> subscriber) {
               try {
                   if(!subscriber.isUnsubscribed()) {
                       subscriber.onNext("張三的ID:");
                       int i = 1 / 0; //拋異常,模擬服務降級
                       subscriber.onNext(String.valueOf(id));
                       subscriber.onCompleted();
                   }
               } catch (Exception e) {
                   subscriber.onError(e);
               }
           }
       });
    }

    private String observFailed(Long id) {
        return "observFailed---->" + id;
    }

    /**
     * LAZY參數表示使用toObservable()方式執行
     */
    @HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY, fallbackMethod = "toObserbableError") //表示使用toObservable()執行方式
    public Observable<String> getUserByName(final String name) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if(!subscriber.isUnsubscribed()) {
                        subscriber.onNext("找到");
                        subscriber.onNext(name);
                        int i = 1/0; ////拋異常,模擬服務降級
                        subscriber.onNext("了");
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });
    }

    private String toObserbableError(String name) {
        return "toObserbableError--->" + name;
    }

}

 

單元測試:

package org.hope.hystrix.example.service;

import org.hope.hystrix.example.HystrixApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.Iterator;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = HystrixApplication.class)
public class ObservableUserServiceTest {

    @Autowired
    private ObservableUserService observableUserService;

    @Test
    public void testObserve() {
        Iterator<String> iterator = observableUserService.getUserById(30L).toBlocking().getIterator();
        while(iterator.hasNext()) {
            System.out.println("===============" + iterator.next());
        }
    }

    @Test
    public void testToObservable() {
        Iterator<String> iterator = observableUserService.getUserByName("王五").toBlocking().getIterator();
        while(iterator.hasNext()) {
            System.out.println("===============" + iterator.next());
        }
    }
}

 

 

總結:

  在實際使用時,咱們須要爲大多數執行過程當中可能會失敗的Hystrix命令實現服務降級邏輯,可是也有一些狀況能夠不去實現降級邏輯,好比:

  執行寫操做的命令:

  執行批處理或離線計算的命令:

 https://gitee.com/huayicompany/Hystrix-learn/tree/master/hystrix-example

參考:

 [1]Github,https://github.com/Netflix/Hystrix/wiki/How-it-Works

 [2] 《SpringCloud微服務實戰》,電子工業出版社,翟永超

相關文章
相關標籤/搜索