解析 Dubbo 的 LoadBalance 策略源碼

零 前期準備

0 FBI WARNING

文章異常囉嗦且繞彎。java

1 版本

Dubbo 版本 : dubbo 3.0apache

2 LoadBalance 簡介

Dubbo LoadBalance 是 Dubbo Consumer 中用於負載均衡的組件,位於 Cluster 層中。數組

一 Interface

LoadBalance 的組件遵循 Dubbo 的通常設計規律,接口在 dubbo-cluster 模塊中:緩存

package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.Adaptive;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance;
import java.util.List;

@SPI(RandomLoadBalance.NAME)  // RandomLoadBalance.NAME = random
public interface LoadBalance {

    /**
     * 能夠在 url 裏傳入 loadbalance 參數來切換負載均衡策略,默認根據 spi 機制,會使用 random
     */
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

二 模版類

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import java.util.List;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_SERVICE_REFERENCE_PATH;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;

/**
 * 負載均衡組件模板
 */
public abstract class AbstractLoadBalance implements LoadBalance {
    
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {

        // ww = (當前時間 - 啓動時間) / 預熱時間 * 權重
        // 取 ww 和 權重 中的最小值
        // 若是 當前時間 還在 預熱時間 內,那麼此處 ww 必然小於 權重
        // 若是 當前時間 和 啓動時間 相差很是近,或者 預熱時間 很長,那麼此處 ww 有可能會小於 1,此處會返回 1
        // 若是 當前時間 小於 啓動時間,那麼是服務的時間問題,ww 就會小於 0,此處會返回 1

        // 從 getWeight(...) 方法可知,此處 ww 必然小於 weight
        int ww = (int) ( uptime / ((float) warmup / weight));
        return ww < 1 ? 1 : (Math.min(ww, weight));
    }

    /**
     * 接口抽象方法 select 的實現,也是模版的核心方法
     * 
     * @param invokers   全部的服務提供者信息的封裝
     * @param url        當前調用者的 url
     * @param invocation  要發送的信息
     */
    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        // 若是沒有服務提供者,此處返回 null
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }

        // 服務的提供者只有一個,直接返回就能夠了,沒有負載均衡的必要
        if (invokers.size() == 1) {
            return invokers.get(0);
        }

        // 有多個,那麼此處須要不一樣的策略自行完成具體邏輯
        return doSelect(invokers, url, invocation);
    }

    /**
     * 模板方法的具體實現,從列表中選擇一個 invoker
     *
     * @param invokers 全部的服務提供者信息的封裝
     * @param url     當前調用者的 url
     * @param invocation 要發送的信息
     */
    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);


    /**
     * 獲取 invoker 權重的方法,在 random 和 robin      * 中很重要的方法
     */
    int getWeight(Invoker<?> invoker, Invocation invocation) {
        int weight;

        // 獲取 url
        URL url = invoker.getUrl();

        // REGISTRY_SERVICE_REFERENCE_PATH = org.apache.dubbo.registry.RegistryService
        // REGISTRY_KEY = registry
        // WEIGHT_KEY = weight
        // DEFAULT_WEIGHT = 100
        // TIMESTAMP_KEY = timestamp
        // WARMUP_KEY = warmup
        // DEFAULT_WARMUP = 600000

        if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
            // 入參 registry.weight 和 100
            weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
        } else {

            // provider 的權重
            weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
            if (weight > 0) { // 權重大於 0
                // provider 的啓動的時間戳
                long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
                if (timestamp > 0L) {
                    long uptime = System.currentTimeMillis() - timestamp;
                    if (uptime < 0) {
                        // 啓動的時間戳小於當前時間戳,這種狀況多是存在服務器時間問題
                        // 此處爲什麼返回 1 ?
                        return 1;
                    }

                    // warmup 是預熱時間,若是當前時間內,這個 provider 還處於預熱當中
                    // 那麼就會調用到 calculateWarmupWeight(...) 方法
                    int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
                    if (uptime > 0 && uptime < warmup) {
                        weight = calculateWarmupWeight((int)uptime, warmup, weight);
                    }
                }
            }
        }

        // 權重不能低於 0
        return Math.max(weight, 0);
    }
}

三 負載均衡策略實現

在 Dubbo 3.0 中,負載均衡策略存在如下幾種:服務器

  • RandomLoadBalance (隨機)
  • RoundRobinLoadBalance (輪詢)
  • ShortestResponseLoadBalance (最短反饋)
  • LeastActiveLoadBalance (最少活躍)
  • ConsistentHashLoadBalance (一致性 hash)

筆者這裏暫時只列舉前三種,後面兩種有緣補充 (其實是由於還沒看完)。負載均衡

1 RandomLoadBalance

默認策略,其實是考慮了權重以後的隨機選擇,若是每一個服務提供者的權重都一致,那麼就使用 java 的隨機函數去選擇一個。dom

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 考慮權重值以後的隨機負載均衡
 */
public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        // 獲取服務提供者的數量
        int length = invokers.size();

        // 默認全部的服務提供者是相同的權重
        boolean sameWeight = true;

        // 權重數組
        int[] weights = new int[length];

        // 獲取第一個服務提供者的權重
        int firstWeight = getWeight(invokers.get(0), invocation);
        // 存入數組
        weights[0] = firstWeight;

        // 權重的和
        int totalWeight = firstWeight;

        // 輪詢全部的提供者的權重並記錄下來
        for (int i = 1; i < length; i++) {
            // 此處和上方代碼相似
            int weight = getWeight(invokers.get(i), invocation);
            weights[i] = weight;
            totalWeight += weight;

            // 若是遇到不同的就把標識改爲 false
            if (sameWeight && weight != firstWeight) {
                sameWeight = false;
            }
        }

        // 不一樣權重模式下的隨機計算
        // 大概思路是 row 一個隨機值,並按照順序進行相減,觀察落在哪一個區間內
        if (totalWeight > 0 && !sameWeight) {
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            for (int i = 0; i < length; i++) {
                offset -= weights[i];
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }

        // 相同權重下的隨機計算
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }

}

2 RoundRobinLoadBalance

輪詢負載均衡策略,本質上也是考慮了權重以後的輪循。若是 A 服務提供者的權重是 B 服務提供者的兩倍,那麼理論上 A 被輪循到的次數就會是 B 的兩倍。ide

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Round robin load balance.
 *
 * 輪詢負載均衡策略
 */
public class RoundRobinLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "roundrobin";

    private static final int RECYCLE_PERIOD = 60000;

    /**
     * 權重的封裝
     */
    protected static class WeightedRoundRobin {
        private int weight;
        private AtomicLong current = new AtomicLong(0);
        private long lastUpdate;

        public int getWeight() {
            return weight;
        }

        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }

        public long increaseCurrent() {
            return current.addAndGet(weight);
        }

        public void sel(int total) {
            current.addAndGet(-1 * total);
        }

        public long getLastUpdate() {
            return lastUpdate;
        }

        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }

    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();

    /**
     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     *
     * @param invokers
     * @param invocation
     * @return
     */
    protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map != null) {
            return map.keySet();
        }
        return null;
    }

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        // key = serviceKey + methodName
        // 這個 key 表明一個 provider 接口
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();

        // 獲取權重記錄,若是沒有的話會建立一個空 map
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();


        Invoker<T> selectedInvoker = null; // 被選中的 provider
        WeightedRoundRobin selectedWRR = null; // 被選中的 provider 的權重 entity
        for (Invoker<T> invoker : invokers) {

            // 此處若是存在權重記錄就直接返回,不存在就初始化一個
            // identifyString 是緩存的 key
            String identifyString = invoker.getUrl().toIdentityString();

            /*
                 獲取權重的封裝對象,若是沒有的話會建立一個
                 WeightedRoundRobin 維護兩個重要的參數,
                 一個數 current,表明該 provider 當前的調用權重
                 一個是 weight,表明該 provider 恆定的配置權重
             */
            int weight = getWeight(invoker, invocation);
            WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
                WeightedRoundRobin wrr = new WeightedRoundRobin();
                wrr.setWeight(weight);
                return wrr;
            });

            // 改權重數據
            if (weight != weightedRoundRobin.getWeight()) {
                weightedRoundRobin.setWeight(weight);
            }

            // cur = weightedRoundRobin.current + weightedRoundRobin.weight
            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);

            // 此處的 cur > maxCurrent,本質上選出了全部 provider 中 current 最大的一個
            // 此處結合上述邏輯,至關於給每一個 provider 的 current 增長了一次 weight
            // 並選出了 current 最大的那一個,做爲調用方
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight += weight;
        }


        // 對 map 進行自檢
        // 若是超過 60 秒都沒有被調用,此處即認爲服務已經異常,就會移除
        if (invokers.size() != map.size()) {
            map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
        }
        if (selectedInvoker != null) {
            // weightedRoundRobin.current = weightedRoundRobin.current - totalWeight
            // 至關於
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }


        /**
         * 上述邏輯簡圖
         * 假設三個服務 s1,s2,s3 權重均爲 10
         *
         * 第一輪疊加權重後的 current:
         * 10 10 10
         * 第一輪選擇推送 s1,推送完成後的 current:
         * -20 10 10
         *
         * 第二輪疊加權重後的 current:
         * -10 20 20
         * 第二輪選擇推送 s2,推送完成後的 current:
         * -10 -10 20
         *
         * 第三輪疊加權重後的 current:
         * 0 0 30
         * 第三輪選擇推送 s3,推送完成後的 current:
         * 0 0 0
         *
         * 第四輪疊加權重後的 current:
         * 10 10 10
         * 第四輪選擇推送 s1,推送完成後的 current:
         * -20 10 10
         *
         *
         * 以此類推。
         */

        // 上述代碼出問題的狀況下默認選第一個
        return invokers.get(0);
    }
}

3 ShortestResponseLoadBalance

根據響應時間和當前服務的請求量去得到一個最優解。若是存在多個最優解,則考慮權重,若是僅有一個則權重無效。函數

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcStatus;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 根據最優解選擇服務提供者
 */
public class ShortestResponseLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "shortestresponse";

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        // 可調用的服務提供者的數量
        int length = invokers.size();
        // 初始化一個最短 response 時間
        long shortestResponse = Long.MAX_VALUE;
        // 初始化一個最短 response 總數
        int shortestCount = 0;
        // The index of invokers having the same estimated shortest response time
        int[] shortestIndexes = new int[length];
        // 每一個服務提供者的權重
        int[] weights = new int[length];
        // 權重和
        int totalWeight = 0;
        // 調用平均返回時間最短的服務提供者的權重
        int firstWeight = 0;
        // 權重是否相同
        boolean sameWeight = true;

        // 輪詢全部的服務提供者
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // 獲取服務提供者的狀態
            RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            // 平均服務調用成功返回時間
            long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
            // 正在活躍的請求數
            int active = rpcStatus.getActive();
            // 此處用平均時間乘以活躍數,得到打分
            // 若是服務提供方很健壯,平均時間很短,可是請求分配的不少,這裏分數也會比較高
            // 分數越低,優先級越高
            long estimateResponse = succeededAverageElapsed * active;

            // 獲取權重
            int afterWarmup = getWeight(invoker, invocation);
            weights[i] = afterWarmup;

            /**
             * 計算最短數組,shortestResponse 記錄當前最短的
             */
            if (estimateResponse < shortestResponse) {
                // 若是當前服務提供者的得分低於最低的得分,則更新最低得分,
                // 並將最優提供者數組的首位置爲當前的提供者
                shortestResponse = estimateResponse;
                shortestCount = 1;
                shortestIndexes[0] = i;
                totalWeight = afterWarmup;
                firstWeight = afterWarmup;
                sameWeight = true;
            } else if (estimateResponse == shortestResponse) {
                // 若是相等,則可能存在多個最優解
                shortestIndexes[shortestCount++] = i;
                totalWeight += afterWarmup;
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }

        // 最優解只有一個的狀況,直接選最優解進行調用
        if (shortestCount == 1) {
            return invokers.get(shortestIndexes[0]);
        }
        // 最優解不止一個,且最優解之間的權重不一樣,那麼此處根據權重去隨機選擇一個
        if (!sameWeight && totalWeight > 0) {
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            for (int i = 0; i < shortestCount; i++) {
                int shortestIndex = shortestIndexes[i];
                offsetWeight -= weights[shortestIndex];
                if (offsetWeight < 0) {
                    return invokers.get(shortestIndex);
                }
            }
        }

        // 最優解不止一個,且權重相同,則隨機選擇
        return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
    }
}

本文僅爲我的的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充學習

相關文章
相關標籤/搜索