SpringCloud中的Ribbon開源項目,提供了客戶端的負載均衡算法。這篇文章,咱們來介紹下他是如何實現的。爲了方便理解,咱們以客戶端調用的流程來介紹,其中會穿插介紹相關源代碼。java
簡單回顧下Ribbon的使用,這裏強調兩點:算法
一、在啓動類Application中,添加@LoadBalanced註解。app
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
return restTemplate.getForEntity("http://COMPUTE-SERVICE/add?a=10&b=20", String.class).getBody();
先從攔截器LoadBalancerInterceptor開始介紹:負載均衡
從請求中獲取服務名稱,即上文說到的COMPUTE-SERVICE,而後執行LoadBalancerClient的execute方法。dom
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
this.loadBalancer = loadBalancer;
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
new LoadBalancerRequest<ClientHttpResponse>() {
@Override
public ClientHttpResponse apply(final ServiceInstance instance)
throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request,
instance, loadBalancer);
return execution.execute(serviceRequest, body);
}
});
}
}
1)頂層接口ide
實現該接口的類,會使用一個負載均衡器來選擇一個server來轉發請求。this
public interface ServiceInstanceChooser {
ServiceInstance choose(String serviceId);
}
2)繼承接口
LoadBalancerClientspa
提供了兩種不一樣的參數的execute()執行方法。
debug
public interface LoadBalancerClient extends ServiceInstanceChooser {
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
URI reconstructURI(ServiceInstance instance, URI original);
}
3)實現類主要作了一下幾件事:rest
根據serviceId獲取負載均衡器;
根據負載均衡器獲取server;
將請求轉到具體的服務實例。
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
public interface ServiceInstance {
String getServiceId();
String getHost();
int getPort();
boolean isSecure();
URI getUri();
Map<String, String> getMetadata();
}
protected static class RibbonServer implements ServiceInstance { private final String serviceId; private final Server server; private final boolean secure; private Map<String, String> metadata; protected RibbonServer(String serviceId, Server server) { this(serviceId, server, false, Collections.<String, String> emptyMap()); } protected RibbonServer(String serviceId, Server server, boolean secure, Map<String, String> metadata) { this.serviceId = serviceId; this.server = server; this.secure = secure; this.metadata = metadata; } //省去getter和setter @Override public String toString() { final StringBuffer sb = new StringBuffer("RibbonServer{"); sb.append("serviceId='").append(serviceId).append('\''); sb.append(", server=").append(server); sb.append(", secure=").append(secure); sb.append(", metadata=").append(metadata); sb.append('}'); return sb.toString(); } }
部分代碼r
public Server chooseServer(Object key) { Server server = null;
try {
//獲取可用區域Zone
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Throwable e) {
logger.error("Unexpected exception when choosing server using zone aware logic", e);
}
if (server != null) {
return server;
}
}
具體的分配算法:
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}