在分析Sentinel的上一篇文章中,咱們知道了它是基於滑動窗口作的流量統計,那麼在當咱們可以根據流量統計算法拿到流量的實時數據後,下一步要作的事情天然就是基於這些數據作流控。在介紹Sentinel
的流控模型以前,咱們先來簡單看下 Sentinel 後臺是如何去定義一個流控規則的
html
對於上圖的配置Sentinel
把它抽象成一個FlowRule
類,與其屬性一一對應java
下面咱們來看下選擇流控策略和流控效果的核心代碼node
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) { // 根據流控策略選擇須要流控的Node維度節點 Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } // 獲取配置的流控效果 控制器 (1. 直接拒絕 2. 預熱啓動 3. 排隊 4. 預熱啓動排隊等待) return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
上面的代碼比較簡單流程也很清晰,首先根據咱們配置的流控策略獲取到合適維度的 Node 節點(Node節點是Sentinel作流量統計的基本單位),而後再獲取到規則中配置的流控效果控制器(1. 直接拒絕 2. 預熱啓動 3. 排隊等待 4.預熱啓動排隊等待)。算法
下面咱們來看下選擇流控策略的源碼分析api
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) { // 獲取限流來源 limitApp String limitApp = rule.getLimitApp(); // 獲取限流策略 int strategy = rule.getStrategy(); // 獲取當前 上下文的 來源 String origin = context.getOrigin(); // 若是規則配置的限流來源 limitApp 等於 當前上下文來源 if (limitApp.equals(origin) && filterOrigin(origin)) { // 且配置的流控策略是 直接關聯策略 if (strategy == RuleConstant.STRATEGY_DIRECT) { // 直接返回當前來源 origin 節點 return context.getOriginNode(); } // 配置的策略爲關聯或則鏈路 return selectReferenceNode(rule, context, node); // 若是規則配置的限流來源 limitApp 等於 default } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { // 且配置的流控策略是 直接關聯策略 if (strategy == RuleConstant.STRATEGY_DIRECT) { // 直接返回當前資源的 clusterNode return node.getClusterNode(); } // 配置的策略爲關聯或則鏈路 return selectReferenceNode(rule, context, node); // 若是規則配置的限流來源 limitApp 等於 other,且當前上下文origin不在流控規則策略中 } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { // 且配置的流控策略是 直接關聯策略 if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } // 配置的策略爲關聯或則鏈路 return selectReferenceNode(rule, context, node); } return null; } static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { // 關聯資源名稱 (若是策略是關聯 則是關聯的資源名稱,若是策略是鏈路 則是上下文名稱) String refResource = rule.getRefResource(); int strategy = rule.getStrategy(); if (StringUtil.isEmpty(refResource)) { return null; } // 策略是關聯 if (strategy == RuleConstant.STRATEGY_RELATE) { // 返回關聯的資源ClusterNode return ClusterBuilderSlot.getClusterNode(refResource); } // 策略是鏈路 if (strategy == RuleConstant.STRATEGY_CHAIN) { // 當前上下文名稱不是規則配置的name 直接返回null if (!refResource.equals(context.getName())) { return null; } return node; } // No node. return null; }
這段代碼的邏輯判斷比較多,咱們稍微理一下整個過程併發
LimitApp
的做用域只在配置的流控策略爲RuleConstant.STRATEGY_DIRECT
(直接關聯)時起做用。其有三種配置,分別爲default
,origin_name
,other
關於流控效果的配置有四種,咱們來看下它們的初始化代碼ide
/** * class com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil */ private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) { // 只有Grade爲統計 QPS時 才能夠選擇除默認流控效果外的 其餘流控效果控制器 if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { switch (rule.getControlBehavior()) { // 預熱啓動 case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); // 超過 閾值 排隊等待 控制器 case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: // 上面兩個的結合體 return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: default: // Default mode or unknown mode: default traffic shaping controller (fast-reject). } } // 默認控制器 超過 閾值 直接拒絕 return new DefaultController(rule.getCount(), rule.getGrade()); }
能夠比較清晰的看到總共對應有四種流控器的初始化源碼分析
@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 獲取當前qps int curCount = avgUsedTokens(node); // 判斷是否已經大於閾值 if (curCount + acquireCount > count) { // 若是當前流量具備優先級,則會提早去獲取將來的經過資格 if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }
此種策略比較簡單粗暴,超過流量閾值的會直接拒絕。不過這裏有一個小細節,若是入口流量prioritized爲true,也就是優先級比較高,則會經過佔用將來時間窗口的名額來實現。這個在上一篇文章有介紹到 ui
WarmUpController
主要是用來防止流量的忽然上升,使系統本在穩定狀態下能處理的,可是因爲許多資源沒有預熱,致使處理不了了。注意這裏的預熱並非指系統啓動以後的一次性預熱,而是指系統在運行的任什麼時候候流量從低峯到突增的預熱階段。this
下面咱們來看下WarmUpController
的具體實現類
/** * WarmUpController 構造方法 * @param count 當前qps閾值 * @param warmUpPeriodInSec 預熱時長 秒 * @param coldFactor 冷啓動係數 默認爲3 */ private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; this.coldFactor = coldFactor; // 剩餘Token的警惕值,小於警惕值系統就進入正常運行期 warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1); // 系統最冷時候的剩餘Token數 maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); // 系統預熱的速率(斜率) slope = (coldFactor - 1.0) / count / (maxToken - warningToken); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { long passQps = (long) node.passQps(); long previousQps = (long) node.previousPassQps(); // 計算當前的 剩餘 token 數 syncToken(previousQps); // 若是進入了警惕線,開始調整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { // 計算剩餘token超出警惕值的值 long aboveToken = restToken - warningToken; // 計算當前容許經過的最大 qps double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { // 不在預熱階段,則直接判斷當前qps是否大於閾值 if (passQps + acquireCount <= count) { return true; } } return false; }
首先是構造方法,主要關注2個重要參數
要理解這兩個參數的含義,能夠參考令牌桶算法,每經過一個請求,就會從令牌桶中取走一個令牌。那麼試想一下,當令牌桶中的令牌達到最大值是,是否是意味着系統目前處於最冷階段,由於桶裏的令牌始終處於一個很是飽和的狀態。這裏的令牌最大值對應的就是maxToken
,而warningToken
,則是對應了一個警惕值,當桶中的令牌數減小到一個指定的值時,說明系統已經度過了預熱階段
當一個請求進來時,首先須要計算當前桶中剩餘的token數,具體邏輯在syncToken
方法中
當系統剩餘Token大於warningToken時,說明系統仍處於預熱階段,故須要調整當前所能經過的最大qps閾值
protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); // 獲取秒級別時間(去除毫秒) currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return; } long oldValue = storedTokens.get(); // 判斷是否須要往桶中添加令牌 long newValue = coolDownTokens(currentTime, passQps); // 設置新的token數 if (storedTokens.compareAndSet(oldValue, newValue)) { // 若是設置成功的話則減去上次經過的qps數量,就獲得當前的實際token數 long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); } }
private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 添加令牌的幾種狀況 // 1. 系統初始啓動階段,oldvalue = 0,lastFilledTime也等於0,此時獲得一個很是大的newValue,會取maxToken爲當前token數量值 // 2. 系統處於預熱階段 且 當前qps小於 count / coldFactor // 3. 系統處於完成預熱階段 if (oldValue < warningToken) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { if (passQps < (int)count / coldFactor) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); }
這裏看一下會添加令牌的幾種狀況
前2種狀況比較好理解,這裏主要解釋一下第三種狀況,爲什麼 當前qps
小於count / coldFactor
時,須要往桶中添加Token?試想一下若是沒有這一步會怎麼樣,若是沒有這一步在比較低的qps狀況下補充Token,系統最終也會慢慢度過預熱階段,但實際上這麼低的qps(小於 count / coldFactor時
)不該該完成預熱。因此這裏纔會在 qps低於count / coldFactor
時補充剩餘token數,來讓系統在低qps狀況下始終處於預熱狀態下
排隊等待的實現相對預熱啓動實現比較簡單
首先會經過咱們的配置,計算出相鄰兩個請求容許經過的最小時間,而後會記錄最近一個經過的時間。二者相加便是下一次請求容許經過的最小時間。
public boolean canPass(Node node, int acquireCount, boolean prioritized) { if (acquireCount <= 0) { return true; } if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); // 計算相隔兩個請求 須要相隔多長時間 long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 本次指望經過的最小時間 long expectedTime = costTime + latestPassedTime.get(); // 若是當前時間大於指望時間,說明qps還未超過閾值,直接經過 if (expectedTime <= currentTime) { latestPassedTime.set(currentTime); return true; } else { // 當前時間小於於指望時間,請求過快了,須要排隊等待指定時間 // 計算等待時間 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); // 等待時長大於咱們設置的最大時長,則不經過 if (waitTime > maxQueueingTimeMs) { return false; } else { // 不然則排隊等待,佔用下經過時間 long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); // 判斷等待時間是否已經大於最大值 if (waitTime > maxQueueingTimeMs) { // 大於則將上一步加的值從新減去 latestPassedTime.addAndGet(-costTime); return false; } // in race condition waitTime may <= 0 // 佔用等待時間成功,直接sleep costTime if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; }
排隊等待控制器的核心策略其實就是圍繞了latestPassedTime
進行的,latestPassedTime
指的是上一次請求經過的時間,經過latestPassedTime
+ costTime
來與當前時間作比較,來判斷當前請求是否能夠經過,沒法經過的請求則會優先佔用latestPassedTime
時間,直到sleep到能夠經過的時間。固然咱們也能夠配置排隊等待的最大時間,來限制目前排隊等待經過的請求數量。
預熱排隊等待,WarmUpRateLimiterController
實現類咱們發現其繼承了WarmUpController
,這是Sentinel在1.4版本後新加的一種控制器,其實就是預熱啓動和排隊等待的結合體,具體源碼咱們就不作分析。
Sentinel
的流控策略和流控效果的相結合使用仍是很是巧妙的,當中的一些設計思想仍是很是有借鑑意義的