RateLimiter 源碼分析(Guava 和 Sentinel 實現)

做者javadoop,資深Java工程師。本文已獲做者受權發佈。java

原文連接https://www.javadoop.com/post/rate-limiternode

本文主要介紹關於流控的兩部份內容。算法

第一部分介紹 Guava 中 RateLimiter 的源碼,包括它的兩種模式,目前網上大部分文章只分析簡單的 SmoothBursty 模式,而沒有分析帶有預熱的 SmoothWarmingUp。數據庫

第二部分介紹 Sentinel 中流控的實現,本文不要求讀者瞭解 Sentinel,這部份內容和 Sentinel 耦合很低,因此讀者不須要有閱讀壓力。api

Sentinel 中流控設計是參考 Guava RateLimiter 的,因此閱讀第二部份內容,須要有第一部份內容的背景。緩存

Guava RateLimiter

RateLimiter 基於漏桶算法,但它參考了令牌桶算法,這裏不討論流控算法,請自行查找資料。markdown

RateLimiter 使用介紹

RateLimiter 的接口很是簡單,它有兩個靜態方法用來實例化,實例化之後,咱們只須要關心 acquire 就好了,甚至都沒有 release 操做。多線程

// RateLimiter 接口列表:併發

// 實例化的兩種方式:
public static RateLimiter create(double permitsPerSecond){}
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit) {}

public double acquire() {}
public double acquire(int permits) {}

public boolean tryAcquire() {}
public boolean tryAcquire(int permits) {}
public boolean tryAcquire(long timeout, TimeUnit unit) {}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}

public final double getRate() {}
public final void setRate(double permitsPerSecond) {}複製代碼

RateLimiter 的做用是用來限流的,咱們知道 java 併發包中提供了 Semaphore,它也可以提供對資源使用進行控制,咱們看一下下面的代碼:less

// Semaphore
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < 100; i++) {
    executor.submit(new Runnable() {
        @Override
        public void run() {
            semaphore.acquireUninterruptibly(1);
            try {
                doSomething();
            } finally {
                semaphore.release();
            }
        }
    });
}
複製代碼

Semaphore 用來控制同時訪問某個資源的併發數量,如上面的代碼,咱們設置 100 個線程工做,可是咱們能作到最多隻有 10 個線程能同時到 doSomething() 方法中。它控制的是併發數量。

而 RateLimiter 是用來控制訪問資源的速率(rate)的,它強調的是控制速率。好比控制每秒只能有 100 個請求經過,好比容許每秒發送 1MB 的數據。

它的構造方法指定一個 permitsPerSecond 參數,表明每秒鐘產生多少個 permits,這就是咱們的速率。

RateLimiter 容許預佔將來的令牌,好比,每秒產生 5 個 permits,咱們能夠單次請求 100 個,這樣,緊接着的下一個請求須要等待大概 20 秒才能獲取到 permits。

SmoothRateLimiter 介紹

RateLimiter 目前只有一個子類,那就是抽象類 SmoothRateLimiter,SmoothRateLimiter 有兩個實現類,也就是咱們這邊要介紹的兩種模式,咱們先簡單介紹下 SmoothRateLimiter,而後後面分兩個小節分別介紹它的兩個實現類。

RateLimiter 做爲抽象類,只有兩個屬性:

private final SleepingStopwatch stopwatch;

private volatile Object mutexDoNotUseDirectly;複製代碼

stopwatch 很是重要,它用來「計時」,RateLimiter 把實例化的時間設置爲 0 值,後續都是取相對時間,用微秒錶示。

mutexDoNotUseDirectly 用來作鎖,RateLimiter 依賴於 synchronized 來控制併發,因此咱們以後能夠看到,各個屬性甚至都沒有用 volatile 修飾。

而後咱們來看 SmoothRateLimiter 的屬性,分別表明什麼意思。

// 當前還有多少 permits 沒有被使用,被存下來的 permits 數量
double storedPermits;

// 最大容許緩存的 permits 數量,也就是 storedPermits 能達到的最大值
double maxPermits;

// 每隔多少時間產生一個 permit,
// 好比咱們構造方法中設置每秒 5 個,也就是每隔 200ms 一個,這裏單位是微秒,也就是 200,000
double stableIntervalMicros;

// 下一次能夠獲取 permits 的時間,這個時間是相對 RateLimiter 的構造時間的,是一個相對時間,理解爲時間戳吧
private long nextFreeTicketMicros = 0L; 複製代碼

其實,看到這幾個屬性,咱們就能夠大體猜一下它的內部實現了:

nextFreeTicketMicros 是一個很關鍵的屬性。咱們每次獲取 permits 的時候,先拿 storedPermits 的值,若是夠,storedPermits 減去相應的值就能夠了,若是不夠,那麼還須要將 nextFreeTicketMicros 往前推,表示我預佔了接下來多少時間的量了。那麼下一個請求來的時候,若是還沒到 nextFreeTicketMicros 這個時間點,須要 sleep 到這個點再返回,固然也要將這個值再往前推。

你們在這裏可能會有疑惑,由於時間是一直往前走的,因此 storedPermits 的信息多是不許確的,不過,只須要在關鍵的操做中同步一下,從新計算就行了。

SmoothBursty 分析

咱們先從比較簡單的 SmoothBursty 出發,來分析 RateLimiter 的源碼,以後咱們再分析 SmoothWarmingUp。

Bursty 是突發的意思,它說的不是下面這個意思:咱們設置了 1k 每秒,而咱們能夠一次性獲取 5k 的 permits,這個場景表達的不是突發,而是在說預先佔有了接下來幾秒產生的 permits。

突發說的是,RateLimiter 會緩存必定數量的 permits 在池中,這樣對於突發請求,能及時獲得知足。想象一下咱們的某個接口,好久沒有請求過來,忽然同時來了好幾個請求,若是咱們沒有緩存一些 permits 的話,不少線程就須要等待了。

SmoothBursty 默認緩存最多 1 秒鐘的 permits,不能夠修改。

RateLimiter 的靜態構造方法:

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}複製代碼

構造參數 permitsPerSecond 指定每秒鐘能夠產生多少個 permits。

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}複製代碼

咱們看到,這裏實例化的是 SmoothBursty 的實例,它的構造方法很簡單,並且它只有一個屬性 maxBurstSeconds,這裏就不貼代碼了。

構造函數指定了 maxBurstSeconds 爲 1.0,也就是說,最多會緩存 1 秒鐘,也就是 (1.0 * permitsPerSecond) 這麼多個 permits 到池中。

這個 1.0 秒,關係到 storedPermits 和 maxPermits:

0 <= storedPermits <= maxPermits = permitsPerSecond

咱們繼續日後看 setRate 方法:

public final void setRate(double permitsPerSecond) {
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  synchronized (mutex()) {
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}複製代碼

setRate 這個方法是一個 public 方法,它能夠用來調整速率。咱們這邊繼續跟的是初始化過程,可是你們提早知道這個方法是用來調整速率用的,對理解源碼有很大的幫助。注意看,這裏用了 synchronized 控制併發。

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 同步
    resync(nowMicros);
    // 計算屬性 stableIntervalMicros
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}複製代碼

resync 方法很簡單,它用來調整 storedPermits 和 nextFreeTicketMicros。這就是咱們說的,在關鍵的節點,須要先更新一下 storedPermits 到正確的值。

void resync(long nowMicros) {
  // 若是 nextFreeTicket 已通過掉了,想象一下很長時間都沒有再次調用 limiter.acquire() 的場景
  // 須要將 nextFreeTicket 設置爲當前時間,從新計算 storedPermits
  if (nowMicros > nextFreeTicketMicros) {
    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
    storedPermits = min(maxPermits, storedPermits + newPermits);
    nextFreeTicketMicros = nowMicros;
  }
}複製代碼

coolDownIntervalMicros() 這個方法你們先不用關注,能夠看到,在 SmoothBursty 類中的實現是直接返回了 stableIntervalMicros 的值,也就是咱們說的,每產生一個 permit 的時間長度。

固然了,細心的讀者,可能會發現,此時的 stableIntervalMicros 其實沒有設置,也就是說,上面發生了一次除以 0 值的操做,獲得的 newPermits 實際上是一個無窮大。而 maxPermits 此時仍是 0 值,不過這裏其實沒有關係。

咱們回到前面一個方法,resync 同步之後,會設置 stableIntervalMicros 爲一個正確的值,而後進入下面的方法:

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  // 這裏計算了,maxPermits 爲 1 秒產生的 permits
  maxPermits = maxBurstSeconds * permitsPerSecond;
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    // 由於 storedPermits 的值域變化了,須要等比例縮放
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}複製代碼

上面這個方法,咱們要這麼看,原來的 RateLimiter 是用某個 permitsPerSecond 值初始化的,如今咱們要調整這個頻率。對於 maxPermits 來講,是從新計算,而對於 storedPermits 來講,是作等比例的縮放。

到此,構造方法就完成了,咱們獲得了一個 RateLimiter 的實現類 SmoothBursty 的實例,可能上面的源碼你仍是會有一些疑惑,不過也不要緊,繼續往下看,可能你的不少疑惑就解開了。

接下來,咱們來分析 acquire 方法:

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
  // 預定,若是當前不能直接獲取到 permits,須要等待
  // 返回值表明須要 sleep 多久
  long microsToWait = reserve(permits);
  // sleep
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  // 返回 sleep 的時長
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}複製代碼

咱們來看 reserve 方法:

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
  // 返回 nextFreeTicketMicros
  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  // 計算時長
  return max(momentAvailable - nowMicros, 0);
}複製代碼

繼續往裏看:

@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  // 這裏作一次同步,更新 storedPermits 和 nextFreeTicketMicros (若是須要)
  resync(nowMicros);
  // 返回值就是 nextFreeTicketMicros,注意剛剛已經作了 resync 了,此時它是最新的正確的值
  long returnValue = nextFreeTicketMicros;
  // storedPermits 中可使用多少個 permits
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  // storedPermits 中不夠的部分
  double freshPermits = requiredPermits - storedPermitsToSpend;
  // 爲了這個不夠的部分,須要等待多久時間
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 這部分固定返回 0
          + (long) (freshPermits * stableIntervalMicros);
  // 將 nextFreeTicketMicros 往前推
  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  // storedPermits 減去被拿走的部分
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
複製代碼

咱們能夠看到,獲取 permits 的時候,實際上是獲取了兩部分,一部分來自於存量 storedPermits,存量不夠的話,另外一部分來自於預佔將來的 freshPermits。

這裏提一個關鍵點吧,咱們看到,返回值是 nextFreeTicketMicros 的舊值,由於只要到這個時間點,就說明當次 acquire 能夠成功返回了,而無論 storedPermits 夠不夠。若是 storedPermits 不夠,會將 nextFreeTicketMicros 往前推必定的時間,預佔了必定的量。

到這裏,acquire 方法就分析完了,你們看到這裏,逆着往前看就是了。應該說,SmoothBursty 的源碼仍是很是簡單的。

SmoothWarmingUp 分析

分析完了 SmoothBursty,咱們再來分析 SmoothWarmingUp 會簡單一些。咱們說過,SmoothBursty 能夠處理突發請求,由於它會緩存最多 1 秒的 permits,而待會咱們會看到 SmoothWarmingUp 徹底不一樣的設計。

SmoothWarmingUp 適用於資源須要預熱的場景,好比咱們的某個接口業務,須要使用到數據庫鏈接,因爲鏈接須要預熱才能進入到最佳狀態,若是咱們的系統長時間處於低負載或零負載狀態(固然,應用剛啓動也是同樣的),鏈接池中的鏈接慢慢釋放掉了,此時咱們認爲鏈接池是冷的。

假設咱們的業務在穩定狀態下,正常能夠提供最大 1000 QPS 的訪問,可是若是鏈接池是冷的,咱們就不能讓 1000 個請求同時進來,由於這會拖垮咱們的系統,咱們應該有個預熱升溫的過程。

對應到 SmoothWarmingUp 中,若是系統處於低負載狀態,storedPermits 會一直增長,當請求來的時候,咱們要從 storedPermits 中取 permits,最關鍵的點在於,從 storedPermits 中取 permits 的操做是比較耗時的,由於沒有預熱。

回顧一下前面介紹的 SmoothBursty,它從 storedPermits 中獲取 permits 是不須要等待時間的,而這邊洽洽相反,從 storedPermits 獲取須要更多的時間,這是最大的不一樣,先理解這一點,能幫助你更好地理解源碼。

你們先有一些粗的概念,而後咱們來看下面這個圖:

這個圖不容易看懂,X 軸表明 storedPermits 的數量,Y 軸表明獲取一個 permits 須要的時間。

假設指定 permitsPerSecond 爲 10,那麼 stableInterval 爲 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是寫死的,用戶不能修改)。也就是說,當達到 maxPermits 時,此時處於系統最冷的時候,獲取一個 permit 須要 300ms,而若是 storedPermits 小於 thresholdPermits 的時候,只須要 100ms。

想象有一條垂直線 x=k,它與 X 軸的交點 k 表明當前 storedPermits 的數量:

  • 當系統在很是繁忙的時候,這條線停留在 x=0 處,此時 storedPermits 爲 0
  • 當 limiter 沒有被使用的時候,這條線慢慢往右移動,直到 x=maxPermits 處;
  • 若是 limiter 被從新使用,那麼這條線又慢慢往左移動,直到 x=0 處;
    當 storedPermits 處於 maxPermits 狀態時,咱們認爲 limiter 中的 permits 是冷的,此時獲取一個 permit 須要較多的時間,由於須要預熱,有一個關鍵的分界點是 thresholdPermits。

預熱時間是咱們在構造的時候指定的,圖中梯形的面積就是預熱時間,由於預熱完成後,咱們能進入到一個穩定的速率中(stableInterval),下面咱們來計算出 thresholdPermits 和 maxPermits 的值。

有一個關鍵點,從 thresholdPermits 到 0 的時間,是從 maxPermits 到 thresholdPermits 時間的一半,也就是梯形的面積是長方形面積的 2 倍,梯形的面積是 warmupPeriod。

之因此長方形的面積是 warmupPeriod/2,是由於 coldFactor 是硬編碼的 3。

梯形面積爲 warmupPeriod,即:

warmupPeriod = 2 * stableInterval * thresholdPermits複製代碼

由此,咱們得出 thresholdPermits 的值:

thresholdPermits = 0.5 * warmupPeriod / stableInterval複製代碼

而後咱們根據梯形面積的計算公式:

warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)複製代碼

得出 maxPermits 爲:

maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)複製代碼

這樣,咱們就獲得了 thresholdPermits 和 maxPermits 的值。

接下來,咱們來看一下冷卻時間間隔,它指的是 storedPermits 中每一個 permit 的增加速度,也就是咱們前面說的 x=k 這條垂直線往右的移動速度,爲了達到從 0 到 maxPermits 花費 warmupPeriodMicros 的時間,咱們將其定義爲:

@Override
double coolDownIntervalMicros() {
    return warmupPeriodMicros / maxPermits;
}
貼一下代碼,你們就知道了,在 resync 中用到的這個:

void resync(long nowMicros) {
  if (nowMicros > nextFreeTicketMicros) {
    // coolDownIntervalMicros 在這裏使用
    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
    storedPermits = min(maxPermits, storedPermits + newPermits);
    nextFreeTicketMicros = nowMicros;
  }
}
複製代碼

基於上面的分析,咱們來看 SmoothWarmingUp 的其餘源碼。

首先,咱們來看它的 doSetRate 方法,有了前面的介紹,這個方法的源碼很是簡單:

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = maxPermits;
    // coldFactor 是固定的 3
    double coldIntervalMicros = stableIntervalMicros * coldFactor;
    // 這個公式咱們上面已經說了
    thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
    // 這個公式咱們上面也已經說了
    maxPermits =
        thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
    // 計算那條斜線的斜率。數學知識,對邊 / 臨邊
    slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
    } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
    }
}複製代碼

setRate 方法很是簡單,接下來,咱們要分析的是 storedPermitsToWaitTime 方法,咱們回顧一下下面的代碼:

這段代碼是 acquire 方法的核心,waitMicros 由兩部分組成,一部分是從 storedPermits 中獲取花費的時間,一部分是等待 freshPermits 產生花費的時間。在 SmoothBursty 的實現中,從 storedPermits 中獲取 permits 直接返回 0,不須要等待。

而在 SmoothWarmingUp 的實現中,因爲須要預熱,因此從 storedPermits 中取 permits 須要花費必定的時間,其實就是要計算下圖中,陰影部分的面積。

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;
  // 若是右邊梯形部分有 permits,那麼先從右邊部分獲取permits,計算梯形部分的陰影部分的面積
  if (availablePermitsAboveThreshold > 0.0) {
    // 從右邊部分獲取的 permits 數量
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    // 梯形面積公式:(上底+下底)*高/2
    double length =
        permitsToTime(availablePermitsAboveThreshold)
            + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
    micros = (long) (permitsAboveThresholdToTake * length / 2.0);
    permitsToTake -= permitsAboveThresholdToTake;
  }
  // 加上 長方形部分的陰影面積
  micros += (long) (stableIntervalMicros * permitsToTake);
  return micros;
}

// 對於給定的 x 值,計算 y 值
private double permitsToTime(double permits) {
  return stableIntervalMicros + permits * slope;
}
複製代碼

到這裏,SmoothWarmingUp 也已經說完了。

若是你們對於 Guava RateLimiter 還有什麼疑惑,歡迎在留言區留言,對於 Sentinel 中的流控不感興趣的讀者,看到這裏就能夠結束了。

Sentinel 中的流控

Sentinel 是阿里開源的流控、熔斷工具,這裏不作過多的介紹,感興趣的讀者請自行了解。

在 Sentinel 的流控中,咱們能夠配置流控規則,主要是控制 QPS 和線程數,這裏咱們不討論控制線程數,控制線程數的代碼不在咱們這裏的討論範圍內,下面的介紹都是指控制 QPS。

RateLimiterController

RateLimiterController 很是簡單,它經過使用 latestPassedTime 屬性來記錄最後一次經過的時間,而後根據規則中 QPS 的限制,計算當前請求是否能夠經過。

舉個很是簡單的例子:設置 QPS 爲 10,那麼每 100 毫秒容許經過一個,經過計算當前時間是否已通過了上一個請求的經過時間 latestPassedTime 以後的 100 毫秒,來判斷是否能夠經過。假設才過了 50ms,那麼須要當前線程再 sleep 50ms,而後才能夠經過。若是同時有另外一個請求呢?那須要 sleep 150ms 才行。

public class RateLimiterController implements TrafficShapingController {

    // 排隊最大時長,默認 500ms
    private final int maxQueueingTimeMs;
    // QPS 設置的值
    private final double count;
        // 上一次請求經過的時間
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public RateLimiterController(int timeOut, double count) {
        this.maxQueueingTimeMs = timeOut;
        this.count = count;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    // 一般 acquireCount 爲 1,這裏不用關心參數 prioritized
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {
            return true;
        }
        // 
        if (count <= 0) {
            return false;
        }

        long currentTime = TimeUtil.currentTimeMillis();
        // 計算每 2 個請求之間的間隔,好比 QPS 限制爲 10,那麼間隔就是 100ms
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // Expected pass time of this request.
        long expectedTime = costTime + latestPassedTime.get();

        // 能夠經過,設置 latestPassedTime 而後就返回 true 了
        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 不能夠經過,須要等待
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            // 等待時長大於最大值,返回 false
            if (waitTime > maxQueueingTimeMs) {
                return false;
            } else {
                // 將 latestPassedTime 往前推
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    // 須要 sleep 的時間
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

}
複製代碼

這個策略仍是很是好理解的,簡單粗暴,快速失敗。

WarmUpController

WarmUpController 用來防止突發流量迅速上升,致使系統負載嚴重太高,原本系統在穩定狀態下能處理的,可是因爲許多資源沒有預熱,致使這個時候處理不了了。好比,數據庫須要創建鏈接、須要鏈接到遠程服務等,這就是爲何咱們須要預熱。

囉嗦一句,這裏不只僅指系統剛剛啓動須要預熱,對於長時間處於低負載的系統,突發流量也須要從新預熱。

Guava 的 SmoothWarmingUp 是用來控制獲取令牌的速率的,和這裏的控制 QPS 仍是有一點區別,可是中心思想是同樣的。咱們在看完源碼之後再討論它們的區別。

爲了幫助你們理解源碼,咱們這邊先設定一個場景:QPS 設置爲 100,預熱時間設置爲 10 秒。代碼中使用 「【】」 表明根據這個場景計算出來的值。

public class WarmUpController implements TrafficShapingController {

    // 閾值
    protected double count;
    // 3
    private int coldFactor;
    // 轉折點的令牌數,和 Guava 的 thresholdPermits 一個意思
    // [500]
    protected int warningToken = 0;
    // 最大的令牌數,和 Guava 的 maxPermits 一個意思
    // [1000]
    private int maxToken;
    // 斜線斜率
    // [1/25000]
    protected double slope;

    // 累積的令牌數,和 Guava 的 storedPermits 一個意思
    protected AtomicLong storedTokens = new AtomicLong(0);
    // 最後更新令牌的時間
    protected AtomicLong lastFilledTime = new AtomicLong(0);

    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }

    public WarmUpController(double count, int warmUpPeriodInSec) {
        construct(count, warmUpPeriodInSec, 3);
    }

    // 下面的構造方法,和 Guava 中是差很少的,只不過 thresholdPermits 和 maxPermits 都換了個名字
    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }

        this.count = count;

        this.coldFactor = coldFactor;

        // warningToken 和 thresholdPermits 是同樣的意思,計算結果實際上是同樣的
        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // 【warningToken = (10*100)/(3-1) = 500】
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);

        // maxToken 和 maxPermits 是同樣的意思,計算結果實際上是同樣的
        // maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)
        // 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

        // 斜率計算
        // slope
        // slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermits-thresholdPermits);
        // 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {

        // Sentinel 的 QPS 統計使用的是滑動窗口

        // 當前時間窗口的 QPS 
        long passQps = (long) node.passQps();

        // 這裏是上一個時間窗口的 QPS,這裏的一個窗口跨度是1分鐘
        long previousQps = (long) node.previousPassQps();

        // 同步。設置 storedTokens 和 lastFilledTime 到正確的值
        syncToken(previousQps);

        long restToken = storedTokens.get();
        // 令牌數超過 warningToken,進入梯形區域
        if (restToken >= warningToken) {

            // 這裏簡單說一句,由於當前的令牌數超過了 warningToken 這個閾值,系統處於須要預熱的階段
            // 經過計算當前獲取一個令牌所需時間,計算其倒數便是當前系統的最大 QPS 容量

            long aboveToken = restToken - warningToken;

            // 這裏計算警惕 QPS 值,就是當前狀態下能達到的最高 QPS。
            // (aboveToken * slope + 1.0 / count) 其實就是在當前狀態下獲取一個令牌所須要的時間
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            // 若是不會超過,那麼經過,不然不經過
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            // count 是最高能達到的 QPS
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }

    protected void syncToken(long passQps) {
        // 下面幾行代碼,說明在第一次進入新的 1 秒鐘的時候,作同步
        // 題外話:Sentinel 默認地,1 秒鐘分爲 2 個時間窗口,分別 500ms
        long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {
            return;
        }

        // 令牌數量的舊值
        long oldValue = storedTokens.get();
        // 計算新的令牌數量,往下看
        long newValue = coolDownTokens(currentTime, passQps);

        if (storedTokens.compareAndSet(oldValue, newValue)) {
            // 令牌數量上,減去上一分鐘的 QPS,而後設置新值
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }

    // 更新令牌數
    private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 當前令牌數小於 warningToken,添加令牌
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {
            // 當前令牌數量處於梯形階段,
            // 若是當前經過的 QPS 大於 count/coldFactor,說明系統消耗令牌的速度,大於冷卻速度
            //    那麼不須要添加令牌,不然須要添加令牌
            if (passQps < (int)count / coldFactor) {
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

}
複製代碼

coolDownTokens 這個方法用來計算新的 token 數量,其實我也沒有徹底理解做者的設計:

  • 第1、對於令牌的增長,在 Guava 中,使用 warmupPeriodMicros / maxPermits 做爲增加率,由於它實現的是 storedPermits 從 0 到 maxPermits 花費的時間爲 warmupPeriod。而這裏是以每秒 count 個做爲增加率,爲何?
  • 第2、else if 分支中的決定我沒有理解,爲何用 passQps 和 count / coldFactor 進行對比來決定是否繼續添加令牌?
  • 我本身的理解是,count/coldFactor 就是指冷卻速度,那麼就是說得通的。歡迎你們一塊兒探討。

最後,咱們再簡單說說 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的區別。

Guava 在於控制獲取令牌的速率,它關心的是,獲取 permits 須要多少時間,包括從 storedPermits 中獲取,以及獲取 freshPermits,以此推動 nextFreeTicketMicros 到將來的某個時間點。

而 Sentinel 在於控制 QPS,它用令牌數來標識當前系統處於什麼狀態,根據時間推動一直增長令牌,根據經過的 QPS 一直減小令牌。若是 QPS 持續降低,根據推演,能夠發現 storedTokens 愈來愈多,而後越過 warningTokens 這個閾值,以後只有當 QPS 降低到 count/3 之後,令牌纔會繼續往上增加,一直到 maxTokens。

storedTokens 是以 「count 每秒」的增加率增加的,減小是以 前一分鐘的 QPS 來減小的。其實這裏我也有個疑問,爲何增長令牌的時候考慮了時間,而減小的時候卻不考慮時間因素,提了 issue,彷佛沒人搭理。

WarmUpRateLimiterController

注意,這個類繼承自剛剛介紹的 WarmUpController,它的流控效果定義爲排隊等待。它的代碼其實就是前面介紹的 RateLimiterController 加上 WarmUpController。

public class WarmUpRateLimiterController extends WarmUpController {

    private final int timeoutInMs;
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
        super(count, warmUpPeriodSec, coldFactor);
        this.timeoutInMs = timeOutMs;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        long currentTime = TimeUtil.currentTimeMillis();

        long restToken = storedTokens.get();
        long costTime = 0;
        long expectedTime = 0;

        // 和 RateLimiterController 比較,區別主要就是這塊代碼

        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;

            // current interval = restToken*slope+1/count
            double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
        } else {
            costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        }
        expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            latestPassedTime.set(currentTime);
            return true;
        } else {
            long waitTime = costTime + latestPassedTime.get() - currentTime;
            if (waitTime > timeoutInMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > timeoutInMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }
}複製代碼

這個代碼很簡單,就是 RateLimiter 中的代碼,而後加入了預熱的內容。

在 RateLimiter 中,單個請求的 costTime 是固定的,就是 1/count,好比設置 100 qps,那麼 costTime 就是 10ms。

可是這邊,加入了 WarmUp 的內容,就是說,經過令牌數量,來判斷當前系統的 QPS 應該是多少,若是當前令牌數超過 warningTokens,那麼系統的 QPS 容量已經低於咱們預設的 QPS,相應的,costTime 就會延長。

小結

有段時間沒寫文章了,寫得很差之處,歡迎指正。

關注做者公衆號:

相關文章
相關標籤/搜索