Spring Cloud Zuul 在傳統路由模式下使用Hystrix

前言

最近這段時間在公司內部分享了Spring Cloud的一些功能,結合目前公司使用的框架,針對這段時間調研的SC(Spring Cloud)技術,對現有的架構中融入了一些自定義的功能.spring

目前公司使用的架構是Spring Cloud Zuul+dubbo ,使用docker容器,k8s對容器進行管理,咱們主要看Zuul,後面考慮把Hystrix整合進dubbo.由於咱們不是徹底用的sc的一套功能,只是用了sc的網關,使用網關對請求進行路由到服務,雖然sc默認包含有ribbon和hystrix功能,可是隻限於面向服務的路由,而咱們服務沒有整合到eureka中,因此咱們用的是傳統路由模式,路由轉發地址是svc的地址,至關於默認已經有了負載均衡的功能,因此我不須要使用ribbon,只須要使用hystrix.docker

分析

對zuul比較瞭解的同窗知道,zuul使用傳統路由模式和使用面向服務的路由模式分別由SimpleHostRoutingFilterRibbonRoutingFilter兩個route類型的過濾器進行路由的,因此咱們首先須要分析這二者之間的差異apache

SimpleHostRoutingFilter

此過濾器是傳統路由方式的route過濾器,其shouldFilter()方法以下:架構

@Override
public boolean shouldFilter() {
    return RequestContext.getCurrentContext().getRouteHost() != null
            && RequestContext.getCurrentContext().sendZuulResponse();
}

能夠發現,決定是否過濾的關鍵條件是上下文中是否含有傳統路由的URL.因此,此過濾器處理的是傳統路由模式的請求過濾.app

經過查看其構造器,不難發現,其初始化是在ZuulProxyAutoConfiguration 中,傳入了ZuulPropertieszuul的一些配置信息,來初始化這個filter,而後還有一堆http鏈接管理工廠以及一個ProxyRequestHelperbean來處理一些request和response的請求和響應.負載均衡

查看其核心run()方法,不難發現,其主要執行邏輯就是構建了一個httpClient,而後封裝請求信息,執行請求獲取響應而後返回,而且將請求信息設置到RequestContext中,返回請求結果到客戶端,有興趣的同窗能夠看下源碼:框架

@Override
public Object run() {
    RequestContext context = RequestContext.getCurrentContext();
    HttpServletRequest request = context.getRequest();
    MultiValueMap<String, String> headers = this.helper
            .buildZuulRequestHeaders(request);
    MultiValueMap<String, String> params = this.helper
            .buildZuulRequestQueryParams(request);
    String verb = getVerb(request);
    InputStream requestEntity = getRequestBody(request);
    if (getContentLength(request) < 0) {
        context.setChunkedRequestBody();
    }

    String uri = this.helper.buildZuulRequestURI(request);
    this.helper.addIgnoredHeaders();

    try {
        //封裝請求信息,執行請求獲取響應
        CloseableHttpResponse response = forward(this.httpClient, verb, uri, request,
                headers, params, requestEntity);
        //設置請求結果
        setResponse(response);
    }
    catch (Exception ex) {
        throw new ZuulRuntimeException(ex);
    }
    return null;
}

因此,能夠看出之因此傳統路由模式沒有hystrix和ribbon功能,是由於它直接是使用的apache的HttpClient來執行請求轉發,返回結果直接響應,沒有對請求作攔截,也沒有對請求做熔斷處理,簡單粗暴.ide

RibbonRoutingFilter

此過濾器是面向服務路由的過濾器,老規矩,shouldFilter():ui

@Override
public boolean shouldFilter() {
    RequestContext ctx = RequestContext.getCurrentContext();
    return (ctx.getRouteHost() == null && ctx.get(SERVICE_ID_KEY) != null
            && ctx.sendZuulResponse());
}

判斷此過濾器是否執行的關鍵條件是上下文中不含有傳統路由的URL,而且服務id不能爲空.this

再看看其構造器,一樣的,此構造器的初始化也是在ZuulProxyAutoConfiguration中,所不一樣的是,注入了一個RibbonCommandFactory,這個就是RibbonReoutingFilter的核心,看這個名字,用屁股都能想到確定是和ribbon還有hystrix有關.

再看看它的run()方法:

@Override
public Object run() {
    RequestContext context = RequestContext.getCurrentContext();
    this.helper.addIgnoredHeaders();
    try {
        //構建一些請求信息
        RibbonCommandContext commandContext = buildCommandContext(context);
        ClientHttpResponse response = forward(commandContext);
        setResponse(response);
        return response;
    }
    catch (ZuulException ex) {
        throw new ZuulRuntimeException(ex);
    }
    catch (Exception ex) {
        throw new ZuulRuntimeException(ex);
    }
}

這裏看到,主要就是作了一個操做,構建了一個RibbonCommandContext上下文對象,存放了一些serviceId,meothod,uri,header...的一些請求信息而後拿這些信息去執行forward(commandContext),那咱們就看看forward()去解解毒:

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
    Map<String, Object> info = this.helper.debug(context.getMethod(),
            context.getUri(), context.getHeaders(), context.getParams(),
            context.getRequestEntity());

    RibbonCommand command = this.ribbonCommandFactory.create(context);
    try {
        ClientHttpResponse response = command.execute();
        this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
        return response;
    }
    catch (HystrixRuntimeException ex) {
        return handleException(info, ex);
    }

}

這個方法就是用上面的請求信息和構造器傳入的RibbonCommandFactory建立了一個command對象,而後經過此對象去執行操做而後返回response到客戶端.

看起來貌似和以前的SimpleHostRoutingFilter流程差不了太多,都是構建請求信息,而後獲取響應,而後響應到客戶端,那麼爲麼比這個RibbonRoutingFilter就是吊一些呢,又有熔斷又有負載均衡的.因此這裏就能夠破案了,他的這些個功能就是由於這個command對象,下面咱們就去看看這個command究竟是什麼.

public interface RibbonCommand extends HystrixExecutable<ClientHttpResponse> {
}

看到這裏可能有的人就瞭然了,這個HystrixExecutable是爲HystrixCommand設計的接口,主要提供執行命令的抽象方法,例如:execute(),queue(),observe(),因此他纔能有熔斷的功能,那麼負載均衡呢,我猜八成是和上面的RibbonCommandContext有關,由於這個context對象裏面放了serviceId,說明了實例的選擇都是這裏面操做的.

下面咱們去看看這段代碼ribbonCommandFactory.create(context)->org.springframework.cloud.netflix.zuul.filters.route.RestClientRibbonCommandFactory#create,

@Override
@SuppressWarnings("deprecation")
public RestClientRibbonCommand create(RibbonCommandContext context) {
    String serviceId = context.getServiceId();
    //根據serviceId 獲取服務降級的provider,供後面調用服務發生異常的時候調用
    ZuulFallbackProvider fallbackProvider = getFallbackProvider(serviceId);
    RestClient restClient = this.clientFactory.getClient(serviceId,
            RestClient.class);
    return new RestClientRibbonCommand(context.getServiceId(), restClient, context,
            this.zuulProperties, fallbackProvider, clientFactory.getClientConfig(serviceId));
}

這裏獲取了服務降級的provider,建立了發送請求的RestClient,實例化了RestClientRibbonCommand,而且將配置注入進去了這個command.

//RestClientRibbonCommand的構造器
public RestClientRibbonCommand(String commandKey, RestClient client,
                               RibbonCommandContext context, ZuulProperties zuulProperties,
                               ZuulFallbackProvider zuulFallbackProvider, IClientConfig config) {
    super(commandKey, client, context, zuulProperties, zuulFallbackProvider, config);
}
    
    
//AbstractRibbonCommand的構造器,是RestClientRibbonCommand的構造器的抽象父類
protected AbstractRibbonCommand(Setter setter, LBC client,
                             RibbonCommandContext context,
                             ZuulFallbackProvider fallbackProvider, IClientConfig config) {
    super(setter);
    this.client = client;
    this.context = context;
    this.zuulFallbackProvider = fallbackProvider;
    this.config = config;
}

//初始化HystrixCommand的配置信息
protected static Setter getSetter(final String commandKey,ZuulProperties zuulProperties, IClientConfig config) {

    // @formatter:off
    Setter commandSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RibbonCommand"))
                            .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
    final HystrixCommandProperties.Setter setter = createSetter(config, commandKey, zuulProperties);
    if (zuulProperties.getRibbonIsolationStrategy() == ExecutionIsolationStrategy.SEMAPHORE){
        final String name = ZuulConstants.ZUUL_EUREKA + commandKey + ".semaphore.maxSemaphores";
        // we want to default to semaphore-isolation since this wraps
        // 2 others commands that are already thread isolated
        final DynamicIntProperty value = DynamicPropertyFactory.getInstance()
                .getIntProperty(name, zuulProperties.getSemaphore().getMaxSemaphores());
        setter.withExecutionIsolationSemaphoreMaxConcurrentRequests(value.get());
    } else if (zuulProperties.getThreadPool().isUseSeparateThreadPools()) {
        final String threadPoolKey = zuulProperties.getThreadPool().getThreadPoolKeyPrefix() + commandKey;
        commandSetter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
    }

return commandSetter.andCommandPropertiesDefaults(setter);
// @formatter:on

}

這裏能夠看到,在初始化RestClientRibbonCommand時,調用了父類的構造器,而後將HystrixCommand初始化,將zuul的配置信息配置到HystrixCommand中,這裏看到,在Zuul中,Hystrix默認的隔離模式就是信號量隔離,並且commandKey就是serviceId.至此,Hystrix初始化完成,而後經過上文的command.execute()方法來進行調用,也就是調用run()方法,因此,這裏咱們看看:

@Override
protected ClientHttpResponse run() throws Exception {
    final RequestContext context = RequestContext.getCurrentContext();

    RQ request = createRequest();
    RS response;
    
    boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
            && ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
    
    if (retryableClient) {
        response = this.client.execute(request, config);
    } else {
        response = this.client.executeWithLoadBalancer(request, config);
    }
    context.set("ribbonResponse", response);

    // Explicitly close the HttpResponse if the Hystrix command timed out to
    // release the underlying HTTP connection held by the response.
    //
    if (this.isResponseTimedOut()) {
        if (response != null) {
            response.close();
        }
    }

    return new RibbonHttpResponse(response);
}

這裏run()方法就是核心的調用邏輯了,這裏首先會判斷client是否支持重試,若是請求能夠重試的話就執行重試邏輯,具體重試的機制能夠看這篇文章:http://blog.didispace.com/spring-cloud-zuul-retry-detail/,同時不是重試的話就會執行executeWithLoadBalancer()從而達到負載均衡的目的.

總結:

至此關於route過濾器的兩種路由模式已經梳理完畢:

  • SimpleHostRoutingFilter:本質就是經過apache的httpClient執行請求,而後返回響應信息到客戶端,簡單粗暴,因此不存在負載均衡,也不存在熔斷,降級操做;
  • RibbonRoutingFilter:和SimpleHostRoutingFilter不一樣的是,他建立了一個RibbonCommand對象,這個command本質就是HystrixCommand,在HystrixCommand的run()方法內發送請求,並在此方法內經過serviceId和client進行負載均衡調用.

細心的同窗可能會發現:RibbonRoutingFilter其實沒有對返回的response做任何處理,也就是說,就算被路由的服務端發生了錯誤,也是返回response,錯誤都在response裏面包着,正常狀況下不會拋出異常,固然也不會降級熔斷了

傳統路由模式改造

經過上面的分析,想要對傳統路由模式進行改造添加熔斷功能的話,就須要重寫SimpleHostRoutingFilter,在其做http調用的時候添加熔斷功能,而後將返回調用的結果set到RequestContext中,這樣就完成了路由模式的改造

過濾器改造:SimpleHostRoutingFilter

首先,咱們建立一個bean,繼承自SimpleHostRoutingFilter,查看SimpleHostRoutingFilter發現他的成員變量都是私有的,子類沒法共享這些變量,因此,咱們只能再定義這些變量,這些變量大多都是httpClient相關的一些配置類,固然咱們也能夠選擇不使用httpClient使用其餘的http調用Client好比RestTemplate,可是爲了作到儘可能少的改動,並且讓以前的配置改造以後也可使用,因此我選擇沒有動他的這套邏輯

那麼咱們只須要重寫一些私有的成員變量和一些私有方法就能夠了,固然run()方法也是要重寫的,我就暴力一點,直接重寫全部的方法,而後改動一些地方就行了,改動後的代碼以下:

@Component
@ConditionalOnExpression("${host.route.hystrix.enable:false}")//自定義路由模式的開關
public class SimpleHostHystrixRoutingFilter extends SimpleHostRoutingFilter {

    private static final Log log = LogFactory.getLog(SimpleHostHystrixRoutingFilter.class);

    private final Timer connectionManagerTimer = new Timer(
            "SimpleHostRoutingFilter.connectionManagerTimer", true);

    private boolean sslHostnameValidationEnabled;

    private ProxyRequestHelper helper;
    private ZuulProperties.Host hostProperties;
    private ApacheHttpClientConnectionManagerFactory connectionManagerFactory;
    private HttpClientConnectionManager connectionManager;
    private CloseableHttpClient httpClient;
    private boolean customHttpClient = false;
    private HostHystrixCommandFactory restClientHystrixCommandFactory;

    @EventListener
    public void onPropertyChange(EnvironmentChangeEvent event) {
        if (!customHttpClient) {
            boolean createNewClient = false;

            for (String key : event.getKeys()) {
                if (key.startsWith("zuul.host.")) {
                    createNewClient = true;
                    break;
                }
            }

            if (createNewClient) {
                try {
                    SimpleHostHystrixRoutingFilter.this.httpClient.close();
                } catch (IOException ex) {
                    log.error("error closing client", ex);
                }
                SimpleHostHystrixRoutingFilter.this.httpClient = newClient();
            }
        }
    }

    public SimpleHostHystrixRoutingFilter(ProxyRequestHelper helper, ZuulProperties properties,
                                          ApacheHttpClientConnectionManagerFactory connectionManagerFactory,
                                          ApacheHttpClientFactory httpClientFactory, HostHystrixCommandFactory restClientHystrixCommandFactory) {
        super(helper, properties, connectionManagerFactory, httpClientFactory);
        this.helper = helper;
        this.hostProperties = properties.getHost();
        this.sslHostnameValidationEnabled = properties.isSslHostnameValidationEnabled();
        this.connectionManagerFactory = connectionManagerFactory;
        this.restClientHystrixCommandFactory = restClientHystrixCommandFactory;
    }

    @PostConstruct
    private void initialize() {
        if (!customHttpClient) {
            this.connectionManager = connectionManagerFactory.newConnectionManager(
                    !this.sslHostnameValidationEnabled,
                    this.hostProperties.getMaxTotalConnections(),
                    this.hostProperties.getMaxPerRouteConnections(),
                    this.hostProperties.getTimeToLive(), this.hostProperties.getTimeUnit(),
                    null);
            this.httpClient = newClient();
            this.connectionManagerTimer.schedule(new TimerTask() {
                @Override
                public void run() {
                    if (SimpleHostHystrixRoutingFilter.this.connectionManager == null) {
                        return;
                    }
                    SimpleHostHystrixRoutingFilter.this.connectionManager.closeExpiredConnections();
                }
            }, 30000, 5000);
        }
    }

    @PreDestroy
    public void stop() {
        this.connectionManagerTimer.cancel();
    }


    @Override
    public Object run() {
        RequestContext context = RequestContext.getCurrentContext();

        log.info("======>custom hystrix host routing filter start ,context :{}"+ context);

        HttpServletRequest request = context.getRequest();
        MultiValueMap<String, String> headers = this.helper
                .buildZuulRequestHeaders(request);
        MultiValueMap<String, String> params = this.helper
                .buildZuulRequestQueryParams(request);
        String verb = getVerb(request);
        InputStream requestEntity = getRequestBody(request);

        URL host = RequestContext.getCurrentContext().getRouteHost();

        String uri = this.helper.buildZuulRequestURI(request);
        uri = StringUtils.cleanPath((host.getPath() + uri).replaceAll("/{2,}", "/"));
        int contentLength = request.getContentLength();

        ContentType contentType = null;
        if (request.getContentType() != null) {
            contentType = ContentType.parse(request.getContentType());
        }

        InputStreamEntity entity = new InputStreamEntity(requestEntity, contentLength,
                contentType);
        if (request.getContentLength() < 0) {
            context.setChunkedRequestBody();
        }

        this.helper.addIgnoredHeaders();

        HttpRequest httpRequest = super.buildHttpRequest(verb, uri, entity, headers, params, request);

        //routeId做爲Hystrix 的 commandKey,
        // eg. zuul.routes.hystrix.path=/hystrix-provider/**  的routeId是hystrix
        String routeId = (String) context.get("proxy");

        try {
            //SEMAPHORE 隔離模式共享的是一個線程的requestContext ,可是thread 模式使用的是另一個獨立的線程池裏面的線程,requestContext信息不共享,須要將context 經過構造器傳入
            HostHystrixCommand command = restClientHystrixCommandFactory.create(routeId, httpClient,httpRequest,getHttpHost(host));
            HttpResponse response = command.execute();

            setResponse(response);
        } catch (Exception ex) {
            if((ex instanceof HystrixRuntimeException) && (ex.getCause() instanceof CircuitBreakerResponseException)){
                HttpResponse httpResponse = ((CircuitBreakerResponseException) ex.getCause()).getHttpResponse();
                try {
                    setResponse(httpResponse);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return null;
            }
            throw new ZuulRuntimeException(ex);
        }
        return null;
    }


    private MultiValueMap<String, String> revertHeaders(Header[] headers) {
        MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
        for (Header header : headers) {
            String name = header.getName();
            if (!map.containsKey(name)) {
                map.put(name, new ArrayList<>());
            }
            map.get(name).add(header.getValue());
        }
        return map;
    }


    private InputStream getRequestBody(HttpServletRequest request) {
        InputStream requestEntity = null;
        try {
            requestEntity = request.getInputStream();
        } catch (IOException ex) {
            // no requestBody is ok.
        }
        return requestEntity;
    }

    private String getVerb(HttpServletRequest request) {
        String sMethod = request.getMethod();
        return sMethod.toUpperCase();
    }

    private void setResponse(HttpResponse response) throws IOException {
        RequestContext.getCurrentContext().set("zuulResponse", response);
        this.helper.setResponse(response.getStatusLine().getStatusCode(),
                response.getEntity() == null ? null : response.getEntity().getContent(),
                revertHeaders(response.getAllHeaders()));
    }

    private HttpHost getHttpHost(URL host) {
        HttpHost httpHost = new HttpHost(host.getHost(), host.getPort(),
                host.getProtocol());
        return httpHost;
    }

}

這裏的構造器我比父類多傳了一個RestClientHystrixCommandFactory,這個Factory就是建立HystrixCommand的工廠bean,裏面實例化了一個自定義的HystrixCommand,run()方法就是經過這個工廠bean建立了一個HystrixCommand,而後進行調用的.

斷路器建立工廠: RestClientHystrixCommandFactory

再看RestClientHystrixCommandFactory:

@Component
public class RestClientHystrixCommandFactory {

    @Autowired(required = false)
    private List<HostFallbackProvider> fallbackList = Collections.emptyList();

    //TODO:暫時只用默認的errorDecode
    @Autowired
    DefaultHttpClientErrorDecoder errorDecoder;

    private Map<String, HostFallbackProvider> fallbackProviderMap;
    
    @Autowired  
    private ZuulProperties zuulProperties;
    
    @PostConstruct
    public void init() {
        this.fallbackProviderMap = fallbackList.stream().collect(Collectors.toMap(HostFallbackProvider::getRoute, x -> x, (x, y) -> x));
    }


    public RestClientHystrixCommand create(String routeId, CloseableHttpClient httpClient, HttpRequest httpRequest, HttpHost httpHost) {
        HostFallbackProvider fallbackProvider = null;
        if (!fallbackProviderMap.isEmpty()) {
            if (Objects.isNull(fallbackProvider = fallbackProviderMap.get("*"))) {
                fallbackProvider = fallbackProviderMap.get(routeId);
            }
        }
        return new RestClientHystrixCommand(routeId, httpClient,httpRequest,httpHost,fallbackProvider,errorDecoder,zuulProperties);
    }
}

這個Factory就是作了兩個操做:

  1. 對Hystrix的fallBack進行了拓展,可讓使用者根據本身不一樣的需求定製不一樣的fallBack,在發生錯誤時直接調用fallBack;
  2. 定義了一個Response的ErrorDecoder,對請求調用返回的結果進行decode,判斷是否須要拋出異常觸發熔斷/降級,目前沒有考慮把這個decoder作成定製化,因此如今默認都是用的我定義的一套ErrorDecoder.

降級處理: HostFallbackProvider

public interface HostFallbackProvider {

    /**
     * 
     * @Description: 返回routId,代表爲哪一個rout提供回退,* 表示 爲全部route提供回退 
     * @Param:  
     * @returns:  
     * @Date: 2019/12/4 
    */
    String getRoute();

    HttpResponse fallbackResponse(Throwable cause);

    HttpResponse fallbackResponse();

}

這個HostFallbackProvider是我參考RibbonRoutingFilter提供的FallbackProviderZuulFallbackProvider實現的一個接口,只不過FallbackProvider返回的是ClientHttpResponse,而我這裏返回的是HttpResponse,功能都是一致的

響應解碼: DefaultHttpClientErrorDecoder

再看DefaultHttpClientErrorDecoder,這個是我默認實現的一套邏輯:針對4系列的異常不作處理,也不觸發熔斷和降級,5系列的異常咱們就直接觸發拋出一個自定義的異常,而且將response裝在異常裏面返回出去

/**
 * Created by liuwen on 2019/12/3
 * httpClient 響應錯誤處理接口,能夠經過實現此方法配置httpClient發生錯誤(4xx/5xx)異常的錯誤解析,
 * 從而決定是否走熔斷處理,也能夠用於異常信息的傳遞
 *
 */
public interface HttpClientErrorDecoder {

    boolean hashError(HttpResponse httpResponse);


    void handlerError(HttpResponse httpResponse) throws Exception;

}




@Component
public class DefaultHttpClientErrorDecoder implements HttpClientErrorDecoder {

    @Override
    public boolean hashError(HttpResponse httpResponse) {
        int stateSeries = getStateSeries(httpResponse);
        return stateSeries == HttpStatus.Series.SERVER_ERROR.value() || stateSeries == HttpStatus.Series.CLIENT_ERROR.value();
    }

    private int getStateSeries(HttpResponse httpResponse) {
        return httpResponse.getStatusLine().getStatusCode() / 100;
    }

    @Override
    public void handlerError(HttpResponse httpResponse) throws Exception {
        int stateSeries = getStateSeries(httpResponse);
        //4系列的異常不走熔斷
        if (stateSeries == HttpStatus.Series.SERVER_ERROR.value()) {
            throw new CircuitBreakerResponseException(httpResponse, "internal server error");
        }
    }
}

自定義的異常很簡單:

@Data
public class CircuitBreakerResponseException extends RuntimeException {

  private HttpResponse httpResponse;

    public CircuitBreakerResponseException(HttpResponse response, String message) {
        super(message);
        this.httpResponse = response;
    }
}

請求處理Command: RestClientHystrixCommand

接下來就是核心了,咱們的自定義的RestClientHystrixCommand,仔細看這個Command,在構造器裏面初始化了父類的構造參數,和RestClientRibbonCommand相似,也是定義了使用HystrixCommand時的一些默認配置,以及引用了一些zuul的配置bean去初始化HystrixCommand.

他的run()方法就是執行了http請求,而後對響應結果進行了decode,同時,因爲咱們使用的原生的HttpClient,因此,必定要關閉HttpClient,防止它佔用太多的資源,致使鏈接一直沒法釋放,從而形成後續的請求因爲沒有鏈接使用而一直阻塞,致使系統沒法使用,雖然在SimpleHostHystrixRoutingFilter裏有一個定時器去定時去關閉鏈接,可是這種沒有被使用的鏈接是沒法關閉的,因此這裏必定要在調用超時的狀況下關閉鏈接

fallBack(),這裏的fallBack()就是使用了以前在RestClientHystrixCommandFactory裏面傳入的HostFallbackProvider bean,而後將fallBack邏輯交給HostFallbackProvider提供的方法中處理,從而達到定製化的降級的目的

@CommonsLog
public class RestClientHystrixCommand extends HystrixCommand<HttpResponse> {

    private final HttpRequest httpRequest;
    private final HttpHost httpHost;
    private CloseableHttpClient httpclient;
    private HostFallbackProvider fallbackProvider;
    private HttpClientErrorDecoder errorDecoder;


    public RestClientHystrixCommand(String commandKey, CloseableHttpClient httpClient, HttpRequest httpRequest, HttpHost httpHost, HostFallbackProvider fallbackProvider, DefaultHttpClientErrorDecoder errorDecoder,ZuulProperties zuulProperties) {
        //初始化父類默認構造參數
        super(getSetter(commandKey,zuulProperties));
        this.httpclient = httpClient;
        this.httpRequest = httpRequest;
        this.httpHost = httpHost;
        this.fallbackProvider = fallbackProvider;
        this.errorDecoder = errorDecoder;
    }


    /**
     * @Description: 構造HystrixCommand配置
     * <p>
     * 注意:
     * zuul裏,從新封裝了hytrix的一些配置名稱,致使hytrix的一些原生全局配置會失效,須要經過zuulProperties從新設置的屬性以下:
     * 1.隔離級別指定:zuul.ribbonIsolationStrategy=SEMAPHORE
     * 2.信號隔離的默認隔離大小:zuul.semaphore.max-semaphores=20
     * 而原生的hytrix.command.default.execution.isolation.strategy和maxConcurrentRequests的配置將失效,會被這2個覆蓋
     *
     * @Param:
     * @returns:
     * @Date: 2019/11/20
     */
    protected static HystrixCommand.Setter getSetter(String commandKey, ZuulProperties zuulProperties) {

        Setter commandSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RibbonCommand"))
                .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
        final HystrixCommandProperties.Setter setter = HystrixCommandProperties.Setter().withExecutionIsolationStrategy(zuulProperties.getRibbonIsolationStrategy());
        if (zuulProperties.getRibbonIsolationStrategy() == HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) {
            final String name = ZuulConstants.ZUUL_EUREKA + commandKey + ".semaphore.maxSemaphores";

            final DynamicIntProperty value = DynamicPropertyFactory.getInstance()
                    .getIntProperty(name, zuulProperties.getSemaphore().getMaxSemaphores());
            setter.withExecutionIsolationSemaphoreMaxConcurrentRequests(value.get());
        } else if (zuulProperties.getThreadPool().isUseSeparateThreadPools()) {
            final String threadPoolKey = zuulProperties.getThreadPool().getThreadPoolKeyPrefix() + commandKey;
            commandSetter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
        }

        return commandSetter.andCommandPropertiesDefaults(setter);

    }

    @Override
    protected HttpResponse run() throws Exception {
        return forward();
    }

    protected HttpResponse forward() throws Exception {
        CloseableHttpResponse response = httpclient.execute(httpHost, httpRequest);

        if(isResponseTimedOut()){
            response.close();
        }

        //處理返回響應,觸發熔斷或者傳遞異常
        if (this.errorDecoder.hashError(response)) {
            this.errorDecoder.handlerError(response);
        }

        return response;
    }

    /**
     * @Description: 重寫getFallback方法,
     * @Param:
     * @returns:
     * @Date: 2019/11/21
     */
    @Override
    protected HttpResponse getFallback() {
        log.error("====>some error was happened in this request,execute fallback method");

        Throwable throwable = super.getExecutionException();
        if (!Objects.isNull(fallbackProvider)) {
            if ((throwable == null ? super.getExecutionException() : throwable) == null) {
                return fallbackProvider.fallbackResponse();
            } else {
                return fallbackProvider.fallbackResponse(throwable);
            }
        }
        return super.getFallback();
    }
}

至此,咱們的一套定製化的傳統路由模式增長熔斷的功能已經實現完畢,除了可使用到以前的host的一套配置以外,也可使用ribbon的一些關於斷路器的配置,並且彌補了以前RibbonRoutingFilter的請求觸發降級/熔斷的缺陷,使用者能夠定製化的針對不一樣的路由id進行定製化的開發本身的模塊的解碼和降級策略.

相關文章
相關標籤/搜索