Ribbon 寓意爲帶狀。。。 Ribbon是Netflix公司開源的一個負載均衡的項目,是一個客戶端負載均衡器,運行在客戶端上,它是一個通過了雲端測試的IPC庫,能夠很好地控制HTTP和TCP客戶端的一些行爲。 Feign已經默認使用了Ribbon算法
負載均衡 容錯 多協議(HTTP,TCP,UDP)支持異步和反應模型 緩存和批處理
RestTemplate和Ribbon相結合 Ribbon在Netflix組件是很是重要的一個組件,在Zuul中使用Ribbon作負載均衡,以及Feign組件的結合等。在Spring Cloud 中,做爲開發中,作的最多的多是將RestTemplate和Ribbon相結合,你可能會這樣寫:spring
@Configuration public class RibbonConfig { @Bean @LoadBalanced RestTemplate restTemplate() { return new RestTemplate(); } }
消費另一個的服務的接口,差很少是這樣的:緩存
@Service public class RibbonService { @Autowired RestTemplate restTemplate; public String hi(String name) { return restTemplate.getForObject("http://eureka-client/hi?name="+name,String.class); } }
深刻理解Ribbon LoadBalancerClient 在Riibon中一個很是重要的組件爲LoadBalancerClient,它做爲負載均衡的一個客戶端。(Feign最終實現負載均衡,也是要調用到LoadBalancerClient)。它在spring-cloud-commons包下:負載均衡
LoadBalancerClient是一個接口,它繼承ServiceInstanceChooser,它的實現類是RibbonLoadBalancerClient,這三者之間的關係以下圖:less
***LoadBalancerClient源碼,代碼以下:dom
public interface LoadBalancerClient extends ServiceInstanceChooser { <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; URI reconstructURI(ServiceInstance instance, URI original); }
ServiceInstanceChooser接口,主要有一個方法,用來根據serviceId來獲取ServiceInstance,代碼以下:異步
public interface ServiceInstanceChooser { ServiceInstance choose(String serviceId); }
LoadBalancerClient的實現類爲RibbonLoadBalancerClient,這個類是很是重要的一個類,最終的負載均衡的請求處理,由它來執行。它的部分源碼以下:async
public class RibbonLoadBalancerClient implements LoadBalancerClient { ...//省略代碼 @Override public ServiceInstance choose(String serviceId) { Server server = getServer(serviceId); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); } protected Server getServer(String serviceId) { return getServer(getLoadBalancer(serviceId)); } protected Server getServer(ILoadBalancer loadBalancer) { if (loadBalancer == null) { return null; } return loadBalancer.chooseServer("default"); // TODO: better handling of key } protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } ...//省略代碼
在RibbonLoadBalancerClient的源碼中,其中choose()方法是選擇具體服務實例的一個方法。該方法經過getServer()方法去獲取實例,通過源碼跟蹤,最終交給了ILoadBalancer類去選擇服務實例。ide
ILoadBalancer在ribbon-loadbalancer的jar包下,它是定義了實現軟件負載均衡的一個接口,它須要一組可供選擇的服務註冊列表信息,以及根據特定方法去選擇服務,它的源碼以下 :函數
public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getReachableServers(); public List<Server> getAllServers(); }
其中, addServers(): 是添加一個Server集合; chooseServer():是根據key去獲取Server; markServerDown():用來標記某個服務下線; getReachableServers():獲取可用的Server集合; getAllServers():獲取全部的Server集合。
DynamicServerListLoadBalancer 它的繼承類爲BaseLoadBalancer,它的實現類爲DynamicServerListLoadBalancer,這三者之間的關係以下:
查看上述三個類的源碼,可用發現,配置如下信息,IClientConfig、IRule、IPing、ServerList、ServerListFilter和ILoadBalancer,查看BaseLoadBalancer類,它默認的狀況下,實現瞭如下配置:
IClientConfig ribbonClientConfig: DefaultClientConfigImpl配置 IRule ribbonRule: RoundRobinRule 路由策略 IPing ribbonPing: DummyPing ServerList ribbonServerList: ConfigurationBasedServerList ServerListFilter ribbonServerListFilter: ZonePreferenceServerListFilter ILoadBalancer ribbonLoadBalancer: ZoneAwareLoadBalancer
IClientConfig 用於對客戶端或者負載均衡的配置,它的默認實現類爲DefaultClientConfigImpl
IClientConfig 用於對客戶端或者負載均衡的配置,它的默認實現類爲DefaultClientConfigImpl。
IRule用於複雜均衡的策略,它有三個方法,其中choose()是根據key 來獲取server,setLoadBalancer()和getLoadBalancer()是用來設置和獲取ILoadBalancer的,它的源碼以下:
public interface IRule{ public Server choose(Object key); public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); }
IRule有不少默認的實現類,這些實現類根據不一樣的算法和邏輯來處理負載均衡。Ribbon實現的IRule有一下。在大多數狀況下,這些默認的實現類是能夠知足需求的,若是有特性的需求,能夠本身實現。
BestAvailableRule 選擇最小請求數 ClientConfigEnabledRoundRobinRule 輪詢 RandomRule 隨機選擇一個server RoundRobinRule 輪詢選擇server RetryRule 根據輪詢的方式重試 WeightedResponseTimeRule 根據響應時間去分配一個weight ,weight越低,被選擇的可能性就越低 ZoneAvoidanceRule 根據server的zone區域和可用性來輪詢選擇
IPing向server發生」ping」,來判斷該server是否有響應,從而判斷該server是否可用。它有一個isAlive()方法,它的源碼以下:
public interface IPing { /** * Checks whether the given <code>Server</code> is "alive" i.e. should be * considered a candidate while loadbalancing * */ public boolean isAlive(Server server); }
IPing的實現類有PingUrl、PingConstant、NoOpPing、DummyPing和NIWSDiscoveryPing。它門之間的關係以下:
PingUrl 真實的去ping 某個url,判斷其是否alive PingConstant 固定返回某服務是否可用,默認返回true,便可用 NoOpPing 不去ping,直接返回true,便可用。 DummyPing 直接返回true,並實現了initWithNiwsConfig方法。 NIWSDiscoveryPing,根據DiscoveryEnabledServer的InstanceInfo的InstanceStatus去判斷,若是爲InstanceStatus.UP,則爲可用,不然不可用。
ServerListFilter接口,定於了可根據配置去過濾或者根據特性動態獲取符合條件的server列表的方法,代碼以下:
public interface ServerListFilter<T extends Server> { public List<T> getFilteredListOfServers(List<T> servers); }
閱讀DynamicServerListLoadBalancer的源碼,DynamicServerListLoadBalancer的構造函數中有個initWithNiwsConfig()方法。在改方法中,通過一系列的初始化配置,最終執行了restOfInit()方法。其代碼以下:
public DynamicServerListLoadBalancer(IClientConfig clientConfig) { initWithNiwsConfig(clientConfig); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { try { super.initWithNiwsConfig(clientConfig); String niwsServerListClassName = clientConfig.getPropertyAsString( CommonClientConfigKey.NIWSServerListClassName, DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS); ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory .instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig); this.serverListImpl = niwsServerListImpl; if (niwsServerListImpl instanceof AbstractServerList) { AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl) .getFilterImpl(clientConfig); niwsFilter.setLoadBalancerStats(getLoadBalancerStats()); this.filter = niwsFilter; } String serverListUpdaterClassName = clientConfig.getPropertyAsString( CommonClientConfigKey.ServerListUpdaterClassName, DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS ); this.serverListUpdater = (ServerListUpdater) ClientFactory .instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig); restOfInit(clientConfig); } catch (Exception e) { throw new RuntimeException( "Exception while initializing NIWSDiscoveryLoadBalancer:" + clientConfig.getClientName() + ", niwsClientConfig:" + clientConfig, e); } }
在restOfInit()方法上,有一個 updateListOfServers()的方法,該方法是用來獲取全部的ServerList的。
void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }
進一步跟蹤updateListOfServers()方法的源碼,最終由serverListImpl.getUpdatedListOfServers()獲取全部的服務列表的,代碼以下:
@VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }
而serverListImpl是ServerList接口的具體實現類。跟蹤代碼,ServerList的實現類爲DiscoveryEnabledNIWSServerList,在ribbon-eureka.jar的com.netflix.niws.loadbalancer下。其中DiscoveryEnabledNIWSServerList有 getInitialListOfServers()和getUpdatedListOfServers()方法,具體代碼以下:
@Override public List<DiscoveryEnabledServer> getInitialListOfServers(){ return obtainServersViaDiscovery(); } @Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); }
繼續跟蹤源碼,obtainServersViaDiscovery(),是根據eurekaClientProvider.get()來回去EurekaClient,再根據EurekaClient來獲取註冊列表信息,代碼以下:
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }
其中eurekaClientProvider的實現類是LegacyEurekaClientProvider,它是一個獲取eurekaClient類,經過靜態的方法去獲取eurekaClient,其代碼以下:
class LegacyEurekaClientProvider implements Provider<EurekaClient> { private volatile EurekaClient eurekaClient; @Override public synchronized EurekaClient get() { if (eurekaClient == null) { eurekaClient = DiscoveryManager.getInstance().getDiscoveryClient(); } return eurekaClient; } }
EurekaClient的實現類爲DiscoveryClient,在以前已經分析了它具備服務註冊、獲取服務註冊列表等的所有功能。
*************因而可知,負載均衡器是從EurekaClient獲取服務信息,並根據IRule去路由,而且根據IPing去判斷服務的可用性。 **************
負載均衡器多久一次去獲取一次從Eureka Client獲取註冊信息呢? 在BaseLoadBalancer類下,BaseLoadBalancer的構造函數,該構造函數開啓了一個PingTask任務,代碼以下:
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats, IPing ping, IPingStrategy pingStrategy) { ...//代碼省略 setupPingTask(); ...//代碼省略 }
setupPingTask()的具體代碼邏輯,它開啓了ShutdownEnabledTimer執行PingTask任務,在默認狀況下pingIntervalSeconds爲10,即每10秒鐘,向EurekaClient發送一次」ping」。
void setupPingTask() { if (canSkipPing()) { return; } if (lbTimer != null) { lbTimer.cancel(); } lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); forceQuickPing(); }
PingTask源碼,即new一個Pinger對象,並執行runPinger()方法。
class PingTask extends TimerTask { public void run() { try { new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error pinging", name, e); } } }
查看Pinger的runPinger()方法,最終根據 pingerStrategy.pingServers(ping, allServers)來獲取服務的可用性,若是該返回結果,如以前相同,則不去向EurekaClient獲取註冊列表,若是不一樣則通知ServerStatusChangeListener或者changeListeners發生了改變,進行更新或者從新拉取。
public void runPinger() throws Exception { if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do } // we are "in" - we get to Ping Server[] allServers = null; boolean[] results = null; Lock allLock = null; Lock upLock = null; try { /* * The readLock should be free unless an addServer operation is * going on... */ allLock = allServerLock.readLock(); allLock.lock(); allServers = allServerList.toArray(new Server[allServerList.size()]); allLock.unlock(); int numCandidates = allServers.length; results = pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); notifyServerStatusChangeListener(changedServers); } finally { pingInProgress.set(false); } }
因而可知,LoadBalancerClient是在初始化的時候,會向Eureka回去服務註冊列表,而且向經過10s一次向EurekaClient發送「ping」,來判斷服務的可用性,若是服務的可用性發生了改變或者服務數量和以前的不一致,則更新或者從新拉取。LoadBalancerClient有了這些服務註冊列表,就能夠根據具體的IRule來進行負載均衡。
RestTemplate是如何和Ribbon結合的
@LoadBalanced RestTemplate restTemplate() { return new RestTemplate(); }
全局搜索ctr+shift+f @LoadBalanced有哪些類用到了LoadBalanced有哪些類用到了, 發現LoadBalancerAutoConfiguration類,即LoadBalancer自動配置類。
@Configuration @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); } @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializer( final List<RestTemplateCustomizer> customizers) { return new SmartInitializingSingleton() { @Override public void afterSingletonsInstantiated() { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } } }; } @Configuration @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return new RestTemplateCustomizer() { @Override public void customize(RestTemplate restTemplate) { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; } } }
在該類中,首先維護了一個被@LoadBalanced修飾的RestTemplate對象的List,在初始化的過程當中,經過調用customizer.customize(restTemplate)方法來給RestTemplate增長攔截器LoadBalancerInterceptor。
而LoadBalancerInterceptor,用於實時攔截,在LoadBalancerInterceptor這裏實現來負載均衡。LoadBalancerInterceptor的攔截方法以下:
@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); }
總結
綜上所述,Ribbon的負載均衡,主要經過LoadBalancerClient來實現的,而LoadBalancerClient具體交給了ILoadBalancer來處理,ILoadBalancer經過配置IRule、IPing等信息,並向EurekaClient獲取註冊列表的信息,並默認10秒一次向EurekaClient發送「ping」,進而檢查是否更新服務列表,最後,獲得註冊列表後,ILoadBalancer根據IRule的策略進行負載均衡。
而RestTemplate 被@LoadBalance註解後,能過用負載均衡,主要是維護了一個被@LoadBalance註解的RestTemplate列表,並給列表中的RestTemplate添加攔截器,進而交給負載均衡器去處理。