Dubbo 源碼分析 - 集羣容錯之 LoadBalance

1.簡介

LoadBalance 中文意思爲負載均衡,它的職責是將網絡請求,或者其餘形式的負載「均攤」到不一樣的機器上。避免集羣中部分服務器壓力過大,而另外一些服務器比較空閒的狀況。經過負載均衡,可讓每臺服務器獲取到適合本身處理能力的負載。在爲高負載的服務器分流的同時,還能夠避免資源浪費,一箭雙鵰。負載均衡可分爲軟件負載均衡和硬件負載均衡。在咱們平常開發中,通常很難接觸到硬件負載均衡。但軟件負載均衡仍是可以接觸到一些的,好比 Nginx。在 Dubbo 中,也有負載均衡的概念和相應的實現。Dubbo 須要對服務消費者的調用請求進行分配,避免少數服務提供者負載過大。服務提供者負載過大,會致使部分服務調用超時。所以將負載均衡到每一個服務提供者上,是很是必要的。Dubbo 提供了4種負載均衡實現,分別是基於權重隨機算法的 RandomLoadBalance、基於最少活躍調用數算法的 LeastActiveLoadBalance、基於 hash 一致性的 ConsistentHashLoadBalance,以及基於加權輪詢算法的 RoundRobinLoadBalance。這幾個負載均衡算法代碼不是很長,可是想看懂也不是很容易,須要你們對這幾個算法的原理有必定了解才行。若是不是很瞭解,也沒不用太擔憂。我會在分析每一個算法的源碼以前,對算法原理進行簡單的講解,幫助你們創建初步的印象。html

我在寫 Dubbo 源碼分析系列文章之初,當時 Dubbo 最新的版本爲 2.6.4。近期,Dubbo 2.6.5 發佈了,其中就有對負載均衡部分代碼修改。所以我在分析完 2.6.4 版本後的源碼後,會另外分析 2.6.5 更新的部分。本篇文章內容很是之豐富,須要你們耐心閱讀。好了,其餘的就很少說了,進入正題吧。java

2.源碼分析

在 Dubbo 中,全部負載均衡實現類均繼承自 AbstractLoadBalance,該類實現了 LoadBalance 接口方法,並封裝了一些公共的邏輯。因此在分析負載均衡實現以前,先來看一下 AbstractLoadBalance 的邏輯。首先來看一下負載均衡的入口方法 select,以下:node

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (invokers == null || invokers.isEmpty())
        return null;
    // 若是 invokers 列表中僅有一個 Invoker,直接返回便可,無需進行負載均衡
    if (invokers.size() == 1)
        return invokers.get(0);
    
    // 調用 doSelect 方法進行負載均衡,該方法爲抽象方法,由子類實現
    return doSelect(invokers, url, invocation);
}

protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

select 方法的邏輯比較簡單,首先會檢測 invokers 集合的合法性,而後再檢測 invokers 集合元素數量。若是隻包含一個 Invoker,直接返回該 Inovker 便可。若是包含多個 Invoker,此時須要經過負載均衡算法選擇一個 Invoker。具體的負載均衡算法由子類實現,接下來章節會對這些子類進行詳細分析。git

AbstractLoadBalance 除了實現了 LoadBalance 接口方法,還封裝了一些公共邏輯 —— 服務提供者權重計算邏輯。具體實現以下:github

protected int getWeight(Invoker<?> invoker, Invocation invocation) {
    // 從 url 中獲取 weight 配置值
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
    if (weight > 0) {
        // 獲取服務提供者啓動時間戳
        long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
        if (timestamp > 0L) {
            // 計算服務提供者運行時長
            int uptime = (int) (System.currentTimeMillis() - timestamp);
            // 獲取服務預熱時間,默認爲10分鐘
            int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
            // 若是服務運行時間小於預熱時間,則從新計算服務權重,即降權
            if (uptime > 0 && uptime < warmup) {
                // 從新計算服務權重
                weight = calculateWarmupWeight(uptime, warmup, weight);
            }
        }
    }
    return weight;
}

static int calculateWarmupWeight(int uptime, int warmup, int weight) {
    // 計算權重,下面代碼邏輯上形似於 (uptime / warmup) * weight。
    // 隨着服務運行時間 uptime 增大,權重計算值 ww 會慢慢接近配置值 weight
    int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
    return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

上面是權重的計算過程,該過程主要用於保證當服務運行時長小於服務預熱時間時,對服務進行降權,避免讓服務在啓動之初就處於高負載狀態。服務預熱是一個優化手段,與此相似的還有 JVM 預熱。主要目的是讓服務啓動後「低功率」運行一段時間,使其效率慢慢提高至最佳狀態。關於預熱方面的更多知識,你們感興趣能夠本身搜索一下。算法

關於 AbstractLoadBalance 就先分析到這,接下來分析各個實現類的代碼。首先,咱們從 Dubbo 缺省的實現類 RandomLoadBalance 看起。apache

2.1 RandomLoadBalance

RandomLoadBalance 是加權隨機算法的具體實現,它的算法思想很簡單。假設咱們有一組服務器 servers = [A, B, C],他們對應的權重爲 weights = [5, 3, 2],權重總和爲10。如今把這些權重值平鋪在一維座標值上,[0, 5) 區間屬於服務器 A,[5, 8) 區間屬於服務器 B,[8, 10) 區間屬於服務器 C。接下來經過隨機數生成器生成一個範圍在 [0, 10) 之間的隨機數,而後計算這個隨機數會落到哪一個區間上。好比數字3會落到服務器 A 對應的區間上,此時返回服務器 A 便可。權重越大的機器,在座標軸上對應的區間範圍就越大,所以隨機數生成器生成的數字就會有更大的機率落到此區間內。只要隨機數生成器產生的隨機數分佈性很好,在通過屢次選擇後,每一個服務器被選中的次數比例接近其權重比例。好比,通過一萬次選擇後,服務器 A 被選中的次數大約爲5000次,服務器 B 被選中的次數約爲3000次,服務器 C 被選中的次數約爲2000次。數組

以上就是 RandomLoadBalance 背後的算法思想,比較簡單,很少說了,下面開始分析源碼。緩存

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size();
        int totalWeight = 0;
        boolean sameWeight = true;
        // 下面這個循環有兩個做用,第一是計算總權重 totalWeight,
        // 第二是檢測每一個服務提供者的權重是否相同,若不相同,則將 sameWeight 置爲 false
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // 累加權重
            totalWeight += weight;
            // 檢測當前服務提供者的權重與上一個服務提供者的權重是否相同,
            // 不相同的話,則將 sameWeight 置爲 false。
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false;
            }
        }
        
        // 下面的 if 分支主要用於獲取隨機數,並計算隨機數落在哪一個區間上
        if (totalWeight > 0 && !sameWeight) {
            // 隨機獲取一個 [0, totalWeight) 之間的數字
            int offset = random.nextInt(totalWeight);
            // 循環讓 offset 數減去服務提供者權重值,當 offset 小於0時,返回相應的 Invoker。
            // 仍是用上面的例子進行說明,servers = [A, B, C],weights = [5, 3, 2],offset = 7。
            // 第一次循環,offset - 5 = 2 > 0,說明 offset 確定不會落在服務器 A 對應的區間上。
            // 第二次循環,offset - 3 = -1 < 0,代表 offset 落在服務器 B 對應的區間上
            for (int i = 0; i < length; i++) {
                // 讓隨機值 offset 減去權重值
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    // 返回相應的 Invoker
                    return invokers.get(i);
                }
            }
        }
        
        // 若是全部服務提供者權重值相同,此時直接隨機返回一個便可
        return invokers.get(random.nextInt(length));
    }
}

RandomLoadBalance 的算法思想比較簡單,在通過屢次請求後,可以將調用請求按照權重值進行「均勻」分配。固然 RandomLoadBalance 也存在必定的缺點,當調用次數比較少時,Random 產生的隨機數可能會比較集中,此時多數請求會落到同一臺服務器上。這個缺點並非很嚴重,多數狀況下能夠忽略。RandomLoadBalance 是一個簡單,高效的負載均衡實現,所以 Dubbo 選擇它做爲缺省實現。服務器

關於 RandomLoadBalance 就先到這了,接下來分析 LeastActiveLoadBalance。

2.2 LeastActiveLoadBalance

LeastActiveLoadBalance 翻譯過來是最小活躍數負載均衡,所謂的最小活躍數可理解爲最少鏈接數。即服務提供者目前正在處理的請求數(一個請求對應一條鏈接)最少,代表該服務提供者效率高,單位時間內可處理更多的請求。此時應優先將請求分配給該服務提供者。在具體實現中,每一個服務提供者對應一個活躍數 active。初始狀況下,全部服務提供者活躍數均爲0。每收到一個請求,活躍數加1,完成請求後則將活躍數減1。在服務運行一段時間後,性能好的服務提供者處理請求的速度更快,所以活躍數降低的也越快。此時這樣的服務提供者可以優先獲取到新的服務請求,這就是最小活躍數負載均衡算法的基本思想。除了最小活躍數,LeastActiveLoadBalance 在實現上還引入了權重值。因此準確的來講,LeastActiveLoadBalance 是基於加權最小活躍數算法實現的。舉個例子說明一下,在一個服務提供者集羣中,有兩個性能優異的服務提供者。某一時刻它們的活躍數相同,此時 Dubbo 會根據它們的權重去分配請求,權重越大,獲取到新請求的可能性就越大。若是兩個服務提供者權重相同,此時隨機選擇一個便可。關於 LeastActiveLoadBalance 的背景知識就先介紹到這裏,下面開始分析源碼。

public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size();
        // 最小的活躍數
        int leastActive = -1;
        // 具備相同「最小活躍數」的服務者提供者(如下用 Invoker 代稱)數量
        int leastCount = 0; 
        // leastIndexs 用於記錄具備相同「最小活躍數」的 Invoker 在 invokers 列表中的下標信息
        int[] leastIndexs = new int[length];
        int totalWeight = 0;
        // 第一個最小活躍數的 Invoker 權重值,用於與其餘具備相同最小活躍數的 Invoker 的權重進行對比,
        // 以檢測是否全部具備相同最小活躍數的 Invoker 的權重均相等
        int firstWeight = 0;
        boolean sameWeight = true;

        // 遍歷 invokers 列表
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // 獲取 Invoker 對應的活躍數
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // 獲取權重 - ⭐️
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
            // 發現更小的活躍數,從新開始
            if (leastActive == -1 || active < leastActive) {
                // 使用當前活躍數 active 更新最小活躍數 leastActive
                leastActive = active;
                // 更新 leastCount 爲 1
                leastCount = 1;
                // 記錄當前下標值到 leastIndexs 中
                leastIndexs[0] = i;
                totalWeight = weight;
                firstWeight = weight;
                sameWeight = true;

            // 當前 Invoker 的活躍數 active 與最小活躍數 leastActive 相同 
            } else if (active == leastActive) {
                // 在 leastIndexs 中記錄下當前 Invoker 在 invokers 集合中的下標
                leastIndexs[leastCount++] = i;
                // 累加權重
                totalWeight += weight;
                // 檢測當前 Invoker 的權重與 firstWeight 是否相等,
                // 不相等則將 sameWeight 置爲 false
                if (sameWeight && i > 0
                    && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        
        // 當只有一個 Invoker 具備最小活躍數,此時直接返回該 Invoker 便可
        if (leastCount == 1) {
            return invokers.get(leastIndexs[0]);
        }

        // 有多個 Invoker 具備相同的最小活躍數,但他們的權重不一樣
        if (!sameWeight && totalWeight > 0) {
            // 隨機獲取一個 [0, totalWeight) 之間的數字
            int offsetWeight = random.nextInt(totalWeight);
            // 循環讓隨機數減去具備最小活躍數的 Invoker 的權重值,
            // 當 offset 小於等於0時,返回相應的 Invoker
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                // 獲取權重值,並讓隨機數減去權重值 - ⭐️
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 若是權重相同或權重爲0時,隨機返回一個 Invoker
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }
}

如上,爲了幫助你們理解代碼,我在上面的代碼中寫了大量的註釋。下面簡單總結一下以上代碼所作的事情,以下:

  1. 遍歷 invokers 列表,尋找活躍數最小的 Invoker
  2. 若是有多個 Invoker 具備相同的最小活躍數,此時記錄下這些 Invoker 在 invokers 集合中的下標,以及累加它們的權重,比較它們之間的權重值是否相等
  3. 若是隻有一個 Invoker 具備最小的活躍數,此時直接返回該 Invoker 便可
  4. 若是有多個 Invoker 具備最小活躍數,且它們的權重不相等,此時處理方式和 RandomLoadBalance 一致
  5. 若是有多個 Invoker 具備最小活躍數,但它們的權重相等,此時隨機返回一個便可

以上就是 LeastActiveLoadBalance 大體的實現邏輯,你們在閱讀的源碼的過程當中要注意區分活躍數與權重這兩個概念,不要混爲一談。

以上分析是基於 Dubbo 2.6.4 版本進行了,因爲近期 Dubbo 2.6.5 發佈了,對負載均衡部分的代碼進行了一些更新。這其中就包含了本節分析的 LeastActiveLoadBalance,因此下面簡單說明一下 Dubbo 2.6.5 對 LeastActiveLoadBalance 進行了怎樣的修改。回到上面的源碼中,我在上面的代碼中標註了兩個黃色的五角星⭐️。兩處標記對應的代碼分別以下:

int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);

問題出在服務預熱階段,第一行代碼直接從 url 中去權重值,未被降權過。第二行代碼獲取到的是通過降權後的權重。第一行代碼獲取到的權重值最終會被累加到權重總和 totalWeight 中,這個時候會致使一個問題。offsetWeight 是一個在 0, totalWeight) 範圍內的隨機數,而它所減去的是通過降權的權重。頗有可能在通過 leastCount 次運算後,offsetWeight 仍然是大於0的,致使沒法選中 Invoker。這個問題對應的 issue 爲 [#904,在 pull request #2172 中被修復。具體的修復邏輯是將標註一處的代碼修改成:

// afterWarmup 等價於上面的 weight 變量,這樣命名是爲了強調該變量通過 warmup 降權處理了
int afterWarmup = getWeight(invoker, invocation);

另外,2.6.4 版本中的 LeastActiveLoadBalance 還要一個缺陷,即當一組 Invoker 具備相同的最小活躍數,且其中一個 Invoker 的權重值爲1,此時這個 Invoker 沒法被選中。缺陷代碼以下:

int offsetWeight = random.nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
    int leastIndex = leastIndexs[i];
    offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
    if (offsetWeight <= 0)    // ❌
        return invokers.get(leastIndex);
}

問題就出在了offsetWeight <= 0上,舉例說明,假設有一組 Invoker 的權重爲 五、二、1,offsetWeight 最大值爲 7。假設 offsetWeight = 7,你會發現,當 for 循環進行第二次遍歷後 offsetWeight = 7 - 5 - 2 = 0,提早返回了。此時,權重爲1的 Invoker 就沒有機會被選中。這個修改起來也不難,能夠將 offsetWeight < 0,不過 Dubbo 的是將offsetWeight + 1,也就是:

int offsetWeight = random.nextInt(totalWeight) + 1;

兩種改動都行,不過我認爲以爲第一種方式更好一點,可與 RandomLoadBalance 邏輯保持一致。這裏+1有點突兀,你們讀到這裏要特意思考一下爲何要+1。

以上就是 Dubob 2.6.5 對 LeastActiveLoadBalance 的更新,不是很難理解,就很少說了。接下來分析基於一致性 hash 思想的 ConsistentHashLoadBalance。

2.3 ConsistentHashLoadBalance

一致性 hash 算法由麻省理工學院的 Karger 及其合做者於1997年提供出的,算法提出之初是用於大規模緩存系統的負載均衡。它的工做過程是這樣的,首先根據 ip 獲取其餘的信息爲緩存節點生成一個 hash,並將這個 hash 投射到 [0, 232 - 1] 的圓環上。當有查詢或寫入請求時,則爲緩存項的 key 生成一個 hash 值。而後查找第一個大於或等於該 hash 值的緩存節點,併到這個節點中查詢或寫入緩存項。若是當前節點掛了,則在下一次查詢或寫入緩存時,爲緩存項查找另外一個大於其 hash 值的緩存節點便可。大體效果以下,每一個緩存節點在圓環上佔據一個位置。若是緩存項的 key 的 hash 值小於緩存節點 hash 值,則到該緩存節點中存儲或讀取緩存項。好比下面綠色點對應的緩存項存儲到 cache-2 節點中。因爲 cache-3 掛了,本來應該存到該節點中的緩存想最終會存儲到 cache-4 節點中。

關於一致性 hash 算法,我這裏只作掃盲。具體的細節不討論,你們請自行補充相關的背景知識。下面來看看一致性 hash 在 Dubbo 中的應用。咱們把上圖的緩存節點替換成 Dubbo 的服務提供者,因而獲得了下圖:

這裏相同顏色的節點均屬於同一個服務提供者,好比 Invoker1-1,Invoker1-2,……, Invoker1-160。這樣作的目的是經過引入虛擬節點,讓 Invoker 在圓環上分散開來,避免數據傾斜問題。所謂數據傾斜是指,因爲節點不夠分散,致使大量請求落到了同一個節點上,而其餘節點只會接收到了少許的請求。好比:

如上,因爲 Invoker-1 和 Invoker-2 在圓環上分佈不均,致使系統中75%的請求都會落到 Invoker-1 上,只有 25% 的請求會落到 Invoker-2 上。解決這個問題辦法是引入虛擬節點,經過虛擬節點均衡各個節點的請求量。

到這裏背景知識就普及完了,接下來開始分析源碼。咱們先從 ConsistentHashLoadBalance 的 doSelect 方法開始看起,以下:

public class ConsistentHashLoadBalance extends AbstractLoadBalance {

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = 
        new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

        // 獲取 invokers 原始的 hashcode
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 若是 invokers 是一個新的 List 對象,意味着服務提供者數量發生了變化,可能新增也可能減小了。
        // 此時 selector.identityHashCode != identityHashCode 條件成立
        if (selector == null || selector.identityHashCode != identityHashCode) {
            // 建立新的 ConsistentHashSelector
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }

        // 調用 ConsistentHashSelector 的 select 方法選擇 Invoker
        return selector.select(invocation);
    }
    
    private static final class ConsistentHashSelector<T> {...}
}

如上,doSelect 方法主要作了一些前置工做,好比檢測 invokers 列表是否是變更過,以及建立 ConsistentHashSelector。這些工做作完後,接下來開始調用 select 方法執行負載均衡邏輯。在分析 select 方法以前,咱們先來看一下一致性 hash 選擇器 ConsistentHashSelector 的初始化過程,以下:

private static final class ConsistentHashSelector<T> {

    // 使用 TreeMap 存儲 Invoker 虛擬節點
    private final TreeMap<Long, Invoker<T>> virtualInvokers;

    private final int replicaNumber;

    private final int identityHashCode;

    private final int[] argumentIndex;

    ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
        this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
        this.identityHashCode = identityHashCode;
        URL url = invokers.get(0).getUrl();
        // 獲取虛擬節點數,默認爲160
        this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
        // 獲取參與 hash 計算的參數下標值,默認對第一個參數進行 hash 運算
        String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
        argumentIndex = new int[index.length];
        for (int i = 0; i < index.length; i++) {
            argumentIndex[i] = Integer.parseInt(index[i]);
        }
        for (Invoker<T> invoker : invokers) {
            String address = invoker.getUrl().getAddress();
            for (int i = 0; i < replicaNumber / 4; i++) {
                // 對 address + i 進行 md5 運算,獲得一個長度爲16的字節數組
                byte[] digest = md5(address + i);
                // 對 digest 部分字節進行4次 hash 運算,獲得四個不一樣的 long 型正整數
                for (int h = 0; h < 4; h++) {
                    // h = 0 時,取 digest 中下標爲 0 ~ 3 的4個字節進行位運算
                    // h = 1 時,取 digest 中下標爲 4 ~ 7 的4個字節進行位運算
                    // h = 2, h = 3 時過程同上
                    long m = hash(digest, h);
                    // 將 hash 到 invoker 的映射關係存儲到 virtualInvokers 中,
                    // virtualInvokers 中的元素要有序,所以選用 TreeMap 做爲存儲結構
                    virtualInvokers.put(m, invoker);
                }
            }
        }
    }
}

ConsistentHashSelector 進行了一些列的初始化方法,好比從配置中獲取虛擬節點數以及參與 hash 計算的參數下標,默認狀況下只使用第一個參數進行 hash。須要特別說明的是,ConsistentHashLoadBalance 的負載均衡邏輯只受參數值影響,具備相同參數值的請求將會被分配給同一個服務提供者。ConsistentHashLoadBalance 不 care 權重,所以使用時須要注意一下。

在獲取虛擬節點數和參數下標配置後,接下來要作到事情是計算虛擬節點 hash 值,並將虛擬節點存儲到 TreeMap 中。到此,ConsistentHashSelector 初始化工做就完成了。接下來,咱們再來看看 select 方法的邏輯。

public Invoker<T> select(Invocation invocation) {
    // 將參數轉爲 key
    String key = toKey(invocation.getArguments());
    // 對參數 key 進行 md5 運算
    byte[] digest = md5(key);
    // 取 digest 數組的前四個字節進行 hash 運算,再將 hash 值傳給 selectForKey 方法,
    // 尋找合適的 Invoker
    return selectForKey(hash(digest, 0));
}

private Invoker<T> selectForKey(long hash) {
    // 到 TreeMap 中查找第一個節點值大於或等於當前 hash 的 Invoker
    Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
    // 若是 hash 大於 Invoker 在圓環上最大的位置,此時 entry = null,
    // 須要將 TreeMap 的頭結點賦值給 entry
    if (entry == null) {
        entry = virtualInvokers.firstEntry();
    }

    // 返回 Invoker
    return entry.getValue();
}

如上,選擇的過程比較簡單了。首先是對參數進行 md5 以及 hash 運算,獲得一個 hash 值。而後再拿這個值到 TreeMap 中查找目標 Invoker 便可。

到此關於 ConsistentHashLoadBalance 就分析完了。在閱讀 ConsistentHashLoadBalance 以前,你們必定要先補充背景知識。否者即便這裏只有一百多行代碼,也很難看懂。好了,本節先分析到這。

2.4 RoundRobinLoadBalance

本節,咱們來看一下 Dubbo 中的加權輪詢負載均衡的實現 RoundRobinLoadBalance。在詳細分析源碼前,咱們仍是先來了解一下什麼是加權輪詢。這裏從最簡單的輪詢開始講起,所謂輪詢就是將請求輪流分配給一組服務器。舉個例子,咱們有三臺服務器 A、B、C。咱們將第一個請求分配給服務器 A,第二個請求分配給服務器 B,第三個請求分配給服務器 C,第四個請求再次分配給服務器 A。這個過程就叫作輪詢。輪詢是一種無狀態負載均衡算法,實現簡單,適用於每臺服務器性能相近的場景下。顯然,現實狀況下,咱們並不能保證每臺服務器性能均相近。若是咱們將等量的請求分配給性能較差的服務器,這顯然是不合理的。所以,這個時候咱們須要加權輪詢算法,對輪詢過程進行干預,使得性能好的服務器能夠獲得更多的請求,性能差的獲得的少一些。每臺服務器可以獲得的請求數比例,接近或等於他們的權重比。好比服務器 A、B、C 權重比爲 5:2:1。那麼在8次請求中,服務器 A 將獲取到其中的5次請求,服務器 B 獲取到其中的2次請求,服務器 C 則獲取到其中的1次請求。

以上就是加權輪詢的算法思想,搞懂了這個思想,接下來咱們就能夠分析源碼了。咱們先來看一下 2.6.4 版本的 RoundRobinLoadBalance。

public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = 
        new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // key = 全限定類名 + "." + 方法名,好比 com.xxx.DemoService.sayHello
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size();
        // 最大權重
        int maxWeight = 0;
        // 最小權重
        int minWeight = Integer.MAX_VALUE;
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        // 權重總和
        int weightSum = 0;

        // 下面這個循環主要用於查找最大和最小權重,計算權重總和等
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // 獲取最大和最小權重
            maxWeight = Math.max(maxWeight, weight);
            minWeight = Math.min(minWeight, weight);
            if (weight > 0) {
                // 將 weight 封裝到 IntegerWrapper 中
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                // 累加權重
                weightSum += weight;
            }
        }

        // 查找 key 對應的對應 AtomicPositiveInteger 實例,爲空則建立。
        // 這裏能夠把 AtomicPositiveInteger 當作一個黑盒,你們只要知道
        // AtomicPositiveInteger 用於記錄服務的調用編號便可。至於細節,
        // 你們若是感興趣,能夠自行分析
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }

        // 獲取當前的調用編號
        int currentSequence = sequence.getAndIncrement();
        // 若是 最小權重 < 最大權重,代表服務提供者之間的權重是不相等的
        if (maxWeight > 0 && minWeight < maxWeight) {
            // 使用調用編號對權重總和進行取餘操做
            int mod = currentSequence % weightSum;
            // 進行 maxWeight 次遍歷
            for (int i = 0; i < maxWeight; i++) {
                // 遍歷 invokerToWeightMap
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    // 獲取 Invoker
                    final Invoker<T> k = each.getKey();
                    // 獲取權重包裝類 IntegerWrapper
                    final IntegerWrapper v = each.getValue();
                    
                    // 若是 mod = 0,且權重大於0,此時返回相應的 Invoker
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    
                    // mod != 0,且權重大於0,此時對權重和 mod 分別進行自減操做
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        
        // 服務提供者之間的權重相等,此時經過輪詢選擇 Invoker
        return invokers.get(currentSequence % length);
    }

    // IntegerWrapper 是一個 int 包裝類,主要包含了一個自減方法。
    // 與 Integer 不一樣,Integer 是不可變類,而 IntegerWrapper 是可變類
    private static final class IntegerWrapper {
        private int value;

        public void decrement() {
            this.value--;
        }
        
        // 省略部分代碼
    }
}

如上,RoundRobinLoadBalance 的每行代碼都不是很難理解,可是將它們組合到一塊兒以後,好像就看不懂了。這裏對上面代碼的主要邏輯進行總結,以下:

  1. 找到最大權重值,並計算出權重和
  2. 使用調用編號對權重總和進行取餘操做,獲得 mod
  3. 檢測 mod 的值是否等於0,且 Invoker 權重是否大於0,若是兩個條件均知足,則返回該 Invoker
  4. 若是上面條件不知足,且 Invoker 權重大於0,此時對 mod 和權重進行遞減
  5. 再次循環,重複步驟三、4

以上過程對應的原理不太好解釋,因此下面直接舉例說明把。假設咱們有三臺服務器 servers = [A, B, C],對應的權重爲 weights = [2, 5, 1]。接下來對上面的邏輯進行簡單的模擬。

mod = 0:知足條件,此時直接返回服務器 A

mod = 1:須要進行一次遞減操做才能知足條件,此時返回服務器 B

mod = 2:須要進行兩次遞減操做才能知足條件,此時返回服務器 C

mod = 3:須要進行三次遞減操做才能知足條件,通過遞減後,服務器權重爲 [1, 4, 0],此時返回服務器 A

mod = 4:須要進行四次遞減操做才能知足條件,通過遞減後,服務器權重爲 [0, 4, 0],此時返回服務器 B

mod = 5:須要進行五次遞減操做才能知足條件,通過遞減後,服務器權重爲 [0, 3, 0],此時返回服務器 B

mod = 6:須要進行六次遞減操做才能知足條件,通過遞減後,服務器權重爲 [0, 2, 0],此時返回服務器 B

mod = 7:須要進行七次遞減操做才能知足條件,通過遞減後,服務器權重爲 [0, 1, 0],此時返回服務器 B

通過8次調用後,咱們獲得的負載均衡結果爲 [A, B, C, A, B, B, B, B],次數比 A:B:C = 2:5:1,等於權重比。當 sequence = 8 時,mod = 0,此時重頭再來。從上面的模擬過程能夠看出,當 mod >= 3 後,服務器 C 就不會被選中了,由於它的權重被減爲0了。當 mod >= 4 後,服務器 A 的權重被減爲0,此後 A 就不會再被選中。

以上是 2.6.4 版本的 RoundRobinLoadBalance 分析過程,你們若是看不懂,本身能夠定義一些權重組合進行模擬。也能夠寫點測試用例,進行調試分析,總之不要死看。

2.6.4 版本的 RoundRobinLoadBalance 存在着比較嚴重的性能問題,該問題最初是在 issue #2578 中被反饋出來。問題出在了 Invoker 的返回時機上,RoundRobinLoadBalance 須要在mod == 0 && v.getValue() > 0 條件成立的狀況下才會被返回相應的 Invoker。假如 mod 很大,好比 10000,50000,甚至更大時,doSelect 方法須要進行不少次計算才能將 mod 減爲0。由此可知,doSelect 的效率與 mod 有關,時間複雜度爲 O(mod)。mod 又受最大權重 maxWeight 的影響,所以當某個服務提供者配置了很是大的權重,此時 RoundRobinLoadBalance 會產生比較嚴重的性能問題。這個問題被反饋後,社區很快作了迴應。並對 RoundRobinLoadBalance 的代碼進行了重構,將時間複雜度優化至了常量級別。這個優化能夠說很好了,下面咱們來學習一下優化後的代碼。

public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private final ConcurrentMap<String, AtomicPositiveInteger> indexSeqs = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size();
        int maxWeight = 0;
        int minWeight = Integer.MAX_VALUE;
        final List<Invoker<T>> invokerToWeightList = new ArrayList<>();
        
        // 查找最大和最小權重
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight);
            minWeight = Math.min(minWeight, weight);
            if (weight > 0) {
                invokerToWeightList.add(invokers.get(i));
            }
        }
        
        // 獲取當前服務對應的調用序列對象 AtomicPositiveInteger
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            // 建立 AtomicPositiveInteger,默認值爲0
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        
        // 獲取下標序列對象 AtomicPositiveInteger
        AtomicPositiveInteger indexSeq = indexSeqs.get(key);
        if (indexSeq == null) {
            // 建立 AtomicPositiveInteger,默認值爲 -1
            indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1));
            indexSeq = indexSeqs.get(key);
        }

        if (maxWeight > 0 && minWeight < maxWeight) {
            length = invokerToWeightList.size();
            while (true) {
                int index = indexSeq.incrementAndGet() % length;
                int currentWeight = sequence.get() % maxWeight;

                // 每循環一輪(index = 0),從新計算 currentWeight
                if (index == 0) {
                    currentWeight = sequence.incrementAndGet() % maxWeight;
                }
                
                // 檢測 Invoker 的權重是否大於 currentWeight,大於則返回
                if (getWeight(invokerToWeightList.get(index), invocation) > currentWeight) {
                    return invokerToWeightList.get(index);
                }
            }
        }
        
        // 全部 Invoker 權重相等,此時進行普通的輪詢便可
        return invokers.get(sequence.incrementAndGet() % length);
    }
}

上面代碼的邏輯是這樣的,每進行一輪循環,從新計算 currentWeight。若是當前 Invoker 權重大於 currentWeight,則返回該 Invoker。仍是舉例說明吧,假設服務器 [A, B, C] 對應權重 [5, 2, 1]。

第一輪循環,currentWeight = 1,可返回 A 和 B

第二輪循環,currentWeight = 2,返回 A

第三輪循環,currentWeight = 3,返回 A

第四輪循環,currentWeight = 4,返回 A

第五輪循環,currentWeight = 0,返回 A, B, C

如上,這裏的一輪循環是指 index 再次變爲0所經歷過的循環,這裏能夠把 index = 0 看作是一輪循環的開始。每一輪循環的次數與 Invoker 的數量有關,Invoker 數量一般不會太多,因此咱們能夠認爲上面代碼的時間複雜度爲常數級。

重構後的 RoundRobinLoadBalance 看起來已經很不錯了,可是在代碼更新不久後,頗有又被重構了。此次重構緣由是新的 RoundRobinLoadBalance 在某些狀況下選出的服務器序列不夠均勻。好比,服務器 [A, B, C] 對應權重 [5, 1, 1]。如今進行7次負載均衡,選擇出來的序列爲 [A, A, A, A, A, B, C]。前5個請求所有都落在了服務器 A上,分佈不夠均勻。這將會使服務器 A 短期內接收大量的請求,壓力陡增。而 B 和 C 無請求,處於空閒狀態。咱們指望的結果是這樣的 [A, A, B, A, C, A, A],不一樣服務器能夠穿插獲取請求。爲了增長負載均衡結果的平滑性,社區再次對 RoundRobinLoadBalance 的實現進行了重構。此次重構參考自 Nginx 的平滑加權輪詢負載均衡,實現原理是這樣的。每一個服務器對應兩個權重,分別爲 weight 和 currentWeight。其中 weight 是固定的,currentWeight 是會動態調整,初始值爲0。當有新的請求進來時,遍歷服務器列表,讓它的 currentWeight 加上自身權重。遍歷完成後,找到最大的 currentWeight,並將其減去權重總和,而後返回相應的服務器便可。

上面描述不是很好理解,下面仍是舉例說明吧。仍然使用服務器 [A, B, C] 對應權重 [5, 1, 1] 的例子進行說明,如今有7個請求依次進入負載均衡邏輯,選擇過程以下:

請求編號 currentWeight 數組 選擇結果 減去權重總和後的 currentWeight 數組
1 [5, 1, 1] A [-2, 1, 1]
2 [3, 2, 2] A [-4, 2, 2]
3 [1, 3, 3] B [1, -4, 3]
4 [6, -3, 4] A [-1, -3, 4]
5 [4, -2, 5] C [4, -2, -2]
6 [9, -1, -1] A [2, -1, -1]
7 [7, 0, 0] A [0, 0, 0]

如上,通過平滑性處理後,獲得的服務器序列爲 [A, A, B, A, C, A, A],相比以前的序列 [A, A, A, A, A, B, C],分佈性要好一些。初始狀況下 currentWeight = [0, 0, 0],第7個請求處理完後,currentWeight 再次變爲 [0, 0, 0],是否是很神奇。這個結果也不難理解,在7次計算過程當中,每一個服務器的 currentWeight 都增長了自身權重 weight 7,獲得 currentWeight = [35, 7, 7],A 被選中5次,要被減去 5 7。B 和 C 被選中1次,要被減去 1 * 7。因而 currentWeight = [35, 7, 7] - [35, 7, 7] = [0, 0, 0]。

以上就是平滑加權輪詢的計算過程,如今你們應該對平滑加權輪詢算法了有了一些瞭解。接下來,咱們來看看 Dubbo-2.6.5 是如何實現上面的計算過程的。

public class RoundRobinLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "roundrobin";
    
    private static int RECYCLE_PERIOD = 60000;
    
    protected static class WeightedRoundRobin {
        // 服務提供者權重
        private int weight;
        // 當前權重
        private AtomicLong current = new AtomicLong(0);
        // 最後一次更新時間
        private long lastUpdate;
        
        public void setWeight(int weight) {
            this.weight = weight;
            // 初始狀況下,current = 0
            current.set(0);
        }
        public long increaseCurrent() {
            // current = current + weight;
            return current.addAndGet(weight);
        }
        public void sel(int total) {
            // current = current - total;
            current.addAndGet(-1 * total);
        }
    }

    // 嵌套 Map 結構,存儲的數據結構示例以下:
    // {
    //     "UserService.query": {
    //         "url1": WeightedRoundRobin@123, 
    //         "url2": WeightedRoundRobin@456, 
    //     },
    //     "UserService.update": {
    //         "url1": WeightedRoundRobin@123, 
    //         "url2": WeightedRoundRobin@456,
    //     }
    // }
    // 最外層爲服務類名 + 方法名,第二層爲 url 到 WeightedRoundRobin 的映射關係。
    // 這裏咱們能夠將 url 當作是服務提供者的 id
    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
    
    // 原子更新鎖
    private AtomicBoolean updateLock = new AtomicBoolean();
    
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        // 獲取 url 到 WeightedRoundRobin 映射表,若是爲空,則建立一個新的
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        
        // 獲取當前時間
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;

        // 下面這個循環主要作了這樣幾件事情:
        //   1. 遍歷 Invoker 列表,檢測當前 Invoker 是否有
        //      對應的 WeightedRoundRobin,沒有則建立
        //   2. 檢測 Invoker 權重是否發生了變化,若變化了,
        //      則更新 WeightedRoundRobin 的 weight 字段
        //   3. 讓 current 字段加上自身權重,等價於 current += weight
        //   4. 設置 lastUpdate 字段,即 lastUpdate = now
        //   5. 尋找具備最大 current 的 Invoker 以及 WeightedRoundRobin,
        //      暫存起來,留做後用
        //   6. 計算權重總和
        for (Invoker<T> invoker : invokers) {
            String identifyString = invoker.getUrl().toIdentityString();
            WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
            int weight = getWeight(invoker, invocation);
            if (weight < 0) {
                weight = 0;
            }
            
            // 檢測當前 Invoker 是否有對應的 WeightedRoundRobin,沒有則建立
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                // 設置 Invoker 權重
                weightedRoundRobin.setWeight(weight);
                // 存儲 url 惟一標識 identifyString 到 weightedRoundRobin 的映射關係
                map.putIfAbsent(identifyString, weightedRoundRobin);
                weightedRoundRobin = map.get(identifyString);
            }
            // Invoker 權重不等於 WeightedRoundRobin 中保存的權重,說明權重變化了,此時進行更新
            if (weight != weightedRoundRobin.getWeight()) {
                weightedRoundRobin.setWeight(weight);
            }
            
            // 讓 current 加上自身權重,等價於 current += weight
            long cur = weightedRoundRobin.increaseCurrent();
            // 設置 lastUpdate,表示近期更新過
            weightedRoundRobin.setLastUpdate(now);
            // 找出最大的 current 
            if (cur > maxCurrent) {
                maxCurrent = cur;
                // 將具備最大 current 權重的 Invoker 賦值給 selectedInvoker
                selectedInvoker = invoker;
                // 將 Invoker 對應的 weightedRoundRobin 賦值給 selectedWRR,留做後用
                selectedWRR = weightedRoundRobin;
            }
            
            // 計算權重總和
            totalWeight += weight;
        }

        // 對 <identifyString, WeightedRoundRobin> 進行檢查,過濾掉長時間未被更新的節點。
        // 該節點可能掛了,invokers 中不包含該節點,因此該節點的 lastUpdate 長時間沒法被更新。
        // 若未更新時長超過閾值後,就會被移除掉,默認閾值爲60秒。
        if (!updateLock.get() && invokers.size() != map.size()) {
            if (updateLock.compareAndSet(false, true)) {
                try {
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                    // 拷貝
                    newMap.putAll(map);
                    
                    // 遍歷修改,也就是移除過時記錄
                    Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, WeightedRoundRobin> item = it.next();
                        if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                            it.remove();
                        }
                    }
                    
                    // 更新引用
                    methodWeightMap.put(key, newMap);
                } finally {
                    updateLock.set(false);
                }
            }
        }

        if (selectedInvoker != null) {
            // 讓 current 減去權重總和,等價於 current -= totalWeight
            selectedWRR.sel(totalWeight);
            // 返回具備最大 current 的 Invoker
            return selectedInvoker;
        }
        
        // should not happen here
        return invokers.get(0);
    }
}

以上就是 Dubbo-2.6.5 版本的 RoundRobinLoadBalance,你們若是可以理解平滑加權輪詢算法的計算過程,再配合我寫的註釋,理解上面的代碼應該不難。

以上就是關於 RoundRobinLoadBalance 所有的分析,內容有點多,你們慢慢消化吧。好了,本節先到這。

3.總結

本篇文章對 Dubbo 中的幾種負載均衡實現進行了詳細的分析,總的來講,這篇文章寫的仍是有點累的。主要是每介紹一種負載均衡實現,就要介紹一下相關背景。另外一方面,這裏不少東西對於我來講,也徹底是新的。在此以前,我對負載均衡算法並沒太多瞭解。這篇文章基本上是邊學邊寫的,總共耗時5天。原本想簡單寫寫算了,但最後仍是決定寫詳細點。好在,如今寫完了,我也能夠放鬆一下了。

本篇文章是個人 Dubbo 源碼分析系列文章關於集羣容錯部分的最後一篇文章,寫完感受學到了不少東西。經過堅持不懈的閱讀代碼,寫技術文章,使得我對 Dubbo 有了更深刻的瞭解。固然,這還遠遠不夠。後續還有不少東西要了解,好比 Nacos、Sentinel 等。長路漫漫,步履不停。

好了,本篇文章到這裏就結束了。感謝你們的閱讀。

參考

附錄:Dubbo 源碼分析系列文章

時間 文章
2018-10-01 Dubbo 源碼分析 - SPI 機制
2018-10-13 Dubbo 源碼分析 - 自適應拓展原理
2018-10-31 Dubbo 源碼分析 - 服務導出
2018-11-12 Dubbo 源碼分析 - 服務引用
2018-11-17 Dubbo 源碼分析 - 集羣容錯之 Directory
2018-11-20 Dubbo 源碼分析 - 集羣容錯之 Router
2018-11-24 Dubbo 源碼分析 - 集羣容錯之 Cluster
2018-11-29 Dubbo 源碼分析 - 集羣容錯之 LoadBalance
本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
做者:田小波
本文同步發佈在個人我的博客: http://www.tianxiaobo.com

cc
本做品採用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。

相關文章
相關標籤/搜索