緩存,降級和限流是大型分佈式系統中的三把利劍。目前限流主要有漏桶和令牌桶兩種算法。java
漏桶算法的示意圖以下:git
漏桶算法能夠將系統處理請求限定到恆定的速率,當請求過載時,漏桶將直接溢出。漏桶算法假定了系統處理請求的速率是恆定的,可是在現實環境中,每每咱們的系統處理請求的速率不是恆定的。漏桶算法沒法解決系統突發流量的狀況。github
令牌桶算法相對漏桶算法的優點在於能夠處理系統的突發流量,其算法示意圖以下所示:算法
令牌桶有必定的容量(capacity),後臺服務向令牌桶中以恆定的速率放入令牌(token),當令牌桶中的令牌數量超過capacity以後,多餘的令牌直接丟棄。當一個請求進來時,須要從桶中拿到N個令牌,若是可以拿到則繼續後面的處理流程,若是拿不到,則當前線程能夠選擇阻塞等待桶中的令牌數量夠本次請求的數量或者不等待直接返回失敗。segmentfault
Guava RateLimiter是一個谷歌提供的限流工具,RateLimiter基於令牌桶算法,能夠有效限定單個JVM實例上某個接口的流量。緩存
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RateLimiterExample {
public static void main(String[] args) throws InterruptedException {
// qps設置爲5,表明一秒鐘只容許處理五個併發請求
RateLimiter rateLimiter = RateLimiter.create(5);
ExecutorService executorService = Executors.newFixedThreadPool(5);
int nTasks = 10;
CountDownLatch countDownLatch = new CountDownLatch(nTasks);
long start = System.currentTimeMillis();
for (int i = 0; i < nTasks; i++) {
final int j = i;
executorService.submit(() -> {
rateLimiter.acquire(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " gets job " + j + " done");
countDownLatch.countDown();
});
}
executorService.shutdown();
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("10 jobs gets done by 5 threads concurrently in " + (end - start) + " milliseconds");
}
}
複製代碼
輸出結果:bash
pool-1-thread-1 gets job 0 done
pool-1-thread-2 gets job 1 done
pool-1-thread-3 gets job 2 done
pool-1-thread-4 gets job 3 done
pool-1-thread-5 gets job 4 done
pool-1-thread-6 gets job 5 done
pool-1-thread-7 gets job 6 done
pool-1-thread-8 gets job 7 done
pool-1-thread-9 gets job 8 done
pool-1-thread-10 gets job 9 done
10 jobs gets done by 5 threads concurrently in 2805 milliseconds
複製代碼
上面例子中咱們提交10個工做任務,每一個任務大概耗時1000微秒,開啓10個線程,而且使用RateLimiter設置了qps爲5,一秒內只容許五個併發請求被處理,雖然有10個線程,可是咱們設置了qps爲5,一秒以內只能有五個併發請求。咱們預期的總耗時大概是2000微秒左右,結果爲2805和預期的差很少。併發
RateLimiter基於令牌桶算法,它的核心思想主要有:框架
acquire(20)
,則不須要等待20秒鐘,由於令牌桶中已經有10個空閒的令牌。SmoothRateLimiter 類中的 storedPermits 就是用來表示當前令牌桶中的空閒令牌數。RateLimiter主要的類的類圖以下所示:分佈式
RateLimiter 是一個抽象類,SmoothRateLimiter 繼承自 RateLimiter,不過 SmoothRateLimiter 仍然是一個抽象類,SmoothBursty 和 SmoothWarmingUp 纔是具體的實現類。
SmoothRateLimiter 是抽象類,其定義了一些關鍵的參數,咱們先來看一下這些參數:
/**
* The currently stored permits.
*/
double storedPermits;
/**
* The maximum number of stored permits.
*/
double maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
double stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future
複製代碼
storedPermits 代表當前令牌桶中有多少令牌。maxPermits 表示令牌桶最大令牌數目,storedPermits 的取值範圍爲:[0, maxPermits]。stableIntervalMicros 等於 1/qps
,它表明系統在穩按期間,兩次請求之間間隔的微秒數。例如:若是咱們設置的 qps 爲5,則 stableIntervalMicros 爲200ms。nextFreeTicketMicros 表示系統處理完當前請求後,下一次請求被許可的最短微秒數,若是在這以前有請求進來,則必須等待。
當咱們設置了 qps 以後,須要計算某一段時間系統可以生成的令牌數目,那麼怎麼計算呢?一種方式是開啓一個後臺任務去作,可是這樣代價未免有點大。RateLimiter 中採起的是惰性計算方式:在每次請求進來的時候先去計算上次請求和本次請求之間應該生成多少個令牌。
RateLimiter 中提供了建立 SmoothBursty 的方法:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); // maxBurstSeconds 用於計算 maxPermits
rateLimiter.setRate(permitsPerSecond); // 設置生成令牌的速率
return rateLimiter;
}
複製代碼
SmoothBursty 的 maxBurstSeconds 構造函數參數主要用於計算 maxPermits :maxPermits = maxBurstSeconds * permitsPerSecond;
。
咱們再看一下 setRate 的方法,RateLimiter 中 setRate 方法最終後調用 doSetRate 方法,doSetRate 是一個抽象方法,SmoothRateLimiter 抽象類中覆寫了 RateLimiter 的 doSetRate 方法:
// SmoothRateLimiter類中的doSetRate方法,覆寫了 RateLimiter 類中的 doSetRate 方法,此方法再委託下面的 doSetRate 方法作處理。
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
// SmoothBursty 和 SmoothWarmingUp 類中覆寫此方法
abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);
// SmoothBursty 中對 doSetRate的實現
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } 複製代碼
SmoothRateLimiter 類的 doSetRate方法中咱們着重看一下 resync 這個方法:
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
複製代碼
resync 方法就是 RateLimiter 中惰性計算 的實現。每一次請求來的時候,都會調用到這個方法。這個方法的過程大體以下:
1 / QPS
。coolDownIntervalMicros 方法在 SmoothWarmingUp 中的計算方式爲warmupPeriodMicros / maxPermits
,warmupPeriodMicros 是 SmoothWarmingUp 的「預熱」時間。tryAcquire 方法用於嘗試獲取若干個 permit,此方法不會等待,若是獲取失敗則直接返回失敗。canAcquire 方法用於判斷當前的請求可否經過:
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) { // 首先判斷當前超時時間以內請求可否被知足,不能知足的話直接返回失敗
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros); // 計算本次請求須要等待的時間,核心方法
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}
複製代碼
canAcquire 方法邏輯比較簡單,就是看 nextFreeTicketMicros 減去 timeoutMicros 是否小於等於 nowMicros。若是當前需求能被知足,則繼續往下走。
接着會調用 SmoothRateLimiter 類的 reserveEarliestAvailable 方法,該方法返回當前請求須要等待的時間。改方法在 acquire 方法中也會用到,咱們來着重分析這個方法。
// 計算本次請求須要等待的時間
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); // 本次請求和上次請求之間間隔的時間是否應該有新的令牌生成,若是有則更新 storedPermits
long returnValue = nextFreeTicketMicros;
// 本次請求的令牌數 requiredPermits 由兩個部分組成:storedPermits 和 freshPermits,storedPermits 是令牌桶中已有的令牌
// freshPermits 是須要新生成的令牌數
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
// 分別計算從兩個部分拿走的令牌各自須要等待的時間,而後總和做爲本次請求須要等待的時間,SmoothBursty 中從 storedPermits 拿走的部分不須要等待時間
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 更新 nextFreeTicketMicros,這裏更新的實際上是下一次請求的時間,是一種「預消費」
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 更新 storedPermits
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
/**
* Translates a specified portion of our currently stored permits which we want to spend/acquire,
* into a throttling time. Conceptually, this evaluates the integral of the underlying function we
* use, for the range of [(storedPermits - permitsToTake), storedPermits].
*
* <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
*/
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
複製代碼
上面的代碼是 SmoothRateLimiter 中的具體實現。其主要有如下步驟:
acquire 方法沒有等待超時的概念,會一直阻塞直到知足本次請求。
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
abstract long reserveEarliestAvailable(int permits, long nowMicros);
複製代碼
acquire 方法最終仍是經過 reserveEarliestAvailable 方法來計算本次請求須要等待的時間。這個方法上面已經分析過了,這裏就再也不過多闡述。
SmoothWarmingUp 相對 SmoothBursty 來講主要區別在於 storedPermitsToWaitTime 方法。其餘部分原理和 SmoothBursty 相似。
SmoothWarmingUp 是 SmoothRateLimiter 的子類,它相對於 SmoothRateLimiter 多了幾個屬性:
static final class SmoothWarmingUp extends SmoothRateLimiter {
private final long warmupPeriodMicros;
/**
* The slope of the line from the stable interval (when permits == 0), to the cold interval
* (when permits == maxPermits)
*/
private double slope;
private double thresholdPermits;
private double coldFactor;
...
}
複製代碼
這四個參數都是和 SmoothWarmingUp 的「熱身」(warmup)機制相關。warmup 能夠用以下的圖來表示:
* ^ throttling
* |
* cold + /
* interval | /.
* | / .
* | / . ← "warmup period" is the area of the trapezoid between
* | / . thresholdPermits and maxPermits
* | / .
* | / .
* | / .
* stable +----------/ WARM .
* interval | . UP .
* | . PERIOD.
* | . .
* 0 +----------+-------+--------------→ storedPermits
* 0 thresholdPermits maxPermits
複製代碼
上圖中橫座標是當前令牌桶中的令牌 storedPermits,前面說過 SmoothWarmingUp 將 storedPermits 分爲兩個區間:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。縱座標是請求的間隔時間,stableInterval 就是 1 / QPS
,例如設置的 QPS 爲5,則 stableInterval 就是200ms,coldInterval = stableInterval * coldFactor
,這裏的 coldFactor "hard-code"寫死的是3。
當系統進入 cold 階段時,圖像會向右移,直到 storedPermits 等於 maxPermits;當系統請求增多,圖像會像左移動,直到 storedPermits 爲0。
上面"矩形+梯形"圖像的面積就是 waitMicros 也便是本次請求須要等待的時間。計算過程在 SmoothWarmingUp 類的 storedPermitsToWaitTime 方法中覆寫:
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) { // 若是當前 storedPermits 超過 availablePermitsAboveThreshold 則計算從 超過部分拿令牌所須要的時間(圖中的 WARM UP PERIOD)
// WARM UP PERIOD 部分計算的方法,這部分是一個梯形,梯形的面積計算公式是 「(上底 + 下底) * 高 / 2」
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
double length = permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0); // 計算出從 WARM UP PERIOD 拿走令牌的時間
permitsToTake -= permitsAboveThresholdToTake; // 剩餘的令牌從 stable 部分拿
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (stableIntervalMicros * permitsToTake); // stable 部分令牌獲取花費的時間
return micros;
}
// WARM UP PERIOD 部分 獲取相應令牌所對應的的時間
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
複製代碼
SmoothWarmingUp 類中 storedPermitsToWaitTime 方法將 permitsToTake 分爲兩部分,一部分從 WARM UP PERIOD 部分拿,這部分是一個梯形,面積計算就是(上底 + 下底)* 高 / 2。另外一部分從 stable 部分拿,它是一個長方形,面積就是 長 * 寬。最後返回兩個部分的時間總和。