它實現的是客戶端的負載均衡java
問題1:它是怎麼實現的負載均衡算法?算法
問題2:它是怎麼經過實例名獲取到的ip地址?spring
咱們能夠開始嘗試跟蹤一下:緩存
咱們對RestTemplate已經比較瞭解了,它自己只提供了Http調用的功能,並不具有負載均衡的能力,那麼咱們能夠猜想可能起到做用的就是@LoadBalanced這個註解。咱們進入這個註解,會發現註解上存在一句註釋:服務器
Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
這句註釋說明了,當前這個註解是代表RestTemplate 將要使用 LoadBalancerClient ,咱們把這個bean給記下來。app
接下來咱們進入到LoadBalancerClient ` 發現它是一個接口,接口中提供了三個方法:負載均衡
//使用從LoadBalancer中選擇出來的實例執行 <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; //使用從LoadBalancer中選擇出來的實例執行,指定哪一個實例來執行 <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; //爲系統構建一個合適的host:port形式的Url URI reconstructURI(ServiceInstance instance, URI original);
追蹤它的實現類,咱們會看到RibbonLoadBalancerClient這個類,它裏面實現了這些方法。dom
咱們經過RestTemplate 的追蹤,咱們會發現RestTemplate 中調用postForObject() 方法時會觸發LoadBalancerInterceptor的方法,而後會發現它最後執行了一個this.loadBalancer.execute()方法ide
而這個execute方法就是咱們在LoadBalancerClient 接口中看到的方法!工具
咱們進入RibbonLoadBalancerClient中去查看它的實現方法--
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { //經過seviceid獲取到咱們的loadBalancer ILoadBalancer loadBalancer = getLoadBalancer(serviceId); //獲取到咱們的sever對象 Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); }
追蹤getServer()方法 發現有這一行代碼
return loadBalancer.chooseServer(hint != null ? hint : "default");
咱們能夠獲得一個結論,這行代碼的chooseServer方法實現了負載均衡的策略,同時給咱們返回了一個實例回來。
那麼它是如何找到的host:port的呢?
進入到RibbonLoadBalancerClient中找到reconstructURI()方法,能夠看到有一個RibbonLoadBalancerContext類,進入這個類,會發現它在構造器中傳入了一個ILoadBalancer
在ILoadBalancer這個接口中存在addServers(List<Server> newServers)在內部咱們能夠獲得一個結論,
咱們全部的host:port形式的東西都存放在RibbonLoadBalancerContext中,它會去經過一個實例去獲取到咱們的
host:port形式的一個uri地址。也就是一個server對象。
追蹤RibbonLoadBalancerContext咱們會發現裏面實現了不少方法
public class RibbonLoadBalancerContext extends LoadBalancerContext { public RibbonLoadBalancerContext(ILoadBalancer lb) { super(lb); } //構造器2,初始化了一個Client的配置項 public RibbonLoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig) { super(lb, clientConfig); } //構造器3,初始化時帶入了重試機制,進入RetryHandler, public RibbonLoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig,RetryHandler, handler) { super(lb, clientConfig, handler); } /**記錄活躍數,這裏注意Serverstats類,裏面定義了大量的狀態(響應時間,錯誤數量,活躍數量等), *note 表明筆記本的意思, *它實際上就是記錄一些狀態數據 noteOpenConnection 方法裏面 實際上就給咱們提供一次加一方法 */ @Override public void noteOpenConnection(ServerStats serverStats) { super.noteOpenConnection(serverStats); } @Override public Timer getExecuteTracer() { return super.getExecuteTracer(); } /*** * 好比這個,就是記錄請求響應結束,這時候的異常會被記錄。 * */ @Override public void noteRequestCompletion(ServerStats stats, Object response, Throwable e,long responseTime) { super.noteRequestCompletion(stats, response, e, responseTime); } @Override public void noteRequestCompletion(ServerStats stats, Object response, Throwable e,long responseTime, RetryHandler errorHandler) { super.noteRequestCompletion(stats, response, e, responseTime, errorHandler); }
上面代碼能夠看到一個很明顯的RetryHandler,跟蹤進去看
public interface RetryHandler { public static final RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler(); /** * Test if an exception is retriable for the load balancer * * @param e the original exception * @param sameServer if true, the method is trying to determine if retry can be * done on the same server. Otherwise, it is testing whether retry can be * done on a different server */ public boolean isRetriableException(Throwable e, boolean sameServer); /** * Test if an exception should be treated as circuit failure. For example, * a {@link ConnectException} is a circuit failure. This is used to determine * whether successive exceptions of such should trip the circuit breaker to a particular * host by the load balancer. If false but a server response is absent, * load balancer will also close the circuit upon getting such exception. */ public boolean isCircuitTrippingException(Throwable e); /** * @return Number of maximal retries to be done on one server */ //返回在同一臺服務器最大的重試次數 public int getMaxRetriesOnSameServer(); /** * @return Number of maximal different servers to retry */ //返回在下一個服務器最大重試次數 public int getMaxRetriesOnNextServer(); }
這些東西只須要了解一下
問題3:它的負載均衡器有哪些?
追蹤咱們的RibbonLoadBalancerClient中的execute中的loadBalancer.chooseServer()會發現它調用了接口ILoadBalancer中的chooseServer()方法.
追蹤ILoadBalancer咱們能夠看到它內部其實實現了這幾種方法:
//添加服務實例 public void addServers(List<Server> newServers); //選擇服務實例 public Server chooseServer(Object key); //由負載均衡器的客戶端調用,以通知服務器宕機,不然,LB會認爲它還活着,直到下一個Ping週期——有可能 public void markServerDown(Server server); @Deprecated //不推薦使用 public List<Server> getServerList(boolean availableOnly); //返回一個啓動而且正常的服務 public List<Server> getReachableServers(); //返回全部服務 public List<Server> getAllServers();
結論:ILoadBalancer接口實際上給咱們提供了三種結果
它的默認實現就是ZoneAwareLoadBalancer
咱們能夠在RibbonClientConfiguration#ribbonLoadBalancer()中看到它若是沒有設定負載均衡器就返回默認的。
重寫以後咱們能夠看到,在該實現中建立了一個ConcurrentHashMap類型的balance對象,用來存儲每一個Zone區域對應的負載均衡器,負載均衡器的建立就是經過getLoadBalancer(zone).setServersList(entry.getValue());來完成的。 這是ILoadBalancer的默認實現
查看這個方法的實現類,咱們會發現這裏有幾個類實現了它的方法:
在BaseLoadBalaner基礎上,還有兩個子類
DynamicServerListLoadBalancer 是對BaseLoadBalaner 作的一個擴展,在父類的基礎上,實現了服務實例清單在運行期的動態更新能力;同時還具有了對服務實例清單過濾的功能,也就是說,使用它的時候,能夠經過過濾器來選擇性的獲取一批服務實例的清單。
1.新增了一個ServerList<T> serverListImpl的接口,去跟蹤ServerList 會發現內部定義了兩個抽象方法
public interface ServerList<T extends Server> { //獲取初始化的服務實例清單 public List<T> getInitialListOfServers(); //獲取更新的服務實例清單 public List<T> getUpdatedListOfServers(); }
繼續追蹤,會發現它的實現有五個,咱們須要去判斷一下,在這裏它用的是什麼方式來作的實現?
咱們作一個猜想,既然在負載均衡器中須要實現服務實例的動態更新,那麼它就勢必須要有去訪問Eureka來獲取服務實例的能力。咱們能夠去查看一下EurekaRibbonClientConfiguration
@Bean @ConditionalOnMissingBean public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) { if (this.propertiesFactory.isSet(ServerList.class, this.serviceId)) { return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.serviceId); } else { /**經過DiscoveryEnabledNIWSServerList內部的obtainServersViaDiscovery() 從註冊中心經過serviceId獲取到服務實例列表,將狀態爲UP的服務放入list返回**/ DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider); //經過discoveryServerList 獲取到兩個集合--初始化的服務清單,更新的服務清單 DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname); return serverList; } }
在這裏咱們可以看到裏面是採用了DomainExtractingServerList來進行實現的。咱們開始追蹤DomainExtractingServerList會發現它在構造器內部,經過傳入的List<DiscoveryEnabledServer>
在實現這兩個方法的時候經過setZones方法得到了兩個集合。
內部的實現方法過於複雜,感興趣的同窗能夠本身追蹤。只要知道流程就OK了。
問題4:剛纔看到了IRule這個接口,也知道了這是ribbon的負載均衡策略,那麼具體它包含了哪些策略?
默認策略:ZoneAvoidanceRule ===> RibbonClientConfiguration#ribbonRule()
這個策略比較複雜,請注意:
首先類裏面定義了一個定時任務DynamicServerWeightTask,默認30秒執行一次
class DynamicServerWeightTask extends TimerTask { public void run() { ServerWeight serverWeight = new ServerWeight(); try { //每隔30秒計算一次權重 serverWeight.maintainWeights(); } catch (Exception e) { logger.error("Error running DynamicServerWeightTask for {}", name, e); } } } public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } //計算全部實例的想贏時間的總和 double totalResponseTime = 0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { //若是服務不在緩存中,就自動加載。 ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } //遍歷計算每一個實例的權重,公式以下:weightSoFar+totalResponseTime - 平均響應時間 Double weightSoFar = 0.0; // create new list and hot swap the reference List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } } }
這段代碼利用了一個公式,weightSoFar+totalResponseTime - 平均響應時間
咱們能夠舉個例子 假設有,A,B,C三個實例能夠選擇,他們的平均響應時間爲10,40,80,那麼咱們能夠獲得它的一個總響應時長爲10+40+80 = 130
那麼根據這個公式能夠獲得的權重
A:0+130 -10 =120;
B:120+(130-40) = 210;
C: 210 +(130-80) = 260;
這裏其實是一個數字軸,它會本身生成一個隨機數落在這個數字軸上,在哪一個區間就會去選擇哪臺服務。
問題5:Ribbon的ping策略
在ribbon中IPing這個對象定義了它的ping策略,咱們都知道,須要判斷服務是否存活的方式一般都是用心跳,在計算機中心跳一般都是ping這樣標識,咱們會每隔一段時間去訪問一次服務,一旦有正確返回狀態,咱們就認爲當前服務存活。
一樣的咱們能夠在RibbonClientConfiguration下去看到它的默認策略DummyPing()
問題6:服務列表ServerList<Server> 的初始化
RibbonClientConfiguration#ribbonServerList下會初始化
ServerList主要是負責:
它的默認實現比較有意思,若是Eureka關閉,它實現的就是ConfigurationBasedServerList
若是咱們整合了Eureka,會發現它默認實現的就是DiscoveryEnabledNIWSServerList -->咱們能夠經過EurekaRibbonClientConfiguration#ribbonServerList中去看到這個服務列表
問題7:Ribbon的自動裝配
LoadBalancerClient 這個咱們在以前解釋過
PropertiesFactory 經過一些配置化的方式進行組裝。
RibbonLoadBalancerContext
IRule 規則
IPING 心跳
ServerList 服務列表
ILoadBalancer
IClientConfig
問題8:重試機制
在咱們使用ribbon中,一旦某個服務實例宕機或者掉線,而咱們的eureka沒有及時清理,會發生返回錯誤的狀況,那麼針對這種狀況下,咱們有沒有什麼機制能夠去解決問題的?
咱們能夠繼續去找LoadBalancerAutoConfiguration 會看到這樣一段代碼
@Configuration @ConditionalOnClass(RetryTemplate.class) public static class RetryInterceptorAutoConfiguration { @Bean @ConditionalOnMissingBean public RetryLoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,LoadBalancerRequestFactory requestFactory,LoadBalancedRetryFactory loadBalancedRetryFactory) { return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory, loadBalancedRetryFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } }
它在這裏面定義了負載均衡的重試機制,咱們能夠來看看這個怎麼用的。
導入pom.xml
<!--引入重試機制,讓重試生效--> <dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency>
而後,咱們再來嘗試,咱們會發現重試機制生效了。