Guava RateLimiter源碼解析

前言

在開發高併發系統時有三把利器用來保護系統:緩存、降級和限流java

  • 緩存 緩存的目的是提高系統訪問速度和增大系統處理容量
  • 降級 降級是當服務出現問題或者影響到核心流程時,須要暫時屏蔽掉,待高峯或者問題解決後再打開
  • 限流 限流的目的是經過對併發訪問/請求進行限速,或者對一個時間窗口內的請求進行限速來保護系統,一旦達到限制速率則能夠拒絕服務、排隊或等待、降級等處理

經常使用限流算法

經常使用的限流算法有兩種:漏桶算法和令牌桶算法算法

漏桶算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水,當水流入速度過大會直接溢出,能夠看出漏桶算法能強行限制數據的傳輸速率。

對於不少應用場景來講,除了要求可以限制數據的平均傳輸速率外,還要求容許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更爲適合。緩存

令牌桶算法的原理是系統會以一個恆定的速度往桶裏放入令牌,而若是請求須要被處理,則須要先從桶裏獲取一個令牌,當桶裏沒有令牌可取時,則拒絕服務。

更多關於漏桶算法和令牌桶算法的介紹能夠參考 http://blog.csdn.net/charlesl...併發


信號量

操做系統的信號量是個很重要的概念,Java 併發庫 的Semaphore 能夠很輕鬆完成信號量控制,Semaphore能夠控制某個資源可被同時訪問的個數,經過 acquire() 獲取一個許可,若是沒有就等待,而 release() 釋放一個許可。ide

信號量的本質是控制某個資源可被同時訪問的個數,在必定程度上能夠控制某資源的訪問頻率,但不能精確控制。函數

@Test
fun semaphoreTest() {
    val semaphore = Semaphore(2)

    (1..10).map {
        thread(true) {
            semaphore.acquire()

            println("$it\t${Date()}")
            Thread.sleep(1000)

            semaphore.release()
        }
    }.forEach { it.join() }
}

以上示例,建立信號量,指定併發數爲2,其輸出以下高併發

1   Wed Jan 17 10:31:49 CST 2018
2   Wed Jan 17 10:31:49 CST 2018
3   Wed Jan 17 10:31:50 CST 2018
4   Wed Jan 17 10:31:50 CST 2018
5   Wed Jan 17 10:31:51 CST 2018
6   Wed Jan 17 10:31:51 CST 2018
7   Wed Jan 17 10:31:52 CST 2018
8   Wed Jan 17 10:31:52 CST 2018
9   Wed Jan 17 10:31:53 CST 2018
10  Wed Jan 17 10:31:53 CST 2018

能夠很清楚的看到,同一時刻最多隻能有2個線程進行輸出。
雖然信號量能夠在必定程度上控制資源的訪問頻率,卻不能精確控制。工具


RateLimiter

Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶算法實現流量限制,使用十分方便。ui

@Test
fun rateLimiterTest() {
    val rateLimiter = RateLimiter.create(0.5)

    arrayOf(1,6,2).forEach {
        println("${System.currentTimeMillis()} acq $it:\twait ${rateLimiter.acquire(it)}s")
    }
}

以上示例,建立一個RateLimiter,指定每秒放0.5個令牌(2秒放1個令牌),其輸出見下this

1516166482561 acq 1: wait 0.0s
1516166482563 acq 6: wait 1.997664s
1516166484569 acq 2: wait 11.991958s

從輸出結果能夠看出,RateLimiter具備預消費的能力:
acq 1時並無任何等待直接預消費了1個令牌
acq 6時,因爲以前預消費了1個令牌,故而等待了2秒,以後又預消費了6個令牌
acq 2時同理,因爲以前預消費了6個令牌,故而等待了12秒

從另外一方面講,RateLimiter經過限制後面請求的等待時間,來支持必定程度的突發請求(預消費)。
可是某些狀況下並不須要這種突發請求處理能力,如某IM廠商提供消息推送接口,但推送接口有嚴格的頻率限制(600次/30秒),在調用該IM廠商推送接口時便不能預消費,不然,則可能出現推送頻率超出限制而失敗。該狀況的處理會在其餘博文中介紹。

源碼解讀

Guava有兩種限流模式,一種爲穩定模式(SmoothBursty:令牌生成速度恆定),一種爲漸進模式(SmoothWarmingUp:令牌生成速度緩慢提高直到維持在一個穩定值)
兩種模式實現思路相似,主要區別在等待時間的計算上,本篇重點介紹SmoothBursty

在調用create接口時,實際實例化的爲SmoothBursty

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

在解析SmoothBursty原理前,重點解釋下SmoothBursty中幾個屬性的含義

/**
 * The currently stored permits.
 * 當前存儲令牌數
 */
double storedPermits;

/**
 * The maximum number of stored permits.
 * 最大存儲令牌數
 */
double maxPermits;

/**
 * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
 * per second has a stable interval of 200ms.
 * 添加令牌時間間隔
 */
double stableIntervalMicros;

/**
 * The time when the next request (no matter its size) will be granted. After granting a request,
 * this is pushed further in the future. Large requests push this further than small requests.
 * 下一次請求能夠獲取令牌的起始時間
 * 因爲RateLimiter容許預消費,上次請求預消費令牌後
 * 下次請求須要等待相應的時間到nextFreeTicketMicros時刻才能夠獲取令牌
 */
private long nextFreeTicketMicros = 0L; // could be either in the past or future

接下來介紹幾個關鍵函數

/**
 * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
 */
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}

根據令牌桶算法,桶中的令牌是持續生成存放的,有請求時須要先從桶中拿到令牌才能開始執行,誰來持續生成令牌存放呢?

一種解法是,開啓一個定時任務,由定時任務持續生成令牌。這樣的問題在於會極大的消耗系統資源,如,某接口須要分別對每一個用戶作訪問頻率限制,假設系統中存在6W用戶,則至多須要開啓6W個定時任務來維持每一個桶中的令牌數,這樣的開銷是巨大的。

另外一種解法則是延遲計算,如上resync函數。該函數會在每次獲取令牌以前調用,其實現思路爲,若當前時間晚於nextFreeTicketMicros,則計算該段時間內能夠生成多少令牌,將生成的令牌加入令牌桶中並更新數據。這樣一來,只須要在獲取令牌時計算一次便可。


final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros; // 返回的是上次計算的nextFreeTicketMicros
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 能夠消費的令牌數
  double freshPermits = requiredPermits - storedPermitsToSpend; // 還須要的令牌數
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros); // 根據freshPermits計算須要等待的時間

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次計算的nextFreeTicketMicros不返回
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}

該函數用於獲取requiredPermits個令牌,並返回須要等待到的時間點
其中,storedPermitsToSpend爲桶中能夠消費的令牌數,freshPermits爲還須要的(須要補充的)令牌數,根據該值計算須要等待的時間,追加並更新到nextFreeTicketMicros

須要注意的是,該函數的返回是更新前的(上次請求計算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗來說,本次請求須要爲上次請求的預消費行爲埋單,這也是RateLimiter能夠預消費(處理突發)的原理所在。若須要禁止預消費,則修改此處返回更新後的nextFreeTicketMicros值。


回頭來看SmoothBursty的構造函數

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
  super(stopwatch);
  this.maxBurstSeconds = maxBurstSeconds; // 最大存儲maxBurstSeconds秒生成的令牌
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond; // 計算最大存儲令牌數
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}

桶中可存放的最大令牌數由maxBurstSeconds計算而來,其含義爲最大存儲maxBurstSeconds秒生成的令牌。
該參數的做用在於,能夠更爲靈活地控制流量。如,某些接口限制爲300次/20秒,某些接口限制爲50次/45秒等。


在瞭解以上概念後,就很是容易理解RateLimiter暴露出來的接口

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

acquire函數主要用於獲取permits個令牌,並計算須要等待多長時間,進而掛起等待,並將該值返回


public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}

public boolean tryAcquire() {
  return tryAcquire(1, 0, MICROSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}

tryAcquire函數能夠嘗試在timeout時間內獲取令牌,若是能夠則掛起等待相應時間並返回true,不然當即返回false
canAcquire用於判斷timeout時間內是否能夠獲取令牌

至此,Guava RateLimiter的原理及用法介紹完畢,對SmoothWarmingUp感興趣的童鞋能夠自行查閱文檔或源碼。


訂閱號

相關文章
相關標籤/搜索