private static final Logger logger = LoggerFactory .getLogger(AbstractClusterInvoker.class); /** * 目錄,包含多個invoker */ protected final Directory<T> directory; /** * 是否須要覈對可用 */ protected final boolean availablecheck; /** * 是否銷燬 */ private AtomicBoolean destroyed = new AtomicBoolean(false); /** * 粘滯鏈接的Invoker */ private volatile Invoker<T> stickyInvoker = null;
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { // 若是invokers爲空,則返回null if (invokers == null || invokers.isEmpty()) return null; // 得到方法名 String methodName = invocation == null ? "" : invocation.getMethodName(); // 是否啓動了粘滯鏈接 boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); { //ignore overloaded method // 若是上一次粘滯鏈接的調用不在可選的提供者列合內,則直接設置爲空 if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore concurrency problem // stickyInvoker不爲null,而且沒在已選列表中,返回上次的服務提供者stickyInvoker,但以前強制校驗可達性。 // 因爲stickyInvoker不能包含在selected列表中,經過代碼看,能夠得知forking和failover集羣策略,用不了sticky屬性 if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } // 利用負載均衡選一個提供者 Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); // 若是啓動粘滯鏈接,則記錄這一次的調用 if (sticky) { stickyInvoker = invoker; } return invoker; }
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; // 若是隻有一個 ,就直接返回這個 if (invokers.size() == 1) return invokers.get(0); // 若是沒有指定用哪一個負載均衡策略,則默認用隨機負載均衡策略 if (loadbalance == null) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } // 調用負載均衡選擇 Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. // 若是選擇的提供者,已在selected中或者不可用則從新選擇 if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { // 從新選擇 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { //Check the index of current selected invoker, if it's not the last one, choose the one at index+1. // 若是從新選擇失敗,看下第一次選的位置,若是不是最後,選+1位置. int index = invokers.indexOf(invoker); try { //Avoid collision // 最後再避免選擇到同一個invoker invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; }
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException { //Allocating one in advance, this list is certain to be used. //預先分配一個重選列表,這個列表是必定會用到的. List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size()); //First, try picking a invoker not in `selected`. //先從非select中選 //把不包含在selected中的提供者,放入重選列表reselectInvokers,讓負載均衡器選擇 if (availablecheck) { // invoker.isAvailable() should be checked for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } } // 在重選列表中用負載均衡器選擇 if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } else { // do not check invoker.isAvailable() // 不覈對服務是否能夠,把不包含在selected中的提供者,放入重選列表reselectInvokers,讓負載均衡器選擇 for (Invoker<T> invoker : invokers) { if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } // Just pick an available invoker using loadbalance policy { // 若是非selected的列表中沒有選擇到,則從selected中選擇 if (selected != null) { for (Invoker<T> invoker : selected) { if ((invoker.isAvailable()) // available first && !reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } return null; }
@Override public Result invoke(final Invocation invocation) throws RpcException { // 覈對是否已經銷燬 checkWhetherDestroyed(); LoadBalance loadbalance = null; // binding attachments into invocation. // 得到上下文的附加值 Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); // 把附加值放入到會話域中 if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 生成服務提供者集合 List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { // 得到負載均衡器 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
protected List<Invoker<T>> list(Invocation invocation) throws RpcException { // 把會話域中的invoker加入集合 List<Invoker<T>> invokers = directory.list(invocation); return invokers; }
public class AvailableCluster implements Cluster { public static final String NAME = "available"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立一個AbstractClusterInvoker return new AbstractClusterInvoker<T>(directory) { @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 遍歷全部的involer,只要有一個可用就直接調用。 for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); } }; } }
Available Cluster我在上面已經講過了,只要找到一個可用的,則直接調用。ide
public class BroadcastCluster implements Cluster { @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立一個BroadcastClusterInvoker return new BroadcastClusterInvoker<T>(directory); } }
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); // 把invokers放到上下文 RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; // 遍歷invokers,逐個調用,在循環調用結束後,只要任意一臺報錯就報錯 for (Invoker<T> invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { throw exception; } return result; } }
public class ForkingCluster implements Cluster { public final static String NAME = "forking"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立ForkingClusterInvoker return new ForkingClusterInvoker<T>(directory); } }
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> { /** * 線程池 * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread} * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}. */ private final ExecutorService executor = Executors.newCachedThreadPool( new NamedInternalThreadFactory("forking-cluster-timer", true)); public ForkingClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); final List<Invoker<T>> selected; // 獲取 forks 配置 final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); // 獲取超時配置 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 若是 forks 配置不合理,則直接將 invokers 賦值給 selected if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<Invoker<T>>(); // 循環選出 forks 個 Invoker,並添加到 selected 中 for (int i = 0; i < forks; i++) { // TODO. Add some comment here, refer chinese version for more details. // 選擇 Invoker Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) {//Avoid add the same invoker several times. // 加入到selected集合 selected.add(invoker); } } } // 放入上下文 RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); // 遍歷 selected 列表 for (final Invoker<T> invoker : selected) { // 爲每一個 Invoker 建立一個執行線程 executor.execute(new Runnable() { @Override public void run() { try { // 進行遠程調用 Result result = invoker.invoke(invocation); // 將結果存到阻塞隊列中 ref.offer(result); } catch (Throwable e) { // 僅在 value 大於等於 selected.size() 時,纔將異常對象 // 爲了防止異常現象覆蓋正常的結果 int value = count.incrementAndGet(); if (value >= selected.size()) { // 將異常對象存入到阻塞隊列中 ref.offer(e); } } } }); } try { // 從阻塞隊列中取出遠程調用結果 Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); // 若是是異常,則拋出 if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } } finally { // clear attachments which is binding to current thread. RpcContext.getContext().clearAttachments(); } } }
public class FailbackCluster implements Cluster { public final static String NAME = "failback"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立一個FailbackClusterInvoker return new FailbackClusterInvoker<T>(directory); } }
private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); // 重試間隔 private static final long RETRY_FAILED_PERIOD = 5 * 1000; /** * 定時器 * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread} * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}. */ private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedInternalThreadFactory("failback-cluster-timer", true)); /** * 失敗集合 */ private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>(); /** * future */ private volatile ScheduledFuture<?> retryFuture;
@Override protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); // 選擇出invoker Invoker<T> invoker = select(loadbalance, invocation, invokers, null); // 調用 return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); // 若是失敗,則加入到失敗隊列,等待重試 addFailed(invocation, this); return new RpcResult(); // ignore } }
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null) { // 鎖住 synchronized (this) { if (retryFuture == null) { // 建立定時任務,每隔5秒執行一次 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // collect retry statistics try { // 對失敗的調用進行重試 retryFailed(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } // 添加 invocation 和 invoker 到 failed 中 failed.put(invocation, router); }
void retryFailed() { // 若是失敗隊列爲0,返回 if (failed.size() == 0) { return; } // 遍歷失敗隊列 for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>( failed).entrySet()) { // 得到會話域 Invocation invocation = entry.getKey(); // 得到invoker Invoker<?> invoker = entry.getValue(); try { // 從新調用 invoker.invoke(invocation); // 從失敗隊列中移除 failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } }
public class FailfastCluster implements Cluster { public final static String NAME = "failfast"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立FailfastClusterInvoker return new FailfastClusterInvoker<T>(directory); } }
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> { public FailfastClusterInvoker(Directory<T> directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); // 選擇一個invoker Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { // 調用 return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. // 拋出異常 throw (RpcException) e; } // 拋出異常 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } }
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立FailoverClusterInvoker return new FailoverClusterInvoker<T>(directory); } }
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 複製一個invoker集合 List<Invoker<T>> copyinvokers = invokers; // 檢測是否爲空 checkInvokers(copyinvokers, invocation); // 獲取重試次數 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. // 記錄最後一個異常 RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); // 循環調用,失敗重試 for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. // 在進行重試前從新列舉 Invoker,這樣作的好處是,若是某個服務掛了, // 經過調用 list 可獲得最新可用的 Invoker 列表 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again // 檢測copyinvokers 是否爲空 checkInvokers(copyinvokers, invocation); } // 經過負載均衡選擇invoker Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); // 添加到 invoker 到 invoked 列表中 invoked.add(invoker); // 設置 invoked 到 RPC 上下文中 RpcContext.getContext().setInvokers((List) invoked); try { // 調用目標 Invoker 的 invoke 方法 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 若重試失敗,則拋出異常 throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } }
public class FailsafeCluster implements Cluster { public final static String NAME = "failsafe"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立FailsafeClusterInvoker return new FailsafeClusterInvoker<T>(directory); } }
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory<T> directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); // 選擇一個invoker Invoker<T> invoker = select(loadbalance, invocation, invokers, null); // 調用 return invoker.invoke(invocation); } catch (Throwable e) { // 若是失敗打印異常,返回一個空結果 logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); // ignore } } }
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...