Dubbo 負載均衡

前言

你們好,今天給你們分享 Dubbo 中的負載均衡。在前一個章節中咱們介紹 Dubbo延遲服務暴露,咱們例舉了常見的使用場景而且進行了源碼解析來分析其實現原理,同時咱們也知道 Dubbo 延遲服務暴露其核心就是經過一個 延遲的調度器指定延遲時間後開始服務的暴露。不少小夥伴可能會好奇:咱們的服務部署基本都是集羣形式,那服務消費端究竟是調用哪個服務提供者呢?都有哪些常見的服務選擇算法?爲了揭開這些困惑,下面就讓咱們快速開始吧!java

1. 負載均衡簡介

那麼什麼是負載均衡呢?舉個簡單例子:在火車站購買火車票場景,咱們知道節假日確定有很是多的人購買火車票,那麼一個售票窗口賣票的話確定會排很長的隊對用戶來講體驗很是差。那咱們能不能多增長几個售票窗口呢?是否是並行處理的能力立刻獲得提升了呢?沒錯!咱們在軟件工程裏面也是如此。好比 Nginx 經常用做軟件負載均衡、F5用做硬件的負載均衡器等等。負載均衡算法能夠有效提升系統的併發處理能力、容錯能力、一致性訪問等。以下圖所示:node

負載均衡

  1. 併發處理能力:經過橫向拓展部署後端服務器數量作到服務擴容。
  2. 容錯能力:經過 Nginx 代理請求後端服務失敗時轉移調用其餘存活服務。
  3. 一致性訪問:在某些場景中指定路由到固定服務器,好比:特定服務器對登陸用戶 Session 進行緩存。

2. Dubbo 支持負載均衡算法

2.1 加權隨機算法

隨機算法比較簡單就和數學中的隨機值選取同樣,好比:1-100中隨機選擇一個數字。在 Dubbo 中對隨機算法增長了一個權重概念。舉個例子:git

服務列表 權重
Service01 10
Service02 20
Service03 20
Service04 50

從上表中咱們知道4個服務權重分別是十、20、20、50,參考下圖描述了隨機選擇步驟:算法

隨機算法

  1. 首先咱們根據權重總和利用隨機值獲取初始 offset,假設 offset 值爲 60。
  2. 第一步循環服務列表第一個服務獲取權重值 10 並使用 offset - weight 結果值爲 50 > 0 繼續執行從新賦值offset = 50。
  3. 第二步循環服務列表第二個服務獲取權重值 20 並使用 offset - weight 結果值爲 30 > 0 繼續執行。
  4. 第三步循環服務列表第三個服務獲取權重值 20 並使用 offset - weight 結果值爲 10 > 0 繼續執行。
  5. 第四步循環服務列表第四個服務獲取權重值 50 並使用 offset - weight 結果值爲 -40 > 0 找到服務終止。
Tips:此算法數據量越大數據分配越均勻。

2.1.1 算法實現:

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // 服務總數
        int length = invokers.size();
        // 標識這些服務是否具備相同的權重
        boolean sameWeight = true;
        // 全部服務權重集合
        int[] weights = new int[length];
        // 第一個服務權重
        int firstWeight = getWeight(invokers.get(0), invocation);
        weights[0] = firstWeight;
        // 權重總和
        int totalWeight = firstWeight;
        for (int i = 1; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // 記錄權重
            weights[i] = weight;
            // 合計權重值
            totalWeight += weight;
            if (sameWeight && weight != firstWeight) {
                sameWeight = false;
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // 從(0,totalWeight]範圍選擇權重值
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // 基於隨機值返回服務
            for (int i = 0; i < length; i++) {
              // offset = offset - weights[i]
                offset -= weights[i];
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 若是權重值相同或者totalWeight=0返回服務列表中隨機服務
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }
}

2.1.2 算法過程分析:

  1. 循環 invokers 服務列表計算出權重總和並標記出是否存在權重所有相同狀況。
  2. 根據上面步驟的標記判斷 invokers 服務列表全部服務權重是否相同,相同則隨機返回服務列表中的一個服務。
  3. invokers 服務列表存在服務權重不相同時,產生一個權重總和範圍內的一個大於0的隨着整數賦值給 offset,而後循環訪問服務列表中的元素而且使用 offset 減去當前循環元素的權重值,若是差值大於0則賦值給當前的 offset 繼續執行元素循環,當 offset 減去當前元素的權重大於零時中止循環,則當前查找的元素爲選擇的服務。

2.2 加權輪詢算法

輪詢負載均衡算法顧名思義就是輪流調用服務。舉個例子:假設4個請求經過 Nginx 代理轉發到後端服務:編程

輪詢算法

同時 Dubbo 中的輪詢算法也是須要權重支持。輪詢負載均衡算法可讓 RPC 調用嚴格按照咱們設置的比例來分配。不論是少許的調用仍是大量的調用。可是輪詢負載均衡算法也有不足的地方,存在慢的服務累積請求的問題,好比:第二臺 Service2 很慢,但沒掛,當請求調到第二臺時就卡在那,長此以往,全部請求都卡在調到第二臺上,致使整個系統變慢。如下是加權輪詢算法圖:後端

加權輪詢算法

2.2.1 算法實現

public class RoundRobinLoadBalance extends AbstractLoadBalance {
    
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
         //服務惟一標識,獲取這一組服務第一個服務:由於這是一組服務全部key是相同的
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();//DemoService.method1
        //經過服務惟一標識對這一組服務進行緩存 key -> Map
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;
        for (Invoker<T> invoker : invokers) {
            String identifyString = invoker.getUrl().toIdentityString();//服務實例惟一表示:[test://127.0.0.1:1/DemoService,test://127.0.0.1:2/DemoService,test://127.0.0.1:3/DemoService]
            WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
            //獲取服務權重
            int weight = getWeight(invoker, invocation);
            //WeightedRoundRobin封裝實體
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(identifyString, weightedRoundRobin);
            }
            //權重發生變動
            if (weight != weightedRoundRobin.getWeight()) {
                //從新賦值
                weightedRoundRobin.setWeight(weight);
            }
            long cur = weightedRoundRobin.increaseCurrent();
            //記錄更新時間
            weightedRoundRobin.setLastUpdate(now);
            if (cur > maxCurrent) {//大於當前最大權重則被選擇
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            //計算全部權重值
            totalWeight += weight;
        }
        //獲取鎖而且存在服務下線狀況【invokers.size() != map.size()】
        if (!updateLock.get() && invokers.size() != map.size()) {
            if (updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                    //回收週期
                    newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
                    methodWeightMap.put(key, newMap);
                } finally {
                    updateLock.set(false);
                }
            }
        }
        if (selectedInvoker != null) {
            //將其選擇服務權重值減去總權重值
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return invokers.get(0);
    }
}

2.2.2 算法過程分析

  1. 首先初始化 maxCurrent 爲最小值 Long.MIN_VALUEtotalWeight 爲0。
  2. 循環 invokers 服務列表,每一個元素中的 current (初始化爲0)加上元素權重值大於 maxCurrent
    則賦值給 maxCurrent 而且賦值選中服務( selectedInvoker )爲當前元素,而且累計當前元素權重值賦值給 totalWeight
  3. 設置當前選中服務權重值爲選中元素權重減去當前權重總和。
Tips:上述算法存在必定的難度,稍做了解便可。

2.3 最少活躍調用數算法

咱們在實際服務調度場景中通常存在多服務實例部署狀況,當相同的應用部署到不一樣機器極可能存在不一樣服務處理的能力不一樣有快有慢,那麼咱們理想化的認爲在服務調用過程當中處理能力慢的服務器接收較少的服務請求更爲符合咱們的要求。以下示例圖:數組

最少活躍計數算法

從圖中能夠看出假設咱們這裏有5個請求經過 Ngingx 代理轉發到後端服務,這裏 Service02 只接收到一個請求(處理能力較弱)。緩存

2.3.1 算法實現

public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size();
        //初始化最小活躍值,最小活躍數,最小活躍的invoker索引(index),權重數組
        int leastActive = -1;
        int leastCount = 0;
        int[] leastIndexes = new int[length];
        int[] weights = new int[length];
        int totalWeight = 0;
        int firstWeight = 0;
        boolean sameWeight = true;

        // 過濾出全部最少活躍服務
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // 獲取這個invoker活躍數
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // 獲取invoker的權重 默認100.
            int afterWarmup = getWeight(invoker, invocation);
            // 權重保存到集合中
            weights[i] = afterWarmup;
            //找到最小活躍數invoker
            if (leastActive == -1 || active < leastActive) {
                //重置最小活躍數值
                leastActive = active;
                //重置最小活躍計數
                leastCount = 1;
                // 記錄最小活躍數對應索引
                leastIndexes[0] = i;
                // 重置權重合計
                totalWeight = afterWarmup;
                // 記錄第一個最小活躍數對應權重值
                firstWeight = afterWarmup;
                // 重置是否具備相同權重標誌
                sameWeight = true;
                //當前invoker活躍數等於最小活躍數
            } else if (active == leastActive) {
                // 最小活躍數leastCount增長而且記錄當前invoker對應索引
                leastIndexes[leastCount++] = i;
                // 累計invoker權重值
                totalWeight += afterWarmup;
                // 判斷是否權重值相同
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        //最小活躍數invoker只有一個
        if (leastCount == 1) {
            // 直接返回
            return invokers.get(leastIndexes[0]);
        }
        //具備多個相同活躍數、具備不一樣權重且權重和大於0
        if (!sameWeight && totalWeight > 0) {
            // 根據權重值進行一個(0,totalWeight]整數隨機值
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            // 基於隨機值選擇所在區間和隨機負載均衡算法一致
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        // 若是全部權重值相同或者totalWeight=0返回服務列表中隨機一個服務
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }
}

2.3.2 算法過程分析

  1. 初始化最小活躍值,最小活躍數,最小活躍的 invoker 索引(index),權重數組。
  2. 循環 invokers 服務列表獲取 invoker 活躍數、權重且過濾出全部最少活躍服務並記錄最小活躍數對應索引,同時標記服務列表是否存在不一樣權重值。
  3. 若是最小活躍數爲一直接反正這個服務。
  4. 若是存在多個相同活躍數、具備不一樣權重且權重和大於0,則根據權重值進行一個(0, totalWeight ]整數隨機值
    而後基於隨機值選擇所在區間和上面講的隨機負載均衡算法一致。
  5. 若是全部權重值相同或者 totalWeight=0 返回服務列表中隨機一個服務。
Tips:上述算法存在必定的難度,稍做了解便可。

2.4 一致性 Hash 算法

要明白什麼是一致性 Hash 首先得知道什麼是 Hash 算法。咱們在學習 HashMap 的 JDK 源碼時能夠了解到 HashMap 的數據結構由一個數組+鏈表構成以下所示:服務器

hash算法

上圖簡單畫出了 HashMap 底層數據存儲結構,從中能夠看出當咱們調用 HashMap.put(K key, V value) 方法時經過hash(key)計算出一個 Hash 值與數組長度求模獲得一個索引即爲數組下標對應位置,當這個位置存在元素時即發生 Hash 碰撞就會產生一個鏈。微信

一個常見的場景,好比咱們在請求某個服務時須要固定訪問某一臺服務器以下所示:

hash算法1

從圖中能夠看出 Client01 經過 Nginx 固定訪問 Service01Client02 經過 Nginx 固定訪問 Service02Client03 經過 Nginx 固定訪問 Service03。要知足這種場景就須要使用 Hash 算法。可是普通 Hash 算法存在一個問題是 Hash 出來的是實際後端服務節點,當其中某個服務宕機時 Hash 計算因子發生改變(求模的分子)若是計算 Hash 因子不調整則可能會路由到宕機那臺服務就會存在問題。那一致性 Hash 就是解決這樣的問題。

什麼是一致性 Hash 呢?一致性哈希算法是採用的環形哈希空間的方式,它首先根據 Ip 或者其餘信息爲機器節點生成一個 Hash 值,它投射到一個[0,2^32-1]的圓環中。以後咱們能夠認爲它是一個錨點。當請求進來後,它攜帶的數據,都統一輩子成一個 Hash 值,這時候比對以前的機器節點信息產生的 Hash 值,當遇到第一個大於或者等於該 Hash 值的緩存節點,咱們將這個數據便歸屬於這個機器節點。

假設虛擬節點12個、真實服務3個分別是:Invoker1Invoker2Invoker3。下圖展現了服務節點怎樣進行虛擬節點映射的過程:

  1. Invoker1 通過 Hash 計算後映射到環上對應位置

Invoker1虛擬節點映射

  1. Invoker2 通過 Hash 計算後映射到環上對應位置

Invoker2虛擬節點映射

  1. Invoker3 通過 Hash 計算後映射到環上對應位置

Invoker3虛擬節點映射

上圖中長方形色塊表明真實的服務,圓形表明虛擬節點。全部同一種顏色的圓形虛擬節點歸屬於同一色塊的長方形色塊所表明的真實服務。

2.4.1 算法實現

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "consistenthash";

    /**
     * Hash nodes name
     */
    public static final String HASH_NODES = "hash.nodes";

    /**
     * Hash arguments name
     */
    public static final String HASH_ARGUMENTS = "hash.arguments";

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //獲取調用方法名稱
        String methodName = RpcUtils.getMethodName(invocation);
        //生成惟一標識
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        int identityHashCode = System.identityHashCode(invokers);
        //對ConsistentHashSelector進行緩存
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        //緩存中selector == null 或 當invokers中服務發生變化 服務下限/服務上線 invokers hash值發生變動須要從新構建ConsistentHashSelector
        if (selector == null || selector.identityHashCode != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        //執行服務選擇
        return selector.select(invocation);
    }

    private static final class ConsistentHashSelector<T> {

        private final TreeMap<Long, Invoker<T>> virtualInvokers;

        private final int replicaNumber;

        private final int identityHashCode;

        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            //虛擬Invoker(虛擬節點)
            this.virtualInvokers = new TreeMap<>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            //虛擬節點數
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
            String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));//參與hash的參數索引
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();//127.0.0.1:1
                // 將虛擬節點細化爲 (replicaNumber / 4) 份
                for (int i = 0; i < replicaNumber / 4; i++) {
                    //地址+序號進行摘要
                    byte[] digest = md5(address + i);
                    // 每份4個點散列到treeMap中
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

        public Invoker<T> select(Invocation invocation) {
            String key = toKey(invocation.getArguments());
            byte[] digest = md5(key);
            return selectForKey(hash(digest, 0));
        }
        private Invoker<T> selectForKey(long hash) {
            //獲取至少大於或等於給定hash值
            Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            if (entry == null) {
                //獲取第一個Entry
                entry = virtualInvokers.firstEntry();
            }
            return entry.getValue();
        }

    }

}

2.4.2 算法過程分析

  1. 虛擬節點構建:
  • 首先初始化虛擬節點數,默認160能夠經過外部參數指定。
  • 獲取參與 Hash 計算的參數值,hash.arguments 屬性值,缺省是0參數位置。
  • 循環 invokers 服務列表根據 ip+port+序號生成 replicaNumber 個的節點(生成虛擬節點),其中 key 就是算出來的 Hash 值,value 就是 invokerkey 的值從小到大排序的。
  1. 節點查找
  • 根據 hash.arguments參數取出對應位的參數,拼接成 key
  • 使用 md5key 計算,使用 Hash 算法算出 key 對應的 Hash 值。
  • 根據這個 key 的 Hash 值找出對應的 invoker ,這裏返回鍵值大於或等於 key 的那部分 ,而後再取第一個。
Tips:上述算法存在必定的難度,稍做了解便可。

3. 小結

在本小節中咱們主要學習了 Dubbo 中常見的負載均衡算法以及使用場景。

本節課程的重點以下:

  1. 理解 什麼是負載均衡算法
  2. 知道負載均衡算法使用場景
  3. 瞭解 Dubbo 中有哪些負載均衡算法
  4. 瞭解 Dubbo 負載均衡算法實現邏輯

做者

我的從事金融行業,就任過易極付、思建科技、某網約車平臺等重慶一流技術團隊,目前就任於某銀行負責統一支付系統建設。自身對金融行業有強烈的愛好。同時也實踐大數據、數據存儲、自動化集成和部署、分佈式微服務、響應式編程、人工智能等領域。同時也熱衷於技術分享創立公衆號和博客站點對知識體系進行分享。關注公衆號: 青年IT男 獲取最新技術文章推送!

博客地址: http://youngitman.tech

微信公衆號:

相關文章
相關標籤/搜索