Dubbo服務治理

準確來講這裏所說的集羣主要是指Dubbo對於集羣內容的支持,主要涉及:集羣容錯,負載均衡,路由邏輯三塊。下面對於這三塊的內容分別進行介紹。

集羣容錯

容錯主要是指服務出現了非業務異常以後採起的一些彌補措施,注意我這裏講的是非業務異常,由於業務異常出現的絕大多數狀況都是代碼異常,因此及時採起了重試等邏輯仍是會出現同樣的業務異常(代碼出問題了,鍋固然要本身背嘍)。
Dubbo中對於容錯的處理主要集中在Cluster中,Cluster包裝了底層調用的Invoker而且在Cluster本身本層作了一些出現異常以後的處理。
對於Dubbo的容錯主要是有兩層。第一層是mock,第二層是用戶配置的容錯策略。對於集羣容錯的包裝邏輯入口就在於RegistryProtocol的doRefer()方法最後的cluster.join(directory),該方法返回了集羣包裝事後的invoker,這裏的cluser其實就是MockClusterWrapper(至於爲何能肯定是MockClusterInvoker,就須要你們去理解一下Dubbo的SPI機制了),下面一塊兒來看一下MockClusterInvoke的具體內容:java

//真正起做用的是MockClusterInvoker
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        //由於cluster下面以後一層包裝,因此這裏的this.cluser就是默認的FialoverCluster
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
    //MockClusterInvoker.invoke()
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        //獲取URL中配置的mock參數
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
        if (value.length() == 0 || value.equalsIgnoreCase("false")){
            //若是沒有配置mock的話就直接進行調用
            result = this.invoker.invoke(invocation);
            //若是配置了強制的mock就直接調用mock,不走正常調用邏輯
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " +  directory.getUrl());
            }
            result = doMockInvoke(invocation, null);
        } else {
            //調用失敗以後再進行mock操做
            try {
                result = this.invoker.invoke(invocation);
            }catch (RpcException e) {
                //mock並不會處理義務異常
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " +  directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
    //主要是挑選出可用的MockInvoker類而後調用其invoke方法返回結果
    private Result doMockInvoke(Invocation invocation,RpcException e){
        Result result = null;
        Invoker<T> minvoker ;
        //選取可用的MockInvoker
        List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
        if (mockInvokers == null || mockInvokers.size() == 0){
            minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
        } else {
            minvoker = mockInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            //若是是業務異常就封裝結果(注意這裏和上面的區別),由於biz異常是用戶本身本身在mock信息中配置的異常,不是預想以外的異常
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
        }
        return result;
    }
    
    //核心最後仍是調用了MockInvoker的invoker方法
    public Result invoke(Invocation invocation) throws RpcException {
        String mock = getUrl().getParameter(invocation.getMethodName()+"."+Constants.MOCK_KEY);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(this);
        }
        if (StringUtils.isBlank(mock)){
            mock = getUrl().getParameter(Constants.MOCK_KEY);
        }
        
        if (StringUtils.isBlank(mock)){
            //這個錯誤比較常見,緣由就在於客戶端調用的時候返回的異常信息是非業務異常,可是客戶端又沒有配置mock信息,所以就會拋出這個異常
            throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
        }
        //解析出mock的類型,若是是mock=fail:AA,就返回AA,若是是mock=xx.Service就返回xx.Service,若是是mock=force:XX,就返回XX
        mock = normallizeMock(URL.decode(mock));
        //若是配置的是:mock=fail:return,就直接返回空結果
        if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())){
            RpcResult result = new RpcResult();
            result.setValue(null);
            return result;
            //若是配置的是mock=fail:return **,就解析**爲對應的可返回內容而後返回
        } else if (mock.startsWith(Constants.RETURN_PREFIX)) {
            mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
            mock = mock.replace('`', '"');
            try {
                Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
                Object value = parseMockValue(mock, returnTypes);
                return new RpcResult(value);
            } catch (Exception ew) {
                throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: "+ url , ew);
            }
            //若是配置的是mock=fail:throw **(用戶自定義的異常信息),就解析**爲對應的可返回內容而後返回
        } else if (mock.startsWith(Constants.THROW_PREFIX)) {
            mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
            mock = mock.replace('`', '"');
            if (StringUtils.isBlank(mock)){
                throw new RpcException(" mocked exception for Service degradation. ");
            } else { //用戶自定義類
                Throwable t = getThrowable(mock);
                throw new RpcException(RpcException.BIZ_EXCEPTION, t);
            }
        } else { //若是mock信息爲ServiceMock的話就直接找到對應的Mock類進行mock調用,而後返回結果
             try {
                 Invoker<T> invoker = getInvoker(mock);
                 return invoker.invoke(invocation);
             } catch (Throwable t) {
                 throw new RpcException("Failed to create mock implemention class " + mock , t);
             }
        }
    }

從上面的邏輯上來看mock主要是根據用戶的一些配置,作一些很是具體的容錯邏輯,精確到方法界別的,因此算是容錯的最小粒度了。咱們常常在使用中對一些不可靠服務進行mock處理,防止在其出現異常時候影響咱們的核心調用流程。
方法級別的容錯整體來講是針對業務異常的一種容錯,而針對非業務異常的容錯邏輯就是另一個概念了,好比說因爲網絡抖動致使某次調用沒有成功,針對相似的異常Dubbo也有本身的容錯措施,具體以下面幾種:web

  • Failover Cluster 失敗自動切換,當出現失敗,重試其它服務器。(缺省) 一般用於讀操做,但重試會帶來更長延遲。
  • Failfast Cluster 快速失敗,只發起一次調用,失敗當即報錯。一般用於非冪等性的寫操做,好比新增記錄。
  • Failsafe Cluster 失敗安全,出現異常時,直接忽略。一般用於寫入審計日誌等操做。
  • Failback Cluster 失敗自動恢復,後臺記錄失敗請求,定時重發。 一般用於消息通知操做。
  • Forking Cluster 並行調用多個服務器,只要一個成功即返回。一般用於實時性要求較高的讀操做,但須要浪費更多服務資源。可經過forks="2"來設置最大並行數。
  • Broadcast Cluster 廣播調用全部提供者,逐個調用,任意一臺報錯則報錯。(2.1.0開始支持) 一般用於通知全部提供者更新緩存或日誌等本地資源信息。
    由於種類比較多,所有講一遍太費時間了,因此選擇比較經常使用的幾個進行介紹。

Failover

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        //檢查copyinvokers是否爲null
        checkInvokers(copyinvokers, invocation);
        //重試次數,默認爲3次,不包含第一次調用
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // last exception.
        RpcException le = null;
        //已經調用的Invoker列表
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //重試時,進行從新選擇,避免重試時invoker列表已發生變化.
            //注意:若是列表發生了變化,那麼invoked判斷會失效,由於invoker示例已經改變
            if (i > 0) {
                checkWheatherDestoried();
                copyinvokers = list(invocation);
                //從新檢查一下有沒有對應的提供者
                checkInvokers(copyinvokers, invocation);
            }
            //經過loadbalance去選出目標Invoker
            //這裏默認的LoadBalance是RandomLoadBalance,選擇時候是根據權重來選擇目標的Invoker,固然也能夠配置其餘的LoadBalance
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            //添加到上下文環境中去,可是這裏爲何會把失敗的invoker也加進來,感受失敗的Invoker信息並無什麼意義
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                //這裏纔是最後的調用,使用通過loadbalance選出的invoker去調用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers 
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                //業務異常不會重試,直接拋出
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

Failfast

//快速失敗的邏輯最簡單了,就是什麼都不作,有調用異常的話就直接往上拋出
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            return invoker.invoke(invocation);
            //簡單區分異常類型
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
    }

failover與failfast的代碼基本同樣,主要區別就是出現異常以後直接忽略,而後返回空的RpcResult。其他的幾種集羣策略在平時用的比較少,我就很少介紹了,其實現也都比較簡單。
Dubbo的容錯不只體如今provider的cluster,對於註冊中心也有提供cluster內容(AvailableCluster),只不過該內容比較簡單,只是隨機選取了一個可用的註冊中心。算法

負載均衡

負載均衡的概念:從多個目標服務器中選擇出其中一個服務器供客戶端調用,這個選擇的具體過程就叫作負載均衡(純粹是本身給小白用戶的解釋)。
通常的服務性框架都會有負載均衡的內容,Dubbo也是基於本身的URL機制作了一層負載均衡,咱們看到上面的集羣內容時候就看到集羣內部其實就依賴了負載均衡策略來從多個Invoker中選取其中的一個,只不過一次負載所選擇到的Invoker並不必定能知足條件,好比在Failover策略下,失敗以後Loadbalance從新選擇的Invoker仍是失敗過的那個,那就要從新計算了。
Dubbo的負載均衡策略主要有如下幾種:緩存

  • Random LoadBalance 隨機選取服務提供者 最簡單的無狀態的負載均衡算法
  • RoundRobin LoadBalance 以輪訓的方式調用服務提供者 缺點是有狀態,必須在併發之下記住上一次到誰了
  • LeastActive LoadBalance 調用最少活躍調調用的服務提供者,這裏有一點須要注意,這裏的服務調用統計維度是在方法級別的,也就是說是方法級別的LoadBalance。
  • ConsistentHash LoadBalance 一致性Hash(用的很少,很少講解)

首先看下LoadBalance的接口就直到它是作什麼的:安全

//從invokers列表中根據url和invocation信息選出一個合適的Invoker供consumer端調用
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

在全部的LoadBalance中都提到了一個概念:weight。正常狀況下咱們不管配置在provider仍是service中,對應的全部服務端的providerUrl的weight都是同樣的,這種狀況其實weight配置不配置意義不大。可是一旦動態針對某個服務調整過weight值,這個影響就出來了。例如:override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&weight=200 配置以後,就將10.20.153.10服務器上的com.foo.BarServic的權值改成了200,那麼它在以後的LoadBalance中被選取的可能性就更大了(默認的權值爲100)。服務器

Random LoadBalance

private final Random random = new Random();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 總個數
        int totalWeight = 0; // 總權重
        boolean sameWeight = true; // 權重是否都同樣
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累計總權重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 計算全部權重是否同樣
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 若是權重不相同且權重大於0則按總權重數隨機
            int offset = random.nextInt(totalWeight);
            // 隨機數落到哪一個片斷上,就取哪一個片斷對應的Invoker對象(擊鼓傳花式日後切割)
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 若是權重相同或權重爲0則均等隨機
        return invokers.get(random.nextInt(length));
    }

RoundRobin LoadBalance

//輪訓是針對方法級別的,並非全部服務調用
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // 總個數
        int maxWeight = 0; // 最大權重
        int minWeight = Integer.MAX_VALUE; // 最小權重
        // invoker->weight,IntegerWrapper就是一個簡單的Integer包裝類
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        int weightSum = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // 累計最大權重
            minWeight = Math.min(minWeight, weight); // 累計最小權重
            if (weight > 0) {
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        //currentSequence表明某個方法是第多少次被調用的,例如第1W次
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) { // 權重不同
            // 能夠把weightSum理解成一個權重範圍內的總集,mod就帶表在這個總集中具體執行到的位置
            int mod = currentSequence % weightSum;
            //weightSum < maxWeight*length
            for (int i = 0; i < maxWeight; i++) {
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Invoker<T> k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    //這裏的邏輯比較抽象,本質上就是誰的權重越大,輪詢到誰的次數就越多
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // 取模輪循
        return invokers.get(currentSequence % length);
    }

簡單題下RoundRobin的弊端:若是某個服務有3臺服務器,權重依次是10,1000,100,可能配置的人的本意是在輪訓的時候走到三臺機器的比例是:110,可是實際上確實是1000個請求壓倒了第二臺機器上。。。網絡

LeastActive LoadBalance

private final Random random = new Random();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 總個數
        int leastActive = -1; // 最小的活躍數
        int leastCount = 0; // 相同最小活躍數的個數
        int[] leastIndexs = new int[length]; // 相同最小活躍數的下標
        int totalWeight = 0; // 總權重
        int firstWeight = 0; // 第一個權重,用於於計算是否相同
        boolean sameWeight = true; // 是否全部權重相同
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活躍數
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權重
            if (leastActive == -1 || active < leastActive) { // 若是是初始狀況下或者某臺機器的active數量小於如今保存的leastActive數量,就會從新開始
                leastActive = active; // 記錄最小活躍數
                leastCount = 1; // 從新統計相同最小活躍數的個數
                leastIndexs[0] = i; // 從新記錄最小活躍數下標
                totalWeight = weight; // 從新累計總權重
                firstWeight = weight; // 記錄第一個權重
                sameWeight = true; // 還原權重相同標識
            } else if (active == leastActive) { // 累計相同最小的活躍數
                leastIndexs[leastCount ++] = i; // 累計相同最小活躍數下標
                totalWeight += weight; // 累計總權重
                // 判斷全部權重是否同樣
                if (sameWeight && i > 0 
                        && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // 若是隻有一個最小值的話就直接調用
        if (leastCount == 1) {
            // 若是隻有一個最小則直接返回
            return invokers.get(leastIndexs[0]);
        }
        if (! sameWeight && totalWeight > 0) {
            // 若是權重不相同且權重大於0則按總權重數隨機
            int offsetWeight = random.nextInt(totalWeight);
            // 並肯定隨機值落在哪一個片段上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 若是權重相同或權重爲0則均等隨機
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

一致性哈希對應的LoadBalance本次不講解。
我我的對於Dubbo 提供的LoadBalance的見解是:基本上知足平常使用,可是應該更加豐富。由於咱們平常使用的是leastactive,可是由於該負載均衡策略是基於方法級別的,因此沒法控制其餘的方法對於應用的影響,這裏若是將統計的維度上升到機器緯度,其實能夠作到相似於整個集羣的leastactive,這樣的話就不容易出現部分幾臺機器負載特別高,而其他的大部分機器都有不少資源結餘。固然也能夠提供接口緯度的負載均衡,這個徹底能夠根據具體的業務實現定值,由於SPI機制的緣故,自定義負載均衡策略實現起來仍是比較方便的。併發

路由

說到路由仍是貼一張官方的圖會比較好理解一些 app

服務治理細節負載均衡

從上圖中能看出來router的做用實際上是在LB以前的,也就是說LB的入參其實就是router的結果。
由於router可能理解起來並不直觀,所以仍是大體介紹一下router的含義。
在平常的服務這裏過程當中,好比我想給某個接口開個特權,專門設置一些提供者只供其調用;讀寫分離,讀操做配置專門的機器,寫操做配置專門的機器;某個消費者有問題,想及時下掉等等。都是能夠經過router來實現的,明白了router的具體含義以後咱們來一塊兒看一下router的實現:
首先爲了明確路由的基本規則,把官方的案例拿過來一塊兒看一下:

服務調用信息,如:method, argument 等 (暫不支持參數路由) URL自己的字段,如:protocol, host, port 等 以及URL上的全部參數,如:application, organization 等
條件支持:
等號"="表示"匹配",如:host = 10.20.153.10 不等號"!="表示"不匹配",如:host != 10.20.153.10
值支持:
以逗號","分隔多個值,如:host != 10.20.153.10,10.20.153.11 以星號"_"結尾,表示通配,如:host != 10.20._ 以美圓符"$"開頭,表示引用消費者參數,如:host = $host
示例:

  1. 排除預發佈機:

RegistryFactoryregistryFactory=ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registryregistry=registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181")); registry.register(URL.valueOf("condition://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule="+ URL.encode("host=10.20.153.10=>host=10.20.153.11")+"));

host=10.20.153.10=>host=10.20.153.11
"=>"以前的爲消費者匹配條件,全部參數和消費者的URL進行對比,當消費者知足匹配條件時,對該消費者執行後面的過濾規則。 "=>"以後爲提供者地址列表的過濾條件,全部參數和提供者的URL進行對比,消費者最終只拿到過濾後的地址列表。 若是匹配條件爲空,表示對全部消費方應用,如:=> host != 10.20.153.11
若是過濾條件爲空,表示禁止訪問,如:host = 10.20.153.10 =>host!=172.22.3.91

  1. 白名單:(注意:一個服務只能有一條白名單規則,不然兩條規則交叉,就都被篩選掉了)
    host!=10.20.153.10,10.20.153.11=>
  2. 黑名單:
    host=10.20.153.10,10.20.153.11=>
  3. 服務寄宿在應用上,只暴露一部分的機器,防止整個集羣掛掉:
    =>host=172.22.3.1_,172.22.3.2_
  4. 爲重要應用提供額外的機器:
    application!=kylin=>host!=172.22.3.95,172.22.3.96
  5. 讀寫分離:
    method=find_,list_,get_,is_=>host=172.22.3.94,172.22.3.95,172.22.3.96
    method!=find_,list_,get_,is_=>host=172.22.3.97,172.22.3.98
  6. 先後臺分離:
    application=bops=>host=172.22.3.91,172.22.3.92,172.22.3.93
    application!=bops=>host=172.22.3.94,172.22.3.95,172.22.3.96
  7. 隔離不一樣機房網段:
    host!=172.22.3._=>host!=172.22.3._
  8. 提供者與消費者部署在同集羣內,本機只訪問本機的服務:
    =>host=$host

下面以ConditionRouter爲例來看一下路由規則的具體實現

//構造函數初始化的時候難點在於when和then的初始化:
    public ConditionRouter(URL url) {
        this.url = url;
        //路由規則的優先級,用於排序,優先級越大越靠前執行,可不填,缺省爲0。
        this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
        //當路由結果爲空時,是否強制執行,若是不強制執行,路由結果爲空的路由規則將自動失效,可不填,缺省爲flase
        this.force = url.getParameter(Constants.FORCE_KEY, false);
        try {
            String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
            if (rule == null || rule.trim().length() == 0) {
                throw new IllegalArgumentException("Illegal route rule!");
            }
            rule = rule.replace("consumer.", "").replace("provider.", "");
            int i = rule.indexOf("=>");
            // =>前的部分
            String whenRule = i < 0 ? null : rule.substring(0, i).trim();
            // =>後的部分
            String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
            Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
            Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
            // NOTE: When條件是容許爲空的,外部業務來保證相似的約束條件
            this.whenCondition = when;
            this.thenCondition = then;
        } catch (ParseException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
    
   
    /**
     * 將對應的rule信息解析爲對應的MatchPair
     * host=10.20.153.10解析出來就是一個host:MatchPair,Matcher的matches內容爲10.20.153.10
     * host!=10.20.153.10解析出來就是一個host:MatchPair,Matcher的mismatches內容爲10.20.153.10
     * 能夠理解爲MatcherPair就是區分matches和mismatches的具體聚合類,拿到這個Matcher就拿到表達式初步解析後的數據
     * @param rule
     * @return
     * @throws ParseException
     */
    private static Map<String, MatchPair> parseRule(String rule)
            throws ParseException {
        Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
        if(StringUtils.isBlank(rule)) {
            return condition;
        }        
        // 匹配或不匹配Key-Value對
        MatchPair pair = null;
        // 多個Value值
        Set<String> values = null;
        final Matcher matcher = ROUTE_PATTERN.matcher(rule);
        //例如:host=10.20.153.10 第一次匹配的group1='',group2='host',第二次匹配的group1='=',group2='10.20.153.10'
        while (matcher.find()) { // 逐個匹配
            String separator = matcher.group(1);
            String content = matcher.group(2);
            // 表達式開始
            if (separator == null || separator.length() == 0) {
                pair = new MatchPair();
                //'host':new MatchPair()
                condition.put(content, pair);
            }
            // &符號尚未遇到過
            else if ("&".equals(separator)) {
                if (condition.get(content) == null) {
                    pair = new MatchPair();
                    condition.put(content, pair);
                } else {
                    condition.put(content, pair);
                }
            }
            // 匹配=號部分
            else if ("=".equals(separator)) {
                if (pair == null)
                    throw new RuntimeException();
                values = pair.matches;
                values.add(content);
            }
            // 匹配!=號部分
            else if ("!=".equals(separator)) {
                if (pair == null)
                    throw new RuntimeException();
                values = pair.mismatches;
                values.add(content);
            }
            // ,號直接跟着前面的=或者!=走
            else if (",".equals(separator)) {
                if (values == null || values.size() == 0)
                    throw new RuntimeException();
                values.add(content);
            } else {
                throw new RuntimeException();
            }
        }
        return condition;
    }
    
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
            throws RpcException {
        if (invokers == null || invokers.size() == 0) {
            return invokers;
        }
        try {
            //若是沒有匹配到的前置條件後直接返回,意思就是當前做用的consumer信息不須要通過路由操做
            // 若是路由的配置值有$開頭的話就將其替換爲URL中對應的key的value
            if (! matchWhen(url)) {
                return invokers;
            }
            List<Invoker<T>> result = new ArrayList<Invoker<T>>();
            //黑名單
            if (thenCondition == null) {
                logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
                return result;
            }
            for (Invoker<T> invoker : invokers) {
                //逐個匹配後置條件
                if (matchThen(invoker.getUrl(), url)) {
                    result.add(invoker);
                }
            }
            if (result.size() > 0) {
                return result;
                //感受強制執行的話返回一個空的List並無卵用呀
            } else if (force) {
                logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
                return result;
            }
        } catch (Throwable t) {
            logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
        }
        return invokers;
    }

經過對於上面三方面的介紹咱們已經基本瞭解Dubbo對於服務治理的支持點,這三個內容是做爲一個完善的RPC框架的強心劑。讓咱們乾了這碗雞湯~~~

相關文章
相關標籤/搜索