Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶算法實現流量限制,使用十分方便,並且十分高效。算法
public void testAcquire() { RateLimiter limiter = RateLimiter.create(1); for(int i = 1; i < 10; i = i + 2 ) { double waitTime = limiter.acquire(i); System.out.println("cutTime=" + System.currentTimeMillis() + " acq:" + i + " waitTime:" + waitTime); } }
結果:緩存
cutTime=1535439657427 acq:1 waitTime:0.0 cutTime=1535439658431 acq:3 waitTime:0.997045 cutTime=1535439661429 acq:5 waitTime:2.993028 cutTime=1535439666426 acq:7 waitTime:4.995625 cutTime=1535439673426 acq:9 waitTime:6.999223
首先經過RateLimiter.create(1);
建立一個限流器,參數表明每秒生成的令牌數,經過limiter.acquire(i);
來以阻塞的方式獲取令牌,固然也能夠經過tryAcquire(int permits, long timeout, TimeUnit unit)
來設置等待超時時間的方式獲取令牌,若是超timeout爲0,則表明非阻塞,獲取不到當即返回。服務器
從輸出來看,RateLimiter支持預消費,好比在acquire(5)時,等待時間是3秒,是上一個獲取令牌時預消費了3個兩排,固須要等待3*1秒,而後又預消費了5個令牌,以此類推網絡
RateLimiter經過限制後面請求的等待時間,來支持必定程度的突發請求(預消費)less
源碼註釋中的一個例子,好比咱們有不少任務須要執行,可是咱們不但願每秒超過兩個任務執行,那麼咱們就可使用RateLimiter:ide
final RateLimiter rateLimiter = RateLimiter.create(2.0); void submitTasks(List<Runnable> tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // may wait executor.execute(task); } }
另一個例子,假如咱們會產生一個數據流,而後咱們想以每秒5kb的速度發送出去.咱們能夠每獲取一個令牌(permit)就發送一個byte的數據,這樣咱們就能夠經過一個每秒5000個令牌的RateLimiter來實現:函數
final RateLimiter rateLimiter = RateLimiter.create(5000.0); void submitPacket(byte[] packet) { rateLimiter.acquire(packet.length); networkService.send(packet); }
另外,咱們也可使用非阻塞的形式達到降級運行的目的,即便用非阻塞的tryAcquire()方法:工具
if(limiter.tryAcquire()) { //未請求到limiter則當即返回false doSomething(); }else{ doSomethingElse(); }
設計思路:ui
考慮一下RateLimiter是如何設計的,而且爲何要這樣設計.this
RateLimiter的主要功能就是提供一個穩定的速率,實現方式就是經過限制請求流入的速度,好比計算請求等待合適的時間閾值.
實現QPS速率的最簡單的方式就是記住上一次請求的最後受權時間,而後保證1/QPS秒內不容許請求進入.好比QPS=5,若是咱們保證最後一個被受權請求以後的200ms的時間內沒有請求被受權,那麼咱們就達到了預期的速率.若是一個請求如今過來可是最後一個被受權請求是在100ms以前,那麼咱們就要求當前這個請求等待100ms.按照這個思路,請求15個新令牌(許可證)就須要3秒.
有一點很重要:上面這個設計思路的RateLimiter記憶很是的淺,它的腦容量很是的小,只記得上一次被受權的請求的時間.若是RateLimiter的一個被受權請求q以前很長一段時間沒有被使用會怎麼樣?這個RateLimiter會立馬忘記過去這一段時間的利用不足,而只記得剛剛的請求q.
過去一段時間的利用不足意味着有過剩的資源是能夠利用的.這種狀況下,RateLimiter應該加把勁(speed up for a while)將這些過剩的資源利用起來.好比在向網絡中發生數據的場景(限流),過去一段時間的利用不足可能意味着網卡緩衝區是空的,這種場景下,咱們是能夠加速發送來將這些過程的資源利用起來.
另外一方面,過去一段時間的利用不足可能意味着處理請求的服務器對即將到來的請求是準備不足的(less ready for future requests),好比由於很長一段時間沒有請求當前服務器的cache是陳舊的,進而致使即將到來的請求會觸發一個昂貴的操做(好比從新刷新全量的緩存).
爲了處理這種狀況,RateLimiter中增長了一個維度的信息,就是過去一段時間的利用不足(past underutilization),代碼中使用storedPermits變量表示.當沒有利用不足這個變量爲0,最大能達到maxStoredPermits(maxStoredPermits表示徹底沒有利用).所以,請求的令牌可能從兩個地方來:
1.過去剩餘的令牌(stored permits, 可能沒有) 2.現有的令牌(fresh permits,當前這段時間還沒用完的令牌)
咱們將經過一個例子來解釋它是如何工做的:
對一個每秒產生一個令牌的RateLimiter,每有一個沒有使用令牌的一秒,咱們就將storedPermits加1,若是RateLimiter在10秒都沒有使用,則storedPermits變成10.0.這個時候,一個請求到來並請求三個令牌(acquire(3)),咱們將從storedPermits中的令牌爲其服務,storedPermits變爲7.0.這個請求以後立馬又有一個請求到來並請求10個令牌,咱們將從storedPermits剩餘的7個令牌給這個請求,剩下還須要三個令牌,咱們將從RateLimiter新產生的令牌中獲取.咱們已經知道,RateLimiter每秒新產生1個令牌,就是說上面這個請求還須要的3個請求就要求其等待3秒.
想象一個RateLimiter每秒產生一個令牌,如今徹底沒有使用(處於初始狀態),限制一個昂貴的請求acquire(100)過來.若是咱們選擇讓這個請求等待100秒再容許其執行,這顯然很荒謬.咱們爲何什麼也不作而只是傻傻的等待100秒,一個更好的作法是容許這個請求當即執行(和acquire(1)沒有區別),而後將隨後到來的請求推遲到正確的時間點.這種策略,咱們容許這個昂貴的任務當即執行,並將隨後到來的請求推遲100秒.這種策略就是讓任務的執行和等待同時進行.
一個重要的結論:RateLimiter不會記最後一個請求,而是即下一個請求容許執行的時間.這也能夠很直白的告訴咱們到達下一個調度時間點的時間間隔.而後定一個一段時間未使用的Ratelimiter也很簡單:下一個調度時間點已通過去,這個時間點和如今時間的差就是Ratelimiter多久沒有被使用,咱們會將這一段時間翻譯成storedPermits.全部,若是每秒鐘產生一個令牌(rate==1),而且正好每秒來一個請求,那麼storedPermits就不會增加.
原理:
Guava有兩種限流模式,一種爲穩定模式(SmoothBursty:令牌生成速度恆定),一種爲漸進模式(SmoothWarmingUp:令牌生成速度緩慢提高直到維持在一個穩定值) 兩種模式實現思路相似,主要區別在等待時間的計算上,本篇重點介紹SmoothBursty
經過調用RateLimiter的create
接口來建立實例,實際是調用的SmoothBuisty
穩定模式建立的實例。
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
SmoothBursty
中的兩個構造參數含義:
在解析SmoothBursty原理前,重點解釋下SmoothBursty中幾個屬性的含義
/** * The work (permits) of how many seconds can be saved up if this RateLimiter is unused? * 在RateLimiter未使用時,最多存儲幾秒的令牌 * */ final double maxBurstSeconds; /** * The currently stored permits. * 當前存儲令牌數 */ double storedPermits; /** * The maximum number of stored permits. * 最大存儲令牌數 = maxBurstSeconds * stableIntervalMicros(見下文) */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. * 添加令牌時間間隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌數) */ double stableIntervalMicros; /** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. * 下一次請求能夠獲取令牌的起始時間 * 因爲RateLimiter容許預消費,上次請求預消費令牌後 * 下次請求須要等待相應的時間到nextFreeTicketMicros時刻才能夠獲取令牌 */ private long nextFreeTicketMicros = 0L; // could be either in the past or future
關鍵函數
public final void setRate(double permitsPerSecond) { checkArgument( permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } }
經過這個接口設置令牌通每秒生成令牌的數量,內部時間經過調用SmoothRateLimiter
的doSetRate
來實現
final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); }
這裏先經過調用resync
生成令牌以及更新下一期令牌生成時間,而後更新stableIntervalMicros,最後又調用了SmoothBursty
的doSetRate
/** * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. * 基於當前時間,更新下一次請求令牌的時間,以及當前存儲的令牌(能夠理解爲生成令牌) */ void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
根據令牌桶算法,桶中的令牌是持續生成存放的,有請求時須要先從桶中拿到令牌才能開始執行,誰來持續生成令牌存放呢?
一種解法是,開啓一個定時任務,由定時任務持續生成令牌。這樣的問題在於會極大的消耗系統資源,如,某接口須要分別對每一個用戶作訪問頻率限制,假設系統中存在6W用戶,則至多須要開啓6W個定時任務來維持每一個桶中的令牌數,這樣的開銷是巨大的。
另外一種解法則是延遲計算,如上resync
函數。該函數會在每次獲取令牌以前調用,其實現思路爲,若當前時間晚於nextFreeTicketMicros,則計算該段時間內能夠生成多少令牌,將生成的令牌加入令牌桶中並更新數據。這樣一來,只須要在獲取令牌時計算一次便可。
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below // Double.POSITIVE_INFINITY 表明無窮啊 storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } }
桶中可存放的最大令牌數由maxBurstSeconds計算而來,其含義爲最大存儲maxBurstSeconds秒生成的令牌。
該參數的做用在於,能夠更爲靈活地控制流量。如,某些接口限制爲300次/20秒,某些接口限制爲50次/45秒等。也就是流量不侷限於qps
在瞭解以上概念後,就很是容易理解RateLimiter暴露出來的接口
@CanIgnoreReturnValue public double acquire() { return acquire(1); } /** * 獲取令牌,返回阻塞的時間 **/ @CanIgnoreReturnValue public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } }
acquire
函數主要用於獲取permits個令牌,並計算須要等待多長時間,進而掛起等待,並將該值返回,主要經過reserve
返回須要等待的時間,reserve
中經過調用reserveAndGetWaitLength
獲取等待時間
/** * Reserves next ticket and returns the wait time that the caller must wait for. * * @return the required wait time, never negative */ final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); }
最後調用了reserveEarliestAvailable
@Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; return returnValue; }
首先經過resync生成令牌以及同步nextFreeTicketMicros時間戳,freshPermits從令牌桶中獲取令牌後還須要的令牌數量,經過storedPermitsToWaitTime計算出獲取freshPermits還須要等待的時間,在穩定模式中,這裏就是(long) (freshPermits * stableIntervalMicros) ,而後更新nextFreeTicketMicros以及storedPermits,此次獲取令牌須要的等待到的時間點, reserveAndGetWaitLength返回須要等待的時間間隔。 從`reserveEarliestAvailable`能夠看出RateLimiter的預消費原理,以及獲取令牌的等待時間時間原理(能夠解釋示例結果),再獲取令牌不足時,並無等待到令牌所有生成,而是更新了下次獲取令牌時的nextFreeTicketMicros,從而影響的是下次獲取令牌的等待時間。 `reserve`這裏返回等待時間後,`acquire`經過調用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`進行sleep操做,這裏不一樣於Thread.sleep(), 這個函數的sleep是uninterruptibly的,內部實現:
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) { //sleep 阻塞線程 內部經過Thread.sleep() boolean interrupted = false; try { long remainingNanos = unit.toNanos(sleepFor); long end = System.nanoTime() + remainingNanos; while (true) { try { // TimeUnit.sleep() treats negative timeouts just like zero. NANOSECONDS.sleep(remainingNanos); return; } catch (InterruptedException e) { interrupted = true; remainingNanos = end - System.nanoTime(); //若是被interrupt能夠繼續,更新sleep時間,循環繼續sleep } } } finally { if (interrupted) { Thread.currentThread().interrupt(); //若是被打斷過,sleep事後再真正中斷線程 } } }
sleep以後,`acquire`返回sleep的時間,阻塞結束,獲取到令牌。
public boolean tryAcquire(int permits) { return tryAcquire(permits, 0, MICROSECONDS); } public boolean tryAcquire() { return tryAcquire(1, 0, MICROSECONDS); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; } @Override final long queryEarliestAvailable(long nowMicros) { return nextFreeTicketMicros; }
tryAcquire
函數能夠嘗試在timeout時間內獲取令牌,若是能夠則掛起等待相應時間並返回true,不然當即返回falsecanAcquire
用於判斷timeout時間內是否能夠獲取令牌,經過判斷當前時間+超時時間是否大於nextFreeTicketMicros 來決定是否可以拿到足夠的令牌數,若是能夠獲取到,則過程同acquire,線程sleep等待,若是經過canAcquire
在此超時時間內不能回去到令牌,則能夠快速返回,不須要等待timeout後才知道可否獲取到令牌。