做者javadoop,資深Java工程師。本文已獲做者受權發佈。java
原文連接https://www.javadoop.com/post/rate-limiternode
本文主要介紹關於流控的兩部份內容。算法
第一部分介紹 Guava 中 RateLimiter 的源碼,包括它的兩種模式,目前網上大部分文章只分析簡單的 SmoothBursty 模式,而沒有分析帶有預熱的 SmoothWarmingUp。數據庫
第二部分介紹 Sentinel 中流控的實現,本文不要求讀者瞭解 Sentinel,這部份內容和 Sentinel 耦合很低,因此讀者不須要有閱讀壓力。api
Sentinel 中流控設計是參考 Guava RateLimiter 的,因此閱讀第二部份內容,須要有第一部份內容的背景。緩存
RateLimiter 基於漏桶算法,但它參考了令牌桶算法,這裏不討論流控算法,請自行查找資料。markdown
RateLimiter 的接口很是簡單,它有兩個靜態方法用來實例化,實例化之後,咱們只須要關心 acquire 就好了,甚至都沒有 release 操做。多線程
// RateLimiter 接口列表:併發
// 實例化的兩種方式:
public static RateLimiter create(double permitsPerSecond){}
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit) {}
public double acquire() {}
public double acquire(int permits) {}
public boolean tryAcquire() {}
public boolean tryAcquire(int permits) {}
public boolean tryAcquire(long timeout, TimeUnit unit) {}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}
public final double getRate() {}
public final void setRate(double permitsPerSecond) {}複製代碼
RateLimiter 的做用是用來限流的,咱們知道 java 併發包中提供了 Semaphore,它也可以提供對資源使用進行控制,咱們看一下下面的代碼:less
// Semaphore
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < 100; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
semaphore.acquireUninterruptibly(1);
try {
doSomething();
} finally {
semaphore.release();
}
}
});
}
複製代碼
Semaphore 用來控制同時訪問某個資源的併發數量,如上面的代碼,咱們設置 100 個線程工做,可是咱們能作到最多隻有 10 個線程能同時到 doSomething() 方法中。它控制的是併發數量。
而 RateLimiter 是用來控制訪問資源的速率(rate)的,它強調的是控制速率。好比控制每秒只能有 100 個請求經過,好比容許每秒發送 1MB 的數據。
它的構造方法指定一個 permitsPerSecond 參數,表明每秒鐘產生多少個 permits,這就是咱們的速率。
RateLimiter 容許預佔將來的令牌,好比,每秒產生 5 個 permits,咱們能夠單次請求 100 個,這樣,緊接着的下一個請求須要等待大概 20 秒才能獲取到 permits。
RateLimiter 目前只有一個子類,那就是抽象類 SmoothRateLimiter,SmoothRateLimiter 有兩個實現類,也就是咱們這邊要介紹的兩種模式,咱們先簡單介紹下 SmoothRateLimiter,而後後面分兩個小節分別介紹它的兩個實現類。
RateLimiter 做爲抽象類,只有兩個屬性:
private final SleepingStopwatch stopwatch;
private volatile Object mutexDoNotUseDirectly;複製代碼
stopwatch 很是重要,它用來「計時」,RateLimiter 把實例化的時間設置爲 0 值,後續都是取相對時間,用微秒錶示。
mutexDoNotUseDirectly 用來作鎖,RateLimiter 依賴於 synchronized 來控制併發,因此咱們以後能夠看到,各個屬性甚至都沒有用 volatile 修飾。
而後咱們來看 SmoothRateLimiter 的屬性,分別表明什麼意思。
// 當前還有多少 permits 沒有被使用,被存下來的 permits 數量
double storedPermits;
// 最大容許緩存的 permits 數量,也就是 storedPermits 能達到的最大值
double maxPermits;
// 每隔多少時間產生一個 permit,
// 好比咱們構造方法中設置每秒 5 個,也就是每隔 200ms 一個,這裏單位是微秒,也就是 200,000
double stableIntervalMicros;
// 下一次能夠獲取 permits 的時間,這個時間是相對 RateLimiter 的構造時間的,是一個相對時間,理解爲時間戳吧
private long nextFreeTicketMicros = 0L; 複製代碼
其實,看到這幾個屬性,咱們就能夠大體猜一下它的內部實現了:
nextFreeTicketMicros 是一個很關鍵的屬性。咱們每次獲取 permits 的時候,先拿 storedPermits 的值,若是夠,storedPermits 減去相應的值就能夠了,若是不夠,那麼還須要將 nextFreeTicketMicros 往前推,表示我預佔了接下來多少時間的量了。那麼下一個請求來的時候,若是還沒到 nextFreeTicketMicros 這個時間點,須要 sleep 到這個點再返回,固然也要將這個值再往前推。
你們在這裏可能會有疑惑,由於時間是一直往前走的,因此 storedPermits 的信息多是不許確的,不過,只須要在關鍵的操做中同步一下,從新計算就行了。
咱們先從比較簡單的 SmoothBursty 出發,來分析 RateLimiter 的源碼,以後咱們再分析 SmoothWarmingUp。
Bursty 是突發的意思,它說的不是下面這個意思:咱們設置了 1k 每秒,而咱們能夠一次性獲取 5k 的 permits,這個場景表達的不是突發,而是在說預先佔有了接下來幾秒產生的 permits。
突發說的是,RateLimiter 會緩存必定數量的 permits 在池中,這樣對於突發請求,能及時獲得知足。想象一下咱們的某個接口,好久沒有請求過來,忽然同時來了好幾個請求,若是咱們沒有緩存一些 permits 的話,不少線程就須要等待了。
SmoothBursty 默認緩存最多 1 秒鐘的 permits,不能夠修改。
RateLimiter 的靜態構造方法:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}複製代碼
構造參數 permitsPerSecond 指定每秒鐘能夠產生多少個 permits。
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}複製代碼
咱們看到,這裏實例化的是 SmoothBursty 的實例,它的構造方法很簡單,並且它只有一個屬性 maxBurstSeconds,這裏就不貼代碼了。
構造函數指定了 maxBurstSeconds 爲 1.0,也就是說,最多會緩存 1 秒鐘,也就是 (1.0 * permitsPerSecond) 這麼多個 permits 到池中。
這個 1.0 秒,關係到 storedPermits 和 maxPermits:
0 <= storedPermits <= maxPermits = permitsPerSecond
咱們繼續日後看 setRate 方法:
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}複製代碼
setRate 這個方法是一個 public 方法,它能夠用來調整速率。咱們這邊繼續跟的是初始化過程,可是你們提早知道這個方法是用來調整速率用的,對理解源碼有很大的幫助。注意看,這裏用了 synchronized 控制併發。
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 同步
resync(nowMicros);
// 計算屬性 stableIntervalMicros
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}複製代碼
resync 方法很簡單,它用來調整 storedPermits 和 nextFreeTicketMicros。這就是咱們說的,在關鍵的節點,須要先更新一下 storedPermits 到正確的值。
void resync(long nowMicros) {
// 若是 nextFreeTicket 已通過掉了,想象一下很長時間都沒有再次調用 limiter.acquire() 的場景
// 須要將 nextFreeTicket 設置爲當前時間,從新計算 storedPermits
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}複製代碼
coolDownIntervalMicros() 這個方法你們先不用關注,能夠看到,在 SmoothBursty 類中的實現是直接返回了 stableIntervalMicros 的值,也就是咱們說的,每產生一個 permit 的時間長度。
固然了,細心的讀者,可能會發現,此時的 stableIntervalMicros 其實沒有設置,也就是說,上面發生了一次除以 0 值的操做,獲得的 newPermits 實際上是一個無窮大。而 maxPermits 此時仍是 0 值,不過這裏其實沒有關係。
咱們回到前面一個方法,resync 同步之後,會設置 stableIntervalMicros 爲一個正確的值,而後進入下面的方法:
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
// 這裏計算了,maxPermits 爲 1 秒產生的 permits
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// 由於 storedPermits 的值域變化了,須要等比例縮放
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}複製代碼
上面這個方法,咱們要這麼看,原來的 RateLimiter 是用某個 permitsPerSecond 值初始化的,如今咱們要調整這個頻率。對於 maxPermits 來講,是從新計算,而對於 storedPermits 來講,是作等比例的縮放。
到此,構造方法就完成了,咱們獲得了一個 RateLimiter 的實現類 SmoothBursty 的實例,可能上面的源碼你仍是會有一些疑惑,不過也不要緊,繼續往下看,可能你的不少疑惑就解開了。
接下來,咱們來分析 acquire 方法:
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}
@CanIgnoreReturnValue
public double acquire(int permits) {
// 預定,若是當前不能直接獲取到 permits,須要等待
// 返回值表明須要 sleep 多久
long microsToWait = reserve(permits);
// sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回 sleep 的時長
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}複製代碼
咱們來看 reserve 方法:
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 返回 nextFreeTicketMicros
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 計算時長
return max(momentAvailable - nowMicros, 0);
}複製代碼
繼續往裏看:
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 這裏作一次同步,更新 storedPermits 和 nextFreeTicketMicros (若是須要)
resync(nowMicros);
// 返回值就是 nextFreeTicketMicros,注意剛剛已經作了 resync 了,此時它是最新的正確的值
long returnValue = nextFreeTicketMicros;
// storedPermits 中可使用多少個 permits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// storedPermits 中不夠的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 爲了這個不夠的部分,須要等待多久時間
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 這部分固定返回 0
+ (long) (freshPermits * stableIntervalMicros);
// 將 nextFreeTicketMicros 往前推
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// storedPermits 減去被拿走的部分
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
複製代碼
咱們能夠看到,獲取 permits 的時候,實際上是獲取了兩部分,一部分來自於存量 storedPermits,存量不夠的話,另外一部分來自於預佔將來的 freshPermits。
這裏提一個關鍵點吧,咱們看到,返回值是 nextFreeTicketMicros 的舊值,由於只要到這個時間點,就說明當次 acquire 能夠成功返回了,而無論 storedPermits 夠不夠。若是 storedPermits 不夠,會將 nextFreeTicketMicros 往前推必定的時間,預佔了必定的量。
到這裏,acquire 方法就分析完了,你們看到這裏,逆着往前看就是了。應該說,SmoothBursty 的源碼仍是很是簡單的。
分析完了 SmoothBursty,咱們再來分析 SmoothWarmingUp 會簡單一些。咱們說過,SmoothBursty 能夠處理突發請求,由於它會緩存最多 1 秒的 permits,而待會咱們會看到 SmoothWarmingUp 徹底不一樣的設計。
SmoothWarmingUp 適用於資源須要預熱的場景,好比咱們的某個接口業務,須要使用到數據庫鏈接,因爲鏈接須要預熱才能進入到最佳狀態,若是咱們的系統長時間處於低負載或零負載狀態(固然,應用剛啓動也是同樣的),鏈接池中的鏈接慢慢釋放掉了,此時咱們認爲鏈接池是冷的。
假設咱們的業務在穩定狀態下,正常能夠提供最大 1000 QPS 的訪問,可是若是鏈接池是冷的,咱們就不能讓 1000 個請求同時進來,由於這會拖垮咱們的系統,咱們應該有個預熱升溫的過程。
對應到 SmoothWarmingUp 中,若是系統處於低負載狀態,storedPermits 會一直增長,當請求來的時候,咱們要從 storedPermits 中取 permits,最關鍵的點在於,從 storedPermits 中取 permits 的操做是比較耗時的,由於沒有預熱。
回顧一下前面介紹的 SmoothBursty,它從 storedPermits 中獲取 permits 是不須要等待時間的,而這邊洽洽相反,從 storedPermits 獲取須要更多的時間,這是最大的不一樣,先理解這一點,能幫助你更好地理解源碼。
你們先有一些粗的概念,而後咱們來看下面這個圖:
這個圖不容易看懂,X 軸表明 storedPermits 的數量,Y 軸表明獲取一個 permits 須要的時間。
假設指定 permitsPerSecond 爲 10,那麼 stableInterval 爲 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是寫死的,用戶不能修改)。也就是說,當達到 maxPermits 時,此時處於系統最冷的時候,獲取一個 permit 須要 300ms,而若是 storedPermits 小於 thresholdPermits 的時候,只須要 100ms。
想象有一條垂直線 x=k,它與 X 軸的交點 k 表明當前 storedPermits 的數量:
預熱時間是咱們在構造的時候指定的,圖中梯形的面積就是預熱時間,由於預熱完成後,咱們能進入到一個穩定的速率中(stableInterval),下面咱們來計算出 thresholdPermits 和 maxPermits 的值。
有一個關鍵點,從 thresholdPermits 到 0 的時間,是從 maxPermits 到 thresholdPermits 時間的一半,也就是梯形的面積是長方形面積的 2 倍,梯形的面積是 warmupPeriod。
之因此長方形的面積是 warmupPeriod/2,是由於 coldFactor 是硬編碼的 3。
梯形面積爲 warmupPeriod,即:
warmupPeriod = 2 * stableInterval * thresholdPermits複製代碼
由此,咱們得出 thresholdPermits 的值:
thresholdPermits = 0.5 * warmupPeriod / stableInterval複製代碼
而後咱們根據梯形面積的計算公式:
warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)複製代碼
得出 maxPermits 爲:
maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)複製代碼
這樣,咱們就獲得了 thresholdPermits 和 maxPermits 的值。
接下來,咱們來看一下冷卻時間間隔,它指的是 storedPermits 中每一個 permit 的增加速度,也就是咱們前面說的 x=k 這條垂直線往右的移動速度,爲了達到從 0 到 maxPermits 花費 warmupPeriodMicros 的時間,咱們將其定義爲:
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
貼一下代碼,你們就知道了,在 resync 中用到的這個:
void resync(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
// coolDownIntervalMicros 在這裏使用
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
複製代碼
基於上面的分析,咱們來看 SmoothWarmingUp 的其餘源碼。
首先,咱們來看它的 doSetRate 方法,有了前面的介紹,這個方法的源碼很是簡單:
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
// coldFactor 是固定的 3
double coldIntervalMicros = stableIntervalMicros * coldFactor;
// 這個公式咱們上面已經說了
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
// 這個公式咱們上面也已經說了
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// 計算那條斜線的斜率。數學知識,對邊 / 臨邊
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}複製代碼
setRate 方法很是簡單,接下來,咱們要分析的是 storedPermitsToWaitTime 方法,咱們回顧一下下面的代碼:
這段代碼是 acquire 方法的核心,waitMicros 由兩部分組成,一部分是從 storedPermits 中獲取花費的時間,一部分是等待 freshPermits 產生花費的時間。在 SmoothBursty 的實現中,從 storedPermits 中獲取 permits 直接返回 0,不須要等待。
而在 SmoothWarmingUp 的實現中,因爲須要預熱,因此從 storedPermits 中取 permits 須要花費必定的時間,其實就是要計算下圖中,陰影部分的面積。
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// 若是右邊梯形部分有 permits,那麼先從右邊部分獲取permits,計算梯形部分的陰影部分的面積
if (availablePermitsAboveThreshold > 0.0) {
// 從右邊部分獲取的 permits 數量
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// 梯形面積公式:(上底+下底)*高/2
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 加上 長方形部分的陰影面積
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
// 對於給定的 x 值,計算 y 值
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
複製代碼
到這裏,SmoothWarmingUp 也已經說完了。
若是你們對於 Guava RateLimiter 還有什麼疑惑,歡迎在留言區留言,對於 Sentinel 中的流控不感興趣的讀者,看到這裏就能夠結束了。
Sentinel 是阿里開源的流控、熔斷工具,這裏不作過多的介紹,感興趣的讀者請自行了解。
在 Sentinel 的流控中,咱們能夠配置流控規則,主要是控制 QPS 和線程數,這裏咱們不討論控制線程數,控制線程數的代碼不在咱們這裏的討論範圍內,下面的介紹都是指控制 QPS。
RateLimiterController 很是簡單,它經過使用 latestPassedTime 屬性來記錄最後一次經過的時間,而後根據規則中 QPS 的限制,計算當前請求是否能夠經過。
舉個很是簡單的例子:設置 QPS 爲 10,那麼每 100 毫秒容許經過一個,經過計算當前時間是否已通過了上一個請求的經過時間 latestPassedTime 以後的 100 毫秒,來判斷是否能夠經過。假設才過了 50ms,那麼須要當前線程再 sleep 50ms,而後才能夠經過。若是同時有另外一個請求呢?那須要 sleep 150ms 才行。
public class RateLimiterController implements TrafficShapingController {
// 排隊最大時長,默認 500ms
private final int maxQueueingTimeMs;
// QPS 設置的值
private final double count;
// 上一次請求經過的時間
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
// 一般 acquireCount 爲 1,這裏不用關心參數 prioritized
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
//
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// 計算每 2 個請求之間的間隔,好比 QPS 限制爲 10,那麼間隔就是 100ms
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
// 能夠經過,設置 latestPassedTime 而後就返回 true 了
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// 不能夠經過,須要等待
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 等待時長大於最大值,返回 false
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 將 latestPassedTime 往前推
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 須要 sleep 的時間
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
複製代碼
這個策略仍是很是好理解的,簡單粗暴,快速失敗。
WarmUpController 用來防止突發流量迅速上升,致使系統負載嚴重太高,原本系統在穩定狀態下能處理的,可是因爲許多資源沒有預熱,致使這個時候處理不了了。好比,數據庫須要創建鏈接、須要鏈接到遠程服務等,這就是爲何咱們須要預熱。
囉嗦一句,這裏不只僅指系統剛剛啓動須要預熱,對於長時間處於低負載的系統,突發流量也須要從新預熱。
Guava 的 SmoothWarmingUp 是用來控制獲取令牌的速率的,和這裏的控制 QPS 仍是有一點區別,可是中心思想是同樣的。咱們在看完源碼之後再討論它們的區別。
爲了幫助你們理解源碼,咱們這邊先設定一個場景:QPS 設置爲 100,預熱時間設置爲 10 秒。代碼中使用 「【】」 表明根據這個場景計算出來的值。
public class WarmUpController implements TrafficShapingController {
// 閾值
protected double count;
// 3
private int coldFactor;
// 轉折點的令牌數,和 Guava 的 thresholdPermits 一個意思
// [500]
protected int warningToken = 0;
// 最大的令牌數,和 Guava 的 maxPermits 一個意思
// [1000]
private int maxToken;
// 斜線斜率
// [1/25000]
protected double slope;
// 累積的令牌數,和 Guava 的 storedPermits 一個意思
protected AtomicLong storedTokens = new AtomicLong(0);
// 最後更新令牌的時間
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
// 下面的構造方法,和 Guava 中是差很少的,只不過 thresholdPermits 和 maxPermits 都換了個名字
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// warningToken 和 thresholdPermits 是同樣的意思,計算結果實際上是同樣的
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// 【warningToken = (10*100)/(3-1) = 500】
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// maxToken 和 maxPermits 是同樣的意思,計算結果實際上是同樣的
// maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)
// 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// 斜率計算
// slope
// slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermits-thresholdPermits);
// 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Sentinel 的 QPS 統計使用的是滑動窗口
// 當前時間窗口的 QPS
long passQps = (long) node.passQps();
// 這裏是上一個時間窗口的 QPS,這裏的一個窗口跨度是1分鐘
long previousQps = (long) node.previousPassQps();
// 同步。設置 storedTokens 和 lastFilledTime 到正確的值
syncToken(previousQps);
long restToken = storedTokens.get();
// 令牌數超過 warningToken,進入梯形區域
if (restToken >= warningToken) {
// 這裏簡單說一句,由於當前的令牌數超過了 warningToken 這個閾值,系統處於須要預熱的階段
// 經過計算當前獲取一個令牌所需時間,計算其倒數便是當前系統的最大 QPS 容量
long aboveToken = restToken - warningToken;
// 這裏計算警惕 QPS 值,就是當前狀態下能達到的最高 QPS。
// (aboveToken * slope + 1.0 / count) 其實就是在當前狀態下獲取一個令牌所須要的時間
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
// 若是不會超過,那麼經過,不然不經過
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
// count 是最高能達到的 QPS
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
// 下面幾行代碼,說明在第一次進入新的 1 秒鐘的時候,作同步
// 題外話:Sentinel 默認地,1 秒鐘分爲 2 個時間窗口,分別 500ms
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
// 令牌數量的舊值
long oldValue = storedTokens.get();
// 計算新的令牌數量,往下看
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 令牌數量上,減去上一分鐘的 QPS,而後設置新值
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
// 更新令牌數
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 當前令牌數小於 warningToken,添加令牌
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
// 當前令牌數量處於梯形階段,
// 若是當前經過的 QPS 大於 count/coldFactor,說明系統消耗令牌的速度,大於冷卻速度
// 那麼不須要添加令牌,不然須要添加令牌
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}
複製代碼
coolDownTokens 這個方法用來計算新的 token 數量,其實我也沒有徹底理解做者的設計:
最後,咱們再簡單說說 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的區別。
Guava 在於控制獲取令牌的速率,它關心的是,獲取 permits 須要多少時間,包括從 storedPermits 中獲取,以及獲取 freshPermits,以此推動 nextFreeTicketMicros 到將來的某個時間點。
而 Sentinel 在於控制 QPS,它用令牌數來標識當前系統處於什麼狀態,根據時間推動一直增長令牌,根據經過的 QPS 一直減小令牌。若是 QPS 持續降低,根據推演,能夠發現 storedTokens 愈來愈多,而後越過 warningTokens 這個閾值,以後只有當 QPS 降低到 count/3 之後,令牌纔會繼續往上增加,一直到 maxTokens。
storedTokens 是以 「count 每秒」的增加率增加的,減小是以 前一分鐘的 QPS 來減小的。其實這裏我也有個疑問,爲何增長令牌的時候考慮了時間,而減小的時候卻不考慮時間因素,提了 issue,彷佛沒人搭理。
注意,這個類繼承自剛剛介紹的 WarmUpController,它的流控效果定義爲排隊等待。它的代碼其實就是前面介紹的 RateLimiterController 加上 WarmUpController。
public class WarmUpRateLimiterController extends WarmUpController {
private final int timeoutInMs;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
super(count, warmUpPeriodSec, coldFactor);
this.timeoutInMs = timeOutMs;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
long currentTime = TimeUtil.currentTimeMillis();
long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
// 和 RateLimiterController 比較,區別主要就是這塊代碼
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// current interval = restToken*slope+1/count
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > timeoutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > timeoutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}複製代碼
這個代碼很簡單,就是 RateLimiter 中的代碼,而後加入了預熱的內容。
在 RateLimiter 中,單個請求的 costTime 是固定的,就是 1/count,好比設置 100 qps,那麼 costTime 就是 10ms。
可是這邊,加入了 WarmUp 的內容,就是說,經過令牌數量,來判斷當前系統的 QPS 應該是多少,若是當前令牌數超過 warningTokens,那麼系統的 QPS 容量已經低於咱們預設的 QPS,相應的,costTime 就會延長。
有段時間沒寫文章了,寫得很差之處,歡迎指正。
關注做者公衆號: