Zuul 1.x 重試策略(源碼分析)

前言

上一篇文章中闡述了zuul的基本架構組成,而且將核心關鍵類相應作了標註以及分析,可是並未詳細深刻到關鍵性的細節,本篇文章主要是是探索zuul超時重試,服務降級的機制。java

重試/服務降級機制

不少時候,當一個請求被轉發至tomcat服務器處理的過程當中,極有可能由於某種緣由(好比服務器鏈接池爆滿,好比sql查詢過久等等)被卡主,在沒有超時重試/服務降級的狀況下,此時客戶端徹底不知情,一直處於等待狀態。nginx

重試

指當服務調用方發起請求超過XXXms後,請求還未處理完,則服務調用方會拋出異常,切斷請求並進行重試。spring

好比向目標服務發起請求,不幸的是,因爲正巧存在網絡波動以致於請求超時事後依舊沒法訪問到目標服務,或者目標服務返回的結果沒法被正確的收到,可是此時目標服務並不是是不可服務的狀態,因此經過少許重試能夠減小因爲網絡波動等因素所帶來的影響。sql

服務降級

指當服務調用方發起請求超過XXXms後,依舊沒法收到正確的響應,則切斷請求,接口降級,返回可接受的數據。apache

當在屢次重試後依舊無果,客戶端判斷此時目標服務不可用(也許目標服務此時並不是不可用),可是客戶端已經提早預料到存在這樣一個問題,與調用方約定服務不可用時將降級爲另外接口,以返回特定的數據。後端

熔斷降級機制在廣大互聯網公司是很是常見的,且在SOA服務,微服務等架構盛行的今天,面對複雜的業務設計,海量的大數據,服務降級策略愈加的重要。設計模式

目前服務降級的策略也很是多,好比nginx,hystrix……api

zuul 1.x的線程模型

想要了解zuul的重試/降級等機制的前提下,有必要優先了解zuul的線程模型。tomcat

源自網絡.png

從上圖能夠很是清晰的看出zuul1.x的線程模型,即每個請求都會以阻塞方式調用處理(經由RibbonRoutingFilter處理的方式)服務器

查看HystrixCommand#queue()源碼能夠看到以下代碼的註釋

/*
 * The Future returned by Observable.toBlocking().toFuture() does not implement the
 * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
 * thus, to comply with the contract of Future, we must wrap around it.
 */
final Future<R> delegate = toObservable().toBlocking().toFuture();

RibbonRoutingFilter轉發機制詳解

RibbonRoutingFilter#forward

經過debug方式能夠看到ribbonCommandFactory實際上是HttpClientRibbonCommandFactory實例,並用以建立HttpClientRibbonCommand實例。根據前文看到的zuul的線程模型,能夠判定command.execute()的調用確定是HttpClientRibbonCommand#run()的方法

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
	Map<String, Object> info = this.helper.debug(context.getMethod(),
			context.getUri(), context.getHeaders(), context.getParams(),
			context.getRequestEntity());

	// HttpClientRibbonCommandFactory#create
	// HttpClientRibbonCommand
	RibbonCommand command = this.ribbonCommandFactory.create(context);
	try {
		// HttpClientRibbonCommand#run
		ClientHttpResponse response = command.execute();// queue().get()
		this.helper.appendDebug(info, response.getStatusCode().value(),
				response.getHeaders());
		return response;
	}
	catch (HystrixRuntimeException ex) {
		return handleException(info, ex);
	}
}

HttpClientRibbonCommandFactory#create

在建立HttpClientRibbonCommand之時,也會尋找是否存在相應的降級接口(自定義實現),若是ZuulFallbackProvider若是爲空則降級後按照調用HystrixCommand#getFallback()拋出異常UnsupportedOperationException("No fallback available.")

@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
	// ZuulFallbackProvider降級接口,每一個serviceId對應一個
	// Hystrix 熔斷時會調用該接口
	ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
	final String serviceId = context.getServiceId();
	
	// 成功開啓重試後的值爲RetryableRibbonLoadBalancingHttpClient
	// 非成功開啓重試爲RibbonLoadBalancingHttpClient
	final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
			serviceId, RibbonLoadBalancingHttpClient.class);
	client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));

	return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
			clientFactory.getClientConfig(serviceId));
}

內部如何決策開啓重試機制呢?

從建立bean的條件看,歸根結底是根據是否引入srping-retry來決定是否建立重試實例

@Configuration
@ConditionalOnClass(name = "org.apache.http.client.HttpClient")
@ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
public class HttpClientRibbonConfiguration {
	@RibbonClientName
	private String name = "client";

	// ....

	@Bean
	@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
	// 建立bean的條件是org.springframework.retry.support.RetryTemplate不存在
	@ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate")
	public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
		IClientConfig config, ServerIntrospector serverIntrospector,
		ILoadBalancer loadBalancer, RetryHandler retryHandler, CloseableHttpClient httpClient) {
		RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(httpClient, config, serverIntrospector);
		client.setLoadBalancer(loadBalancer);
		client.setRetryHandler(retryHandler);
		Monitors.registerObject("Client_" + this.name, client);
		return client;
	}

	@Bean
	@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
	// 建立bean的條件是org.springframework.retry.support.RetryTemplate存在
	@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
	public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
			IClientConfig config, ServerIntrospector serverIntrospector,
			ILoadBalancer loadBalancer, RetryHandler retryHandler,
			LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient) {
		RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
			httpClient, config, serverIntrospector, loadBalancedRetryFactory);
		client.setLoadBalancer(loadBalancer);
		client.setRetryHandler(retryHandler);
		Monitors.registerObject("Client_" + this.name, client);
		return client;
	}
}

image.png

HttpClientRibbonCommand#AbstractRibbonCommand#run

前文提到,執行command.execute的時候會執行HttpClientRibbonCommand#run,可是因爲HttpClientRibbonCommand沒有找到run方法,因此前往父類AbstractRibbonCommand尋找run方法

final RequestContext context = RequestContext.getCurrentContext();

	RQ request = createRequest();
	// RibbonLoadBalancingHttpClient#AbstractLoadBalancerAwareClient#executeWithLoadBalancer
	// RetryableRibbonLoadBalancingHttpClient#AbstractLoadBalancerAwareClient#executeWithLoadBalancer
	RS response = this.client.executeWithLoadBalancer(request, config);

	context.set("ribbonResponse", response);

	// Explicitly close the HttpResponse if the Hystrix command timed out to
	// release the underlying HTTP connection held by the response.
	//
	if (this.isResponseTimedOut()) {
		if (response != null) {
			response.close();
		}
	}

	return new RibbonHttpResponse(response);
}

AbstractLoadBalancerAwareClient#

這裏涉及到Observable至關多的API, 基於RxJava框架,相關的知識能夠前往官網或者其餘博文了解,這裏不作多餘贅述。

關鍵代碼在於AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)究竟作了什麼事?

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
	// 請求重試處理器
	RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
	LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
			.withLoadBalancerContext(this)
			.withRetryHandler(handler)
			.withLoadBalancerURI(request.getUri())
			.build();

	try {
		// 將請求執行包裝在Observable
		return command.submit(
			new ServerOperation<T>() {
				@Override
				public Observable<T> call(Server server) {
					URI finalUri = reconstructURIWithServer(server, request.getUri());
					S requestForServer = (S) request.replaceUri(finalUri);
					try {
						return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
					} 
					catch (Exception e) {
						return Observable.error(e);
					}
				}
			})
			.toBlocking()
			.single();
	} catch (Exception e) {
		Throwable t = e.getCause();
		if (t instanceof ClientException) {
			throw (ClientException) t;
		} else {
			throw new ClientException(e);
		}
	}
}

RibbonLoadBalancingHttpClient#getRequestSpecificRetryHandler RetryableRibbonLoadBalancingHttpClient#getRequestSpecificRetryHandler

查看以下源碼發現 okToRetryOnConnectErrors,okToRetryOnAllErrors都被初始化爲false fallback被初始化爲DefaultLoadBalancerRetryHandler

@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
	return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
}

public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) {
	Preconditions.checkNotNull(baseRetryHandler);
	this.okToRetryOnConnectErrors = okToRetryOnConnectErrors;
	this.okToRetryOnAllErrors = okToRetryOnAllErrors;
	this.fallback = baseRetryHandler;
	if (requestConfig != null) {
		if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) {
			retrySameServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetries); 
		}
		if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) {
			retryNextServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer); 
		} 
	}
}

LoadBalancerCommand#submit

該方法代碼量較多,且多數爲Observable代碼,截取其中關鍵信息查看

// 同一個服務地址最大重試次數,且根據建立條件, 該值走到
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
// 整個集羣內部同一個服務的多個實例的最大重試次數
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

// 從建立RequestSpecificRetryHandler的條件看,maxRetrysSame 與 maxRetrysNext 都是0,
// 也就說下邊的重試條件永遠不可能發生,詳細請查閱DefaultLoadBalancerRetryHandler源碼

if (maxRetrysSame > 0) 
     o = o.retry(retryPolicy(maxRetrysSame, true));
if (maxRetrysNext > 0 && server == null) 
    o = o.retry(retryPolicy(maxRetrysNext, false));

// 重試策略,也能夠稱之爲斷定是否重試
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
	return new Func2<Integer, Throwable, Boolean>() {
		@Override
		public Boolean call(Integer tryCount, Throwable e) {
			if (e instanceof AbortExecutionException) {
				return false;
			}

			if (tryCount > maxRetrys) {
				return false;
			}
			
			if (e.getCause() != null && e instanceof RuntimeException) {
				e = e.getCause();
			}
			
			return retryHandler.isRetriableException(e, same);
		}
	};
}

@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
	if (okToRetryOnAllErrors) {
		// 查看剛剛的源碼發現,不論是否重試,這裏的值都被設置爲false,因此這裏不可能返回
		return true;
	} 
	else if (e instanceof ClientException) {
		// 若是是客戶端異常信息
		ClientException ce = (ClientException) e;
		// 客戶端限流
		if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
			// sameServer是指是否都是同一個sever
			// 一旦某一個server實例跑了異常,則再也不對該服務進行重試
			// 不一樣實例地址則重試
			return !sameServer;
		} else {
			// 不然再也不重試
			return false;
		}
	} 
	else  {
		// 必false
		return okToRetryOnConnectErrors && isConnectionException(e);
	}
}

從源碼上看,咋看覺得重試的策略是主動去觸發Observable#retry重試機制進行重試,可是經過bebug的方式卻發現太天真了。由於在經過getRequestSpecificRetryHandler方法建立的RequestSpecificRetryHandler都是同樣的,內部的RetryHandler都是默認構造的DefaultLoadBalancerRetryHandler,因此retrySameServerretryNextServer都是0,也就說經過觸發Observable#retry的機制至少在這個版本是不會發生的。

那麼重試的機制明顯就交給了spring-retry來處理,那麼具體的處理方式又定義在何處呢?

RetryableRibbonLoadBalancingHttpClient#execute

@Override
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
	//
	final RequestConfig.Builder builder = RequestConfig.custom();
	IClientConfig config = configOverride != null ? configOverride : this.config;
	builder.setConnectTimeout(config.get(
			CommonClientConfigKey.ConnectTimeout, this.connectTimeout));// 默認2s
	builder.setSocketTimeout(config.get(
			CommonClientConfigKey.ReadTimeout, this.readTimeout)); // 默認5s
	builder.setRedirectsEnabled(config.get(
			CommonClientConfigKey.FollowRedirects, this.followRedirects));

	final RequestConfig requestConfig = builder.build();
	return this.executeWithRetry(request, new RetryCallback() {
		// ....
	});
}

private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, RetryCallback<RibbonApacheHttpResponse, IOException> callback) throws Exception {
	LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);//RibbonLoadBalancedRetryPolicyFactory
	RetryTemplate retryTemplate = new RetryTemplate();
	boolean retryable = request.getContext() == null ? true :
			BooleanUtils.toBooleanDefaultIfNull(request.getContext().getRetryable(), true);
	retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
			: new RetryPolicy(request, retryPolicy, this, this.getClientName()));//RetryableRibbonLoadBalancingHttpClient
	return retryTemplate.execute(callback);
}

@Override
public LoadBalancedRetryPolicy create(final String serviceId, final ServiceInstanceChooser loadBalanceChooser) {
	final RibbonLoadBalancerContext lbContext = this.clientFactory
			.getLoadBalancerContext(serviceId);
	return new LoadBalancedRetryPolicy() { // 因爲這裏是匿名實例,因此可能會比較難找

		// 用以判斷是否重試相同的服務實例
		@Override
		public boolean canRetrySameServer(LoadBalancedRetryContext context) {
			return sameServerCount < lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context);
		}

		// 用以判斷是否重試集羣內下一個服務實例
		@Override
		public boolean canRetryNextServer(LoadBalancedRetryContext context) {
			//this will be called after a failure occurs and we increment the counter
			//so we check that the count is less than or equals to too make sure
			//we try the next server the right number of times
			return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context);
		}

		@Override
		public void close(LoadBalancedRetryContext context) {

		}

		@Override
		public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
			//Check if we need to ask the load balancer for a new server.
			//Do this before we increment the counters because the first call to this method
			//is not a retry it is just an initial failure.
			if(!canRetrySameServer(context)  && canRetryNextServer(context)) {
				context.setServiceInstance(loadBalanceChooser.choose(serviceId));
			}
			//This method is called regardless of whether we are retrying or making the first request.
			//Since we do not count the initial request in the retry count we don't reset the counter
			//until we actually equal the same server count limit.  This will allow us to make the initial
			//request plus the right number of retries.
			if(sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) {
				//reset same server since we are moving to a new server
				sameServerCount = 0;
				nextServerCount++;
				if(!canRetryNextServer(context)) {
					context.setExhaustedOnly();
				}
			} else {
				sameServerCount++;
			}

		}
	};
}

以上基本上把zuul的一次請求(包括開啓重試功能以及不開啓重試功能)的所有過程都瞭解了一遍,講道理應該對zuul的請求轉發有了比較深入的瞭解。

總結

請求流至RibbonRoutingFilter以後,決定是否重試的功能點在因而否引入了spring-retry包,可否找到org.springframework.retry.support.RetryTemplate這個全限定類名。若是找到則順利開啓重試機制,不然不開啓重啓機制。

除此以外,因爲RibbonCommand繼承了HystrixExecutable,理論上具有了熔斷降級策略的,測試是否具有熔斷降級策略,能夠繼承自ZuulFallbackProvider,並將實現類加入到spring容器中(@Component)。

從源碼分析的角度來看,熔斷降級策略spring-retry並無產生直接的關係,也就說當請求發起重試的時候,即使已經被降級了以後,後端卻仍是重試,而且在重試過程當中,在發生降級以後,後邊全部的重試其實都是無心義的重試,由於無論重試是否成功,最後的返回值都是降級後的接口返回的數據。

通過測試發現,熔斷降級策略默認是1s降級,而超時重試默認爲5s(請查看前文源碼註釋)。

調試實戰

  • 在zuul應用的加入spring-retry依賴
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>1.1.2.RELEASE</version>
</dependency>
  • zuul的配置文件加入zuul.retryable=trueribbon.MaxAutoRetries=3ribbon.MaxAutoRetriesNextServer=2
eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
server:
  port: 8769
spring:
  application:
    name: service-zuul
zuul:
  routes:
    api-a:
      path: /api-a/**
      serviceId: service-ribbon
  retryable: true
  
  
ribbon:
  MaxAutoRetries: 3
  MaxAutoRetriesNextServer: 2

建立熔斷後降級接口

@Component
public class MyZuulFallbackProvider implements ZuulFallbackProvider {

	@Override
	public String getRoute() {
		return "service-ribbon";
	}

	@Override
	public ClientHttpResponse fallbackResponse() {
		return new ClientHttpResponse() {

			@Override
			public InputStream getBody() throws IOException {
				Map<String, Object> map = new HashMap<String, Object>();
				map.put("code", 1);
				map.put("text", "error");
				final byte[] reqBodyBytes = map.toString().getBytes();
				return new ServletInputStreamWrapper(reqBodyBytes);
			}

			@Override
			public HttpHeaders getHeaders() {
				return new HttpHeaders();
			}

			@Override
			public HttpStatus getStatusCode() throws IOException {
				// TODO Auto-generated method stub
				return HttpStatus.OK;
			}

			@Override
			public int getRawStatusCode() throws IOException {
				return 0;
			}

			@Override
			public String getStatusText() throws IOException {
				return "201 error";
			}

			@Override
			public void close() {

			}

		};
	}
}

在服務被調用方中加入一個count來計算重試的次數(count值只用一次,作簡單驗證足以)

@RestController
public class HelloControler {
	
	private Integer count = 4;
	
    @Autowired
    HelloService helloService;
    
    @RequestMapping(value = "/hi")
    public String hi(@RequestParam String name){
    	if( 0 == count --) {
    		// 當嘗試第4次請求時,直接返回。
    		return "hi has bean hystrix";
    	}
    	System.out.println("request is coming...");
    	   try {
    	      Thread.sleep(10000);
    	   } catch (InterruptedException e) {
    	      System.out.println("線程被打斷... " + e.getMessage());
    	   }
        return name;
    }
}

總結

閱讀源碼是一件讓人興奮愉悅,卻有苦不堪言的事,可是堅持下來就好,原本想畫一下整個調用過程的相關類圖,但是有點懶,就不畫了……

老外寫的代碼感受更難以看懂一些,不過還好,基本的設計模式沒問題,配合編輯以看起來也就不是很累了。

zuul的源碼閱讀估計就到這裏了,其餘的坑等後續碰見了再學習。不太重試與降級的問題(降級後繼續重試的問題),簡直不能忍,是否是這個問題會在zuul2.x版本中解決呢?

相關文章
相關標籤/搜索