擼一擼Spring Cloud Ribbon的原理

提及負載均衡通常都會想到服務端的負載均衡,經常使用產品包括LBS硬件或雲服務、Nginx等,都是耳熟能詳的產品。git

而Spring Cloud提供了讓服務調用端具有負載均衡能力的Ribbon,經過和Eureka的緊密結合,不用在服務集羣內再架設負載均衡服務,很大程度簡化了服務集羣內的架構。github

具體也不想多寫虛的介紹,反正哪裏都能看獲得相關的介紹。web

直接開擼代碼,經過代碼來看Ribbon是如何實現的。spring

 

配置架構

詳解:併發

1.RibbonAutoConfiguration配置生成RibbonLoadBalancerClient實例。app

代碼位置:負載均衡

spring-cloud-netflix-core-1.3.5.RELEASE.jar
org.springframework.cloud.netflix.ribbon
RibbonAutoConfiguration.classide

@Configuration
@ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties(RibbonEagerLoadProperties.class)
public class RibbonAutoConfiguration {

    //

    @Bean
    @ConditionalOnMissingBean(LoadBalancerClient.class)
    public LoadBalancerClient loadBalancerClient() {
        return new RibbonLoadBalancerClient(springClientFactory());
    }

        //
}

先看配置條件項,RibbonAutoConfiguration配置必須在LoadBalancerAutoConfiguration配置前執行,由於在LoadBalancerAutoConfiguration配置中會使用RibbonLoadBalancerClient實例。this

RibbonLoadBalancerClient繼承自LoadBalancerClient接口,是負載均衡客戶端,也是負載均衡策略的調用方。

 

2.LoadBalancerInterceptorConfig配置生成:
1).負載均衡攔截器LoadBalancerInterceptor實例
包含:
  LoadBalancerClient實現類的RibbonLoadBalancerClient實例
  負載均衡的請求建立工廠LoadBalancerRequestFactory:實例
2).RestTemplate自定義的RestTemplateCustomizer實例

代碼位置:

spring-cloud-commons-1.2.4.RELEASE.jar
org.springframework.cloud.client.loadbalancer
LoadBalancerAutoConfiguration.class

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    //

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(
            LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
    }

    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return new RestTemplateCustomizer() {
                @Override
                public void customize(RestTemplate restTemplate) {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                }
            };
        }
    }

    //

}

先看配置條件項:

要求在項目環境中必需要有RestTemplate類。

要求必需要有LoadBalancerClient接口的實現類的實例,也就是上一步生成的RibbonLoadBalancerClient。

 

3.經過上面一步建立的RestTemplateCustomizer配置全部RestTemplate實例,就是將負載均衡攔截器設置給RestTemplate實例。

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    //

    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
            final List<RestTemplateCustomizer> customizers) {
        return new SmartInitializingSingleton() {
            @Override
            public void afterSingletonsInstantiated() {
                for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                    for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); }
                }
            }
        };
    }

    //

    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return new RestTemplateCustomizer() {
                @Override
                public void customize(RestTemplate restTemplate) {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                }
            };
        }
    }

    //
}

restTemplate.setInterceptors(list)這個地方就是注入負載均衡攔截器的地方LoadBalancerInterceptor。

從這個地方實際上也能夠猜出來,RestTemplate能夠經過注入的攔截器來構建相應的請求實現負載均衡。

也能看出來能夠自定義攔截器實現其餘目的。

 

4.RibbonClientConfiguration配置生成ZoneAwareLoadBalancer實例

代碼位置:

spring-cloud-netflix-core-1.3.5.RELEASE.jar
org.springframework.cloud.netflix.ribbon
RibbonClientConfiguration.class

@SuppressWarnings("deprecation")
@Configuration
@EnableConfigurationProperties
//Order is important here, last should be the default, first should be optional
// see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
public class RibbonClientConfiguration {

    //

    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

    //
}

ZoneAwareLoadBalancer繼承自ILoadBalancer接口,該接口有一個方法:

    /**
     * Choose a server from load balancer.
     * 
     * @param key An object that the load balancer may use to determine which server to return. null if 
     *         the load balancer does not use this parameter.
     * @return server chosen
     */
    public Server chooseServer(Object key);

ZoneAwareLoadBalancer就是一個具體的負載均衡實現類,也是默認的負載均衡類,經過對chooseServer方法的實現選取某個服務實例。

 

攔截&請求

1.使用RestTemplate進行Get、Post等各類請求,都是經過doExecute方法實現

代碼位置:
spring-web-4.3.12.RELEASE.jar
org.springframework.web.client
RestTemplate.class

public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {

    //

    protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback,
            ResponseExtractor<T> responseExtractor) throws RestClientException {

        Assert.notNull(url, "'url' must not be null");
        Assert.notNull(method, "'method' must not be null");
        ClientHttpResponse response = null;
        try {
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            response = request.execute();
            handleResponse(url, method, response);
            if (responseExtractor != null) {
                return responseExtractor.extractData(response);
            }
            else {
                return null;
            }
        }
        catch (IOException ex) {
            String resource = url.toString();
            String query = url.getRawQuery();
            resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
            throw new ResourceAccessException("I/O error on " + method.name() +
                    " request for \"" + resource + "\": " + ex.getMessage(), ex);
        }
        finally {
            if (response != null) {
                response.close();
            }
        }
    }

    //

}

支持的各類http請求方法最終都是調用doExecute方法,該方法內調用建立方法建立請求實例,並執行請求獲得響應對象。

 

2.生成請求實例建立工廠

上一步代碼中,調用createRequest方法建立請求實例,這個方法是定義在父類中。

先整理出主要的繼承關係:

 createRequest方法實際是定義在HttpAccessor抽象類中。

public abstract class HttpAccessor {

    private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();

    public void setRequestFactory(ClientHttpRequestFactory requestFactory) {
        Assert.notNull(requestFactory, "ClientHttpRequestFactory must not be null");
        this.requestFactory = requestFactory;
    }

    public ClientHttpRequestFactory getRequestFactory() {
        return this.requestFactory;
    }

    protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
        ClientHttpRequest request = getRequestFactory().createRequest(url, method);
        if (logger.isDebugEnabled()) {
            logger.debug("Created " + method.name() + " request for \"" + url + "\"");
        }
        return request;
    }

}

在createRequest方法中調用getRequestFactory方法得到請求實例建立工廠,實際上getRequestFactory並非當前HttpAccessor類中定義的,而是在子類InterceptingHttpAccessor中定義的。

public abstract class InterceptingHttpAccessor extends HttpAccessor {

    private List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>();

    public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
        this.interceptors = interceptors;
    }

    public List<ClientHttpRequestInterceptor> getInterceptors() {
        return interceptors;
    }

    @Override
    public ClientHttpRequestFactory getRequestFactory() {
        ClientHttpRequestFactory delegate = super.getRequestFactory();
        if (!CollectionUtils.isEmpty(getInterceptors())) {
            return new InterceptingClientHttpRequestFactory(delegate, getInterceptors());
        }
        else {
            return delegate;
        }
    }

}

在這裏作了個小動做,首先仍是經過HttpAccessor類建立並得到SimpleClientHttpRequestFactory工廠,這個工廠主要就是在沒有攔截器的時候建立基本請求實例。

其次,在有攔截器注入的狀況下,建立InterceptingClientHttpRequestFactory工廠,該工廠就是建立帶攔截器的請求實例,由於注入了負載均衡攔截器,因此這裏就從InterceptingClientHttpRequestFactory工廠建立。

 

3.經過工廠建立請求實例

建立實例就看工廠的createRequest方法。

public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {

    private final List<ClientHttpRequestInterceptor> interceptors;

    public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
            List<ClientHttpRequestInterceptor> interceptors) {

        super(requestFactory);
        this.interceptors = (interceptors != null ? interceptors : Collections.<ClientHttpRequestInterceptor>emptyList());
    }


    @Override
    protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
        return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
    }

}

就是new了個InterceptingClientHttpRequest實例,而且把攔截器、基本請求實例建立工廠注進去。

 

4.請求實例調用配置階段注入的負載均衡攔截器的攔截方法intercept

可從第1步看出,建立完請求實例後,經過執行請求實例的execute方法執行請求。

ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
    requestCallback.doWithRequest(request);
}
response = request.execute();

實際請求實例是InterceptingClientHttpRequest,execute實際是在它的父類中。

類定義位置:

spring-web-4.3.12.RELEASE.jar
org.springframework.http.client
InterceptingClientHttpRequest.class

看一下它們的繼承關係。

 在execute方法中實際調用了子類實現的executeInternal方法。

public abstract class AbstractClientHttpRequest implements ClientHttpRequest {

    private final HttpHeaders headers = new HttpHeaders();

    private boolean executed = false;

    @Override
    public final HttpHeaders getHeaders() {
        return (this.executed ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
    }

    @Override
    public final OutputStream getBody() throws IOException {
        assertNotExecuted();
        return getBodyInternal(this.headers);
    }

    @Override
    public final ClientHttpResponse execute() throws IOException {
        assertNotExecuted();
        ClientHttpResponse result = executeInternal(this.headers);
        this.executed = true;
        return result;
    }

    protected void assertNotExecuted() {
        Assert.state(!this.executed, "ClientHttpRequest already executed");
    }

    protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException;

    protected abstract ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException;

}

其實就是InterceptingClientHttpRequest類的executeInternal方法,其中,又調用了一個執行器InterceptingRequestExecution的execute,通關判斷若是有攔截器注入進來過,就調用攔截器的intercept方法。

這裏的攔截器實際上就是在配置階段注入進RestTemplate實例的負載均衡攔截器LoadBalancerInterceptor實例,可參考上面配置階段的第2步。

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {

    //

    @Override
    protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
        InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
        return requestExecution.execute(this, bufferedOutput);
    }


    private class InterceptingRequestExecution implements ClientHttpRequestExecution {

        private final Iterator<ClientHttpRequestInterceptor> iterator;

        public InterceptingRequestExecution() {
            this.iterator = interceptors.iterator();
        }

        @Override
        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
                for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
                    List<String> values = entry.getValue();
                    for (String value : values) {
                        delegate.getHeaders().add(entry.getKey(), value);
                    }
                }
                if (body.length > 0) {
                    StreamUtils.copy(body, delegate.getBody());
                }
                return delegate.execute();
            }
        }
    }

}

 

5.負載均衡攔截器調用負載均衡客戶端

在負載均衡攔截器LoadBalancerInterceptor類的intercept方法中,又調用了負載均衡客戶端LoadBalancerClient實現類的execute方法。

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }
}

在配置階段的第1步,能夠看到實現類是RibbonLoadBalancerClient。

 

6.負載均衡客戶端調用負載均衡策略選取目標服務實例併發起請求

在RibbonLoadBalancerClient的第一個execute方法以及getServer方法中能夠看到,其實是經過ILoadBalancer的負載均衡器實現類做的chooseServer方法選取一個服務,交給接下來的請求對象發起一個請求。

這裏的負載均衡實現類默認是ZoneAwareLoadBalancer區域感知負載均衡器實例,其內部經過均衡策略選擇一個服務。

ZoneAwareLoadBalancer的建立能夠參考配置階段的第4步。

public class RibbonLoadBalancerClient implements LoadBalancerClient {
    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

    @Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }
       
    //

    protected Server getServer(ILoadBalancer loadBalancer) {
        if (loadBalancer == null) {
            return null;
        }
        return loadBalancer.chooseServer("default"); // TODO: better handling of key
    }

    protected ILoadBalancer getLoadBalancer(String serviceId) {
        return this.clientFactory.getLoadBalancer(serviceId);
    }

    public static class RibbonServer implements ServiceInstance {
        private final String serviceId;
        private final Server server;
        private final boolean secure;
        private Map<String, String> metadata;

        public RibbonServer(String serviceId, Server server) {
            this(serviceId, server, false, Collections.<String, String> emptyMap());
        }

        public RibbonServer(String serviceId, Server server, boolean secure,
                Map<String, String> metadata) {
            this.serviceId = serviceId;
            this.server = server;
            this.secure = secure;
            this.metadata = metadata;
        }

        //
    }

}

 

代碼擼完,總結下。

普通使用RestTemplate請求其餘服務時,內部使用的就是常規的http請求實例發送請求。

爲RestTemplate增長了@LoanBalanced 註解後,實際上經過配置,爲RestTemplate注入負載均衡攔截器,讓負載均衡器選擇根據其對應的策略選擇合適的服務後,再發送請求。

 

End

相關文章
相關標籤/搜索