微服務-(ribbon負載均衡)

它實現的是客戶端的負載均衡java

問題1:它是怎麼實現的負載均衡算法?算法

問題2:它是怎麼經過實例名獲取到的ip地址?spring

咱們能夠開始嘗試跟蹤一下:緩存

咱們對RestTemplate已經比較瞭解了,它自己只提供了Http調用的功能,並不具有負載均衡的能力,那麼咱們能夠猜想可能起到做用的就是@LoadBalanced這個註解。咱們進入這個註解,會發現註解上存在一句註釋:服務器

Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.

這句註釋說明了,當前這個註解是代表RestTemplate 將要使用 LoadBalancerClient ,咱們把這個bean給記下來。app

接下來咱們進入到LoadBalancerClient ` 發現它是一個接口,接口中提供了三個方法:負載均衡

//使用從LoadBalancer中選擇出來的實例執行
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
//使用從LoadBalancer中選擇出來的實例執行,指定哪一個實例來執行
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
//爲系統構建一個合適的host:port形式的Url
URI reconstructURI(ServiceInstance instance, URI original);

追蹤它的實現類,咱們會看到RibbonLoadBalancerClient這個類,它裏面實現了這些方法。dom

咱們經過RestTemplate 的追蹤,咱們會發現RestTemplate 中調用postForObject() 方法時會觸發LoadBalancerInterceptor的方法,而後會發現它最後執行了一個this.loadBalancer.execute()方法ide

而這個execute方法就是咱們在LoadBalancerClient 接口中看到的方法!工具

咱們進入RibbonLoadBalancerClient中去查看它的實現方法--

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
       //經過seviceid獲取到咱們的loadBalancer
       ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
       //獲取到咱們的sever對象
       Server server = getServer(loadBalancer, hint);
       if (server == null) {
              throw new IllegalStateException("No instances available for " + serviceId);
       }
       RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
       serviceId), serverIntrospector(serviceId).getMetadata(server));
​
       return execute(serviceId, ribbonServer, request);
}

追蹤getServer()方法 發現有這一行代碼

return loadBalancer.chooseServer(hint != null ? hint : "default");

咱們能夠獲得一個結論,這行代碼的chooseServer方法實現了負載均衡的策略,同時給咱們返回了一個實例回來。

那麼它是如何找到的host:port的呢?

進入到RibbonLoadBalancerClient中找到reconstructURI()方法,能夠看到有一個RibbonLoadBalancerContext類,進入這個類,會發現它在構造器中傳入了一個ILoadBalancer

ILoadBalancer這個接口中存在addServers(List<Server> newServers)在內部咱們能夠獲得一個結論,

咱們全部的host:port形式的東西都存放在RibbonLoadBalancerContext中,它會去經過一個實例去獲取到咱們的

host:port形式的一個uri地址。也就是一個server對象。

追蹤RibbonLoadBalancerContext咱們會發現裏面實現了不少方法

public class RibbonLoadBalancerContext extends LoadBalancerContext {
   public RibbonLoadBalancerContext(ILoadBalancer lb) {
      super(lb);
   }
    
   //構造器2,初始化了一個Client的配置項
   public RibbonLoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig) {
      super(lb, clientConfig);
   }
  
   //構造器3,初始化時帶入了重試機制,進入RetryHandler,
   public RibbonLoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig,RetryHandler, handler) {
      super(lb, clientConfig, handler);
   }
   
   /**記錄活躍數,這裏注意Serverstats類,裏面定義了大量的狀態(響應時間,錯誤數量,活躍數量等),       *note         表明筆記本的意思,
   *它實際上就是記錄一些狀態數據 noteOpenConnection 方法裏面 實際上就給咱們提供一次加一方法
   */
   @Override
   public void noteOpenConnection(ServerStats serverStats) {
      super.noteOpenConnection(serverStats);
   }
​
   @Override
   public Timer getExecuteTracer() {
      return super.getExecuteTracer();
   }
   
   /***
   * 好比這個,就是記錄請求響應結束,這時候的異常會被記錄。
   *
   */
   @Override
   public void noteRequestCompletion(ServerStats stats, Object response, Throwable e,long responseTime) {
      super.noteRequestCompletion(stats, response, e, responseTime);
   }
​
   @Override
   public void noteRequestCompletion(ServerStats stats, Object response, Throwable e,long responseTime, RetryHandler errorHandler) {
      super.noteRequestCompletion(stats, response, e, responseTime, errorHandler);
   }

上面代碼能夠看到一個很明顯的RetryHandler,跟蹤進去看

public interface RetryHandler {
​
   public static final RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler();
   
   /**
    * Test if an exception is retriable for the load balancer
    * 
    * @param e the original exception
    * @param sameServer if true, the method is trying to determine if retry can be 
    *       done on the same server. Otherwise, it is testing whether retry can be
    *       done on a different server
    */
   public boolean isRetriableException(Throwable e, boolean sameServer);
​
   /**
    * Test if an exception should be treated as circuit failure. For example, 
    * a {@link ConnectException} is a circuit failure. This is used to determine
    * whether successive exceptions of such should trip the circuit breaker to a particular
    * host by the load balancer. If false but a server response is absent, 
    * load balancer will also close the circuit upon getting such exception.
    */
   public boolean isCircuitTrippingException(Throwable e);
       
   /**
    * @return Number of maximal retries to be done on one server
    */
   //返回在同一臺服務器最大的重試次數
   public int getMaxRetriesOnSameServer();
​
   /**
    * @return Number of maximal different servers to retry
    */
   //返回在下一個服務器最大重試次數
   public int getMaxRetriesOnNextServer();
}

這些東西只須要了解一下

問題3:它的負載均衡器有哪些?

追蹤咱們的RibbonLoadBalancerClient中的execute中的loadBalancer.chooseServer()會發現它調用了接口ILoadBalancer中的chooseServer()方法.

追蹤ILoadBalancer咱們能夠看到它內部其實實現了這幾種方法:

   //添加服務實例
public void addServers(List<Server> newServers);
//選擇服務實例
public Server chooseServer(Object key);
   //由負載均衡器的客戶端調用,以通知服務器宕機,不然,LB會認爲它還活着,直到下一個Ping週期——有可能
public void markServerDown(Server server); 

@Deprecated //不推薦使用
public List<Server> getServerList(boolean availableOnly);

   //返回一個啓動而且正常的服務
   public List<Server> getReachableServers();
   
​
   //返回全部服務
public List<Server> getAllServers();

結論:ILoadBalancer接口實際上給咱們提供了三種結果

  • 添加服務實例
  • 返回服務實例
  • 讓服務下線

它的默認實現就是ZoneAwareLoadBalancer 

咱們能夠在RibbonClientConfiguration#ribbonLoadBalancer()中看到它若是沒有設定負載均衡器就返回默認的。

  • ZoneAwareLoadBalancer 是對DynamicServerListLoadBalancer的擴展,它重寫了setServerListForZones()方法,這個方法在父類的做用是根據按區域zone的分組實例列表,再給每個Zone對應一個zoneStats來存儲一些狀態和統計信息的。

重寫以後咱們能夠看到,在該實現中建立了一個ConcurrentHashMap類型的balance對象,用來存儲每一個Zone區域對應的負載均衡器,負載均衡器的建立就是經過getLoadBalancer(zone).setServersList(entry.getValue());來完成的。 這是ILoadBalancer的默認實現

查看這個方法的實現類,咱們會發現這裏有幾個類實現了它的方法:

  • BaseLoadBalaner 是實現Ribbon負載均衡的基礎實現類,在該類中定義了不少關於負載均衡的基礎內容
  1. 它維護了兩個存儲服務實例Server對象的列表,一個存儲全部服務實例清單,一個存儲正常服務實例清單
  2. 它定義了檢查服務器裏是否正常的Iping 對象,須要在構造時注入它的實現
  3. 定義了檢查服務實例操做的執行策略對象IPingStrategy
  4. 它定義了負載均衡處理規則IRule
  5. 它定義了用來存儲負載均衡器的各個服務實例屬性和統計信息的LoadBalancerStats

BaseLoadBalaner基礎上,還有兩個子類

DynamicServerListLoadBalancer 是對BaseLoadBalaner 作的一個擴展,在父類的基礎上,實現了服務實例清單在運行期的動態更新能力;同時還具有了對服務實例清單過濾的功能,也就是說,使用它的時候,能夠經過過濾器來選擇性的獲取一批服務實例的清單。

1.新增了一個ServerList<T> serverListImpl的接口,去跟蹤ServerList 會發現內部定義了兩個抽象方法

public interface ServerList<T extends Server> {
  
   //獲取初始化的服務實例清單
   public List<T> getInitialListOfServers();
   
  //獲取更新的服務實例清單
   public List<T> getUpdatedListOfServers();   
​
}

繼續追蹤,會發現它的實現有五個,咱們須要去判斷一下,在這裏它用的是什麼方式來作的實現?

咱們作一個猜想,既然在負載均衡器中須要實現服務實例的動態更新,那麼它就勢必須要有去訪問Eureka來獲取服務實例的能力。咱們能夠去查看一下EurekaRibbonClientConfiguration

@Bean
   @ConditionalOnMissingBean
   public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
       if (this.propertiesFactory.isSet(ServerList.class, this.serviceId)) {
           return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.serviceId);
      } else {
           /**經過DiscoveryEnabledNIWSServerList內部的obtainServersViaDiscovery()
           從註冊中心經過serviceId獲取到服務實例列表,將狀態爲UP的服務放入list返回**/
           DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider);
           //經過discoveryServerList 獲取到兩個集合--初始化的服務清單,更新的服務清單
           DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname);
           return serverList;
      }
  }

在這裏咱們可以看到裏面是採用了DomainExtractingServerList來進行實現的。咱們開始追蹤DomainExtractingServerList會發現它在構造器內部,經過傳入的List<DiscoveryEnabledServer>

在實現這兩個方法的時候經過setZones方法得到了兩個集合。

內部的實現方法過於複雜,感興趣的同窗能夠本身追蹤。只要知道流程就OK了。

問題4:剛纔看到了IRule這個接口,也知道了這是ribbon的負載均衡策略,那麼具體它包含了哪些策略?

默認策略:ZoneAvoidanceRule ===> RibbonClientConfiguration#ribbonRule()

  • ZoneAvoidanceRule 規避區域策略
  • AbstractLoadBalancerRule 策略的抽象類,它在內部定義了ILoadBalancer對象,這個對象主要是用來在具體選擇哪一種策略的時候,獲取到負載均衡器中維護的信息的。
  • AvailabilityFilteringRule該策略繼承自抽象策略PredicateBasedRule因此也繼承了"先過濾清單,再輪詢選擇"的基本處理邏輯,該策略經過線性抽樣的方式直接嘗試可用且較空閒的實例來使用,優化了父類每次都要遍歷全部實例的開銷。
  • BestAvailableRule繼承自ClientConfigEnabledRoundRobinRule該策略的特性是可選出最空閒的實例
  • ClientConfigEnabledRoundRobinRule該策略較爲特殊,咱們通常不直接使用它。由於它自己並無實現什麼特殊的處理邏輯。經過繼承該策略,默認的choose就實現了線性輪詢機制,在子類中作一些高級策略時一般可能存在。一些沒法實施的狀況,就能夠用父類的實現做爲備選
  • PredicateBasedRule抽象策略,繼承自ClientConfigEnabledRoundRobinRule,基於Predicate的策略 Predicateshi Google Guava Collection工具對集合進行過濾的條件接口
  • RandomRule 隨機數策略,它就是經過一個隨機數來獲取uplist的某一個下標,再返回。
  • RetryRule 帶重試機制策略,它在內部還定義了一個IRule,默認使用了RoundRobinRule,在內部實現了反覆重試的機制,若是重試可以獲得一個服務,就返回,若是不能就會根據以前設置的時間來決定,時間一到就返回null.
  • RoundRobinRule 一個輪詢策略,經過一個count計數變量,每次循環都會累加,注意,若是一直沒有server可供選擇達到了10次,就會打印一個警告信息。
  • WeightedResponseTimeRule 這個策略是對輪詢策略的擴展,增長了根據實例的運行狀況來計算權重,並根據權重來挑選實例,用以達到更好的分配結果。

這個策略比較複雜,請注意:

首先類裏面定義了一個定時任務DynamicServerWeightTask,默認30秒執行一次

class DynamicServerWeightTask extends TimerTask {
      public void run() {
          ServerWeight serverWeight = new ServerWeight();
          try {
              //每隔30秒計算一次權重
              serverWeight.maintainWeights();
          } catch (Exception e) {
              logger.error("Error running DynamicServerWeightTask for {}", name, e);
          }
      }
  }
public void maintainWeights() {
           ILoadBalancer lb = getLoadBalancer();
           if (lb == null) {
               return;
          }
           
           if (!serverWeightAssignmentInProgress.compareAndSet(false,  true)) {
               return; 
          }
           
           try {
               logger.info("Weight adjusting job started");
               AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
               LoadBalancerStats stats = nlb.getLoadBalancerStats();
               if (stats == null) {
                   // no statistics, nothing to do
                   return;
              }
               //計算全部實例的想贏時間的總和
               double totalResponseTime = 0;
               // find maximal 95% response time
               for (Server server : nlb.getAllServers()) {
                  //若是服務不在緩存中,就自動加載。
                   ServerStats ss = stats.getSingleServerStat(server);
                   totalResponseTime += ss.getResponseTimeAvg();
              }
               //遍歷計算每一個實例的權重,公式以下:weightSoFar+totalResponseTime - 平均響應時間
               Double weightSoFar = 0.0;
               
               // create new list and hot swap the reference
               List<Double> finalWeights = new ArrayList<Double>();
               for (Server server : nlb.getAllServers()) {
                   ServerStats ss = stats.getSingleServerStat(server);
                   double weight = totalResponseTime - ss.getResponseTimeAvg();
                   weightSoFar += weight;
                   finalWeights.add(weightSoFar);   
              }
               setWeights(finalWeights);
          } catch (Exception e) {
               logger.error("Error calculating server weights", e);
          } finally {
               serverWeightAssignmentInProgress.set(false);
          }
​
      }
  }

這段代碼利用了一個公式,weightSoFar+totalResponseTime - 平均響應時間

咱們能夠舉個例子 假設有,A,B,C三個實例能夠選擇,他們的平均響應時間爲10,40,80,那麼咱們能夠獲得它的一個總響應時長爲10+40+80 = 130

那麼根據這個公式能夠獲得的權重 

A:0+130 -10 =120;

B120+(130-40) = 210;

C: 210 +130-80 = 260

這裏其實是一個數字軸,它會本身生成一個隨機數落在這個數字軸上,在哪一個區間就會去選擇哪臺服務。

問題5:Ribbonping策略

ribbonIPing這個對象定義了它的ping策略,咱們都知道,須要判斷服務是否存活的方式一般都是用心跳,在計算機中心跳一般都是ping這樣標識,咱們會每隔一段時間去訪問一次服務,一旦有正確返回狀態,咱們就認爲當前服務存活。

一樣的咱們能夠在RibbonClientConfiguration下去看到它的默認策略DummyPing()

問題6:服務列表ServerList<Server> 的初始化

RibbonClientConfiguration#ribbonServerList下會初始化

ServerList主要是負責:

  • 獲取初始化的服務列表
  • 獲取更新的服務列表

它的默認實現比較有意思,若是Eureka關閉,它實現的就是ConfigurationBasedServerList

若是咱們整合了Eureka,會發現它默認實現的就是DiscoveryEnabledNIWSServerList -->咱們能夠經過EurekaRibbonClientConfiguration#ribbonServerList中去看到這個服務列表

問題7Ribbon的自動裝配

  • RibbonAutoConfiguration 內部初始化了比較重要的兩個東西:

LoadBalancerClient 這個咱們在以前解釋過

PropertiesFactory 經過一些配置化的方式進行組裝。

  • RibbonClientConfiguration 這個裏面實現了很是多的東西

RibbonLoadBalancerContext 

IRule 規則

IPING 心跳

ServerList 服務列表

ILoadBalancer

IClientConfig

問題8:重試機制

在咱們使用ribbon中,一旦某個服務實例宕機或者掉線,而咱們的eureka沒有及時清理,會發生返回錯誤的狀況,那麼針對這種狀況下,咱們有沒有什麼機制能夠去解決問題的?

咱們能夠繼續去找LoadBalancerAutoConfiguration 會看到這樣一段代碼

@Configuration
@ConditionalOnClass(RetryTemplate.class)
public static class RetryInterceptorAutoConfiguration {

   @Bean
   @ConditionalOnMissingBean
   public RetryLoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,LoadBalancerRequestFactory requestFactory,LoadBalancedRetryFactory loadBalancedRetryFactory) {
   return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory, loadBalancedRetryFactory);
   }
​
   @Bean
   @ConditionalOnMissingBean
   public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
   return restTemplate -> {
               List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                       restTemplate.getInterceptors());
               list.add(loadBalancerInterceptor);
               restTemplate.setInterceptors(list);
          };
   }
}

它在這裏面定義了負載均衡的重試機制,咱們能夠來看看這個怎麼用的。

導入pom.xml

<!--引入重試機制,讓重試生效-->
<dependency>
  <groupId>org.springframework.retry</groupId>
  <artifactId>spring-retry</artifactId>
</dependency>

而後,咱們再來嘗試,咱們會發現重試機制生效了。

相關文章
相關標籤/搜索