終於在這周內寫了一篇源碼解析,每週一篇即便再忙也不能打破html
Sentinel源碼解析系列: 1.Sentinel源碼分析—FlowRuleManager加載規則作了什麼? 2. Sentinel源碼分析—Sentinel是如何進行流量統計的?java
上回咱們用基於併發數來說了一下Sentinel的整個流程,這篇文章咱們來說一下Sentinel的QPS流量控制是如何實現的。node
先上一個極簡的demo,咱們的代碼就從這個demo入手:api
public static void main(String[] args) { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource("abc"); rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); Entry entry = null; try { entry = SphU.entry("abc"); //dosomething } catch (BlockException e1) { } catch (Exception e2) { // biz exception } finally { if (entry != null) { entry.exit(); } } }
在這個例子中咱們首先新建了一個FlowRule實例,而後調用了loadRules方法加載規則,這部分的代碼都和基於併發數的流量控制的代碼是同樣的,想要了解的朋友能夠去看看個人這一篇文章1.Sentinel源碼分析—FlowRuleManager加載規則作了什麼?,下面咱們說說不同的地方。併發
在調用FlowRuleManager的loadRules方法的時候會建立一個rater實例:app
FlowRuleUtil#buildFlowRuleMapless
//設置拒絕策略:直接拒絕、Warm Up、勻速排隊,默認是DefaultController TrafficShapingController rater = generateRater(rule); rule.setRater(rater);
咱們進入到generateRater看一下是怎麼建立實例的:ide
FlowRuleUtil#generateRater源碼分析
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { switch (rule.getControlBehavior()) { case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: //warmUpPeriodSec默認是10 return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: //rule.getMaxQueueingTimeMs()默認是500 return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: default: // Default mode or unknown mode: default traffic shaping controller (fast-reject). } } return new DefaultController(rule.getCount(), rule.getGrade()); }
這個方法裏面若是設置的是按QPS的方式來限流的話,能夠設置一個ControlBehavior屬性,用來作流量控制分別是:直接拒絕、Warm Up、勻速排隊。ui
接下來的全部的限流操做所有在FlowSlot中進行,不熟悉Sentinel流程的朋友能夠去看看個人這一篇文章:2. Sentinel源碼分析—Sentinel是如何進行流量統計的?,這篇文章介紹了Sentinel的全流程分析,本文的其餘流程基本都在這篇文章上講了,只有FlowSlot部分代碼不一樣。
接下來咱們來說一下FlowSlot裏面是怎麼實現QPS限流的
FlowSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); }
FlowSlot在實例化的時候會實例化一個FlowRuleChecker實例做爲checker。在checkFlow方法裏面會繼續調用FlowRuleChecker的checkFlow方法,其中ruleProvider實例是用來根據根據resource來從flowRules中獲取相應的FlowRule。
咱們進入到FlowRuleChecker的checkFlow方法中
FlowRuleChecker#checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } //返回FlowRuleManager裏面註冊的全部規則 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //若是當前的請求不能經過,那麼就拋出FlowException異常 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } }
這裏是調用ruleProvider來獲取全部FlowRule,而後遍歷rule集合經過canPassCheck方法來進行過濾,若是不符合條件則會拋出FlowException異常。
咱們跟進去直接來到passLocalCheck方法:
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { //節點選擇 Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } //根據設置的規則來攔截 return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
這個方法裏面會選擇好相應的節點後調用rater的canPass方法來判斷是否須要阻塞。
Rater有四個,分別是:DefaultController、RateLimiterController、WarmUpController、WarmUpRateLimiterController,咱們挨個分析一下。
其中DefaultController是直接拒絕策略,咱們在上一篇文章中已經分析過了,此次咱們來看看其餘三個。
它的中心思想是,以固定的間隔時間讓請求經過。當請求到來的時候,若是當前請求距離上個經過的請求經過的時間間隔不小於預設值,則讓當前請求經過;不然,計算當前請求的預期經過時間,若是該請求的預期經過時間小於規則預設的 timeout 時間,則該請求會等待直到預設時間到來經過(排隊等待處理);若預期的經過時間超出最大排隊時長,則直接拒接這個請求。
這種方式適合用於請求以突刺狀來到,這個時候咱們不但願一會兒把全部的請求都經過,這樣可能會把系統壓垮;同時咱們也期待系統以穩定的速度,逐步處理這些請求,以起到「削峯填谷」的效果,而不是拒絕全部請求。
要想使用這個策略須要在實例化FlowRule的時候設置rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)
這樣的一句代碼。
在實例化Rater的時候會調用FlowRuleUtil#generateRateri建立一個實例:
new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
MaxQueueingTimeMs默認是500 ,Count在咱們這個例子中傳入的是20。
咱們看一下具體的canPass方法是怎麼實現限流的:
public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // Reject when count is less or equal than 0. // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); //兩個請求預期經過的時間,也就是說把請求平均分配到1秒上 // Calculate the interval between every two requests. long costTime = Math.round(1.0 * (acquireCount) / count * 1000); //latestPassedTime表明的是上一次調用請求的時間 // Expected pass time of this request. long expectedTime = costTime + latestPassedTime.get(); //若是預期經過的時間加上上次的請求時間小於當前時間,則經過 if (expectedTime <= currentTime) { // Contention may exist here, but it's okay. latestPassedTime.set(currentTime); return true; } else { //默認是maxQueueingTimeMs // Calculate the time to wait. long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); //若是預提時間比當前時間大maxQueueingTimeMs那麼多,那麼就阻塞 if (waitTime > maxQueueingTimeMs) { return false; } else { //將上次時間加上此次請求要耗費的時間 long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); //再次判斷一下是否超過maxQueueingTimeMs設置的時間 if (waitTime > maxQueueingTimeMs) { //若是是的話就阻塞,並重置上次經過時間 latestPassedTime.addAndGet(-costTime); return false; } //若是須要等待的時間大於零,那麼就sleep // in race condition waitTime may <= 0 if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; }
這個方法一開始會計算一下costTime這個值,將請求平均分配到一秒中。例如:當 count 設爲 10 的時候,則表明一秒勻速的經過 10 個請求,也就是每一個請求平均間隔恆定爲 1000 / 10 = 100 ms。
可是這裏有個小bug,若是count設置的比較大,好比設置成10000,那麼costTime永遠都會等於0,整個QPS限流 將會失效。
而後會將costTime和上次的請求時間相加,若是大於當前時間就代表請求的太頻繁了,會將latestPassedTime這個屬性加上此次請求的costTime,並調用sleep方法讓這個線程先睡眠一會再請求。
這裏有個細節,若是多個請求同時一塊兒過來,那麼每一個請求在設置oldTime的時候都會經過addAndGet這個原子性的方法將latestPassedTime依次相加,並賦值給oldTime,這樣每一個線程的sleep的時間都不會相同,線程也不會同時醒來。
當系統長期處於低水位的狀況下,當流量忽然增長時,直接把系統拉昇到高水位可能瞬間把系統壓垮。經過"冷啓動",讓經過的流量緩慢增長,在必定時間內逐漸增長到閾值上限,給冷系統一個預熱的時間,避免冷系統被壓垮。
//默認爲3 private int coldFactor; //轉折點的令牌數 protected int warningToken = 0; //最大的令牌數 private int maxToken; //斜線斜率 protected double slope; //累積的令牌數 protected AtomicLong storedTokens = new AtomicLong(0); //最後更新令牌的時間 protected AtomicLong lastFilledTime = new AtomicLong(0); public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { construct(count, warmUpPeriodInSec, coldFactor); } private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; //默認是3 this.coldFactor = coldFactor; // thresholdPermits = 0.5 * warmupPeriod / stableInterval. // 10*20/2 = 100 // warningToken = 100; warningToken = (int) (warmUpPeriodInSec * count) / (coldFactor - 1); // / maxPermits = thresholdPermits + 2 * warmupPeriod / // (stableInterval + coldInterval) // maxToken = 200 maxToken = warningToken + (int) (2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); // slope // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits // - thresholdPermits); slope = (coldFactor - 1.0) / count / (maxToken - warningToken); }
這裏我拿一張圖來講明一下:
X 軸表明 storedPermits 的數量,Y 軸表明獲取一個 permits 須要的時間。
假設指定 permitsPerSecond 爲 10,那麼 stableInterval 爲 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍 )。也就是說,當達到 maxPermits 時,此時處於系統最冷的時候,獲取一個 permit 須要 300ms,而若是 storedPermits 小於 thresholdPermits 的時候,只須要 100ms。
利用 「獲取冷的 permits 」 須要等待更多時間,來限制突發請求經過,達到系統預熱的目的。
因此在咱們的代碼中,maxToken表明的就是圖中的maxPermits,warningToken表明的就是thresholdPermits,slope就是表明每次獲取permit減小的程度。
咱們接下來看看WarmUpController的canpass方法:
WarmUpController#canpass
public boolean canPass(Node node, int acquireCount, boolean prioritized) { //獲取當前時間窗口的流量大小 long passQps = (long) node.passQps(); //獲取上一個窗口的流量大小 long previousQps = (long) node.previousPassQps(); //設置 storedTokens 和 lastFilledTime 到正確的值 syncToken(previousQps); // 開始計算它的斜率 // 若是進入了警惕線,開始調整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { //經過計算當前的restToken和警惕線的距離來計算當前的QPS //離警惕線越接近,表明這個程序越「熱」,從而逐步釋放QPS long aboveToken = restToken - warningToken; //當前狀態下能達到的最高 QPS // current interval = restToken*slope+1/count double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); // 若是不會超過,那麼經過,不然不經過 if (passQps + acquireCount <= warningQps) { return true; } } else { // count 是最高能達到的 QPS if (passQps + acquireCount <= count) { return true; } } return false; }
這個方法裏經過syncToken(previousQps)設置storedTokens的值後,與警惕值作判斷,若是沒有達到警惕值,那麼經過計算和警惕值的距離再加上slope計算出一個當前的QPS值,storedTokens越大當前的QPS越小。
若是當前的storedTokens已經小於警惕值了,說明已經預熱完畢了,直接用count判斷就行了。
WarmUpController#syncToken
protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); //去掉毫秒的時間 currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return; } // 令牌數量的舊值 long oldValue = storedTokens.get(); // 計算新的令牌數量,往下看 long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { // 令牌數量上,減去上一分鐘的 QPS,而後設置新值 long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); } }
這個方法經過coolDownTokens方法來獲取一個新的value,而後經過CAS設置到storedTokens中,而後將storedTokens減去上一個窗口的QPS值,併爲lastFilledTime設置一個新的值。
其實我這裏有個疑惑,在用storedTokens減去上一個窗口的QPS的時候並無作控制,假如處理的速度很是的快,在一個窗口內就減了不少次,直接把當前的storedTokens減到了小於warningToken,那麼是否是就沒有在必定的時間範圍內啓動冷啓動的效果?
private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 添加令牌的判斷前提條件: // 當令牌的消耗程度遠遠低於警惕線的時候 if (oldValue < warningToken) { // 根據count數每秒加上令牌 newValue = (long) (oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { //若是還在冷啓動階段 // 若是當前經過的 QPS 大於 count/coldFactor,說明系統消耗令牌的速度,大於冷卻速度 // 那麼不須要添加令牌,不然須要添加令牌 if (passQps < (int) count / coldFactor) { newValue = (long) (oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); }
這個方法主要是用來作添加令牌的操做,若是是流量比較小或者是已經預熱完畢了,那麼就須要根據count數每秒加上令牌,若是是在預熱階段那麼就不進行令牌添加。
WarmUpRateLimiterController就是結合了冷啓動和勻速排隊,代碼很是的簡單,有了上面的分析,相信你們也能看得懂,因此也就不講解了。
原文出處:https://www.cnblogs.com/luozhiyun/p/11489128.html