Ribbon源碼解析



  SpringCloud中的Ribbon開源項目,提供了客戶端的負載均衡算法。這篇文章,咱們來介紹下他是如何實現的。爲了方便理解,咱們以客戶端調用的流程來介紹,其中會穿插介紹相關源代碼。java


簡單回顧下Ribbon的使用,這裏強調兩點:算法

  一、在啓動類Application中,添加@LoadBalanced註解。app

@Bean
	@LoadBalanced
	RestTemplate restTemplate() {
		return new RestTemplate();
	}

  二、結合RestTemplate發起調用,調用時採用服務名稱(如:COMPUTE-SERVICE)來實現。

        return restTemplate.getForEntity("http://COMPUTE-SERVICE/add?a=10&b=20", String.class).getBody();

 

  先從攔截器LoadBalancerInterceptor開始介紹:負載均衡

從請求中獲取服務名稱,即上文說到的COMPUTE-SERVICE,而後執行LoadBalancerClient的execute方法。dom

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		this.loadBalancer = loadBalancer;
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		return this.loadBalancer.execute(serviceName,
				new LoadBalancerRequest<ClientHttpResponse>() {
					@Override
					public ClientHttpResponse apply(final ServiceInstance instance)
							throws Exception {
						HttpRequest serviceRequest = new ServiceRequestWrapper(request,
								instance, loadBalancer);
						return execution.execute(serviceRequest, body);
					}

				});
	}
}


這裏,先簡單介紹下這幾個類。

1)頂層接口ide

實現該接口的類,會使用一個負載均衡器來選擇一個server來轉發請求。this

public interface ServiceInstanceChooser {

    ServiceInstance choose(String serviceId);
}
2)繼承接口

LoadBalancerClientspa

提供了兩種不一樣的參數的execute()執行方法。
debug

public interface LoadBalancerClient extends ServiceInstanceChooser {

	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

	URI reconstructURI(ServiceInstance instance, URI original);
}
3)實現類

在RibbonLoadBalancerClient類中,能夠看到具體的執行方法。

主要作了一下幾件事:rest

   根據serviceId獲取負載均衡器;

   根據負載均衡器獲取server;

   將請求轉到具體的服務實例。

@Override
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		Server server = getServer(loadBalancer);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
				serviceId), serverIntrospector(serviceId).getMetadata(server));

		return execute(serviceId, ribbonServer, request);
	}

	@Override
	public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
		Server server = null;
		if(serviceInstance instanceof RibbonServer) {
			server = ((RibbonServer)serviceInstance).getServer();
		}
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}

		RibbonLoadBalancerContext context = this.clientFactory
				.getLoadBalancerContext(serviceId);
		RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

		try {
			T returnVal = request.apply(serviceInstance);
			statsRecorder.recordStats(returnVal);
			return returnVal;
		}
		// catch IOException and rethrow so RestTemplate behaves correctly
		catch (IOException ex) {
			statsRecorder.recordStats(ex);
			throw ex;
		}
		catch (Exception ex) {
			statsRecorder.recordStats(ex);
			ReflectionUtils.rethrowRuntimeException(ex);
		}
		return null;
	}

其中,ServiceInstance——服務實例接口

	public interface ServiceInstance {
	
		String getServiceId();
                String getHost();
	        int getPort();
	        boolean isSecure();
	
		URI getUri();
	
		Map<String, String> getMetadata();
	}


RibbonServer——服務實例實現類
	protected static class RibbonServer implements ServiceInstance {
			private final String serviceId;
			private final Server server;
			private final boolean secure;
			private Map<String, String> metadata;
	
			protected RibbonServer(String serviceId, Server server) {
				this(serviceId, server, false, Collections.<String, String> emptyMap());
			}
	
			protected RibbonServer(String serviceId, Server server, boolean secure,
					Map<String, String> metadata) {
				this.serviceId = serviceId;
				this.server = server;
				this.secure = secure;
				this.metadata = metadata;
			}
	
		//省去getter和setter
			@Override
			public String toString() {
				final StringBuffer sb = new StringBuffer("RibbonServer{");
				sb.append("serviceId='").append(serviceId).append('\'');
				sb.append(", server=").append(server);
				sb.append(", secure=").append(secure);
				sb.append(", metadata=").append(metadata);
				sb.append('}');
				return sb.toString();
			}
		}

getServer()中,默認使用的ZoneAwareLoadBalancer負載均衡器。

部分代碼r

public Server chooseServer(Object key) {		        Server server = null;
		        try {
		            //獲取可用區域Zone
		            LoadBalancerStats lbStats = getLoadBalancerStats();
		            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
		                  Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
		            logger.debug("Available zones: {}", availableZones);
		            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
		                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
		                logger.debug("Zone chosen: {}", zone);
		                if (zone != null) {
		                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
		                    server = zoneLoadBalancer.chooseServer(key);
		                }
		            }
		        } catch (Throwable e) {
		            logger.error("Unexpected exception when choosing server using zone aware logic", e);
		        }
		        if (server != null) {
		            return server;
		        }
}
具體的分配算法:

static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
            Set<String> chooseFrom) {
        if (chooseFrom == null || chooseFrom.size() == 0) {
            return null;
        }
        String selectedZone = chooseFrom.iterator().next();
        if (chooseFrom.size() == 1) {
            return selectedZone;
        }
        int totalServerCount = 0;
        for (String zone : chooseFrom) {
            totalServerCount += snapshot.get(zone).getInstanceCount();
        }
        int index = random.nextInt(totalServerCount) + 1;
        int sum = 0;
        for (String zone : chooseFrom) {
            sum += snapshot.get(zone).getInstanceCount();
            if (index <= sum) {
                selectedZone = zone;
                break;
            }
        }
        return selectedZone;
    }
相關文章
相關標籤/搜索