各位中秋節快樂啊,我以爲在這個月圓之夜有必要寫一篇源碼解析,以表示我心裏的高興~html
Sentinel源碼解析系列:java
1.Sentinel源碼分析—FlowRuleManager加載規則作了什麼?node
2. Sentinel源碼分析—Sentinel是如何進行流量統計的?c#
3. Sentinel源碼分析— QPS流量控制是如何實現的?數組
在個人第二篇文章裏面2. Sentinel源碼分析—Sentinel是如何進行流量統計的?裏面介紹了整個Sentinel的主流程是怎樣的。因此降級的大體流程能夠概述爲: 1. 設置降級策略,是根據平均響應時間仍是異常比例來進行降級的 2. 根據資源建立一系列的插槽 3. 依次調用插槽,根據設定的插槽類型來進行降級bash
咱們先來看個例子,方便你們本身斷點跟蹤:app
private static final String KEY = "abc";
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
List<DegradeRule> rules = new ArrayList<DegradeRule>();
DegradeRule rule = new DegradeRule();
rule.setResource(KEY);
// set threshold rt, 10 ms
rule.setCount(10);
rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
rule.setTimeWindow(10);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry entry = null;
try {
TimeUnit.MILLISECONDS.sleep(5);
entry = SphU.entry(KEY);
// token acquired
pass.incrementAndGet();
// sleep 600 ms, as rt
TimeUnit.MILLISECONDS.sleep(600);
} catch (Exception e) {
block.incrementAndGet();
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
}
});
entryThread.setName("working-thread");
entryThread.start();
}
}
複製代碼
其餘的流程基本上和第二篇文章裏介紹的差很少,這篇文章來介紹Sentinel的主流程,Sentinel的降級策略所有都是在DegradeSlot中進行操做的。less
DegradeSlotide
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
複製代碼
DegradeSlot會直接調用DegradeRuleManager進行降級的操做,咱們直接進入到DegradeRuleManager.checkDegrade方法中。源碼分析
DegradeRuleManager#checkDegrade
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException {
//根據resource來獲取降級策略
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
複製代碼
這個方法邏輯也是很是的清晰,首先是根據資源名獲取到註冊過的降級規則,而後遍歷規則集合調用規則的passCheck,若是返回false那麼就拋出異常進行降級。
DegradeRule#passCheck
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
//返回false直接進行降級
if (cut.get()) {
return false;
}
//降級是根據資源的全局節點來進行判斷降級策略的
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
//根據響應時間降級策略
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//獲取節點的平均響應時間
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
//rtSlowRequestAmount默認是5
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
// 根據異常比例降級
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
// 根據異常數降級
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
//根據設置的時間窗口進行重置
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
複製代碼
這個方法首先會去獲取cut的值,若是是true那麼就直接進行限流操做。而後會根據resource獲取ClusterNode全局節點。往下分別根據三種不一樣的策略來進行降級。
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//獲取節點的平均響應時間
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
//rtSlowRequestAmount默認是5
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
}
複製代碼
若是是根據響應時間進行降級,那麼會獲取clusterNode的平均響應時間,若是平均響應時間大於所設定的count(默認是毫秒),那麼就調用passCount加1,若是passCount大於5,那麼直接降級。
因此看到這裏咱們應該知道根據平均響應時間降級前幾個請求即便響應過長也不會立馬降級,而是要等到第六個請求到來纔會進行降級。
咱們進入到clusterNode的avgRt方法中看一下是如何獲取到clusterNode的平均響應時間的。
clusterNode是StatisticNode的實例 StatisticNode#avgRt
public double avgRt() {
//獲取當前時間窗口內調用成功的次數
long successCount = rollingCounterInSecond.success();
if (successCount == 0) {
return 0;
}
//獲取窗口內的響應時間
return rollingCounterInSecond.rt() * 1.0 / successCount;
}
```e
這個方法主要是調用rollingCounterInSecond獲取成功次數,而後再獲取窗口內的響應時間,用總響應時間除以次數獲得平均每次成功調用的響應時間。
在[1.Sentinel源碼分析—FlowRuleManager加載規則作了什麼?](https://www.cnblogs.com/luozhiyun/p/11439993.html)中,我已經具體講述了StatisticNode裏面的rollingCounterInMinute實現原理,rollingCounterInMinute是按分鐘進行統計的時間窗口。如今咱們來說一下rollingCounterInSecond按秒來進行統計的時間窗口。
在StatisticNode裏面初始化rollingCounterInSecond:
```java
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
複製代碼
在這個初始化的方法裏,會傳入兩個參數,SampleCountProperty.SAMPLE_COUNT的值是2, IntervalProperty.INTERVAL的值是1000。
咱們進入到ArrayMetric的構造方法中:
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
複製代碼
在建立ArrayMetric實例的時候會給data建立一個OccupiableBucketLeapArray實例。
OccupiableBucketLeapArray
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
複製代碼
OccupiableBucketLeapArray繼承LeapArray這個抽象類,初始化的時候會調用父類的構造器: LeapArray
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
//intervalInMs是sampleCount的整數
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
//每一個小窗口的時間跨度
this.windowLengthInMs = intervalInMs / sampleCount;
//窗口的長度
this.intervalInMs = intervalInMs;
//窗口個數
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
複製代碼
OccupiableBucketLeapArray在初始化的時候也會建立一個FutureBucketLeapArray實例賦值給borrowArray。
FutureBucketLeapArray也是繼承LeapArray:
public FutureBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "BorrowBucketArray".
super(sampleCount, intervalInMs);
}
複製代碼
直接經過調用父類LeapArray的構造方法進行初始化。
到這裏rollingCounterInSecond的建立過程講完了。
下面咱們再回到StatisticNode中,在調用StatisticNode的avgRt方法的時候會調用rollingCounterInSecond.success()方法獲取當前時間窗口的調用成功次數:
ArrayMetric#success
public long success() {
//設置或更新當前的時間窗口
data.currentWindow();
long success = 0;
//獲取窗口裏有效的Bucket
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
success += window.success();
}
return success;
}
複製代碼
這裏的data是的父類是LeapArray,LeapArray裏面有一個array數組,用來記錄時間窗口,在咱們這裏是基於秒鐘的時間窗口,因此array的大小爲2。data的結構圖我直接從1.Sentinel源碼分析—FlowRuleManager加載規則作了什麼?中拿過來:
只不過這裏的WindowWrap數組元素只有兩個,每個WindowWrap元素由MetricBucket對象構成,用來統計數據,如:經過次數、阻塞次數、異常次數等~
調用data的currentWindow方法會調用到LeapArray的currentWindow方法中去: LeapArray#currentWindow
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//經過當前時間判斷屬於哪一個窗口
int idx = calculateTimeIdx(timeMillis);
//計算出窗口開始時間
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//獲取數組裏的老數據
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 若是對應時間窗口的開始時間與計算獲得的開始時間同樣
// 那麼表明當前便是咱們要找的窗口對象,直接返回
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
//若是當前的開始時間小於原開始時間,那麼就更新到新的開始時間
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
//通常來講不會走到這裏
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
複製代碼
這裏我簡單介紹一下這個方法,這個方法的詳細講解已經在第一篇源碼分析裏作了。
這個方法裏面會根據當前的時間戳來計算出array數組裏面的index,而後去array數組中找相應的數據,若是節點已經存在,那麼用CAS更新一個新的節點;若是節點是新的,那麼直接返回;若是節點失效了,設置當前節點,清除全部失效節點。
這裏我直接引用1.Sentinel源碼分析—FlowRuleManager加載規則作了什麼?中的例子:
1. 若是array數據裏面的bucket數據以下所示:
NULL B4
|_______|_______|
800 1000 1200
^
time=888
正好當前時間所對應的槽位裏面的數據是空的,那麼就用CAS更新
2. 若是array裏面已經有數據了,而且槽位裏面的窗口開始時間和當前的開始時間相等,那麼直接返回
B3 B4
||_______|_______||___
800 1000 1200 timestamp
^
time=888
3. 例如當前時間是1676,所對應窗口裏面的數據的窗口開始時間小於當前的窗口開始時間,那麼加上鎖,而後設置槽位的窗口開始時間爲當前窗口開始時間,並把槽位裏面的數據重置
(old)
B0
|_______||_______|
... 1200 1400
^
time=1676
複製代碼
再回到ArrayMetric的success方法中,往下走調用data.values()方法: LeapArray#success
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
複製代碼
這個方法就是用來獲取全部有效的MetricBucket,並返回。 而後經過調用MetricBucket的success方法獲取被成功調用的次數。
咱們接着來看ArrayMetric的rt方法:
public long rt() {
data.currentWindow();
long rt = 0;
//獲取當前時間窗口的統計數據
List<MetricBucket> list = data.values();
//統計當前時間窗口的平均相應時間之和
for (MetricBucket window : list) {
rt += window.rt();
}
return rt;
}
複製代碼
這個方法和上面的success方法差很少,獲取全部的MetricBucket的rt數據求和返回。 而後就能夠經過rt方法返回的時間總和除以成功調用的總和求得平均數。
咱們再回到DegradeRule的passCheck方法中的響應時間降級策略中:
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//獲取節點的平均響應時間
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
//rtSlowRequestAmount默認是5
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
// 根據異常比例降級
}
//省略
return false;
複製代碼
若是求得的平均響應時間小於設置的count時間,那麼就重置passCount並返回true,表示不拋出異常;若是有連續5次的響應時間都超過了count,那麼就返回false拋出異常進行降級。
if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
//獲取每秒異常的次數
double exception = clusterNode.exceptionQps();
//獲取每秒成功的次數
double success = clusterNode.successQps();
//獲取每秒總調用次數
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
// 若是總調用次數少於5,那麼不進行降級
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
}
。。。
return false;
複製代碼
這個方法中獲取成功調用的Qps和異常調用的Qps,驗證後,而後求一下比率,若是沒有大於count,那麼就返回true,不然返回false拋出異常。
咱們再進入到exceptionQps方法中看一下: StatisticNode#exceptionQps
public double exceptionQps() {
return rollingCounterInSecond.exception() / rollingCounterInSecond.getWindowIntervalInSec();
}
複製代碼
rollingCounterInSecond.getWindowIntervalInSec方法是表示窗口的時間長度,用秒來表示。這裏返回的是1。 ArrayMetric#exception
public long exception() {
data.currentWindow();
long exception = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
exception += window.exception();
}
return exception;
}
複製代碼
這個方法和我上面分析的差很少,你們看看就行了。
if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
複製代碼
根據異常數降級是很是的直接的,直接根據統計的異常總次數判斷是否超過count。
到這裏就講完了降級的實現咯~~