在分佈式系統中,應對高併發訪問時,緩存、限流、降級是保護系統正常運行的經常使用方法。當請求量突發暴漲時,若是不加以限制訪問,則可能致使整個系統崩潰,服務不可用。同時有一些業務場景,好比短信驗證碼,或者其它第三方API調用,也須要提供必要的訪問限制支持。還有一些資源消耗過大的請求,好比數據導出等(參考 記一次線上Java服務CPU 100%處理過程 ),也有限制訪問頻率的需求。html
常見的限流算法有令牌桶算法,漏桶算法,與計數器算法。本文主要對三個算法的基本原理及Google Guava包中令牌桶算法的實現RateLimiter進行介紹,下一篇文章介紹最近寫的一個以RateLimiter爲參考的分佈式限流實現及計數器限流實現。java
令牌桶算法的原理就是以一個恆定的速度往桶裏放入令牌,每個請求的處理都須要從桶裏先獲取一個令牌,當桶裏沒有令牌時,則請求不會被處理,要麼排隊等待,要麼降級處理,要麼直接拒絕服務。當桶裏令牌滿時,新添加的令牌會被丟棄或拒絕。算法
令牌桶算法的處理示意圖以下(圖片來自網絡)shell
令牌桶算法主要是能夠控制請求的平均處理速率,它容許預消費,便可以提早消費令牌,以應對突發請求,可是後面的請求須要爲預消費買單(等待更長的時間),以知足請求處理的平均速率是必定的。緩存
漏桶算法的原理是水(請求)先進入漏桶中,漏桶以必定的速度出水(處理請求),當水流入速度大於流出速度致使水在桶內逐漸堆積直到桶滿時,水會溢出(請求被拒絕)。網絡
漏桶算法的處理示意圖以下(圖片來自網絡)併發
漏桶算法主要是控制請求的處理速率,平滑網絡上的突發流量,請求能夠以任意速度進入漏桶中,但請求的處理則以恆定的速度進行。分佈式
計數器算法是限流算法中最簡單的一種算法,限制在一個時間窗口內,至多處理多少個請求。好比每分鐘最多處理10個請求,則從第一個請求進來的時間爲起點,60s的時間窗口內只容許最多處理10個請求。下一個時間窗口又之前一時間窗口事後第一個請求進來的時間爲起點。常見的好比一分鐘內只能獲取一次短信驗證碼的功能能夠經過計數器算法來實現。ide
Guava是Google開源的一個工具包,其中的RateLimiter是實現了令牌桶算法的一個限流工具類。在pom.xml中添加guava依賴,便可使用RateLimiter函數
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>複製代碼
以下測試代碼示例了RateLimiter的用法,
public static void main(String[] args) {
RateLimiter rateLimiter = RateLimiter.create(1); //建立一個每秒產生一個令牌的令牌桶
for(int i=1;i<=5;i++) {
double waitTime = rateLimiter.acquire(i); //一次獲取i個令牌
System.out.println("acquire:" + i + " waitTime:" + waitTime);
}
}複製代碼
運行後,輸出以下,
acquire:1 waitTime:0.0
acquire:2 waitTime:0.997729
acquire:3 waitTime:1.998076
acquire:4 waitTime:3.000303
acquire:5 waitTime:4.000223複製代碼
第一次獲取一個令牌時,等待0s當即可獲取到(這裏之因此不須要等待是由於令牌桶的預消費特性),第二次獲取兩個令牌,等待時間1s,這個1s就是前面獲取一個令牌時由於預消費沒有等待延到此次來等待的時間,此次獲取兩個又是預消費,因此下一次獲取(取3個時)就要等待此次預消費須要的2s了,依此類推。可見預消費不須要等待的時間都由下一次來買單,以保障必定的平均處理速率(上例爲1s一次)。
RateLimiter有兩種實現:
RateLimiter.create(double permitsPerSecond)
建立的是 SmoothBursty 實例。RateLimiter.create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
時建立就是 SmoothWarmingUp 實例,其中 warmupPeriod 就是熱身達到穩定速度的時間。類結構以下
關鍵屬性及方法解析(以 SmoothBursty 爲例)
1.關鍵屬性
/** 桶中當前擁有的令牌數. */
double storedPermits;
/** 桶中最多能夠保存多少秒存入的令牌數 */
double maxBurstSeconds;
/** 桶中能存儲的最大令牌數,等於storedPermits*maxBurstSeconds. */
double maxPermits;
/** 放入令牌的時間間隔*/
double stableIntervalMicros;
/** 下次可獲取令牌的時間點,能夠是過去也能夠是未來的時間點*/
private long nextFreeTicketMicros = 0L;複製代碼
2.關鍵方法
調用 RateLimiter.create(double permitsPerSecond)
方法時,建立的是 SmoothBursty 實例,默認設置 maxBurstSeconds 爲1s。SleepingStopwatch 是guava中的一個時鐘類實現。
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}複製代碼
並經過調用 SmoothBursty.doSetRate(double, long)
方法進行初始化,該方法中:
resync(nowMicros)
對 storedPermits 與 nextFreeTicketMicros 進行了調整——若是當前時間晚於 nextFreeTicketMicros,則計算這段時間內產生的令牌數,累加到 storedPermits 上,並更新下次可獲取令牌時間 nextFreeTicketMicros 爲當前時間。doSetRate(double, double)
方法計算 maxPermits 值(maxBurstSeconds*permitsPerSecond),並根據舊的 maxPermits 值對 storedPermits 進行調整。源碼以下所示
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}複製代碼
調用 acquire(int)
方法獲取指定數量的令牌時,
reserve(int)
方法,該方法最終調用 reserveEarliestAvailable(int, long)
來更新下次可取令牌時間點與當前存儲的令牌數,並返回本次可取令牌的時間點,根據該時間點計算須要等待的時間源碼以下所示
/** 獲取指定數量(permits)的令牌,阻塞直到獲取到令牌,返回等待的時間*/
@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
/** 返回須要等待的時間*/
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
/** 針對這次須要獲取的令牌數更新下次可取令牌時間點與存儲的令牌數,返回本次可取令牌的時間點*/
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); // 更新當前數據
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可消費的令牌數
double freshPermits = requiredPermits - storedPermitsToSpend; // 須要新增的令牌數
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); // 須要等待的時間
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 更新下次可取令牌的時間點
this.storedPermits -= storedPermitsToSpend; // 更新當前存儲的令牌數
return returnValue;
}複製代碼
acquire(int)
方法是獲取不到令牌時一直阻塞,直到獲取到令牌,tryAcquire(int,long,TimeUnit)
方法則是在指定超時時間內嘗試獲取令牌,若是獲取到或超時時間到則返回是否獲取成功
nextFreeTicketMicros <= timeoutMicros + nowMicros
是否爲true來判斷,便可取令牌時間早於當前時間加超時時間則可取(預消費的特性),不然不可獲取。reserveAndGetWaitLength(permits, nowMicros)
來更新下次可取令牌時間點與當前存儲的令牌數,返回等待時間(邏輯與前面相同),並阻塞等待相應的時間,返回true。源碼以下所示
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) { //判斷是否能在超時時間內獲取指定數量的令牌
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; //只要可取時間小於當前時間+超時時間,則可獲取(可預消費的特性!)
}
@Override
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}複製代碼
以上就是 SmoothBursty 實現的基本處理流程。注意兩點:
resync
函數),在每次獲取令牌以前計算該段時間內能夠產生多少令牌,將產生的令牌加入令牌桶中並更新數據來實現,比起一個線程來不斷往桶裏放令牌高效得多。(想一想若是須要針對每一個用戶限制某個接口的訪問,則針對每一個用戶都得建立一個RateLimiter,並起一個線程來控制令牌存放的話,若是在線用戶數有幾十上百萬,起線程來控制是一件多麼恐怖的事情)本文介紹了限流的三種基本算法,其中令牌桶算法與漏桶算法主要用來限制請求處理的速度,可將其歸爲限速,計數器算法則是用來限制一個時間窗口內請求處理的數量,可將其歸爲限量(對速度不限制)。Guava 的 RateLimiter 是令牌桶算法的一種實現,但 RateLimiter 只適用於單機應用,在分佈式環境下就不適用了。雖然已有一些開源項目可用於分佈式環境下的限流管理,如阿里的Sentinel,但對於小型項目來講,引入Sentinel可能顯得有點太重,但限流的需求在小型項目中也是存在的,下一篇文章就介紹下基於 RateLimiter 的分佈式下的限流實現。
[轉載請註明出處] 做者:雨歌 歡迎關注做者公衆號:半路雨歌,查看更多技術乾貨文章