上一講咱們已經知道了Feign的工做原理實際上是在項目啓動的時候,經過JDK動態代理爲每一個FeignClinent生成一個動態代理。html
動態代理的數據結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target
(裏面是serviceName等信息)和dispatcher
(map數據結構,key是請求的方法名,方法參數等,value是SynchronousMethodHandler
)。服務器
以下圖所示:微信
這一講主要是Feign與Ribbon結合實現負載均衡的原理分析。數據結構
原創不易,如若轉載 請標明來源!負載均衡
博客地址:一枝花算不算浪漫
微信公衆號:壹枝花算不算浪漫ide
經過前面的分析,咱們能夠直接來看下SynchronousMethodHandler
中的代碼:微服務
final class SynchronousMethodHandler implements MethodHandler { @Override public Object invoke(Object[] argv) throws Throwable { // 生成請求相似於:GET /sayHello/wangmeng HTTP/1.1 RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this.retryer.clone(); while (true) { try { return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } } Object executeAndDecode(RequestTemplate template) throws Throwable { // 構建request對象:GET http://serviceA/sayHello/wangmeng HTTP/1.1 Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { // 這個client就是以前構建的LoadBalancerFeignClient,options是超時時間 response = client.execute(request, options); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); // 下面邏輯都是構建返回值response boolean shouldClose = true; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } if (Response.class == metadata.returnType()) { if (response.body() == null) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false; return response; } // Ensure the response body is disconnected byte[] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300) { if (void.class == metadata.returnType()) { return null; } else { return decode(response); } } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { return decode(response); } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } } }
這裏主要是構建request數據,而後經過request和options去經過LoadBalancerFeignClient.execute()
方法去得到返回值。咱們能夠接着看client端的調用:源碼分析
public class LoadBalancerFeignClient implements Client { @Override public Response execute(Request request, Request.Options options) throws IOException { try { // asUri: http://serviceA/sayHello/wangmeng URI asUri = URI.create(request.url()); // clientName:serviceA String clientName = asUri.getHost(); // uriWithoutHost: http://sayHello/wangmeng URI uriWithoutHost = cleanUrl(request.url(), clientName); // 這裏ribbonRequest:GET http:///sayHello/wangmeng HTTP/1.1 FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); // 這裏面config只有兩個超時時間,一個是connectTimeout:5000,一個是readTimeout:5000 IClientConfig requestConfig = getClientConfig(options, clientName); // 真正執行負載均衡的地方 return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } } }
接着咱們看下lbClient()
和executeWithLoadBalancer()
post
public class LoadBalancerFeignClient implements Client { private FeignLoadBalancer lbClient(String clientName) { return this.lbClientFactory.create(clientName); } }public class CachingSpringLoadBalancerFactory { public FeignLoadBalancer create(String clientName) { if (this.cache.containsKey(clientName)) { return this.cache.get(clientName); } IClientConfig config = this.factory.getClientConfig(clientName); // 獲取Ribbon ILoadBalancer信息 ILoadBalancer lb = this.factory.getLoadBalancer(clientName); ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class); FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector, loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector); this.cache.put(clientName, client); return client; } }
這裏是獲取了ILoadBalancer
數據,裏面包含了Ribbon獲取的serviceA全部服務節點信息。學習
這裏已經獲取到ILoadBalancer
,裏面包含serviceA服務器全部節點請求host信息。接下來就是從中負載均衡選擇一個節點信息host出來。
public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware { public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } } }public class LoadBalancerCommand<T> { public Observable<T> submit(final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer Observable<T> o = (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { } // 省略代碼... // selectServer是真正執行負載均衡的邏輯 private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { // loadBalancerURI是http:///sayHello/wangmeng, loadBalancerKey爲null Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); } }public class LoadBalancerContext implements IClientConfigAware { public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException { String host = null; int port = -1; if (original != null) { host = original.getHost(); } if (original != null) { Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original); port = schemeAndPort.second(); } // 獲取到ILoadBalancer,這裏面有IRule的信息及服務節點全部信息 ILoadBalancer lb = getLoadBalancer(); if (host == null) { // Partial URI or no URI Case // well we have to just get the right instances from lb - or we fall back if (lb != null){ // 這裏就執行真正的chooseServer的邏輯了。默認的rule爲ZoneAvoidanceZule Server svc = lb.chooseServer(loadBalancerKey); if (svc == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Load balancer does not have available server for client: " + clientName); } host = svc.getHost(); if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Invalid Server for :" + svc); } logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original}); return svc; } // 省略代碼 } } }
上面代碼已經很清晰了,這裏就是真正的經過ribbon的 rule.chooseServer()
負載均衡地選擇了一個服務節點調用,debug以下:
到了這裏feign與ribbon的分析也就結束了,返回請求url信息,而後獲得response結果:
上面已經分析了Feign與Ribbon的整合,最終仍是落到Ribbon中的ILoadBalancer中,使用最後使用IRule去選擇對應的server數據。
下一講 會畫一個很大的圖,包含Feign、Ribbon、Eureka關聯的圖,裏面會畫出每一個組件的細節及依賴關係。也算是學習至今的一個總複習了。
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公衆號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小夥伴可關注我的公衆號:壹枝花算不算浪漫