【一塊兒學源碼-微服務】Feign 源碼三:Feign結合Ribbon實現負載均衡的原理分析

【一塊兒學源碼-微服務】Feign 源碼三:Feign結合Ribbon實現負載均衡的原理分析

前言

前情回顧

上一講咱們已經知道了Feign的工做原理實際上是在項目啓動的時候,經過JDK動態代理爲每一個FeignClinent生成一個動態代理。html

動態代理的數據結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裏面是serviceName等信息)和dispatcher(map數據結構,key是請求的方法名,方法參數等,value是SynchronousMethodHandler)。服務器

以下圖所示:微信


image.png

本講目錄

這一講主要是Feign與Ribbon結合實現負載均衡的原理分析。數據結構

說明

原創不易,如若轉載 請標明來源!負載均衡

博客地址:一枝花算不算浪漫
微信公衆號:壹枝花算不算浪漫ide

源碼分析

Feign結合Ribbon實現負載均衡原理

經過前面的分析,咱們能夠直接來看下SynchronousMethodHandler中的代碼:微服務

final class SynchronousMethodHandler implements MethodHandler {    @Override
    public Object invoke(Object[] argv) throws Throwable {        // 生成請求相似於:GET /sayHello/wangmeng HTTP/1.1
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Retryer retryer = this.retryer.clone();        while (true) {            try {                return executeAndDecode(template);
            } catch (RetryableException e) {
                retryer.continueOrPropagate(e);                if (logLevel != Logger.Level.NONE) {
                    logger.logRetry(metadata.configKey(), logLevel);
                }                continue;
            }
        }
    }    Object executeAndDecode(RequestTemplate template) throws Throwable {        // 構建request對象:GET http://serviceA/sayHello/wangmeng HTTP/1.1
        Request request = targetRequest(template);        if (logLevel != Logger.Level.NONE) {
            logger.logRequest(metadata.configKey(), logLevel, request);
        }

        Response response;        long start = System.nanoTime();        try {            // 這個client就是以前構建的LoadBalancerFeignClient,options是超時時間
            response = client.execute(request, options);            // ensure the request is set. TODO: remove in Feign 10
            response.toBuilder().request(request).build();
        } catch (IOException e) {            if (logLevel != Logger.Level.NONE) {
                logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
            }            throw errorExecuting(request, e);
        }        long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);        // 下面邏輯都是構建返回值response
        boolean shouldClose = true;        try {            if (logLevel != Logger.Level.NONE) {
                response =
                        logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);                // ensure the request is set. TODO: remove in Feign 10
                response.toBuilder().request(request).build();
            }            if (Response.class == metadata.returnType()) {                if (response.body() == null) {                    return response;
                }                if (response.body().length() == null ||
                        response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
                    shouldClose = false;                    return response;
                }                // Ensure the response body is disconnected
                byte[] bodyData = Util.toByteArray(response.body().asInputStream());                return response.toBuilder().body(bodyData).build();
            }            if (response.status() >= 200 && response.status() < 300) {                if (void.class == metadata.returnType()) {                    return null;
                } else {                    return decode(response);
                }
            } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {                return decode(response);
            } else {                throw errorDecoder.decode(metadata.configKey(), response);
            }
        } catch (IOException e) {            if (logLevel != Logger.Level.NONE) {
                logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
            }            throw errorReading(request, response, e);
        } finally {            if (shouldClose) {
                ensureClosed(response.body());
            }
        }
    }
}

這裏主要是構建request數據,而後經過request和options去經過LoadBalancerFeignClient.execute()方法去得到返回值。咱們能夠接着看client端的調用:源碼分析

public class LoadBalancerFeignClient implements Client {    @Override
    public Response execute(Request request, Request.Options options) throws IOException {        try {            // asUri: http://serviceA/sayHello/wangmeng
            URI asUri = URI.create(request.url());            // clientName:serviceA
            String clientName = asUri.getHost();            // uriWithoutHost: http://sayHello/wangmeng
            URI uriWithoutHost = cleanUrl(request.url(), clientName);            // 這裏ribbonRequest:GET http:///sayHello/wangmeng HTTP/1.1  
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(                    this.delegate, request, uriWithoutHost);            // 這裏面config只有兩個超時時間,一個是connectTimeout:5000,一個是readTimeout:5000
            IClientConfig requestConfig = getClientConfig(options, clientName);            // 真正執行負載均衡的地方
            return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                    requestConfig).toResponse();
        }        catch (ClientException e) {
            IOException io = findIOException(e);            if (io != null) {                throw io;
            }            throw new RuntimeException(e);
        }
    }
}

接着咱們看下lbClient()executeWithLoadBalancer()post

public class LoadBalancerFeignClient implements Client {    private FeignLoadBalancer lbClient(String clientName) {        return this.lbClientFactory.create(clientName);
    }
}public class CachingSpringLoadBalancerFactory {    public FeignLoadBalancer create(String clientName) {        if (this.cache.containsKey(clientName)) {            return this.cache.get(clientName);
        }
        IClientConfig config = this.factory.getClientConfig(clientName);        // 獲取Ribbon ILoadBalancer信息
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
        FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
            loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);        this.cache.put(clientName, client);        return client;
    }
}

這裏是獲取了ILoadBalancer數據,裏面包含了Ribbon獲取的serviceA全部服務節點信息。學習


image.png

這裏已經獲取到ILoadBalancer,裏面包含serviceA服務器全部節點請求host信息。接下來就是從中負載均衡選擇一個節點信息host出來。

public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);        try {            return command.submit(                new ServerOperation<T>() {                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);                        try {                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();            if (t instanceof ClientException) {                throw (ClientException) t;
            } else {                throw new ClientException(e);
            }
        }
        
    }
}public class LoadBalancerCommand<T> {    
    public Observable<T> submit(final ServerOperation<T> operation) {        final ExecutionInfoContext context = new ExecutionInfoContext();        
        if (listenerInvoker != null) {            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {                return Observable.error(e);
            }
        }        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();        // Use the load balancer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                }        // 省略代碼...

    // selectServer是真正執行負載均衡的邏輯
    private Observable<Server> selectServer() {        return Observable.create(new OnSubscribe<Server>() {            @Override
            public void call(Subscriber<? super Server> next) {                try {                    // loadBalancerURI是http:///sayHello/wangmeng, loadBalancerKey爲null
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
}public class LoadBalancerContext implements IClientConfigAware {    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;        int port = -1;        if (original != null) {
            host = original.getHost();
        }        if (original != null) {
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }        // 獲取到ILoadBalancer,這裏面有IRule的信息及服務節點全部信息
        ILoadBalancer lb = getLoadBalancer();        if (host == null) {            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){                // 這裏就執行真正的chooseServer的邏輯了。默認的rule爲ZoneAvoidanceZule
                Server svc = lb.chooseServer(loadBalancerKey);                if (svc == null){                    throw new ClientException(ClientException.ErrorType.GENERAL,                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                host = svc.getHost();                if (host == null){                    throw new ClientException(ClientException.ErrorType.GENERAL,                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});                return svc;
            }            // 省略代碼
        }
    }
}

上面代碼已經很清晰了,這裏就是真正的經過ribbon的 rule.chooseServer()負載均衡地選擇了一個服務節點調用,debug以下:


image.png

到了這裏feign與ribbon的分析也就結束了,返回請求url信息,而後獲得response結果:


image.png

總結

上面已經分析了Feign與Ribbon的整合,最終仍是落到Ribbon中的ILoadBalancer中,使用最後使用IRule去選擇對應的server數據。

下一講 會畫一個很大的圖,包含Feign、Ribbon、Eureka關聯的圖,裏面會畫出每一個組件的細節及依賴關係。也算是學習至今的一個總複習了。

申明

本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公衆號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注我的公衆號:壹枝花算不算浪漫


22.jpg

相關文章
相關標籤/搜索