計數器限流能夠分爲:java
固定窗口計數器限流簡單明瞭,就是限制單位之間內的請求數,好比設置QPS爲10,那麼從一開始的請求進入就計數,每次計數前判斷是否到10,到達就拒絕請求,並保證這個計數週期是1秒,1秒後計數器清零。
如下是利用redis實現計數器分佈式限流的實現,曾經在線上實踐過的lua腳本:git
local key = KEYS[1] local limit = tonumber(ARGV[1]) local refreshInterval = tonumber(ARGV[2]) local currentLimit = tonumber(redis.call('get', key) or '0') if currentLimit + 1 > limit then return -1; else redis.call('INCRBY', key, 1) redis.call('EXPIRE', key, refreshInterval) return limit - currentLimit - 1 end
一個明顯的弊端就是固定窗口計數器算法沒法處理突刺流量,好比10QPS,1ms中來了10個請求,後續的999ms的全部請求都會被拒絕。github
爲了解決固定窗口的問題,滑動窗口將窗口細化,用更小的窗口來限制流量。好比 1 分鐘的固定窗口切分爲 60 個 1 秒的滑動窗口。而後統計的時間範圍隨着時間的推移同步後移。
即使滑動時間窗口限流算法能夠保證任意時間窗口內接口請求次數都不會超過最大限流值,可是仍然不能防止在細時間粒度上面訪問過於集中的問題。
爲了應對上面的問題,對於時間窗口限流算法,還有不少改進版本,好比:
多層次限流,咱們能夠對同一個接口設置多條限流規則,除了 1 秒不超過 100 次以外,咱們還能夠設置 100ms 不超過 20 次 (代碼中實現寫了平均的兩倍),兩條規則同時限制,流量會更加平滑。
簡單實現的代碼以下:redis
public class SlidingWindowRateLimiter { // 小窗口鏈表 LinkedList<Room> linkedList = null; long stepInterval = 0; long subWindowCount = 10; long stepLimitCount = 0; int countLimit = 0; int count = 0; public SlidingWindowRateLimiter(int countLimit, int interval){ // 每一個小窗口的時間間距 this.stepInterval = interval * 1000/ subWindowCount; // 請求總數限制 this.countLimit = countLimit; // 每一個小窗口的請求量限制數 設置爲平均的2倍 this.stepLimitCount = countLimit / subWindowCount * 2; // 時間窗口開始時間 long start = System.currentTimeMillis(); // 初始化連續的小窗口鏈表 initWindow(start); } Room getAndRefreshWindows(long requestTime){ Room firstRoom = linkedList.getFirst(); Room lastRoom = linkedList.getLast(); // 發起請求時間在主窗口內 if(firstRoom.getStartTime() < requestTime && requestTime < lastRoom.getEndTime()){ long distanceFromFirst = requestTime - firstRoom.getStartTime(); int num = (int) (distanceFromFirst/stepInterval); return linkedList.get(num); }else{ long distanceFromLast = requestTime - lastRoom.getEndTime(); int num = (int)(distanceFromLast/stepInterval); // 請求時間超出主窗口一個窗口以上的身位 if(num >= subWindowCount){ initWindow(requestTime); return linkedList.getFirst(); }else{ moveWindow(num+1); return linkedList.getLast(); } } } public boolean acquire(){ synchronized (mutex()) { Room room = getAndRefreshWindows(System.currentTimeMillis()); int subCount = room.getCount(); if(subCount + 1 <= stepLimitCount && count + 1 <= countLimit){ room.increase(); count ++; return true; } return false; } } /** * 初始化窗口 * @param start */ private void initWindow(long start){ linkedList = new LinkedList<Room>(); for (int i = 0; i < subWindowCount; i++) { linkedList.add(new Room(start, start += stepInterval)); } // 總記數清零 count = 0; } /** * 移動窗口 * @param stepNum */ private void moveWindow(int stepNum){ for (int i = 0; i < stepNum; i++) { Room removeRoom = linkedList.removeFirst(); count = count - removeRoom.count; } Room lastRoom = linkedList.getLast(); long start = lastRoom.endTime; for (int i = 0; i < stepNum; i++) { linkedList.add(new Room(start, start += stepInterval)); } } public static void main(String[] args) throws InterruptedException { SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(20, 5); for (int i = 0; i < 26; i++) { System.out.println(slidingWindowRateLimiter.acquire()); Thread.sleep(300); } } class Room{ Room(long startTime, long endTime) { this.startTime = startTime; this.endTime = endTime; this.count = 0; } private long startTime; private long endTime; private int count; long getStartTime() { return startTime; } long getEndTime() { return endTime; } int getCount() { return count; } int increase(){ this.count++; return this.count; } } private volatile Object mutexDoNotUseDirectly; private Object mutex() { Object mutex = mutexDoNotUseDirectly; if (mutex == null) { synchronized (this) { mutex = mutexDoNotUseDirectly; if (mutex == null) { mutexDoNotUseDirectly = mutex = new Object(); } } } return mutex; } }
以上實現使用了鏈表的特性,在必定窗口時是將前段刪除後段加上的方式移動的。移動的操做是請求進入時操做的,沒有使用單獨線程移動窗口。而且按照前面講的,兩個維度控制流量一個時窗口的總請求數,一個是每一個單獨窗口的請求數。算法
原理如圖:
spring
目前使用令牌桶實現的限流有如下幾個:緩存
zuul2跳票後Spring Cloud 本身開發的網關,內部也實現了限流器安全
由於是微服務架構,多服務部署是必然場景,因此默認提供了redis爲存儲載體的實現,因此直接看rua腳本是怎麼樣的就能夠知道它的算法是怎麼實現的了:服務器
local tokens_key = KEYS[1] local timestamp_key = KEYS[2] --redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key) // 速率 local rate = tonumber(ARGV[1]) // 桶容量 local capacity = tonumber(ARGV[2]) // 發起請求時間 local now = tonumber(ARGV[3]) // 請求令牌數量 如今固定是1 local requested = tonumber(ARGV[4]) // 存滿桶耗時 local fill_time = capacity/rate // 過時時間 local ttl = math.floor(fill_time*2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING, "now " .. ARGV[3]) --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING, "filltime " .. fill_time) --redis.log(redis.LOG_WARNING, "ttl " .. ttl) // 上次請求的信息 存在redis local last_tokens = tonumber(redis.call("get", tokens_key)) // 可任務初始化桶是滿的 if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) // 上次請求的時間 local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) // 如今距離上次請求時間的差值 也就是距離上次請求過去了多久 local delta = math.max(0, now-last_refreshed) // 這個桶此時能提供的令牌是上次剩餘的令牌數加上此次時間間隔按速率能產生的令牌數 最多不能超過片桶大小 local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) // 能提供的令牌和要求的令牌比較 local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then // 消耗調令牌 new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.LOG_WARNING, "delta " .. delta) --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) // 存儲剩餘的令牌 redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }
1,在請求令牌時計算上次請求和此刻的時間間隔能產生的令牌,來刷新桶中的令牌數,而後將令牌提供出去。
2,若是令牌不足沒有等待,直接返回。
3,實現的方式是將每一個請求的時間間隔爲最小單位,只要小於這個單位的請求就會拒絕,好比100qps的配置,1容許10ms一個,若是兩個請求比較靠近小於10ms,後一個會被拒絕。網絡
一個Guava實現的令牌桶限流器。
RateLimiter就是一個根據配置的rate分發permits的工具。每一次調用acquire()都會阻塞到得到permit爲止,一個permit只能用一次。RateLimiter是併發安全的,它將限制所有線程的所有請求速率,可是RateLimiter並不保證是公平的。
限速器常常被使用在限制訪問物理或邏輯資源的速率,和java.util.concurrent.Semaphore對比,Semaphore用於限制同時訪問的數量,而它限制的是訪問的速率(二者有必定關聯:就是little's law(律特法則))。
RateLimiter主要由發放許可證的速度來定義。若是沒有其餘配置,許可證將以固定的速率分發,以每秒許可證的形式定義。許可證將平穩分發,各許可證之間的延遲將被調整到配置的速率。(SmoothBursty)
也能夠將RateLimiter配置爲有一個預熱期,在此期間,每秒鐘發出的許可穩步增長,直到達到穩定的速率。(SmoothWarmingUp)
一個例子:咱們一個任務列表須要執行,可是每秒提交的任務個數不能超過2個。
final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second" void submitTasks(List<Runnable> tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // may wait executor.execute(task); } }
另外一個例子:咱們生產一個數據流想以5kb每秒的速度傳輸,這個能夠經過將一個permit對應一個byte,並定義5000/s的速率。
final RateLimiter rateLimiter = RateLimiter.create(5000.0); // rate = 5000 permits per second void submitPacket(byte[] packet) { rateLimiter.acquire(packet.length); networkService.send(packet); }
有一點須要注意,請求令牌的數量不會影響請求自身的節流,可是會影響下一次請求,若是一個須要大量令牌的任務到達時,將立刻授予,可是下一次請求將會爲上次昂貴任務付出代價。
對於RateLimiter來講最關鍵的特性是:在通常狀況下容許的最大速率-「恆定速率」。這須要對傳入的請求強制執行即計算「節流」,也須要在適當的節流時間,讓請求線程等待。
維護QPS的速率最簡單的方式就是保留最後准許請求的時間戳,確保能計算從那是到如今流逝的時間和QPS。舉個例子,一個QPS爲5(5個token每秒)的速率,若是咱們確保請求被准許時比最後的請求早200ms,咱們就達到了想要的速率。若是最後准許的請求只過去100ms,那麼咱們須要等待100ms。在這個速率下,提供新的15個許可證(即調用acquire(15))天然須要3s。
RateLimiter只記住最後一次請求的時間戳,只是對過去很是淺的記憶,意識到這點很重要。假如很長一段時間沒有使用RateLimiter,而後來了一個請求,是立刻准許嗎?這樣的RateLimiter將立刻忘記剛剛過去的利用不足的時間。最終,由於現實請求速率的不規則致使要麼利用不足或溢出的結果。過去利用不足意味着存在過剩可用資源,因此RateLimiter想要利用這些資源,就須要提速一下子。在計算機網絡中應用的速率(帶寬限制),那些過去利用不足的通常會轉換成「幾乎爲空的緩衝區」,能夠立刻用於後續流量。
另外一方面,過去利用不足也意味着「服務器對將來請求的處理的準備變得愈來愈少」,也就是說,緩存變過時,請求將更有可能觸發昂貴的操做(一個更極端的例子,當服務啓動後,它一般忙於提升本身的速度)。
爲了處理這類場景,咱們增長了一個額外的維度,將過去利用不足建模到storedPermits字段。沒有利用不足狀況時這個字段的值是0,隨着逐漸到達充分利用不足,這個字段的值會增大到一個最大值maxStoredPermits。
因此,當執行acquire(permits)方法來請求許可證從兩方面獲取:
下面用一個例子來解釋工做原理:假設RateLimiter每秒生成一個token,每一秒過去而RateLimiter沒有被使用的話,就會把storedPermits加1。咱們不使用RateLimiter10秒(即,咱們指望在一個X時間有一個請求,但是實際在X+10秒後請求才到達;這個和最後一個段落的結論有關),storedPermits會變成10(假定maxStoredPermits>=10)。在那時一個調用acquire(3)的請求到達。咱們使用storedPermits將它降到7來響應這個請求(how this is translated to throttling time is discussed later),以後acquire(10)的請求立刻到達。咱們使用storedPermits中所有剩餘的7個permits,剩下的3個使用RateLimiter生產的新permits。咱們已經知道,得到三個fresh permits須要多少時間:若是速率是「1秒一個token」,那麼須要3秒時間。可是提供7個存儲的permits意味着什麼?前面的解釋中沒有惟一的答案。若是主要關注處理利用不足的狀況,咱們存儲permits爲了給出比fresh permits快,由於利用不足=空閒資源。若是主要關注overflow的場景,存儲permits能夠比fresh permits慢給出。因此,咱們須要一個方法將storedPermits轉換成節流時間(throttling time)。
而storedPermitsToWaitTime(double storedPermits, double permitsToTake)這個方法就扮演了這個轉換的角色。它的基礎模型是一個連續函數映射存儲令牌(從0到maxStoredPermits)在1/Rate的積分。咱們利用空閒時間來存儲令牌,因此storedPermits本質上是度量了空閒時間(unused time)。Rate=permits/time,1/Rate=time/permits,因此,1/Rate和permits相乘就能夠算出時間。即處理必定量的令牌請求時,對這個函數進行積分就是對應於後續請求的最小間隔。
關於storedPermitsToWaitTime的一個例子:若是storedPermits==10,咱們先須要3個從storedPermits中拿,storedPermits降到7,使用storedPermitsToWaitTime(storedPermits=10, permitsToToken=3)計算節流時間,它將求出函數從7到10的積分值。
使用積分保證了一個單獨的acquire(3)和拆分紅{acquire(1),acquire(1),acquire(1)}或{acquire(2),acquire(1)}都是同樣的,另外,無論函數是怎麼樣的,對於函數[7,10]的求積分是等於[7,8],[8,9],[9,10]的總和的。這就保證了咱們能夠正確處理不一樣權重(permits不一樣)的請求,無論函數是什麼,所以咱們能夠自由調整函數的算法(很顯然,只有一個要求:能夠被求積分)。注意,若是這個函數畫的是個數值恰好是1/QPS的水平線,那麼這個函數就失去做用了,由於它表示存儲令牌的速率和生產新令牌的速率是一致的,後續中會用到這個技巧。若是這個函數的值在水平線的下面,也就是f(x)<1/Rate,表示咱們減小了積分的面積,也就是相同存儲的令牌數映射的節流時間相對於正常速率產生的時間變少了,也表明RateLimiter在一段時間空閒後變快了。相反的,若是函數的值在水平線的上面,表示增長了積分的面積,得到存儲令牌的消耗要大於新生產令牌,那就意味着RateLimiter在一段時間空閒後變慢了。
最後但也重要,若是RateLimiter採用QPS=1的速度,那麼開銷較大的acquire(100)到達時,那是沒有必要等到100s纔開始實際任務,爲何不在等待的時候作點什麼呢?更好的辦法是咱們能夠立刻先開始執行任務(就像它是acquire(1)一個個請求的樣子),須要把將來的請求推後。
在這個版本,咱們容許立刻執行任務,並把將來的請求推後100s,因此,咱們容許在這段時間裏完成工做,而不是空等。
這就有了一個很重要的結論:RateLimiter並須要不記住最後請求的時間,而只須要記住指望下一個請求到來的時間。由於咱們一直堅持這一點,如此咱們就能夠立刻識別出一個請求(tryAcquire(timeout))超時時間是否知足下一個預期請求到達的時間點。經過這個概念咱們能夠從新定義「一個空閒的RateLimiter」:當咱們觀察到「指望下一個請求達到時間」實際已通過去,而且差值(now-past)也就是大量時間被轉換成storedPermits。(咱們把在空閒時間生產的令牌增長storedPermits)。因此,若是Rate=1 permit/s,且到達時間剛好比前一次請求晚一秒,那麼storedPermits將永遠不會增長—咱們只會在到達時間晚於一秒時增長它。
SmoothWarmingUp實現原理:顧名思義,這裏想要實現的是一個有預熱能力的RateLimiter,做者在註釋中還畫了這幅圖:
在進入詳細實現前,讓咱們先記住幾個基本原則:
總之,向左移動K個permits所須要花費的節流時間等於長度爲K的函數的面積。假設RateLimiter的storedPermits飽和狀態,從maxPermits到thresholdPermits就等於warmupPeriod。從thresholdPermits到0的時間是warmupPeriod/2(緣由是爲了維護之前的實現,其中coldFactor硬編碼爲3)
計算thresholdPermits和maxPermits的公式:
從thresholdPermits到0花費的節流時間等於函數從0到thresholdPermits的積分,也就是thresholdPermits*stableIntervals。也等於warmupPeriod/2。
thresholdPermits=0.5*warmupPeriod/stableInterval
從maxPermits到thresholdPermits就是函數從thresholdPermits到maxPermits的積分,也就是梯型部分的面積,它等於0.5(stableInterval+coldInterval)(maxPermits - thresholdPermits),也就是warmupPeriod
maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)
這裏源碼先從SmoothBursty入手,首先是RateLimiter類裏的建立:
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } @VisibleForTesting static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { // maxBurstSeconds表明這個桶能存儲的令牌數換算的秒數,經過這個秒數就能夠知道能存儲的令牌數,也就表示這個桶的大小。 RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
咱們看到外界沒法自定義SmoothBursty的桶大小,因此咱們不管是建立什麼速率的RateLimiter,桶的大小就必然是rate*1
的大小,那麼就有人經過反射的方式在知足本身想要修改桶大小的需求:https://github.com/vipshop/vjtools/commit/9eacb861960df0c41b2323ce14da037a9fdc0629
setRate方法:
public final void setRate(double permitsPerSecond) { checkArgument( permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); // 須要全局同步 synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } private volatile Object mutexDoNotUseDirectly; // 產生一個同步使用的對象,適應雙重檢查鎖保證這個對象是個單例 private Object mutex() { Object mutex = mutexDoNotUseDirectly; if (mutex == null) { synchronized (this) { mutex = mutexDoNotUseDirectly; if (mutex == null) { mutexDoNotUseDirectly = mutex = new Object(); } } } return mutex; }
互斥鎖來保證線程的安全,具體實現代碼中使用volatile修飾的mutexDoNotUseDirectly字段和雙重校驗同步鎖來保證生成單例,從而保證每次調用mutex()都是鎖同一個對象。這在後續的獲取令牌中也須要用到。
doSetRate方法是子類SmoothRateLimiter實現:
// 當前存儲的令牌 double storedPermits; // 最大可以存儲的令牌 double maxPermits; // 兩次獲取令牌的固定間隔時間 double stableIntervalMicros; // 下一個請求能被授予的時間,一次請求後這個值就會推後,一個大請求推後的時間比一個小請求推後的時間要多 // 這和預消費有關係,上一次消費的令牌 private long nextFreeTicketMicros = 0L; final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); // 計算出請求授予的間隔時間 double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } // 更新storedPermits和nextFreeTicketMicros void resync(long nowMicros) { // 當前時間大於下一個請求時間 說明此次請求不用等待 若是不是就不會出現增長令牌的操做 if (nowMicros > nextFreeTicketMicros) { // 根據上次請求和此次請求間隔計算可以增長的令牌 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); // 存儲的令牌不能超過maxPermits storedPermits = min(maxPermits, storedPermits + newPermits); // 修改nextFreeTicketMicros爲當前時間 nextFreeTicketMicros = nowMicros; } } // 子類實現 abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros); // 返回冷卻期間須要等待得到新許可證的微秒數。子類實現 abstract double coolDownIntervalMicros();
SmoothBursty的實現
static final class SmoothBursty extends SmoothRateLimiter { /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */ final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; // 計算出桶的最大值 maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { // 初始化爲0 後續從新設置時按新maxPermits和老maxPermits的比例計算storedPermits storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; } // 對於SmoothBursty 沒有什麼冷卻時期,因此始終返回的是stableIntervalMicros @Override double coolDownIntervalMicros() { return stableIntervalMicros; } }
而後看下獲取令牌:
public double acquire() { return acquire(1); } public double acquire(int permits) { long microsToWait = reserve(permits); // 等待microsToWait時間 控制速率 stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } // 得到須要等待的時間 final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } // 子類SmoothRateLimiter實現 abstract long reserveEarliestAvailable(int permits, long nowMicros);
reserveEarliestAvailable方法,刷新下一個請求能被授予的時間
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { // 每次acquire都會觸發resync resync(nowMicros); // 返回值就是下一個請求能被授予的時間 long returnValue = nextFreeTicketMicros; // 選出請求令牌數和存儲令牌數較小的一個 也就是從存儲的令牌中須要消耗的數量 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 計算本次請求中須要行建立的令牌 double freshPermits = requiredPermits - storedPermitsToSpend; // 計算須要等待的時間 就是存儲令牌消耗的時間+新令牌產生須要的時間 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 刷新下一個請求能被授予的時間 是將此次等待的時間加上原先的值 就是把此次請求須要產生的等待時間延遲給下一次請求 這就是一個大請求會立刻授予 但後續的請求會被等待長時間 因此這裏的思路核心就是再每次請求時都是在預測下一次請求到來的時間 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 刷新存儲令牌 this.storedPermits -= storedPermitsToSpend; return returnValue; } // 這個是用來計算存儲令牌在消耗時的節流時間 也就是經過這個方法子類能夠控存儲令牌的速率 咱們看到的SmoothBursty的實現是始終返回0 表示消耗存儲的令牌不須要額外的等待時間 咱們在預熱的實現中能夠看到不同的實現 abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
再來看一下請求令牌方法帶超時時間的方法:
public boolean tryAcquire(long timeout, TimeUnit unit) { return tryAcquire(1, timeout, unit); } public boolean tryAcquire(int permits) { return tryAcquire(permits, 0, MICROSECONDS); } public boolean tryAcquire() { return tryAcquire(1, 0, MICROSECONDS); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); // 斷定設置的超時時間是否足夠等待下一個令牌的給予,等不了,就直接失敗 if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { // 得到須要等待的時間 microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } // 等待 stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; }
以上能夠基本理解一個普通的限流器的實現方式,能夠看到實現中能夠經過doSetRate
,storedPermitsToWaitTime
,coolDownIntervalMicros
方法進行定製本身的限流策略。
那麼這裏的SmoothBursty的策略是:
maxPermits = maxBurstSeconds * permitsPerSecond;
咱們繼續看稍微複雜點的SmoothWarmingUp,畢竟爲了說明它人家做者都用註釋畫了示意圖。
static final class SmoothWarmingUp extends SmoothRateLimiter { // 預熱累計消耗時間 private final long warmupPeriodMicros; /** * The slope of the line from the stable interval (when permits == 0), to the cold interval * (when permits == maxPermits) */ private double slope; private double thresholdPermits; // 冷卻因子 固定是3 意思是經過這個因子能夠計算在令牌桶滿的時候,消耗令牌須要的最大時間 private double coldFactor; SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this.coldFactor = coldFactor; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; // 經過coldFactor就能夠算出coldInterval的最高點 即stableIntervalMicros的3倍 也就是說圖中的梯形最高點是固定的了 double coldIntervalMicros = stableIntervalMicros * coldFactor; // 根據warmupPeriodMicros*2=thresholdPermits*stableIntervalMicros換算thresholdPermits 由於咱們看到梯形最高點是固定的 那麼經過設置warmupPeriod是能夠控制thresholdPermits,從而控制maxPermits的值 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; // 也是根據上面提到的公式計算maxPermits maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); // 傾斜角度 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { storedPermits = (oldMaxPermits == 0.0) ? maxPermits // 這裏的初始值是maxPermits : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { // 超過thresholdPermits的存儲令牌 double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // measuring the integral on the right part of the function (the climbing line) if (availablePermitsAboveThreshold > 0.0) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); // 這裏就開始算這個梯形的面積了 梯形面積=(上底+下底)*高/2 double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // measuring the integral on the left part of the function (the horizontal line) micros += (long) (stableIntervalMicros * permitsToTake); return micros; } private double permitsToTime(double permits) { return stableIntervalMicros + permits * slope; } // 爲了確保從0到maxpermit所花費的時間等於預熱時間 能夠看下resync方法中對coolDownIntervalMicros方法的使用 @Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } }
1,Guava在對RateLimiter設計中採用的是令牌桶的算法,提供了普通和預熱兩種模型,在存儲令牌增長和消耗的設計思路是計算函數積分的方式。
2,對於第一個新來的請求,不管請求多少令牌,不阻塞。
3,內部使用線程sleep方式達到線程等待,沒超時時間會一直等到有令牌
4,令牌存儲發生在請求令牌時,不須要單獨線程實現不斷產生令牌的算法
5,內部設計的類一些參數並不開放外部配置
原理如圖:
咱們有一個固定的桶,這個桶的出水速率是固定的,流量不斷往桶中放水,進水速度比出水速度快的時候,能夠在桶中有一個緩衝,但是到達必定量超出桶的容量,就會溢出桶,沒法接受新的請求。
這個思路不就是阻塞隊列嘛,只要在消費的放保持固定速率便可。
實現相似以下代碼(示意使用):
public class LeakyBucketLimiter { BlockingQueue<String> leakyBucket; long rate; public LeakyBucketLimiter(int capacity, long rate) { this.rate = rate; this.leakyBucket = new ArrayBlockingQueue<String>(capacity); } public boolean offer(){ return leakyBucket.offer(""); } class Consumer implements Runnable{ public void run() { try { while (true){ Thread.sleep(1000/rate); leakyBucket.take(); } } catch (InterruptedException e) { } } } }
和令牌桶容許必定突發請求時的高速率,以及空閒後下降速率不一樣的是,漏桶算法是必然保證速率不變的。
一個思路是監控系統的狀態,好比cpu,內存,io等,依據這些信息來開關限流器。
在分佈式系統中,若是想用分佈式限流,就須要公用存儲的載體,好比redis,zk等。還須要考量分佈式存儲載體失效,不能影響正常業務功能。
http://www.cs.ucc.ie/~gprovan/CS6323/2014/L11-Congesion-Control.pdf(網上不少桶算法限流的圖都來自這裏)