RateLimiter有兩種新建的方式算法
建立Bursty方式數據庫
建立WarmingUp方式緩存
如下源碼來自 guava-17.0安全
//初始化
RateLimiter r = RateLimiter.create(1);
//不阻塞
r.tryAcquire();
//阻塞
r.acquire()
複製代碼
RateLimiter.create作了兩件事情建立Bursty對象和設置了速率,至次初始化過程結束bash
RateLimiter rateLimiter = new Bursty(ticker, 1.0 /* maxBurstSeconds */); //ticker默認使用本身定義的
rateLimiter.setRate(permitsPerSecond);
複製代碼
synchronized (mutex) {
//1:查看當前的時間是否比預計下次可發放令牌的時間要大,若是大,更新下次可發放令牌的時間爲當前時間
resync(readSafeMicros());
//2:計算兩次發放令牌之間的時間間隔,好比1s中須要發放5個,那它就是 200000.0微秒
double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
//3:設置maxPermits和storedPermits
doSetRate(permitsPerSecond, stableIntervalMicros);
}
複製代碼
resync源碼網絡
private void resync(long nowMicros) { // 查看當前的時間是否比預計下次可發放令牌的時間要大,若是大,更新下次可發放令牌的時間爲當前時間 if (nowMicros > nextFreeTicketMicros) { storedPermits = Math.min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros); nextFreeTicketMicros = nowMicros; } } 複製代碼
doSetRate源碼多線程
@Overridevoid doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; storedPermits = (oldMaxPermits == 0.0) ? 0.0 // 初始條件存儲的是沒有 storedPermits * maxPermits / oldMaxPermits; } 複製代碼
在整個的初始化過程當中,關鍵信息是:併發
nextFreeTicketMicros 預計下次發放令牌的時間, stableIntervalMicros 兩次發放令牌之間的時間間隔ide
maxPermits 最大能存儲的令牌的數量 storedPermits 已經存儲的令牌數函數
最簡單的維持QPS速率的方式就是記住最後一次請求的時間,而後確保再次有請求過來的時候,已經通過了 1/QPS 秒。好比QPS是5 次/秒,只須要確保兩次請求時間通過了200ms便可,若是恰好在100ms到達,就會再等待100ms,也就是說,若是一次性須要15個令牌,須要的時間爲爲3s。可是對於一個長時間沒有請求的系統,這樣的的設計方式有必定的不合理之處。考慮一個場景:若是一個RateLimiter,每秒產生1個令牌,它一直沒有使用過,忽然來了一個須要100個令牌的請求,選擇等待100s再執行這個請求,顯得不太明智,更好的處理方式爲當即執行它,而後把接下來的請求推遲100s。
於是RateLimiter自己並不記下最後一次請求的時間,而是記下下一次指望運行的時間(nextFreeTicketMicros)。
這種方式帶來的一個好處是,能夠去判斷等待的超時時間是否大於下次運行的時間,以使得可以執行,若是等待的超時時間過短,就能當即返回。
一樣的考慮長時間沒有使用的場景。若是長時間沒有請求,忽然間來了,這個時候是否應該立馬放行這些請求?長時間沒有使用可能意味着兩件事:
RateLimiter就使用storedPermits來給過去請求的不充分程度建模。它的存儲規則以下: 假設RateLimiter每秒產生一個令牌,每過去一秒若是沒有請求,RateLimter也就沒有消費,就使storedPermits增加1。假設10s以內都沒有請求過來,storedPermits就變成了10(假設maxPermits>10),此時若是要獲取3個令牌,會使用storedPermits來中的令牌來處理,而後它的值變爲了7,片刻以後,若是調用了acquire(10),部分的會從storedPermits拿到7個權限,剩餘的3個則須要從新產生。
總的來講RateLimiter提供了一個storedPermits變量,當資源利用充分的時候,它就是0,最大能夠增加到 maxStoredPermits。請求所需的令牌來自於兩個地方:stored permits(空閒時存儲的令牌)和fresh permits(現有的令牌)
一樣假設每秒RateLimiter只生產一個令牌,正常狀況下,若是一次來了3個請求,整個過程會持續3秒鐘。考慮到長時間沒有請求的場景:
分析可知,針對不一樣的場景,須要對獲取storedPermits作不一樣的處理,Ratelimiter的實現方式就是 storedPermitsToWaitTime 函數,它創建了從storedPermits中獲取令牌和時間花銷的模型函數,而衡量時間的花銷就是經過對模型函數進行積分計算,好比原來存儲了10個令牌,如今須要拿3個令牌,還剩餘7個,那麼所須要的時間花銷就是該函數從7-10區間中的積分。
這種方式保證了任何獲取令牌方式所須要的時間都是同樣的,比如 每次拿一個和先拿兩個再拿一個,從時間上來說並無分別。
storedPermits自己是用來衡量沒有使用的時間的。在沒有使用令牌的時候存儲,存儲的速率(單位時間內存儲的令牌的個數)是 每沒用1次就存儲1次: rate=permites/time 。也就是說 1 / rate = time / permits,那麼可獲得 (1/rate)*permits 就能夠來衡量時間花銷。
選取(1/rate)做爲基準線
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
複製代碼
它直接返回了0,也就是在基準線之下,獲取storedPermits的速率比新產生要快,當即可以拿到存儲的量
//初始化
RateLimiter r =RateLimiter.create(1,1,TimeUnit.SECONDS);
//不阻塞
r.tryAcquire();
//阻塞
r.acquire()
複製代碼
create方法建立了WarmingUp對象,並這隻了對應的速率
RateLimiter rateLimiter = new WarmingUp(ticker, warmupPeriod, unit);
rateLimiter.setRate(permitsPerSecond);
複製代碼
相比Bursty,它多了個參數warmupPeroid,它會以提供的unit爲時間單位,轉換成微秒存儲。setRate相似於Bursty,只是在doSetRate提供不一樣的實現
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
//1:最大的存儲個數爲須要預熱的時間除以兩個請求的時間間隔,好比設定預熱時間爲1s,每秒有5個請求,那麼最大的存儲個數爲1000ms/200ms=5個
maxPermits = warmupPeriodMicros / stableIntervalMicros;
//2:計算最大存儲permits的一半
halfPermits = maxPermits / 2.0;
//3:初始化穩定時間間隔的3倍做爲冷卻時間間隔
double coldIntervalMicros = stableIntervalMicros * 3.0;
//4:設置基準線的斜率
slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
storedPermits = 0.0;
} else {
storedPermits = (oldMaxPermits == 0.0)
? maxPermits // 初始條件下,認爲就是存儲滿的,以達到緩慢消費的效果
: storedPermits * maxPermits / oldMaxPermits;
}
}
複製代碼
在這個過程當中能夠看到Warmup方式新增了一個halfPermits的設計,以及經過公式 slope=(coldIntervalMicros-stableIntervalMicros)/halfPermits
,他們在函數 storedPermitsToWaitTime中獲得了運用
@Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
//1:計算儲存的令牌中超過了最大令牌一半的數量
double availablePermitsAboveHalf = storedPermits - halfPermits;
long micros = 0;
// 計算超過一半的部分所須要的時間花銷(對於函數來講,就是積分計算)
if (availablePermitsAboveHalf > 0.0) {
double permitsAboveHalfToTake = Math.min(availablePermitsAboveHalf, permitsToTake);
micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
+ permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
permitsToTake -= permitsAboveHalfToTake;
}
// 計算函數的還沒有超過一半的部分所須要的時間花銷
micros += (stableIntervalMicros * permitsToTake);
return micros;
}
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
複製代碼
WarmingUp對時間花銷衡量方式爲下圖
* ^ throttling
* |
* 3*stable + /
* interval | /.
* (cold) | / .
* | / . <-- "warmup period" is the area of the trapezoid between
* 2*stable + / . halfPermits and maxPermits
* interval | / .
* | / .
* | / .
* stable +----------/ WARM . }
* interval | . UP . } <-- this rectangle (from 0 to maxPermits, and
* | . PERIOD. } height == stableInterval) defines the cooldown period,
* | . . } and we want cooldownPeriod == warmupPeriod
* |---------------------------------> storedPermits
* (halfPermits) (maxPermits)
複製代碼
橫軸表示存儲的令牌個數,縱軸表示時間,這樣函數的積分就能夠表示所要消耗的時間。
在程序剛開始運行的時候,warmingup方式會存滿全部的令牌,而根據從存儲令牌中的獲取方式,能夠實現從存儲最大令牌中到降到一半令牌所須要的時間爲存儲同量令牌時間的2倍,從而使得剛開始的時候發放令牌的速度比較慢,等消耗一半以後,獲取的速率和生產的速率一致,從而也就實現了一個‘熱身’的概念
從storedPermits中獲取令牌所須要的時間,它分爲兩部分,以maxPetmits的一半爲分割點
storedPermits <= halfPermits 的時候,存儲和消費storedPermits的速率與產生的速率如出一轍
storedPermits>halfPermits, 存儲storePermites所須要的時間和產生的速率保持一致,可是消費storePermites從maxPermits到halfPermits所須要的時間爲從halfPermits增加到maxPermits所須要時間的2被,也就是比新令牌產生要慢 爲何在分隔點計算還有斜率方面選了3倍和一半的位置 對函數作積分計算(圖形面積),恰好能夠保證,超過一半的部分,若是要拿掉一半的存儲令牌所須要的時間剛好是存儲一樣量(或者說是新令牌產生)時間花銷的兩倍,對應場景若是過了很長一段時間沒有使用(存儲的令牌會達到maxPermits),剛開始能接收請求的速率相對比較慢,而後再增加到穩定的消費速率
關鍵在於存儲的速率是和新令牌產生的速率同樣,可是消費的速率,當存儲的超過一半時,會慢於新令牌產生的速率,小於一半則速率是同樣的
它會嘗試去獲取令牌,若是沒法獲取就當即返回,不然再超時時間以內返回給定的令牌。 源碼以下
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
//1:使用微秒來轉換超時時間
long timeoutMicros = unit.toMicros(timeout);
checkPermits(permits);
long microsToWait;
synchronized (mutex) {
long nowMicros = readSafeMicros();
//2.1:若是下次可以獲取令牌的時間超過超時時間範圍,立馬返回;
if (nextFreeTicketMicros > nowMicros + timeoutMicros) {
return false;
} else {
//2.2:獲取須要等待的時間,本次獲取的時間確定不會超時
microsToWait = reserveNextTicket(permits, nowMicros);
}
}
//3:實行等待
ticker.sleepMicrosUninterruptibly(microsToWait);
return true;
}
複製代碼
第一次運行的時候,nextFreeTicketMicros是建立時候的時間,一定小於當前時間,因此第一次確定會放過,容許執行,只是須要計算要等待的時間。
private long reserveNextTicket(double requiredPermits, long nowMicros) {
//1:若是下次能夠獲取令牌的時間在過去,更新
resync(nowMicros);
//2:計算距離下次獲取令牌須要的時間,若是nextFreeTikcetMicros>nowMicros,這個時間段一定在超時時間以內,假如入超時時間是0,那麼一定是microsToNextFreeTicket趨近於0,也就是立馬可以放行;
long microsToNextFreeTicket = Math.max(0, nextFreeTicketMicros - nowMicros);
//3:計算須要消耗的存儲的令牌
double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits);
//4:計算須要新產生的令牌
double freshPermits = requiredPermits - storedPermitsToSpend;
//5:計算消耗存儲令牌所須要的時間和新產生令牌所須要的時間。對於Bursty來說,消耗存儲的令牌所須要時間爲0,WarmingUp方式則是須要根據不一樣的場景有不一樣的結果
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
//6:下次可以獲取令牌的時間,須要延遲當前已經等待的時間,也就是說,若是立馬有請求過來會放行,可是這個等待時間將會影響後續的請求訪問,也就是說,此次的請求若是當前的特別的多,下一次可以請求的可以容許的時間一定會有很長的延遲
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
//7:扣除消耗的存儲令牌
this.storedPermits -= storedPermitsToSpend;
//8:返回本次要獲取令牌所須要的時間,它確定不會超過超時時間
return microsToNextFreeTicket;
}
複製代碼
它會阻塞知道容許放行,返回值爲阻塞的時長 源碼以下
public double acquire(int permits) {
long microsToWait = reserve(permits); //也就是調用reserveNextTicket
ticker.sleepMicrosUninterruptibly(microsToWait); //阻塞住須要等待的時長
return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
}
複製代碼
程序設置10個線程,使得併發數爲10,模擬線上的場景,任務內容以下
class MyTask implements Runnable{
private CountDownLatch latch;
private RateLimiter limiter;
public MyTask(CountDownLatch latch, RateLimiter limiter) {
this.latch = latch;
this.limiter = limiter;
}
@Override public void run() {
try {
//使得線程同時觸發
latch.await();
System.out.println("time "+System.currentTimeMillis()+"ms :"+limiter.tryAcquire());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
這裏設置限制每秒的流量爲5,也就是說第一次請求事後,下次請求須要等200ms
RateLimiter r =RateLimiter.create(5);
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);
for (int i=0;i<10;i++)
{
service.submit(new MyTask(latch, r));
latch.countDown();
System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
複製代碼
結果以下
countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
countdown:0
countdown over
time 1538487195698ms :true
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195698ms :false
time 1538487195698ms :false
time 1538487195699ms :false
複製代碼
若是使得線程等待401ms,那麼程序會存儲的令牌爲2個
注意剛開始存儲的時候,不是慢的,這裏的存儲量是慢慢增加,而且可以立馬拿到
RateLimiter r =RateLimiter.create(5);
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);
for (int i=0;i<10;i++)
{
service.submit(new MyTask(latch, r));
if (i==9){
TimeUnit.MILLISECONDS.sleep(401);
System.out.println("sleep 10 seconds over");
}
latch.countDown();
System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
複製代碼
運行結果恰好容許3個運行
countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
sleep 10 seconds over
countdown:0
countdown over
time 1538487297981ms :true
time 1538487297981ms :false
time 1538487297981ms :false
time 1538487297981ms :false
time 1538487297981ms :true
time 1538487297981ms :true
time 1538487297981ms :false
time 1538487297981ms :false
time 1538487297981ms :false
time 1538487297981ms :false
複製代碼
若是等待時間超過1秒,容許放行的流量也不會超過6個,存儲的令牌+第一個令牌
RateLimiter r =RateLimiter.create(5);
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);
for (int i=0;i<10;i++)
{
service.submit(new MyTask(latch, r));
if (i==9){
TimeUnit.MILLISECONDS.sleep(1001);
System.out.println("sleep 10 seconds over");
}
latch.countDown();
System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
複製代碼
結果爲
countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
sleep 10 seconds over
countdown:0
countdown over
time 1538487514780ms :true
time 1538487514780ms :true
time 1538487514780ms :true
time 1538487514780ms :false
time 1538487514780ms :true
time 1538487514780ms :false
time 1538487514780ms :false
time 1538487514780ms :false
time 1538487514780ms :true
time 1538487514780ms :true
複製代碼
使用warmingUp的方式因爲默認已經存儲滿了令牌,那麼,它在第一次請求執行完以後,必須等待必定的時間纔會讓下一次請求開始,而這個請求放行的時間則是會超過存儲所須要的時間
注意這裏的不一樣,默認是存儲滿的,也就是剛開始的消費要慢不少
RateLimiter r =RateLimiter.create(5,1,TimeUnit.SECONDS);
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);
for (int i=0;i<10;i++)
{
service.submit(new MyTask(latch, r));
latch.countDown();
System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
複製代碼
運行結果以下
countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
countdown:0
countdown over
time 1538487677462ms :true
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
複製代碼
所須要的task源碼以下
class MyTask implements Runnable{
private CountDownLatch latch;
private RateLimiter limiter;
private long start;
public MyTask(CountDownLatch latch, RateLimiter limiter,long start) {
this.latch = latch;
this.limiter = limiter;
this.start=start;
}
@Override public void run() {
try {
//使得線程同時觸發
latch.await();
System.out.printf("result:"+limiter.acquire(2));
System.out.println(" time "+(System.currentTimeMillis()-start)+"ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
Acquire會阻塞運行的結果,並且會提早消費
RateLimiter r =RateLimiter.create(1);
r.acquire();
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
r.acquire();
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
r.acquire(3);
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
r.acquire();
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
複製代碼
第一次會立馬運行,而後由於請求了一次,下次發放令牌的時間日後遷移,獲取的令牌越多,下次可以運行須要等待的時間越長
運行結果爲
time cost:0ms
time cost:1005ms
time cost:2004ms
time cost:5001ms
複製代碼
在多線程背景運行以下
RateLimiter r =RateLimiter.create(1);
long start=System.currentTimeMillis();
r.acquire(3);
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);
for (int i=0;i<10;i++)
{
service.submit(new MyTask(latch, r,start));
latch.countDown();
System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
複製代碼
結果以下
time cost:1ms
countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
countdown:0
countdown over
result:2.995732 time 3024ms
result:4.995725 time 5006ms
result:6.995719 time 7007ms
result:8.995716 time 9006ms
result:10.995698 time 11004ms
result:12.995572 time 13006ms
result:14.995555 time 15007ms
result:16.995543 time 17005ms
result:18.995516 time 19005ms
result:20.995463 time 21005ms
複製代碼
warmingUp經過acquire的方式獲取的令牌,一樣會被按照同步的方式獲取
RateLimiter r =RateLimiter.create(1,1,TimeUnit.SECONDS);
long start=System.currentTimeMillis();
r.acquire(3);
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms」); ExecutorService service = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(10); for (int i=0;i<10;i++) { service.submit(new MyTask(latch, r,start)); latch.countDown(); System.out.println("countdown:" + latch.getCount()); } System.out.println("countdown over"); service.shutdown(); 複製代碼
結果以下
time cost:0ms
countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
countdown:0
countdown over
result:3.496859 time 3521ms
result:5.496854 time 5506ms
result:7.49685 time 7505ms
result:9.496835 time 9504ms
result:11.496821 time 11505ms
result:13.496807 time 13502ms
result:15.496793 time 15504ms
result:17.496778 time 17506ms
result:19.496707 time 19506ms
result:21.496699 time 21506ms
複製代碼
RateLimiter自己實現的就是一個令牌桶算法