Dubbo學習源碼總結系列四--集羣容錯機制

 

        Dubbo提供了哪些集羣容錯機制?如何實現的?緩存

        提供了六種集羣容錯機制,包括Failover(失敗自動切換,嘗試其餘服務器)、Failfast(失敗當即拋出異常)、Failsafe(失敗忽略異常)、Failback(失敗自動恢復,記錄日誌並定時重試)、Forking(並行調用多個服務,一個成功當即返回)、Broadcast(廣播調用全部提供者,任意一個報錯則報錯);安全

        下面咱們看看基礎類的實現:服務器

        一、ClusterInvoker默認實現類:AbstractClusterInvoker,給ClusterInvoker子類提供了實現框架。
負載均衡

        

        主要方法有如下幾個:框架

        invoke()方法,給子類的doInvoke()方法準備輸入參數invoker列表和loadbalance實例,實現邏輯爲:ide

        (1)首先判斷當前invoker是否被銷燬(銷燬則直接拋出異常);oop

        (2)從Directory目錄(一般是註冊中心返回的服務列表)中獲得invoker列表,調用directory.list()方法;this

        (3)根據invoker列表中的第一個invoker的Url負載均衡配置信息,動態加載並實例化一個loadbalance子類(若是沒有配置則使用缺省loadbalance);url

        (4)設置invokerId;spa

        (5)調用子類的doInvoke()方法執行調用過程。

 1     public Result invoke(final Invocation invocation) throws RpcException {
 2 
 3  checkWhetherDestroyed(); 4 5  LoadBalance loadbalance; 6 7 List<Invoker<T>> invokers = list(invocation); 8 if (invokers != null && invokers.size() > 0) { 9 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() 10  .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); 11 } else { 12 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); 13  } 14  RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); 15 return doInvoke(invocation, invokers, loadbalance); 16 }

 

        select()方法在子類的doInvoke()方法中調用,經過負載均衡策略選擇一個invoker,實現邏輯爲:

        a)首先經過負載均衡策略選擇一個invoker,若是這個invoker在以前維護的selected列表中(將使用過的invoker記錄在selected本地緩存中),或者這個invoker不可用,則跳轉到步驟b從新選擇;不然直接返回這個invoker;

        b)從新選擇reselect()方法,先從給定的invokers列表中選出不在selected(使用過)列表中的invoker列表reselectInvokers,在reselectInvokers中經過負載均衡策略選出一個invoker返回;若是以前給定的invokes列表都在selected中(都使用過),則篩選出selected列表中全部可用的invoker,再使用負載均衡策略選擇出一個invoker返回;這種策略將重複選擇的概率降到最低。

 1     /**
 2      * Select a invoker using loadbalance policy.</br>
 3      * a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or, if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
 4      * b)Reslection, the validation rule for reselection: selected > available. This rule guarantees that the selected invoker has the minimum chance to be one in the previously selected list, and also guarantees this invoker is available.
 5      *
 6      * @param loadbalance load balance policy
 7      * @param invocation
 8      * @param invokers invoker candidates
 9      * @param selected  exclude selected invokers or not
10      * @return
11      * @throws RpcExceptione
12      */
13     protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
14         if (invokers == null || invokers.size() == 0) 15 return null; 16 String methodName = invocation == null ? "" : invocation.getMethodName(); 17 18 boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); 19  { 20 //ignore overloaded method 21 if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { 22 stickyInvoker = null; 23  } 24 //ignore cucurrent problem 25 if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { 26 if (availablecheck && stickyInvoker.isAvailable()) { 27 return stickyInvoker; 28  } 29  } 30  } 31 Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected); 32 33 if (sticky) { 34 stickyInvoker = invoker; 35  } 36 return invoker; 37 }

         

 1     private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
 2         if (invokers == null || invokers.size() == 0) 3 return null; 4 if (invokers.size() == 1) 5 return invokers.get(0); 6 // If we only have two invokers, use round-robin instead. 7 if (invokers.size() == 2 && selected != null && selected.size() > 0) { 8 return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); 9  } 10 Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); 11 12 //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. 13 if ((selected != null && selected.contains(invoker)) 14 || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { 15 try { 16 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); 17 if (rinvoker != null) { 18 invoker = rinvoker; 19 } else { 20 //Check the index of current selected invoker, if it's not the last one, choose the one at index+1. 21 int index = invokers.indexOf(invoker); 22 try { 23 //Avoid collision 24 invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker; 25 } catch (Exception e) { 26 logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); 27  } 28  } 29 } catch (Throwable t) { 30 logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t); 31  } 32  } 33 return invoker; 34 }

        

 1     /**
 2      * Reselect, use invokers not in `selected` first, if all invokers are in `selected`, just pick an available one using loadbalance policy.
 3      *
 4      * @param loadbalance
 5      * @param invocation
 6      * @param invokers
 7      * @param selected
 8      * @return
 9      * @throws RpcException
10      */
11     private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
12                                 List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) 13 throws RpcException { 14 15 //Allocating one in advance, this list is certain to be used. 16 List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size()); 17 18 //First, try picking a invoker not in `selected`. 19 if (availablecheck) { // invoker.isAvailable() should be checked 20 for (Invoker<T> invoker : invokers) { 21 if (invoker.isAvailable()) { 22 if (selected == null || !selected.contains(invoker)) { 23  reselectInvokers.add(invoker); 24  } 25  } 26  } 27 if (reselectInvokers.size() > 0) { 28 return loadbalance.select(reselectInvokers, getUrl(), invocation); 29  } 30 } else { // do not check invoker.isAvailable() 31 for (Invoker<T> invoker : invokers) { 32 if (selected == null || !selected.contains(invoker)) { 33  reselectInvokers.add(invoker); 34  } 35  } 36 if (reselectInvokers.size() > 0) { 37 return loadbalance.select(reselectInvokers, getUrl(), invocation); 38  } 39  } 40 // Just pick an available invoker using loadbalance policy 41  { 42 if (selected != null) { 43 for (Invoker<T> invoker : selected) { 44 if ((invoker.isAvailable()) // available first 45 && !reselectInvokers.contains(invoker)) { 46  reselectInvokers.add(invoker); 47  } 48  } 49  } 50 if (reselectInvokers.size() > 0) { 51 return loadbalance.select(reselectInvokers, getUrl(), invocation); 52  } 53  } 54 return null; 55 }

 

        二、Failover Cluster:失敗自動切換,當出現失敗,重試其它服務器 。

        一般用於讀操做,但重試會帶來更長延遲。可經過 retries="2" 來設置重試次數(不含第一次)。

        集羣配置以下(該配置爲缺省配置):

        <dubbo:service cluster="failover" />或<dubbo:reference cluster="failover" />

        重試次數配置有三種方式:

        <dubbo:service retries="2" />

        <dubbo:reference retries="2" />   

        <dubbo:reference>

                <dubbo:method name="findFoo" retries="2" />

        </dubbo:reference>

        源碼詳解以下:

        FailoverCluster類,實際使用了FailoverClusterInvoker類;

 1 /**
 2  * {@link FailoverClusterInvoker}
 3  *
 4  */
 5 public class FailoverCluster implements Cluster {
 6 
 7     public final static String NAME = "failover"; 8 9 public <T> Invoker<T> join(Directory<T> directory) throws RpcException { 10 return new FailoverClusterInvoker<T>(directory); 11  } 12 13 }

 

        FailoverClusterInvoker類,繼承了AbstractClusterInvoker類,經過doInvoke()方法實現了Failover機制:失敗自動切換,當出現失敗,重試其它服務器。

        doInvoke()被父類AbstractClusterInvoker的invoke()方法調用,實現邏輯爲:

        (1)根據參數retries獲得重試次數(默認0),設置循環最大次數len=retries+1;

        (2)以最大次數len開始循環:

        (3)首次循環,執行父類的select()方法,使用負載均衡機制選出一個invoker,執行invoker.invoke(invocation)返回結果;若是執行成功則返回退出;若是拋出異常,記錄異常,繼續循環;

        (4)檢查invokers是否有效,從新執行步驟3,直到成功返回,或者循環到達最大次數len;

 1     public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
 2         List<Invoker<T>> copyinvokers = invokers; 3  checkInvokers(copyinvokers, invocation);
//根據參數retries獲得重試次數(默認0),設置循環最大次數len=retries+1 4 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; 5 if (len <= 0) { 6 len = 1; 7 } 8 // retry loop. 9 RpcException le = null; // last exception. 10 List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. 11 Set<String> providers = new HashSet<String>(len); 12 for (int i = 0; i < len; i++) { 13 //Reselect before retry to avoid a change of candidate `invokers`. 14 //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
//檢查invokers是否有效 15 if (i > 0) { 16 checkWhetherDestroyed(); 17 copyinvokers = list(invocation); 18 // check again 19 checkInvokers(copyinvokers, invocation); 20 }
//執行父類的select()方法,使用負載均衡機制選出一個invoker,執行invoker.invoke(invocation)返回結果;若是執行成功則返回退出;若是拋出異常,記錄異常,繼續循環 21 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); 22 invoked.add(invoker); 23 RpcContext.getContext().setInvokers((List) invoked); 24 try { 25 Result result = invoker.invoke(invocation); 26 if (le != null && logger.isWarnEnabled()) { 27 logger.warn("Although retry the method " + invocation.getMethodName() 28 + " in the service " + getInterface().getName() 29 + " was successful by the provider " + invoker.getUrl().getAddress() 30 + ", but there have been failed providers " + providers 31 + " (" + providers.size() + "/" + copyinvokers.size() 32 + ") from the registry " + directory.getUrl().getAddress() 33 + " on the consumer " + NetUtils.getLocalHost() 34 + " using the dubbo version " + Version.getVersion() + ". Last error is: " 35 + le.getMessage(), le); 36 } 37 return result; 38 } catch (RpcException e) { 39 if (e.isBiz()) { // biz exception. 40 throw e; 41 } 42 le = e; 43 } catch (Throwable e) { 44 le = new RpcException(e.getMessage(), e); 45 } finally { 46 providers.add(invoker.getUrl().getAddress()); 47 } 48 } 49 throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " 50 + invocation.getMethodName() + " in the service " + getInterface().getName() 51 + ". Tried " + len + " times of the providers " + providers 52 + " (" + providers.size() + "/" + copyinvokers.size() 53 + ") from the registry " + directory.getUrl().getAddress() 54 + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " 55 + Version.getVersion() + ". Last error is: " 56 + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); 57 }

        三、Failfast Cluster,快速失敗,只發起一次調用,失敗當即報錯。一般用於非冪等性的寫操做,好比新增記錄。

        FailfastClusterInvoker類,繼承了AbstractClusterInvoker類,經過doInvoke()方法實現了Failfast機制:失敗當即拋出異常。

        doInvoke()被父類AbstractClusterInvoker的invoke()方法調用:

 1     public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
 2  checkInvokers(invokers, invocation); 3 Invoker<T> invoker = select(loadbalance, invocation, invokers, null); 4 try { 5 return invoker.invoke(invocation); 6 } catch (Throwable e) { 7 if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. 8 throw (RpcException) e; 9  } 10 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); 11  } 12 }

 

 

        四、Failsafe Cluster,失敗安全,出現異常時,直接忽略。一般用於寫入審計日誌等操做。

        FailfastClusterInvoker類,繼承了AbstractClusterInvoker類,經過doInvoke()方法實現了Failsafe機制:失敗出現異常直接忽略。

        doInvoke()被父類AbstractClusterInvoker的invoke()方法調用:

 1     public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
 2         try { 3  checkInvokers(invokers, invocation); 4 Invoker<T> invoker = select(loadbalance, invocation, invokers, null); 5 return invoker.invoke(invocation); 6 } catch (Throwable e) { 7 logger.error("Failsafe ignore exception: " + e.getMessage(), e); 8 return new RpcResult(); // ignore 9  } 10 }

 

        五、Failback Cluster,失敗自動恢復,後臺記錄失敗請求,定時重發。一般用於消息通知操做。

        FailbackClusterInvoker類,繼承了AbstractClusterInvoker類,經過doInvoke()方法實現了Failback機制。

        doInvoke()被父類AbstractClusterInvoker的invoke()方法調用,記錄每次失敗的調用到failed這個map裏,用一個定時任務執行器重試失敗的任務;

 1     private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
 2         if (retryFuture == null) { 3 synchronized (this) { 4 if (retryFuture == null) { 5 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { 6 7 public void run() { 8 // collect retry statistics 9 try { 10  retryFailed(); 11 } catch (Throwable t) { // Defensive fault tolerance 12 logger.error("Unexpected error occur at collect statistic", t); 13  } 14  } 15  }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); 16  } 17  } 18  } 19  failed.put(invocation, router); 20  } 21 22 void retryFailed() { 23 if (failed.size() == 0) { 24 return; 25  } 26 for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>( 27  failed).entrySet()) { 28 Invocation invocation = entry.getKey(); 29 Invoker<?> invoker = entry.getValue(); 30 try { 31  invoker.invoke(invocation); 32  failed.remove(invocation); 33 } catch (Throwable e) { 34 logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); 35  } 36  } 37  } 38 39 protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { 40 try { 41  checkInvokers(invokers, invocation); 42 Invoker<T> invoker = select(loadbalance, invocation, invokers, null); 43 return invoker.invoke(invocation); 44 } catch (Throwable e) { 45 logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " 46 + e.getMessage() + ", ", e); 47 addFailed(invocation, this); 48 return new RpcResult(); // ignore 49  } 50 }

 

        六、Forking Cluster,並行調用多個服務器,只要一個成功即返回。一般用於實時性要求較高的讀操做,但須要浪費更多服務資源。可經過 forks="2" 來設置最大並行數。

        ForkingClusterInvoker類,繼承了AbstractClusterInvoker類,經過doInvoke()方法實現了Forking機制。

        doInvoke()被父類AbstractClusterInvoker的invoke()方法調用,調用邏輯爲:

        (1)經過select()得到足夠多的invoker加入selected;

        (2)循環selected,每次從線程池得到一個線程對一個invoker發起調用請求,每次將返回的結果放入一個阻塞隊列中,若是有異常,將最後一個invoker發生的異常放入阻塞隊列;

        (3)從阻塞隊列彈出第一個結果,若是不是異常,就直接返回,不然拋出異常;

 1     public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
 2  checkInvokers(invokers, invocation); 3 final List<Invoker<T>> selected; 4 final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); 5 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 6 if (forks <= 0 || forks >= invokers.size()) { 7 selected = invokers; 8 } else { 9 selected = new ArrayList<Invoker<T>>(); 10 for (int i = 0; i < forks; i++) { 11 // TODO. Add some comment here, refer chinese version for more details. 12 Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); 13 if (!selected.contains(invoker)) {//Avoid add the same invoker several times. 14  selected.add(invoker); 15  } 16  } 17  } 18  RpcContext.getContext().setInvokers((List) selected); 19 final AtomicInteger count = new AtomicInteger(); 20 final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); 21 for (final Invoker<T> invoker : selected) { 22 executor.execute(new Runnable() { 23 public void run() { 24 try { 25 Result result = invoker.invoke(invocation); 26  ref.offer(result); 27 } catch (Throwable e) { 28 int value = count.incrementAndGet(); 29 if (value >= selected.size()) { 30  ref.offer(e); 31  } 32  } 33  } 34  }); 35  } 36 try { 37 Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); 38 if (ret instanceof Throwable) { 39 Throwable e = (Throwable) ret; 40 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); 41  } 42 return (Result) ret; 43 } catch (InterruptedException e) { 44 throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); 45  } 46 }
相關文章
相關標籤/搜索