在開發高併發系統時有三把利器用來保護系統:緩存、降級和限流java
緩存
緩存的目的是提高系統訪問速度和增大系統處理容量降級
降級是當服務出現問題或者影響到核心流程時,須要暫時屏蔽掉,待高峯或者問題解決後再打開限流
限流的目的是經過對併發訪問/請求進行限速,或者對一個時間窗口內的請求進行限速來保護系統,一旦達到限制速率則能夠拒絕服務、排隊或等待、降級等處理漏桶算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水,當水流入速度過大會直接溢出,能夠看出漏桶算法能強行限制數據的傳輸速率。
imgweb
對於不少應用場景來講,除了要求可以限制數據的平均傳輸速率外,還要求容許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更爲適合。如圖所示,令牌桶算法的原理是系統會以一個恆定的速度往桶裏放入令牌,而若是請求須要被處理,則須要先從桶裏獲取一個令牌,當桶裏沒有令牌可取時,則拒絕服務。
img算法
Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶算法實現流量限制,使用十分方便,並且十分高效。
首先簡單介紹下RateLimiter的使用,shell
public void testAcquire() { RateLimiter limiter = RateLimiter.create(1); for(int i = 1; i < 10; i = i + 2 ) { double waitTime = limiter.acquire(i); System.out.println("cutTime=" + System.currentTimeMillis() + " acq:" + i + " waitTime:" + waitTime); } }
輸出結果:緩存
cutTime=1535439657427 acq:1 waitTime:0.0 cutTime=1535439658431 acq:3 waitTime:0.997045 cutTime=1535439661429 acq:5 waitTime:2.993028 cutTime=1535439666426 acq:7 waitTime:4.995625 cutTime=1535439673426 acq:9 waitTime:6.999223
首先經過RateLimiter.create(1);
建立一個限流器,參數表明每秒生成的令牌數,經過limiter.acquire(i);
來以阻塞的方式獲取令牌,固然也能夠經過tryAcquire(int permits, long timeout, TimeUnit unit)
來設置等待超時時間的方式獲取令牌,若是超timeout爲0,則表明非阻塞,獲取不到當即返回。併發
從輸出來看,RateLimiter支持預消費,好比在acquire(5)時,等待時間是3秒,是上一個獲取令牌時預消費了3個兩排,固須要等待3*1秒,而後又預消費了5個令牌,以此類推ide
RateLimiter經過限制後面請求的等待時間,來支持必定程度的突發請求(預消費),在使用過程當中須要注意這一點,具體實現原理後面再分析。函數
Guava有兩種限流模式,一種爲穩定模式(SmoothBursty:令牌生成速度恆定),一種爲漸進模式(SmoothWarmingUp:令牌生成速度緩慢提高直到維持在一個穩定值) 兩種模式實現思路相似,主要區別在等待時間的計算上,本篇重點介紹SmoothBursty
經過調用RateLimiter的create
接口來建立實例,實際是調用的SmoothBuisty
穩定模式建立的實例。高併發
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
SmoothBursty
中的兩個構造參數含義:工具
在解析SmoothBursty原理前,重點解釋下SmoothBursty中幾個屬性的含義
/** * The work (permits) of how many seconds can be saved up if this RateLimiter is unused? * 在RateLimiter未使用時,最多存儲幾秒的令牌 * */ final double maxBurstSeconds; /** * The currently stored permits. * 當前存儲令牌數 */ double storedPermits; /** * The maximum number of stored permits. * 最大存儲令牌數 = maxBurstSeconds * stableIntervalMicros(見下文) */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. * 添加令牌時間間隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌數) */ double stableIntervalMicros; /** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. * 下一次請求能夠獲取令牌的起始時間 * 因爲RateLimiter容許預消費,上次請求預消費令牌後 * 下次請求須要等待相應的時間到nextFreeTicketMicros時刻才能夠獲取令牌 */ private long nextFreeTicketMicros = 0L; // could be either in the past or future
接下來介紹幾個關鍵函數
public final void setRate(double permitsPerSecond) { checkArgument( permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } }
經過這個接口設置令牌通每秒生成令牌的數量,內部時間經過調用SmoothRateLimiter
的doSetRate
來實現
@Override final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); }
這裏先經過調用resync
生成令牌以及更新下一期令牌生成時間,而後更新stableIntervalMicros,最後又調用了SmoothBursty
的doSetRate
/** * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. * 基於當前時間,更新下一次請求令牌的時間,以及當前存儲的令牌(能夠理解爲生成令牌) */ void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
根據令牌桶算法,桶中的令牌是持續生成存放的,有請求時須要先從桶中拿到令牌才能開始執行,誰來持續生成令牌存放呢?
一種解法是,開啓一個定時任務,由定時任務持續生成令牌。這樣的問題在於會極大的消耗系統資源,如,某接口須要分別對每一個用戶作訪問頻率限制,假設系統中存在6W用戶,則至多須要開啓6W個定時任務來維持每一個桶中的令牌數,這樣的開銷是巨大的。
另外一種解法則是延遲計算,如上resync
函數。該函數會在每次獲取令牌以前調用,其實現思路爲,若當前時間晚於nextFreeTicketMicros,則計算該段時間內能夠生成多少令牌,將生成的令牌加入令牌桶中並更新數據。這樣一來,只須要在獲取令牌時計算一次便可。
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below // Double.POSITIVE_INFINITY 表明無窮啊 storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } }
桶中可存放的最大令牌數由maxBurstSeconds計算而來,其含義爲最大存儲maxBurstSeconds秒生成的令牌。
該參數的做用在於,能夠更爲靈活地控制流量。如,某些接口限制爲300次/20秒,某些接口限制爲50次/45秒等。也就是流量不侷限於qps
在瞭解以上概念後,就很是容易理解RateLimiter暴露出來的接口
@CanIgnoreReturnValue public double acquire() { return acquire(1); } /** * 獲取令牌,返回阻塞的時間 **/ @CanIgnoreReturnValue public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } }
acquire
函數主要用於獲取permits個令牌,並計算須要等待多長時間,進而掛起等待,並將該值返回,主要經過reserve
返回須要等待的時間,reserve
中經過調用reserveAndGetWaitLength
獲取等待時間
/** * Reserves next ticket and returns the wait time that the caller must wait for. * * @return the required wait time, never negative */ final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); }
最後調用了reserveEarliestAvailable
@Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; return returnValue; }
首先經過resync生成令牌以及同步nextFreeTicketMicros時間戳,freshPermits從令牌桶中獲取令牌後還須要的令牌數量,經過storedPermitsToWaitTime計算出獲取freshPermits還須要等待的時間,在穩定模式中,這裏就是(long) (freshPermits * stableIntervalMicros) ,而後更新nextFreeTicketMicros以及storedPermits,此次獲取令牌須要的等待到的時間點, reserveAndGetWaitLength返回須要等待的時間間隔。 從`reserveEarliestAvailable`能夠看出RateLimiter的預消費原理,以及獲取令牌的等待時間時間原理(能夠解釋示例結果),再獲取令牌不足時,並無等待到令牌所有生成,而是更新了下次獲取令牌時的nextFreeTicketMicros,從而影響的是下次獲取令牌的等待時間。 `reserve`這裏返回等待時間後,`acquire`經過調用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`進行sleep操做,這裏不一樣於Thread.sleep(), 這個函數的sleep是uninterruptibly的,內部實現:
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) { //sleep 阻塞線程 內部經過Thread.sleep() boolean interrupted = false; try { long remainingNanos = unit.toNanos(sleepFor); long end = System.nanoTime() + remainingNanos; while (true) { try { // TimeUnit.sleep() treats negative timeouts just like zero. NANOSECONDS.sleep(remainingNanos); return; } catch (InterruptedException e) { interrupted = true; remainingNanos = end - System.nanoTime(); //若是被interrupt能夠繼續,更新sleep時間,循環繼續sleep } } } finally { if (interrupted) { Thread.currentThread().interrupt(); //若是被打斷過,sleep事後再真正中斷線程 } } }
sleep以後,`acquire`返回sleep的時間,阻塞結束,獲取到令牌。
public boolean tryAcquire(int permits) { return tryAcquire(permits, 0, MICROSECONDS); } public boolean tryAcquire() { return tryAcquire(1, 0, MICROSECONDS); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; } @Override final long queryEarliestAvailable(long nowMicros) { return nextFreeTicketMicros; }
tryAcquire
函數能夠嘗試在timeout時間內獲取令牌,若是能夠則掛起等待相應時間並返回true,不然當即返回false canAcquire
用於判斷timeout時間內是否能夠獲取令牌,經過判斷當前時間+超時時間是否大於nextFreeTicketMicros 來決定是否可以拿到足夠的令牌數,若是能夠獲取到,則過程同acquire,線程sleep等待,若是經過canAcquire
在此超時時間內不能回去到令牌,則能夠快速返回,不須要等待timeout後才知道可否獲取到令牌。
到此,Guava RateLimiter穩定模式的實現原理基本已經清楚,如發現文中錯誤的地方,勞煩指正!
做者:人在碼途
連接:https://www.jianshu.com/p/5d4... 來源:簡書 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。