5.Sentinel源碼分析—Sentinel如何實現自適應限流?

Sentinel源碼解析系列: 1.Sentinel源碼分析—FlowRuleManager加載規則作了什麼?html

2. Sentinel源碼分析—Sentinel是如何進行流量統計的?java

3. Sentinel源碼分析— QPS流量控制是如何實現的?node

4.Sentinel源碼分析— Sentinel是如何作到降級的?git


這篇文章主要學習一下Sentinel如何實現自適應限流的。 爲何要作自適應限流,官方給了兩個理由:github

  1. 保證系統不被拖垮
  2. 在系統穩定的前提下,保持系統的吞吐量

我再貼一下官方的原理: c#

  1. 可以保證水管裏的水量,可以讓水順暢的流動,則不會增長排隊的請求;也就是說,這個時候的系統負載不會進一步惡化。
  2. 當保持入口的流量是水管出來的流量的最大的值的時候,能夠最大利用水管的處理能力。 更加具體的原理解釋能夠看官方:系統自適應限流

因此看起來好像很厲害的樣子,因此咱們來看看具體實現吧。併發

例子:app

  1. 設置系統自適應規則
List<SystemRule> rules = new ArrayList<SystemRule>();
SystemRule rule = new SystemRule();
//限制最大負載
rule.setHighestSystemLoad(3.0);
// cpu負載60%
rule.setHighestCpuUsage(0.6);
// 設置平均響應時間 10 ms
rule.setAvgRt(10);
// 設置qps is 20
rule.setQps(20);
// 設置最大線程數 10
rule.setMaxThread(10);

rules.add(rule);
SystemRuleManager.loadRules(Collections.singletonList(rule));
複製代碼
  1. 設置限流
Entry entry = null;
try {
    entry = SphU.entry("methodA", EntryType.IN); 
    //dosomething
} catch (BlockException e1) {
    block.incrementAndGet();
    //dosomething
} catch (Exception e2) {
    // biz exception
} finally { 
    if (entry != null) {
        entry.exit();
    }
}
複製代碼

注意:系統保護規則是應用總體維度的,而不是資源維度的,而且僅對入口流量生效。入口流量指的是進入應用的流量(EntryType.IN),好比 Web 服務或 Dubbo 服務端接收的請求,都屬於入口流量。源碼分析

咱們先講一下SystemRuleManager這個類在初始化的時候作了什麼吧。學習

SystemRuleManager

private static SystemStatusListener statusListener = null;
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
    new NamedThreadFactory("sentinel-system-status-record-task", true));

static {
    checkSystemStatus.set(false);
    statusListener = new SystemStatusListener();
    scheduler.scheduleAtFixedRate(statusListener, 5, 1, TimeUnit.SECONDS);
    currentProperty.addListener(listener);
}
複製代碼

SystemRuleManager初始化的時候會調用靜態代碼塊,而後用scheduler線程池定時調用SystemStatusListener類的run方法。咱們進入到SystemStatusListener類裏看一下:

SystemStatusListener#run

public void run() {
    try {
        OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
        currentLoad = osBean.getSystemLoadAverage();
       
        currentCpuUsage = osBean.getSystemCpuLoad();

        StringBuilder sb = new StringBuilder();
        if (currentLoad > SystemRuleManager.getHighestSystemLoad()) {
            sb.append("load:").append(currentLoad).append(";");
            sb.append("cpu:").append(currentCpuUsage).append(";");
            sb.append("qps:").append(Constants.ENTRY_NODE.passQps()).append(";");
            sb.append("rt:").append(Constants.ENTRY_NODE.avgRt()).append(";");
            sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append(";");
            sb.append("success:").append(Constants.ENTRY_NODE.successQps()).append(";");
            sb.append("minRt:").append(Constants.ENTRY_NODE.minRt()).append(";");
            sb.append("maxSuccess:").append(Constants.ENTRY_NODE.maxSuccessQps()).append(";");
            RecordLog.info(sb.toString());
        }

    } catch (Throwable e) {
        RecordLog.info("could not get system error ", e);
    }
}
複製代碼

這個方法用來作兩件事:

  1. 定時收集全局資源狀況,並打印日誌
  2. 給全局變量currentLoad和currentCpuUsage賦值,用來作限流使用。

而後看一下SystemRuleManager.loadRules方法。SystemRuleManager和其餘的規則管理是同樣的,當調用loadRules方法的時候會調用內部的listener並觸發它的configUpdate方法。 在SystemRuleManager中實現類了一個SystemPropertyListener,最終SystemRuleManager.loadRules方法會調用到SystemPropertyListener的configUpdate中。

SystemPropertyListener#configUpdate

public void configUpdate(List<SystemRule> rules) {
    restoreSetting();
    // systemRules = rules;
    if (rules != null && rules.size() >= 1) {
        for (SystemRule rule : rules) {
            loadSystemConf(rule);
        }
    } else {
        checkSystemStatus.set(false);
    }

    RecordLog.info(String.format("[SystemRuleManager] Current system check status: %s, "
            + "highestSystemLoad: %e, "
            + "highestCpuUsage: %e, "
            + "maxRt: %d, "
            + "maxThread: %d, "
            + "maxQps: %e",
        checkSystemStatus.get(),
        highestSystemLoad,
        highestCpuUsage,
        maxRt,
        maxThread,
        qps));
}
複製代碼

這個方法很簡單,首先是調用restoreSetting,用來重置rule的屬性,而後遍歷rule調用loadSystemConf對規則進行設置:

SystemRuleManager#loadSystemConf

public static void loadSystemConf(SystemRule rule) {
    boolean checkStatus = false;
    // Check if it's valid.

    if (rule.getHighestSystemLoad() >= 0) {
        highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
        highestSystemLoadIsSet = true;
        checkStatus = true;
    }

    if (rule.getHighestCpuUsage() >= 0) {
        highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
        highestCpuUsageIsSet = true;
        checkStatus = true;
    }

    if (rule.getAvgRt() >= 0) {
        maxRt = Math.min(maxRt, rule.getAvgRt());
        maxRtIsSet = true;
        checkStatus = true;
    }
    if (rule.getMaxThread() >= 0) {
        maxThread = Math.min(maxThread, rule.getMaxThread());
        maxThreadIsSet = true;
        checkStatus = true;
    }

    if (rule.getQps() >= 0) {
        qps = Math.min(qps, rule.getQps());
        qpsIsSet = true;
        checkStatus = true;
    }

    checkSystemStatus.set(checkStatus);

}
複製代碼

這些屬性都是在限流控制中會用到的屬性,不管設置哪一個屬性都會設置checkStatus=true表示開啓系統自適應限流。

在設置好限流規則後會進入到SphU.entry方法中,經過建立slot鏈調用到SystemSlot,這裏是系統自適應限流的地方。

SystemSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
	  //檢查一下是否符合限流條件,符合則進行限流
    SystemRuleManager.checkSystem(resourceWrapper);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
複製代碼

SystemRuleManager#checkSystem

public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
    // Ensure the checking switch is on.
    if (!checkSystemStatus.get()) {
        return;
    }
    //若是不是入口流量,那麼直接返回
    // for inbound traffic only
    if (resourceWrapper.getType() != EntryType.IN) {
        return;
    }

    // total qps
    double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
    if (currentQps > qps) {
        throw new SystemBlockException(resourceWrapper.getName(), "qps");
    }

    // total thread
    int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
    if (currentThread > maxThread) {
        throw new SystemBlockException(resourceWrapper.getName(), "thread");
    }

    double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
    if (rt > maxRt) {
        throw new SystemBlockException(resourceWrapper.getName(), "rt");
    }

    // load. BBR algorithm.
    if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
        if (!checkBbr(currentThread)) {
            throw new SystemBlockException(resourceWrapper.getName(), "load");
        }
    }

    // cpu usage
    if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
        if (!checkBbr(currentThread)) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }
}
複製代碼

這個方法首先會校驗一下checkSystemStatus狀態和EntryType是否是IN,若是不是則直接返回。 而後對Constants.ENTRY_NODE進行操做。這個對象是一個final static 修飾的變量,表明是全局對象。

public final static ClusterNode ENTRY_NODE = new ClusterNode();
複製代碼

因此這裏的限流操做都是對全局其做用的,而不是對資源起做用。ClusterNode仍是繼承自StatisticNode,因此最後都是調用StatisticNode的successQps、curThreadNum、avgRt,這幾個方法個人前幾篇文章都已經講過了,感興趣的能夠本身去翻一下,這裏就不過多涉及了。

在下面調用getCurrentSystemAvgLoad方法和getCurrentCpuUsage方法調用到SystemStatusListener設置的全局變量currentLoad和currentCpuUsage。這兩個參數是SystemRuleManager的定時任務定時收集的,忘了的同窗回到上面講解SystemRuleManager的地方看一下。

在作load判斷和cpu usage判斷的時候會還會調用checkBbr方法來判斷:

private static boolean checkBbr(int currentThread) {
    if (currentThread > 1 &&
        currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
        return false;
    }
    return true;
}
複製代碼

也就是說:當系統 load1 超過閾值,且系統當前的併發線程數超過系統容量時纔會觸發系統保護。系統容量由系統的 maxQps * minRt 計算得出。

StatisticNode#maxSuccessQps

public double maxSuccessQps() {
    return rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount();
}
複製代碼

maxSuccessQps方法是用窗口內的最大成功調用數和窗口數量相乘rollingCounterInSecond的窗口1秒的窗口數量是2,最大成功調用數以下得出: ArrayMetric#maxSuccess

public long maxSuccess() {
    data.currentWindow();
    long success = 0;

    List<MetricBucket> list = data.values();
    for (MetricBucket window : list) {
        if (window.success() > success) {
            success = window.success();
        }
    }
    return Math.max(success, 1);
}
複製代碼

最大成功調用數是經過整個遍歷整個窗口,獲取全部窗口裏面最大的調用數。因此這樣的最大的併發量是一個預估值,不是真實值。

看到這裏咱們再來看一下Constants.ENTRY_NODE的信息是怎麼被收集的。 我在分析StatisticSlot這個類的時候有一段代碼我當時也沒看懂有什麼用,如今就迎刃而解了: StatisticSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    try { 
			....
        if (resourceWrapper.getType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }
			....
    } catch (PriorityWaitException ex) {
			....
        if (resourceWrapper.getType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
        }
         ....
    } catch (BlockException e) {
			....
        if (resourceWrapper.getType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }
			.... 
        throw e;
    } catch (Throwable e) {
         ....
        if (resourceWrapper.getType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseExceptionQps(count);
        }
        throw e;
    }
}
複製代碼

在StatisticSlot的entry方法裏有不少對於type的判斷,若是是EntryType.IN,那麼就調用Constants.ENTRY_NODE的靜態方法進行數據的收集。

因此看到這裏咱們能夠知道,在前面有不少看不懂的代碼其實只要慢慢琢磨,打個標記,那麼在後面的解析的過程當中仍是可以慢慢看懂的。

共勉~~

相關文章
相關標籤/搜索