Spring Cloud Ribbon是一個基於HTTP和TCP的客戶端負載均衡工具,基於Netflix Ribbon實現。java
負載均衡是對系統的高可用、網絡壓力的緩解和處理內容擴容的重要手段之一。git
負載均衡能夠分爲客戶端負載均衡和服務端負載均衡。算法
負載均衡按設備來分爲硬件負載均衡和軟件負載均衡,都屬於服務端負載均衡。spring
硬件負載均衡主要經過在服務器節點之間安裝專門用於負載均衡的設備,例如F5等。服務器
軟件負載均衡經過在服務器上安裝一些具備負載均衡功能或模塊的軟件來完成請求的轉發工做,例如Nginx等。微信
硬件負載均衡和軟件負載均衡都會維護一個可用的服務清單,而後經過心跳檢測來剔除故障節點以保證服務清單中的節點都正常可用。當客戶端發出請求時,負載均衡器會按照某種算法(線性輪詢、按權重負載、按流量負載等)從服務清單中取出一臺服務器的地址,而後將請求轉發到該服務器上。網絡
客戶端負載均衡須要客戶端本身維護本身要訪問的服務實例清單, 這些服務清單來源於註冊中心(在使用Eureka進行服務治理時)。app
基於Spring Cloud Ribbon實現客戶端負載均衡很是簡單,主要由如下步驟:負載均衡
既然Ribbon客戶端負載均衡須要爲RestTemplate增長@LoadBalanced註解,那麼下面咱們就從這個註解開始分析。分佈式
經過該註解的頭部註釋能夠得知,該註解的做用是使用LoadBalancerClient來對RestTemplate進行配置,下面接着看LoadBalancerClient
該類是一個接口類,代碼以下:
package org.springframework.cloud.client.loadbalancer; import org.springframework.cloud.client.ServiceInstance; import java.io.IOException; import java.net.URI; public interface LoadBalancerClient extends ServiceInstanceChooser { /** * 注意該方法是從父類ServiceInstanceChooser接口中繼承過來的 */ ServiceInstance choose(String serviceId); <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; URI reconstructURI(ServiceInstance instance, URI original); }
從接口的方法和註釋能夠看出,一個客戶端負載均衡器應該具備如下幾種重要功能:
經過分析org.springframework.cloud.client.loadbalancer包中的類,能夠找出org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration是實現客戶端負載均衡的自動配置類
從LoadBalancerAutoConfiguration類的頭部註解上能夠看出,Ribbon實現客戶端負載均衡的自動配置須要知足下面兩個條件:
在該自動化配置的任務中,主要完成如下三個任務:
經過上面能夠看出,真正實現客戶端負載均衡是由於有LoadBalancerInterceptor攔截器的存在,那麼下面看一下LoadBalancerInterceptor類
package org.springframework.cloud.client.loadbalancer; import java.io.IOException; import java.net.URI; import org.springframework.http.HttpRequest; import org.springframework.http.client.ClientHttpRequestExecution; import org.springframework.http.client.ClientHttpRequestInterceptor; import org.springframework.http.client.ClientHttpResponse; import org.springframework.util.Assert; public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; private LoadBalancerRequestFactory requestFactory; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this.loadBalancer = loadBalancer; this.requestFactory = requestFactory; } public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); } }
package org.springframework.cloud.client.loadbalancer; import java.util.List; import org.springframework.cloud.client.ServiceInstance; import org.springframework.http.HttpRequest; import org.springframework.http.client.ClientHttpRequestExecution; import org.springframework.http.client.ClientHttpResponse; public class LoadBalancerRequestFactory { private LoadBalancerClient loadBalancer; private List<LoadBalancerRequestTransformer> transformers; public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers) { this.loadBalancer = loadBalancer; this.transformers = transformers; } public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) { this.loadBalancer = loadBalancer; } public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) { return instance -> { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer); if (transformers != null) { for (LoadBalancerRequestTransformer transformer : transformers) { serviceRequest = transformer.transformRequest(serviceRequest, instance); } } return execution.execute(serviceRequest, body); }; } }
package org.springframework.cloud.client.loadbalancer; import java.net.URI; import org.springframework.cloud.client.ServiceInstance; import org.springframework.http.HttpRequest; import org.springframework.http.client.support.HttpRequestWrapper; public class ServiceRequestWrapper extends HttpRequestWrapper { private final ServiceInstance instance; private final LoadBalancerClient loadBalancer; public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) { super(request); this.instance = instance; this.loadBalancer = loadBalancer; } @Override public URI getURI() { URI uri = this.loadBalancer.reconstructURI( this.instance, getRequest().getURI()); return uri; } }
攔截器的intercept方法還用到另外兩個類LoadBalancerRequestFactory和ServiceRequestWrapper。
當使用被@LoadBalanced註解的RestTemplate發送請求時,會被LoadBalancerInterceptor攔截器攔截,執行LoadBalancerInterceptor的intercept方法,最終在該方法中選擇合適的服務實例經過LoadBalancerClient的execute方法進行調用。
由於LoadBalancerClient是抽象的負載均衡器接口,下面咱們能夠經過一個具體的負載均衡器RibbonLoadBalancerClient來進行分析。
package org.springframework.cloud.netflix.ribbon; import java.io.IOException; import java.net.URI; import java.util.Collections; import java.util.Map; import com.netflix.client.config.IClientConfig; import com.netflix.loadbalancer.ILoadBalancer; import com.netflix.loadbalancer.Server; import org.springframework.cloud.client.DefaultServiceInstance; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest; import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; import static org.springframework.cloud.netflix.ribbon.RibbonUtils.updateToSecureConnectionIfNeeded; public class RibbonLoadBalancerClient implements LoadBalancerClient { private SpringClientFactory clientFactory; public RibbonLoadBalancerClient(SpringClientFactory clientFactory) { this.clientFactory = clientFactory; } @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); } @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if(serviceInstance instanceof RibbonServer) { server = ((RibbonServer)serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; } }
分析execute(String serviceId, LoadBalancerRequest<T> request)方法的執行步驟:
1.獲取負載均衡器,默認的負載均衡器是ZoneAwareLoadBalancer,這個後面再講
2.獲取具體的服務實例,這裏並無調用LoadBalancerClient的ServiceInstance choose(String serviceId)方法,而是調用的ILoadBalancer的Server chooseServer(Object key)方法來獲取具體的服務實例,想知道ILoadBalancer幹啥用,請看下面的接口介紹
3.將獲取到的具體服務實例包裝成RibbonServer對象(該對象存儲了服務實例信息、服務名serviceId、是否須要https等其餘信息),最後調用LoadBalancerRequest的apply方法,向一個實際的的具體服務實例發起請求,請看下面LoadLoadBalancerRequest的分析
package com.netflix.loadbalancer; import java.util.List; public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getReachableServers(); public List<Server> getAllServers(); }
該類也是負載均衡器的一個抽象接口,主要定義了一系列的抽象操做:
該接口中使用到的Server對象定義是一個傳統的服務端節點,在該類中存儲了服務端節點的一些元數據信息,包括host、port以及一些部署信息等。
整理該接口的實現類主要有:
默認使用的負載均衡器是ZoneAwareLoadBalancer,要想知道這個結果很簡單,查看一下RibbonClientConfiguration配置類,該類中有一個方法以下:
@Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); }
此處的代碼邏輯若是沒有配置負載均衡器,那麼默認的負載均衡器就是ZoneAwareLoadBalancer。
該類是一個抽象的接口,只有一個apply(ServiceInstance instance)方法,先看一下ServiceInstance類。
該接口暴露了服務實例的一些基本信息,如:serviceId、port、host等,源碼以下:
package org.springframework.cloud.client; import java.net.URI; import java.util.Map; public interface ServiceInstance { String getServiceId(); String getHost(); int getPort(); boolean isSecure(); URI getUri(); Map<String, String> getMetadata(); default String getScheme() { return null; } }
上面說起的RibbonServer是ServiceInstance的一個具體實現,查看源碼能夠知道RibbonServer除了包含Server對象以外,還存儲了服務名、是否使用HTTPS以及一個Map類型的元數據集合。
看完上面兩個類,咱們來看一下具體的apply方法的實現,源碼中使用Lamada表達式實現,下面我只抽出具體的功能實現的源碼以下:
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer); if (transformers != null) { for (LoadBalancerRequestTransformer transformer : transformers) { serviceRequest = transformer.transformRequest(serviceRequest, instance); } } return execution.execute(serviceRequest, body);
在apply方法中,首先將HttpRequest包裝成ServiceRequestWrapper對象,下面看一下ServiceRequestWrapper類,源碼以下:
package org.springframework.cloud.client.loadbalancer; import java.net.URI; import org.springframework.cloud.client.ServiceInstance; import org.springframework.http.HttpRequest; import org.springframework.http.client.support.HttpRequestWrapper; public class ServiceRequestWrapper extends HttpRequestWrapper { private final ServiceInstance instance; private final LoadBalancerClient loadBalancer; public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) { super(request); this.instance = instance; this.loadBalancer = loadBalancer; } @Override public URI getURI() { URI uri = this.loadBalancer.reconstructURI( this.instance, getRequest().getURI()); return uri; } }
在ServiceRequestWrapper類中重寫了getURI方法,重寫後的該方法經過調用LoadBalancerClient中的reconstructURI方法來構建一個host:port形式的URI對外發起請求。
在apply方法的最後在調用ClientHttpRequestExecution的execute方法時,實際會去執行InterceptingClientHttpRequest類下面的InterceptingRequestExecution的execute方法,該方法的具體代碼以下:
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); return nextInterceptor.intercept(request, body, this); } else { HttpMethod method = request.getMethod(); Assert.state(method != null, "No standard HTTP method"); ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value)); if (body.length > 0) { if (delegate instanceof StreamingHttpOutputMessage) { StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate; streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream)); } else { StreamUtils.copy(body, delegate.getBody()); } } return delegate.execute(); } } }
分析上面的代碼能夠看出,在調用requestFactory.createRequest(request.getURI(), method)建立請求時會調用getURI方法,此時調用的getURI方法就是ServiceRequestWrapper類中的getURI方法,進而調用LoadBalancerClient中的reconstructURI方法,下面咱們看一下RibbonLoadBalancerClient中的reconstructURI是如何實現的
public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); URI uri; Server server; if (instance instanceof RibbonServer) { RibbonServer ribbonServer = (RibbonServer) instance; server = ribbonServer.getServer(); uri = updateToSecureConnectionIfNeeded(original, ribbonServer); } else { server = new Server(instance.getScheme(), instance.getHost(), instance.getPort()); IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server); } return context.reconstructURIWithServer(server, uri); }
分析上面的代碼,首先根據serviceId從SpringClientFactory對象中獲取serviceId對應的負載均衡器的上下文RibbonLoadBalancerContext對象,而後再使用該上下文對象的reconstructURIWithServer方法和server對象來構建具體的URI。
備註:
關於LoadBalancerContext類中的reconstructURIWithServer方法是如何組裝host:port形式的邏輯很容易理解,咱們就不在這裏解釋了,有興趣的能夠本身去看一下。
使用被@LoadBalanced註解的RestTemplate發起請求時,會被LoadBalancerInterceptor攔截,而後藉助負載均衡器LoadBalancerClient將邏輯服務名轉換爲host:port的具體的服務實例地址,在使用RibbonLoadBalancerClient(Ribbon實現的負載均衡器)時實際使用的是Ribbon中定義的ILoadBalancer,默認自動化配置的負載均衡器是ZoneAwareLoadBalancer。
https://gitee.com/petterheng/spring-cloud-eureka
後面會介紹負載均衡器的源碼分析,請繼續關注!!!
前往微信公衆號閱讀文章,點擊這裏,或者直接掃碼關注公衆號