做者javadoop,資深Java工程師。本文已獲做者受權發佈。
原文連接 https://www.javadoop.com/post...
本文主要介紹關於流控的兩部份內容。java
第一部分介紹 Guava 中 RateLimiter 的源碼,包括它的兩種模式,目前網上大部分文章只分析簡單的 SmoothBursty 模式,而沒有分析帶有預熱的 SmoothWarmingUp。node
第二部分介紹 Sentinel 中流控的實現,本文不要求讀者瞭解 Sentinel,這部份內容和 Sentinel 耦合很低,因此讀者不須要有閱讀壓力。算法
Sentinel 中流控設計是參考 Guava RateLimiter 的,因此閱讀第二部份內容,須要有第一部份內容的背景。數據庫
RateLimiter 基於漏桶算法,但它參考了令牌桶算法,這裏不討論流控算法,請自行查找資料。api
RateLimiter 的接口很是簡單,它有兩個靜態方法用來實例化,實例化之後,咱們只須要關心 acquire 就好了,甚至都沒有 release 操做。緩存
// RateLimiter 接口列表:多線程
// 實例化的兩種方式: public static RateLimiter create(double permitsPerSecond){} public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit) {} public double acquire() {} public double acquire(int permits) {} public boolean tryAcquire() {} public boolean tryAcquire(int permits) {} public boolean tryAcquire(long timeout, TimeUnit unit) {} public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {} public final double getRate() {} public final void setRate(double permitsPerSecond) {}
RateLimiter 的做用是用來限流的,咱們知道 java 併發包中提供了 Semaphore,它也可以提供對資源使用進行控制,咱們看一下下面的代碼:併發
// Semaphore Semaphore semaphore = new Semaphore(10); for (int i = 0; i < 100; i++) { executor.submit(new Runnable() { @Override public void run() { semaphore.acquireUninterruptibly(1); try { doSomething(); } finally { semaphore.release(); } } }); }
Semaphore 用來控制同時訪問某個資源的併發數量,如上面的代碼,咱們設置 100 個線程工做,可是咱們能作到最多隻有 10 個線程能同時到 doSomething() 方法中。它控制的是併發數量。less
而 RateLimiter 是用來控制訪問資源的速率(rate)的,它強調的是控制速率。好比控制每秒只能有 100 個請求經過,好比容許每秒發送 1MB 的數據。ide
它的構造方法指定一個 permitsPerSecond 參數,表明每秒鐘產生多少個 permits,這就是咱們的速率。
RateLimiter 容許預佔將來的令牌,好比,每秒產生 5 個 permits,咱們能夠單次請求 100 個,這樣,緊接着的下一個請求須要等待大概 20 秒才能獲取到 permits。
RateLimiter 目前只有一個子類,那就是抽象類 SmoothRateLimiter,SmoothRateLimiter 有兩個實現類,也就是咱們這邊要介紹的兩種模式,咱們先簡單介紹下 SmoothRateLimiter,而後後面分兩個小節分別介紹它的兩個實現類。
RateLimiter 做爲抽象類,只有兩個屬性:
private final SleepingStopwatch stopwatch; private volatile Object mutexDoNotUseDirectly;
stopwatch 很是重要,它用來「計時」,RateLimiter 把實例化的時間設置爲 0 值,後續都是取相對時間,用微秒錶示。
mutexDoNotUseDirectly 用來作鎖,RateLimiter 依賴於 synchronized 來控制併發,因此咱們以後能夠看到,各個屬性甚至都沒有用 volatile 修飾。
而後咱們來看 SmoothRateLimiter 的屬性,分別表明什麼意思。
// 當前還有多少 permits 沒有被使用,被存下來的 permits 數量 double storedPermits; // 最大容許緩存的 permits 數量,也就是 storedPermits 能達到的最大值 double maxPermits; // 每隔多少時間產生一個 permit, // 好比咱們構造方法中設置每秒 5 個,也就是每隔 200ms 一個,這裏單位是微秒,也就是 200,000 double stableIntervalMicros; // 下一次能夠獲取 permits 的時間,這個時間是相對 RateLimiter 的構造時間的,是一個相對時間,理解爲時間戳吧 private long nextFreeTicketMicros = 0L;
其實,看到這幾個屬性,咱們就能夠大體猜一下它的內部實現了:
nextFreeTicketMicros 是一個很關鍵的屬性。咱們每次獲取 permits 的時候,先拿 storedPermits 的值,若是夠,storedPermits 減去相應的值就能夠了,若是不夠,那麼還須要將 nextFreeTicketMicros 往前推,表示我預佔了接下來多少時間的量了。那麼下一個請求來的時候,若是還沒到 nextFreeTicketMicros 這個時間點,須要 sleep 到這個點再返回,固然也要將這個值再往前推。
你們在這裏可能會有疑惑,由於時間是一直往前走的,因此 storedPermits 的信息多是不許確的,不過,只須要在關鍵的操做中同步一下,從新計算就行了。
咱們先從比較簡單的 SmoothBursty 出發,來分析 RateLimiter 的源碼,以後咱們再分析 SmoothWarmingUp。
Bursty 是突發的意思,它說的不是下面這個意思:咱們設置了 1k 每秒,而咱們能夠一次性獲取 5k 的 permits,這個場景表達的不是突發,而是在說預先佔有了接下來幾秒產生的 permits。突發說的是,RateLimiter 會緩存必定數量的 permits 在池中,這樣對於突發請求,能及時獲得知足。想象一下咱們的某個接口,好久沒有請求過來,忽然同時來了好幾個請求,若是咱們沒有緩存一些 permits 的話,不少線程就須要等待了。
SmoothBursty 默認緩存最多 1 秒鐘的 permits,不能夠修改。
RateLimiter 的靜態構造方法:
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); }
構造參數 permitsPerSecond 指定每秒鐘能夠產生多少個 permits。
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
咱們看到,這裏實例化的是 SmoothBursty 的實例,它的構造方法很簡單,並且它只有一個屬性 maxBurstSeconds,這裏就不貼代碼了。
構造函數指定了 maxBurstSeconds 爲 1.0,也就是說,最多會緩存 1 秒鐘,也就是 (1.0 * permitsPerSecond) 這麼多個 permits 到池中。
這個 1.0 秒,關係到 storedPermits 和 maxPermits:0 <= storedPermits <= maxPermits = permitsPerSecond
咱們繼續日後看 setRate 方法:
public final void setRate(double permitsPerSecond) { checkArgument( permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } }
setRate 這個方法是一個 public 方法,它能夠用來調整速率。咱們這邊繼續跟的是初始化過程,可是你們提早知道這個方法是用來調整速率用的,對理解源碼有很大的幫助。注意看,這裏用了 synchronized 控制併發。
@Override final void doSetRate(double permitsPerSecond, long nowMicros) { // 同步 resync(nowMicros); // 計算屬性 stableIntervalMicros double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); }
resync 方法很簡單,它用來調整 storedPermits 和 nextFreeTicketMicros。這就是咱們說的,在關鍵的節點,須要先更新一下 storedPermits 到正確的值。
void resync(long nowMicros) { // 若是 nextFreeTicket 已通過掉了,想象一下很長時間都沒有再次調用 limiter.acquire() 的場景 // 須要將 nextFreeTicket 設置爲當前時間,從新計算 storedPermits if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
coolDownIntervalMicros() 這個方法你們先不用關注,能夠看到,在 SmoothBursty 類中的實現是直接返回了 stableIntervalMicros 的值,也就是咱們說的,每產生一個 permit 的時間長度。固然了,細心的讀者,可能會發現,此時的 stableIntervalMicros 其實沒有設置,也就是說,上面發生了一次除以 0 值的操做,獲得的 newPermits 實際上是一個無窮大。而 maxPermits 此時仍是 0 值,不過這裏其實沒有關係。
咱們回到前面一個方法,resync 同步之後,會設置 stableIntervalMicros 爲一個正確的值,而後進入下面的方法:
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; // 這裏計算了,maxPermits 爲 1 秒產生的 permits maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { // 由於 storedPermits 的值域變化了,須要等比例縮放 storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } }
上面這個方法,咱們要這麼看,原來的 RateLimiter 是用某個 permitsPerSecond 值初始化的,如今咱們要調整這個頻率。對於 maxPermits 來講,是從新計算,而對於 storedPermits 來講,是作等比例的縮放。
到此,構造方法就完成了,咱們獲得了一個 RateLimiter 的實現類 SmoothBursty 的實例,可能上面的源碼你仍是會有一些疑惑,不過也不要緊,繼續往下看,可能你的不少疑惑就解開了。
接下來,咱們來分析 acquire 方法:
@CanIgnoreReturnValue public double acquire() { return acquire(1); } @CanIgnoreReturnValue public double acquire(int permits) { // 預定,若是當前不能直接獲取到 permits,須要等待 // 返回值表明須要 sleep 多久 long microsToWait = reserve(permits); // sleep stopwatch.sleepMicrosUninterruptibly(microsToWait); // 返回 sleep 的時長 return 1.0 * microsToWait / SECONDS.toMicros(1L); }
咱們來看 reserve 方法:
final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } final long reserveAndGetWaitLength(int permits, long nowMicros) { // 返回 nextFreeTicketMicros long momentAvailable = reserveEarliestAvailable(permits, nowMicros); // 計算時長 return max(momentAvailable - nowMicros, 0); }
繼續往裏看:
@Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { // 這裏作一次同步,更新 storedPermits 和 nextFreeTicketMicros (若是須要) resync(nowMicros); // 返回值就是 nextFreeTicketMicros,注意剛剛已經作了 resync 了,此時它是最新的正確的值 long returnValue = nextFreeTicketMicros; // storedPermits 中可使用多少個 permits double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // storedPermits 中不夠的部分 double freshPermits = requiredPermits - storedPermitsToSpend; // 爲了這個不夠的部分,須要等待多久時間 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 這部分固定返回 0 + (long) (freshPermits * stableIntervalMicros); // 將 nextFreeTicketMicros 往前推 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // storedPermits 減去被拿走的部分 this.storedPermits -= storedPermitsToSpend; return returnValue; }
咱們能夠看到,獲取 permits 的時候,實際上是獲取了兩部分,一部分來自於存量 storedPermits,存量不夠的話,另外一部分來自於預佔將來的 freshPermits。
這裏提一個關鍵點吧,咱們看到,返回值是 nextFreeTicketMicros 的舊值,由於只要到這個時間點,就說明當次 acquire 能夠成功返回了,而無論 storedPermits 夠不夠。若是 storedPermits 不夠,會將 nextFreeTicketMicros 往前推必定的時間,預佔了必定的量。
到這裏,acquire 方法就分析完了,你們看到這裏,逆着往前看就是了。應該說,SmoothBursty 的源碼仍是很是簡單的。
分析完了 SmoothBursty,咱們再來分析 SmoothWarmingUp 會簡單一些。咱們說過,SmoothBursty 能夠處理突發請求,由於它會緩存最多 1 秒的 permits,而待會咱們會看到 SmoothWarmingUp 徹底不一樣的設計。
SmoothWarmingUp 適用於資源須要預熱的場景,好比咱們的某個接口業務,須要使用到數據庫鏈接,因爲鏈接須要預熱才能進入到最佳狀態,若是咱們的系統長時間處於低負載或零負載狀態(固然,應用剛啓動也是同樣的),鏈接池中的鏈接慢慢釋放掉了,此時咱們認爲鏈接池是冷的。
假設咱們的業務在穩定狀態下,正常能夠提供最大 1000 QPS 的訪問,可是若是鏈接池是冷的,咱們就不能讓 1000 個請求同時進來,由於這會拖垮咱們的系統,咱們應該有個預熱升溫的過程。
對應到 SmoothWarmingUp 中,若是系統處於低負載狀態,storedPermits 會一直增長,當請求來的時候,咱們要從 storedPermits 中取 permits,最關鍵的點在於,從 storedPermits 中取 permits 的操做是比較耗時的,由於沒有預熱。
回顧一下前面介紹的 SmoothBursty,它從 storedPermits 中獲取 permits 是不須要等待時間的,而這邊洽洽相反,從 storedPermits 獲取須要更多的時間,這是最大的不一樣,先理解這一點,能幫助你更好地理解源碼。
你們先有一些粗的概念,而後咱們來看下面這個圖:
這個圖不容易看懂,X 軸表明 storedPermits 的數量,Y 軸表明獲取一個 permits 須要的時間。
假設指定 permitsPerSecond 爲 10,那麼 stableInterval 爲 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是寫死的,用戶不能修改)。也就是說,當達到 maxPermits 時,此時處於系統最冷的時候,獲取一個 permit 須要 300ms,而若是 storedPermits 小於 thresholdPermits 的時候,只須要 100ms。
想象有一條垂直線 x=k,它與 X 軸的交點 k 表明當前 storedPermits 的數量:
當 storedPermits 處於 maxPermits 狀態時,咱們認爲 limiter 中的 permits 是冷的,此時獲取一個 permit 須要較多的時間,由於須要預熱,有一個關鍵的分界點是 thresholdPermits。
預熱時間是咱們在構造的時候指定的,圖中梯形的面積就是預熱時間,由於預熱完成後,咱們能進入到一個穩定的速率中(stableInterval),下面咱們來計算出 thresholdPermits 和 maxPermits 的值。
有一個關鍵點,從 thresholdPermits 到 0 的時間,是從 maxPermits 到 thresholdPermits 時間的一半,也就是梯形的面積是長方形面積的 2 倍,梯形的面積是 warmupPeriod。
之因此長方形的面積是 warmupPeriod/2,是由於 coldFactor 是硬編碼的 3。
梯形面積爲 warmupPeriod,即:
warmupPeriod = 2 * stableInterval * thresholdPermits
由此,咱們得出 thresholdPermits 的值:
thresholdPermits = 0.5 * warmupPeriod / stableInterval
而後咱們根據梯形面積的計算公式:
warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)
得出 maxPermits 爲:
maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)
這樣,咱們就獲得了 thresholdPermits 和 maxPermits 的值。
接下來,咱們來看一下冷卻時間間隔,它指的是 storedPermits 中每一個 permit 的增加速度,也就是咱們前面說的 x=k 這條垂直線往右的移動速度,爲了達到從 0 到 maxPermits 花費 warmupPeriodMicros 的時間,咱們將其定義爲:
@Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } 貼一下代碼,你們就知道了,在 resync 中用到的這個: void resync(long nowMicros) { if (nowMicros > nextFreeTicketMicros) { // coolDownIntervalMicros 在這裏使用 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
基於上面的分析,咱們來看 SmoothWarmingUp 的其餘源碼。
首先,咱們來看它的 doSetRate 方法,有了前面的介紹,這個方法的源碼很是簡單:
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; // coldFactor 是固定的 3 double coldIntervalMicros = stableIntervalMicros * coldFactor; // 這個公式咱們上面已經說了 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; // 這個公式咱們上面也已經說了 maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); // 計算那條斜線的斜率。數學知識,對邊 / 臨邊 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold : storedPermits * maxPermits / oldMaxPermits; } }
setRate 方法很是簡單,接下來,咱們要分析的是 storedPermitsToWaitTime 方法,咱們回顧一下下面的代碼:
這段代碼是 acquire 方法的核心,waitMicros 由兩部分組成,一部分是從 storedPermits 中獲取花費的時間,一部分是等待 freshPermits 產生花費的時間。在 SmoothBursty 的實現中,從 storedPermits 中獲取 permits 直接返回 0,不須要等待。
而在 SmoothWarmingUp 的實現中,因爲須要預熱,因此從 storedPermits 中取 permits 須要花費必定的時間,其實就是要計算下圖中,陰影部分的面積。
@Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // 若是右邊梯形部分有 permits,那麼先從右邊部分獲取permits,計算梯形部分的陰影部分的面積 if (availablePermitsAboveThreshold > 0.0) { // 從右邊部分獲取的 permits 數量 double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); // 梯形面積公式:(上底+下底)*高/2 double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // 加上 長方形部分的陰影面積 micros += (long) (stableIntervalMicros * permitsToTake); return micros; } // 對於給定的 x 值,計算 y 值 private double permitsToTime(double permits) { return stableIntervalMicros + permits * slope; }
到這裏,SmoothWarmingUp 也已經說完了。
若是你們對於 Guava RateLimiter 還有什麼疑惑,歡迎在留言區留言,對於 Sentinel 中的流控不感興趣的讀者,看到這裏就能夠結束了。
Sentinel 是阿里開源的流控、熔斷工具,這裏不作過多的介紹,感興趣的讀者請自行了解。
在 Sentinel 的流控中,咱們能夠配置流控規則,主要是控制 QPS 和線程數,這裏咱們不討論控制線程數,控制線程數的代碼不在咱們這裏的討論範圍內,下面的介紹都是指控制 QPS。
RateLimiterController 很是簡單,它經過使用 latestPassedTime 屬性來記錄最後一次經過的時間,而後根據規則中 QPS 的限制,計算當前請求是否能夠經過。
舉個很是簡單的例子:設置 QPS 爲 10,那麼每 100 毫秒容許經過一個,經過計算當前時間是否已通過了上一個請求的經過時間 latestPassedTime 以後的 100 毫秒,來判斷是否能夠經過。假設才過了 50ms,那麼須要當前線程再 sleep 50ms,而後才能夠經過。若是同時有另外一個請求呢?那須要 sleep 150ms 才行。
public class RateLimiterController implements TrafficShapingController { // 排隊最大時長,默認 500ms private final int maxQueueingTimeMs; // QPS 設置的值 private final double count; // 上一次請求經過的時間 private final AtomicLong latestPassedTime = new AtomicLong(-1); public RateLimiterController(int timeOut, double count) { this.maxQueueingTimeMs = timeOut; this.count = count; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } // 一般 acquireCount 爲 1,這裏不用關心參數 prioritized @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); // 計算每 2 個請求之間的間隔,好比 QPS 限制爲 10,那麼間隔就是 100ms long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // Expected pass time of this request. long expectedTime = costTime + latestPassedTime.get(); // 能夠經過,設置 latestPassedTime 而後就返回 true 了 if (expectedTime <= currentTime) { // Contention may exist here, but it's okay. latestPassedTime.set(currentTime); return true; } else { // 不能夠經過,須要等待 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); // 等待時長大於最大值,返回 false if (waitTime > maxQueueingTimeMs) { return false; } else { // 將 latestPassedTime 往前推 long oldTime = latestPassedTime.addAndGet(costTime); try { // 須要 sleep 的時間 waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } // in race condition waitTime may <= 0 if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; } }
這個策略仍是很是好理解的,簡單粗暴,快速失敗。
WarmUpController 用來防止突發流量迅速上升,致使系統負載嚴重太高,原本系統在穩定狀態下能處理的,可是因爲許多資源沒有預熱,致使這個時候處理不了了。好比,數據庫須要創建鏈接、須要鏈接到遠程服務等,這就是爲何咱們須要預熱。
囉嗦一句,這裏不只僅指系統剛剛啓動須要預熱,對於長時間處於低負載的系統,突發流量也須要從新預熱。
Guava 的 SmoothWarmingUp 是用來控制獲取令牌的速率的,和這裏的控制 QPS 仍是有一點區別,可是中心思想是同樣的。咱們在看完源碼之後再討論它們的區別。
爲了幫助你們理解源碼,咱們這邊先設定一個場景:QPS 設置爲 100,預熱時間設置爲 10 秒。代碼中使用 「【】」 表明根據這個場景計算出來的值。
public class WarmUpController implements TrafficShapingController { // 閾值 protected double count; // 3 private int coldFactor; // 轉折點的令牌數,和 Guava 的 thresholdPermits 一個意思 // [500] protected int warningToken = 0; // 最大的令牌數,和 Guava 的 maxPermits 一個意思 // [1000] private int maxToken; // 斜線斜率 // [1/25000] protected double slope; // 累積的令牌數,和 Guava 的 storedPermits 一個意思 protected AtomicLong storedTokens = new AtomicLong(0); // 最後更新令牌的時間 protected AtomicLong lastFilledTime = new AtomicLong(0); public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { construct(count, warmUpPeriodInSec, coldFactor); } public WarmUpController(double count, int warmUpPeriodInSec) { construct(count, warmUpPeriodInSec, 3); } // 下面的構造方法,和 Guava 中是差很少的,只不過 thresholdPermits 和 maxPermits 都換了個名字 private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; this.coldFactor = coldFactor; // warningToken 和 thresholdPermits 是同樣的意思,計算結果實際上是同樣的 // thresholdPermits = 0.5 * warmupPeriod / stableInterval. // 【warningToken = (10*100)/(3-1) = 500】 warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1); // maxToken 和 maxPermits 是同樣的意思,計算結果實際上是同樣的 // maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval) // 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】 maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); // 斜率計算 // slope // slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermits-thresholdPermits); // 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】 slope = (coldFactor - 1.0) / count / (maxToken - warningToken); } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Sentinel 的 QPS 統計使用的是滑動窗口 // 當前時間窗口的 QPS long passQps = (long) node.passQps(); // 這裏是上一個時間窗口的 QPS,這裏的一個窗口跨度是1分鐘 long previousQps = (long) node.previousPassQps(); // 同步。設置 storedTokens 和 lastFilledTime 到正確的值 syncToken(previousQps); long restToken = storedTokens.get(); // 令牌數超過 warningToken,進入梯形區域 if (restToken >= warningToken) { // 這裏簡單說一句,由於當前的令牌數超過了 warningToken 這個閾值,系統處於須要預熱的階段 // 經過計算當前獲取一個令牌所需時間,計算其倒數便是當前系統的最大 QPS 容量 long aboveToken = restToken - warningToken; // 這裏計算警惕 QPS 值,就是當前狀態下能達到的最高 QPS。 // (aboveToken * slope + 1.0 / count) 其實就是在當前狀態下獲取一個令牌所須要的時間 double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); // 若是不會超過,那麼經過,不然不經過 if (passQps + acquireCount <= warningQps) { return true; } } else { // count 是最高能達到的 QPS if (passQps + acquireCount <= count) { return true; } } return false; } protected void syncToken(long passQps) { // 下面幾行代碼,說明在第一次進入新的 1 秒鐘的時候,作同步 // 題外話:Sentinel 默認地,1 秒鐘分爲 2 個時間窗口,分別 500ms long currentTime = TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return; } // 令牌數量的舊值 long oldValue = storedTokens.get(); // 計算新的令牌數量,往下看 long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { // 令牌數量上,減去上一分鐘的 QPS,而後設置新值 long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); } } // 更新令牌數 private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 當前令牌數小於 warningToken,添加令牌 if (oldValue < warningToken) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { // 當前令牌數量處於梯形階段, // 若是當前經過的 QPS 大於 count/coldFactor,說明系統消耗令牌的速度,大於冷卻速度 // 那麼不須要添加令牌,不然須要添加令牌 if (passQps < (int)count / coldFactor) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); } }
coolDownTokens 這個方法用來計算新的 token 數量,其實我也沒有徹底理解做者的設計:
最後,咱們再簡單說說 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的區別。
Guava 在於控制獲取令牌的速率,它關心的是,獲取 permits 須要多少時間,包括從 storedPermits 中獲取,以及獲取 freshPermits,以此推動 nextFreeTicketMicros 到將來的某個時間點。
而 Sentinel 在於控制 QPS,它用令牌數來標識當前系統處於什麼狀態,根據時間推動一直增長令牌,根據經過的 QPS 一直減小令牌。若是 QPS 持續降低,根據推演,能夠發現 storedTokens 愈來愈多,而後越過 warningTokens 這個閾值,以後只有當 QPS 降低到 count/3 之後,令牌纔會繼續往上增加,一直到 maxTokens。
storedTokens 是以 「count 每秒」的增加率增加的,減小是以 前一分鐘的 QPS 來減小的。其實這裏我也有個疑問,爲何增長令牌的時候考慮了時間,而減小的時候卻不考慮時間因素,提了 issue,彷佛沒人搭理。
注意,這個類繼承自剛剛介紹的 WarmUpController,它的流控效果定義爲排隊等待。它的代碼其實就是前面介紹的 RateLimiterController 加上 WarmUpController。
public class WarmUpRateLimiterController extends WarmUpController { private final int timeoutInMs; private final AtomicLong latestPassedTime = new AtomicLong(-1); public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) { super(count, warmUpPeriodSec, coldFactor); this.timeoutInMs = timeOutMs; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { long previousQps = (long) node.previousPassQps(); syncToken(previousQps); long currentTime = TimeUtil.currentTimeMillis(); long restToken = storedTokens.get(); long costTime = 0; long expectedTime = 0; // 和 RateLimiterController 比較,區別主要就是這塊代碼 if (restToken >= warningToken) { long aboveToken = restToken - warningToken; // current interval = restToken*slope+1/count double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000); } else { costTime = Math.round(1.0 * (acquireCount) / count * 1000); } expectedTime = costTime + latestPassedTime.get(); if (expectedTime <= currentTime) { latestPassedTime.set(currentTime); return true; } else { long waitTime = costTime + latestPassedTime.get() - currentTime; if (waitTime > timeoutInMs) { return false; } else { long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > timeoutInMs) { latestPassedTime.addAndGet(-costTime); return false; } if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; } }
這個代碼很簡單,就是 RateLimiter 中的代碼,而後加入了預熱的內容。
在 RateLimiter 中,單個請求的 costTime 是固定的,就是 1/count,好比設置 100 qps,那麼 costTime 就是 10ms。
可是這邊,加入了 WarmUp 的內容,就是說,經過令牌數量,來判斷當前系統的 QPS 應該是多少,若是當前令牌數超過 warningTokens,那麼系統的 QPS 容量已經低於咱們預設的 QPS,相應的,costTime 就會延長。
有段時間沒寫文章了,寫得很差之處,歡迎指正。
關注做者公衆號: