電子商務平臺源碼請加企鵝求求:三五三六二四七二五九。Spring Cloud集成模式下的Ribbon有如下幾個特徵:java
1.Ribbon 服務配置方式bash
每個服務配置都有一個Spring ApplicationContext上下文,用於加載各自服務的實例。app
在使用Feign做爲客戶端時,最終請求會轉發成 http://<服務名稱>/的格式,經過LoadBalancerFeignClient, 提取出服務標識<服務名稱>,而後根據服務名稱在上下文中查找對應服務的負載均衡器FeignLoadBalancer,負載均衡器負責根據既有的服務實例的統計信息,挑選出最合適的服務實例。負載均衡
2、Spring Cloud模式下和Feign的集成實現方式框架
和Feign結合的場景下,Feign的調用會被包裝成調用請求LoadBalancerCommand,而後底層經過Rxjava基於事件的編碼風格,發送請求;Spring Cloud框架經過 Feigin 請求的URL,提取出服務名稱,而後在上下文中找到對應服務的的負載均衡器實現FeignLoadBalancer,而後經過負載均衡器中挑選一個合適的Server實例,而後將調用請求轉發到該Server實例上,完成調用,在此過程當中,記錄對應Server實例的調用統計信息。async
/**
* Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
* If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
* function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
* exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
* result during execution and retries will be emitted.
*/
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
// 同一Server最大嘗試次數
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
//下一Server最大嘗試次數
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
// 使用負載均衡器,挑選出合適的Server,而後執行Server請求,將請求的數據和行爲整合到ServerStats中
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
// 獲取Server的統計值
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry 服務調用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();//重試計數
loadBalancerContext.noteOpenConnection(stats);//連接統計
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//執行監控器,記錄執行時間
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//找到合適的server後,開始執行請求
//底層調用有結果後,作消息處理
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// 記錄統計信息
}
@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);//記錄異常信息
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;//返回結果值
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();//結束計時
//標記請求結束,更新統計信息
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
//若是失敗,根據重試策略觸發重試邏輯
// 使用observable 作重試邏輯,根據predicate 作邏輯判斷,這裏作
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
// next請求處理,基於重試器操做
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
複製代碼
從一組ServerList 列表中挑選合適的Serveride
/**
* Compute the final URI from a partial URI in the request. The following steps are performed:
* <ul>
* <li> 若是host還沒有指定,則從負載均衡器中選定 host/port
* <li> 若是host 還沒有指定而且還沒有找到負載均衡器,則嘗試從 虛擬地址中肯定host/port
* <li> 若是指定了HOST,而且URI的受權部分經過虛擬地址設置,而且存在負載均衡器,則經過負載就均衡器中肯定host/port(指定的HOST將會被忽略)
* <li> 若是host已指定,可是還沒有指定負載均衡器和虛擬地址配置,則使用真實地址做爲host
* <li> if host is missing but none of the above applies, throws ClientException
* </ul>
*
* @param original Original URI passed from caller
*/
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// Various Supported Cases
// The loadbalancer to use and the instances it has is based on how it was registered
// In each of these cases, the client might come in using Full Url or Partial URL
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// 提供部分URI,缺乏HOST狀況下
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);// 使用負載均衡器選擇Server
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
//經過負載均衡器選擇的結果中選擇host
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
} else {
// No Full URL - and we dont have a LoadBalancer registered to
// obtain a server
// if we have a vipAddress that came with the registration, we
// can use that else we
// bail out
// 經過虛擬地址配置解析出host配置返回
if (vipAddresses != null && vipAddresses.contains(",")) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured."
+ " Also, there are multiple vipAddresses and hence no vip address can be chosen"
+ " to complete this partial uri");
} else if (vipAddresses != null) {
try {
Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
host = hostAndPort.first();
port = hostAndPort.second();
} catch (URISyntaxException e) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured. "
+ " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
}
} else {
throw new ClientException(
ClientException.ErrorType.GENERAL,
this.clientName
+ " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
+ " Also has no vipAddress registered");
}
}
} else {
// Full URL Case URL中指定了全地址,多是虛擬地址或者是hostAndPort
// This could either be a vipAddress or a hostAndPort or a real DNS
// if vipAddress or hostAndPort, we just have to consult the loadbalancer
// but if it does not return a server, we should just proceed anyways
// and assume its a DNS
// For restClients registered using a vipAddress AND executing a request
// by passing in the full URL (including host and port), we should only
// consult lb IFF the URL passed is registered as vipAddress in Discovery
boolean shouldInterpretAsVip = false;
if (lb != null) {
shouldInterpretAsVip = isVipRecognized(original.getAuthority());
}
if (shouldInterpretAsVip) {
Server svc = lb.chooseServer(loadBalancerKey);
if (svc != null){
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("using LB returned Server: {} for request: {}", svc, original);
return svc;
} else {
// just fall back as real DNS
logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
}
} else {
// consult LB to obtain vipAddress backed instance given full URL
//Full URL execute request - where url!=vipAddress
logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
}
}
// end of creating final URL
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
}
// just verify that at this point we have a full URL
return new Server(host, port);
}
複製代碼