目標:介紹dubbo中集羣容錯的幾種模式,介紹dubbo-cluster下support包的源碼。
集羣容錯仍是很好理解的,就是當你調用失敗的時候所做出的措施。先來看看有哪些模式:java
圖有點小,見諒,不過能夠眯着眼睛看稍微能看出來一點,每個Cluster實現類都對應着一個invoker,由於這個模式啓用的時間點就是在調用的時候,而我在以前的文章裏面講過,invoker貫穿來整個服務的調用。不過這裏除了調用失敗的一些模式外,還有幾個特別的模式,他們應該說成是失敗的措施,而已調用的方式。git
該類實現了Invoker接口,是集羣Invoker的抽象類。github
private static final Logger logger = LoggerFactory .getLogger(AbstractClusterInvoker.class); /** * 目錄,包含多個invoker */ protected final Directory<T> directory; /** * 是否須要覈對可用 */ protected final boolean availablecheck; /** * 是否銷燬 */ private AtomicBoolean destroyed = new AtomicBoolean(false); /** * 粘滯鏈接的Invoker */ private volatile Invoker<T> stickyInvoker = null;
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { // 若是invokers爲空,則返回null if (invokers == null || invokers.isEmpty()) return null; // 得到方法名 String methodName = invocation == null ? "" : invocation.getMethodName(); // 是否啓動了粘滯鏈接 boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); { //ignore overloaded method // 若是上一次粘滯鏈接的調用不在可選的提供者列合內,則直接設置爲空 if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore concurrency problem // stickyInvoker不爲null,而且沒在已選列表中,返回上次的服務提供者stickyInvoker,但以前強制校驗可達性。 // 因爲stickyInvoker不能包含在selected列表中,經過代碼看,能夠得知forking和failover集羣策略,用不了sticky屬性 if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } // 利用負載均衡選一個提供者 Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); // 若是啓動粘滯鏈接,則記錄這一次的調用 if (sticky) { stickyInvoker = invoker; } return invoker; }
該方法實現了使用負載均衡策略選擇一個調用者。首先,使用loadbalance選擇一個調用者。若是此調用者位於先前選擇的列表中,或者若是此調用者不可用,則從新選擇,不然返回第一個選定的調用者。從新選擇,重選的驗證規則:選擇>可用。這條規則能夠保證所選的調用者最少有機會成爲以前選擇的列表中的一個,也是保證這個調用程序可用。緩存
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; // 若是隻有一個 ,就直接返回這個 if (invokers.size() == 1) return invokers.get(0); // 若是沒有指定用哪一個負載均衡策略,則默認用隨機負載均衡策略 if (loadbalance == null) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } // 調用負載均衡選擇 Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. // 若是選擇的提供者,已在selected中或者不可用則從新選擇 if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { // 從新選擇 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { //Check the index of current selected invoker, if it's not the last one, choose the one at index+1. // 若是從新選擇失敗,看下第一次選的位置,若是不是最後,選+1位置. int index = invokers.indexOf(invoker); try { //Avoid collision // 最後再避免選擇到同一個invoker invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; }
該方法是用負載均衡選擇一個invoker的主要邏輯。安全
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException { //Allocating one in advance, this list is certain to be used. //預先分配一個重選列表,這個列表是必定會用到的. List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size()); //First, try picking a invoker not in `selected`. //先從非select中選 //把不包含在selected中的提供者,放入重選列表reselectInvokers,讓負載均衡器選擇 if (availablecheck) { // invoker.isAvailable() should be checked for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } } // 在重選列表中用負載均衡器選擇 if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } else { // do not check invoker.isAvailable() // 不覈對服務是否能夠,把不包含在selected中的提供者,放入重選列表reselectInvokers,讓負載均衡器選擇 for (Invoker<T> invoker : invokers) { if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } // Just pick an available invoker using loadbalance policy { // 若是非selected的列表中沒有選擇到,則從selected中選擇 if (selected != null) { for (Invoker<T> invoker : selected) { if ((invoker.isAvailable()) // available first && !reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } return null; }
該方法是是從新選擇的邏輯實現。服務器
@Override public Result invoke(final Invocation invocation) throws RpcException { // 覈對是否已經銷燬 checkWhetherDestroyed(); LoadBalance loadbalance = null; // binding attachments into invocation. // 得到上下文的附加值 Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); // 把附加值放入到會話域中 if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 生成服務提供者集合 List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { // 得到負載均衡器 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
該方法是invoker接口必備的方法,調用鏈的邏輯,不過主要的邏輯在doInvoke方法中,該方法是該類的抽象方法,讓子類只關注doInvoke方法。app
protected List<Invoker<T>> list(Invocation invocation) throws RpcException { // 把會話域中的invoker加入集合 List<Invoker<T>> invokers = directory.list(invocation); return invokers; }
該方法是調用了directory的list方法,從會話域中得到全部的Invoker集合。關於directory我會在後續文章講解。負載均衡
public class AvailableCluster implements Cluster { public static final String NAME = "available"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立一個AbstractClusterInvoker return new AbstractClusterInvoker<T>(directory) { @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 遍歷全部的involer,只要有一個可用就直接調用。 for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); } }; } }
Available Cluster我在上面已經講過了,只要找到一個可用的,則直接調用。ide
public class BroadcastCluster implements Cluster { @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立一個BroadcastClusterInvoker return new BroadcastClusterInvoker<T>(directory); } }
關鍵實如今於BroadcastClusterInvoker。oop
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); // 把invokers放到上下文 RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; // 遍歷invokers,逐個調用,在循環調用結束後,只要任意一臺報錯就報錯 for (Invoker<T> invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { throw exception; } return result; } }
public class ForkingCluster implements Cluster { public final static String NAME = "forking"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立ForkingClusterInvoker return new ForkingClusterInvoker<T>(directory); } }
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> { /** * 線程池 * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread} * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}. */ private final ExecutorService executor = Executors.newCachedThreadPool( new NamedInternalThreadFactory("forking-cluster-timer", true)); public ForkingClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); final List<Invoker<T>> selected; // 獲取 forks 配置 final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); // 獲取超時配置 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 若是 forks 配置不合理,則直接將 invokers 賦值給 selected if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<Invoker<T>>(); // 循環選出 forks 個 Invoker,並添加到 selected 中 for (int i = 0; i < forks; i++) { // TODO. Add some comment here, refer chinese version for more details. // 選擇 Invoker Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) {//Avoid add the same invoker several times. // 加入到selected集合 selected.add(invoker); } } } // 放入上下文 RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); // 遍歷 selected 列表 for (final Invoker<T> invoker : selected) { // 爲每一個 Invoker 建立一個執行線程 executor.execute(new Runnable() { @Override public void run() { try { // 進行遠程調用 Result result = invoker.invoke(invocation); // 將結果存到阻塞隊列中 ref.offer(result); } catch (Throwable e) { // 僅在 value 大於等於 selected.size() 時,纔將異常對象 // 爲了防止異常現象覆蓋正常的結果 int value = count.incrementAndGet(); if (value >= selected.size()) { // 將異常對象存入到阻塞隊列中 ref.offer(e); } } } }); } try { // 從阻塞隊列中取出遠程調用結果 Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); // 若是是異常,則拋出 if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } } finally { // clear attachments which is binding to current thread. RpcContext.getContext().clearAttachments(); } } }
public class FailbackCluster implements Cluster { public final static String NAME = "failback"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立一個FailbackClusterInvoker return new FailbackClusterInvoker<T>(directory); } }
private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); // 重試間隔 private static final long RETRY_FAILED_PERIOD = 5 * 1000; /** * 定時器 * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread} * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}. */ private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedInternalThreadFactory("failback-cluster-timer", true)); /** * 失敗集合 */ private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>(); /** * future */ private volatile ScheduledFuture<?> retryFuture;
@Override protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); // 選擇出invoker Invoker<T> invoker = select(loadbalance, invocation, invokers, null); // 調用 return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); // 若是失敗,則加入到失敗隊列,等待重試 addFailed(invocation, this); return new RpcResult(); // ignore } }
該方法是選擇invoker調用的邏輯,在拋出異常的時候,作了失敗重試的機制,主要實如今addFailed。
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null) { // 鎖住 synchronized (this) { if (retryFuture == null) { // 建立定時任務,每隔5秒執行一次 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // collect retry statistics try { // 對失敗的調用進行重試 retryFailed(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } // 添加 invocation 和 invoker 到 failed 中 failed.put(invocation, router); }
該方法作的事建立了定時器,而後把失敗的調用放入到集合中。
void retryFailed() { // 若是失敗隊列爲0,返回 if (failed.size() == 0) { return; } // 遍歷失敗隊列 for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>( failed).entrySet()) { // 得到會話域 Invocation invocation = entry.getKey(); // 得到invoker Invoker<?> invoker = entry.getValue(); try { // 從新調用 invoker.invoke(invocation); // 從失敗隊列中移除 failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } }
這個方法是調用失敗的invoker從新調用的機制。
public class FailfastCluster implements Cluster { public final static String NAME = "failfast"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立FailfastClusterInvoker return new FailfastClusterInvoker<T>(directory); } }
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> { public FailfastClusterInvoker(Directory<T> directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); // 選擇一個invoker Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { // 調用 return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. // 拋出異常 throw (RpcException) e; } // 拋出異常 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } }
邏輯比較簡單,調用拋出異常就直接拋出。
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立FailoverClusterInvoker return new FailoverClusterInvoker<T>(directory); } }
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 複製一個invoker集合 List<Invoker<T>> copyinvokers = invokers; // 檢測是否爲空 checkInvokers(copyinvokers, invocation); // 獲取重試次數 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. // 記錄最後一個異常 RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); // 循環調用,失敗重試 for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. // 在進行重試前從新列舉 Invoker,這樣作的好處是,若是某個服務掛了, // 經過調用 list 可獲得最新可用的 Invoker 列表 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again // 檢測copyinvokers 是否爲空 checkInvokers(copyinvokers, invocation); } // 經過負載均衡選擇invoker Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); // 添加到 invoker 到 invoked 列表中 invoked.add(invoker); // 設置 invoked 到 RPC 上下文中 RpcContext.getContext().setInvokers((List) invoked); try { // 調用目標 Invoker 的 invoke 方法 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 若重試失敗,則拋出異常 throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } }
該類實現了失敗重試的容錯策略,當調用失敗的時候,記錄下異常,而後循環調用下一個選擇出來的invoker,直到重試次數用完,拋出最後一次的異常。
public class FailsafeCluster implements Cluster { public final static String NAME = "failsafe"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立FailsafeClusterInvoker return new FailsafeClusterInvoker<T>(directory); } }
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory<T> directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測invokers是否爲空 checkInvokers(invokers, invocation); // 選擇一個invoker Invoker<T> invoker = select(loadbalance, invocation, invokers, null); // 調用 return invoker.invoke(invocation); } catch (Throwable e) { // 若是失敗打印異常,返回一個空結果 logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); // ignore } } }
邏輯比較簡單,就是不拋出異常,只是打印異常。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了集羣中關於cluster實現的部分,講了幾種調用方式和容錯策略。接下來我將開始對集羣模塊關於配置規則部分進行講解。