任何系統的性能都有一個上限,當併發量超過這個上限以後,可能會對系統形成毀滅性地打擊。所以在任什麼時候刻咱們都必須保護系統的併發請求數量不能超過某個閾值,限流就是爲了完成這一目的。node
Guava RateLimiter 是一個谷歌提供的限流工具,RateLimiter 基於令牌桶算法實現,說明文檔見:http://ifeve.com/guava-rateli...
使用示例以下:nginx
@Test public void rateLimiterTest() { //建立一個RateLimiter,指定每秒放0.5個令牌(2秒放1個令牌) val rateLimiter = RateLimiter.create(0.5); int[] a = {1,6,2}; for(int i = 0; i < a.length; ++i) { //acquire(x) 從RateLimiter獲取x個令牌,該方法會被阻塞直到獲取到請求 System.out.println(System.currentTimeMillis() + " acq " + a[i] + ": wait " + rateLimiter.acquire(a[i]) + "s"); } }
輸出結果以下:git
1552389443244 acq 1: wait 0.0s 1552389443245 acq 6: wait 1.998468s 1552389445249 acq 2: wait 11.99443s
從輸出結果能夠看出,RateLimiter 具備預消費的能力:github
即:RateLimiter 經過限制後面請求的等待時間,來支持必定程度的突發請求 (預消費)算法
RateLimiter 有兩種限流模式,一種爲穩定模式 (SmoothBursty: 令牌生成速度恆定),一種爲漸進模式 (SmoothWarmingUp: 令牌生成速度緩慢提高直到維持在一個穩定值)。緩存
RateLimiter 核心思想主要有:
響應本次請求以後,動態計算下一次能夠服務的時間,若是下一次請求在這個時間以前則須要進行等待。SmoothRateLimiter 類中的 nextFreeTicketMicros 屬性表示下一次能夠響應的時間。例如,若是咱們設置 QPS 爲 1,本次請求處理完以後,那麼下一次最先的可以響應請求的時間一秒鐘以後。
RateLimiter 的子類 SmoothBursty 支持處理突發流量請求,例如,咱們設置 QPS 爲 1,在十秒鐘以內沒有請求,那麼令牌桶中會有 10 個(假設設置的最大令牌數大於 10)空閒令牌,若是下一次請求是 acquire(20) ,則不須要等待 20 秒鐘,由於令牌桶中已經有 10 個空閒的令牌。SmoothRateLimiter 類中的 storedPermits 就是用來表示當前令牌桶中的空閒令牌數。
SmoothWarmingUp 提出一種 「熱身模型」 和 「冷卻期」 的概念後面會詳細介紹併發
SmoothRateLimiter 主要屬性
SmoothRateLimiter 是抽象類,其定義了一些關鍵的參數,咱們先來看一下這些參數:dom
接下來看一下 SmoothBursty 中幾個重要的方法ide
create(double permitsPerSecond) 根據指定的 QPS 數值建立 RateLimiter,底層調用方法以下:
SmoothBursty 的 maxBurstSeconds 構造函數參數主要用於計算 maxPermits :maxPermits = maxBurstSeconds * permitsPerSecond;。函數
@VisibleForTesting static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) { //1.建立SmoothBursty限流器 RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); //2.設置限流速率 rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
再看 setRate 的方法,RateLimiter 中 setRate 方法最終後調用 doSetRate 方法,doSetRate 是一個抽象方法,SmoothRateLimiter 抽象類中覆蓋了 RateLimiter 的 doSetRate 方法:
//// SmoothRateLimiter類中的doSetRate方法,覆蓋了 RateLimiter 類中的 doSetRate 方法,此方法再委託下面的 doSetRate 方法作處理。 @Override final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); }
實現以下
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; //設置最大令牌數 if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 : storedPermits * maxPermits / oldMaxPermits; } }
acquire(int permits) 從 RateLimiter 獲取 x 個令牌,該方法會被阻塞直到獲取到請求;主要作了三件事
public double acquire(int permits) { long microsToWait = reserve(permits);//1.獲取當前請求須要等待的時間(惰性計算 ) stopwatch.sleepMicrosUninterruptibly(microsToWait); //2.sleep microsToWait 時間窗口 return 1.0 * microsToWait / SECONDS.toMicros(1L);//3.返回microsToWait對應的秒級時間 } final long reserve(int permits) { checkPermits(permits); //檢查參數是否>0 synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); //計算須要等待的時間 } }
該方法返回須要等待的時間,是 RateLimiter 的核心接口
RateLimiter 支持突發流量的本質就是,將當前須要的令牌數量 requiredPermits 拆分紅 storedPermitsToSpend(持有令牌中可用的數量)和 freshPermits(須要預支的令牌數量);分別計算須要等待的時間,而後更新 nextFreeTicketMicros 下次獲取令牌的時間
什麼意思呢?舉個例子:
當前 RateLimiter 持有 4 個令牌,當前請求須要 6 個令牌;則 6 個令牌中 4 個是能夠從持有的令牌中直接獲取,而另外兩個須要預支的令牌則須要單獨計算時間;
僞代碼:getReqWaitTime(6) = getWaitTime(4) + getFreshWait(6 - 4)
而在 SmoothBursty 模式中, getWaitTime(4) 是能夠直接獲取的,即 time=0;getFreshWait(6 - 4) 則等於 freshPermits stableIntervalMicros (預支令牌數 生成一個令牌須要的時間)
@Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); //1.根據當前時間和預計下一秒時間判斷有無新令牌產生,有則更新持有令牌數storedPermits 和 下次請求時間nextFreeTicketMicros long returnValue = nextFreeTicketMicros; //2.如下兩句,根據請求須要的令牌數requiredPermits和storedPermits當前持有的令牌數storedPermits分別計算 持有令牌中可用的數量storedPermitsToSpend和須要預支的令牌數量freshPermits double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)//3.分別計算storedPermitsToSpend和freshPermits的等待時間 + (long) (freshPermits * stableIntervalMicros); try { this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros); //4.更新nextFreeTicketMicros } catch (ArithmeticException e) { this.nextFreeTicketMicros = Long.MAX_VALUE; } this.storedPermits -= storedPermitsToSpend; //4.更新storedPermits return returnValue; }
WarmingUp 是 RateLimiter 的另外一種實例不一樣於 SmoothBursty ,它存在一個 「熱身」 的概念。即:若是當前系統處於 「 冷卻期」( 即一段時間沒有獲取令牌,即:當前持有的令牌數量大於某個閾值),則下一次獲取令牌須要等待的時間比 SmoothBursty 模式下的線性時間要大,而且逐步降低到一個穩定的數值。
大體原理:將 storedPermits 分紅兩個區間值:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。當請求進來時,若是當前系統處於 "cold" 的冷卻期狀態,從 [thresholdPermits, maxPermits] 區間去拿令牌,所須要等待的時間會長於從區間 [0, thresholdPermits) 拿相同令牌所須要等待的時間。當請求增多,storedPermits 減小到 thresholdPermits 如下時,此時拿令牌所須要等待的時間趨於穩定。這也就是所謂 「熱身」 的過程。
反應到代碼上,和 SmoothBursty 的不一樣有兩點
其餘部分原理相似。
WarmingUp 模式的限流器使用示例以下:
@Test public void rateLimiterTest2() throws InterruptedException { val rateLimiter = RateLimiter.create(5, 4000, TimeUnit.MILLISECONDS);//預熱模式,設置預熱時間和QPS,即在正式acquire前,限流器已經持有5*4=20個令牌 for(int i = 1; i < 50; i++) { System.out.println(System.currentTimeMillis() + " acq " + i + ": wait " + rateLimiter.acquire() + "s"); if(i == 15) { Thread.sleep(2000); System.out.println(System.currentTimeMillis() + " acq " + 15 + ": wait " + rateLimiter.acquire() + "s"); } } }
輸出結果以下:
1552395652026 acq 1: wait 0.0s 1552395652028 acq 2: wait 0.578357s 1552395652612 acq 3: wait 0.533835s 1552395653151 acq 4: wait 0.495191s 1552395653649 acq 5: wait 0.457239s 1552395654110 acq 6: wait 0.41631s 1552395654528 acq 7: wait 0.377524s 1552395654912 acq 8: wait 0.334018s 1552395655248 acq 9: wait 0.298249s 1552395655550 acq 10: wait 0.256165s 1552395655808 acq 11: wait 0.217752s 1552395656028 acq 12: wait 0.197672s 1552395656231 acq 13: wait 0.19451s 1552395656429 acq 14: wait 0.196465s 1552395656630 acq 15: wait 0.195714s 1552395658834 acq 15: wait 0.0s 1552395658834 acq 16: wait 0.34158s 1552395659180 acq 17: wait 0.296628s 1552395659482 acq 18: wait 0.256914s 1552395659744 acq 19: wait 0.216517s 1552395659965 acq 20: wait 0.195077s 1552395660164 acq 21: wait 0.195953s 1552395660365 acq 22: wait 0.195196s 1552395660564 acq 23: wait 0.196015s 1552395660764 acq 24: wait 0.195972s
從輸出結果能夠看出,RateLimiter 具備預消費的能力:
SmoothWarmingUp 是 SmoothRateLimiter 的子類,它相對於 SmoothRateLimiter 多了幾個屬性:
SmoothRateLimiter 類的註釋文檔中有對預熱模型的詳細解釋
橫座標:是當前令牌桶中的令牌 storedPermits,前面說過 SmoothWarmingUp 將 storedPermits 分爲兩個區間:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。
縱座標:請求的間隔時間,stableInterval 就是 1 / QPS,例如設置的 QPS 爲 5,則 stableInterval 就是 200ms,coldInterval = stableInterval * coldFactor,這裏的 coldFactor 硬編碼寫死的是 3。
當系統請求增多,圖像會像左移動,直到 storedPermits 爲 0。等待一段時間後,隨着令牌的生成當系統進入 cold 階段時,圖像會向右移,直到 storedPermits 等於 maxPermits。
create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根據指定的 QPS 和預熱期來建立 RateLimiter,在這段預熱時間內,RateLimiter 每秒分配的許可數會平穩地增加直到預熱期結束時達到其最大速率。
@VisibleForTesting static RateLimiter create( SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor) { RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);//1.建立SmoothWarmingUp限流器 rateLimiter.setRate(permitsPerSecond);//2.設置限流速率 return rateLimiter; } public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod); return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit, 3.0); } SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); //1.設置預熱時間 this.coldFactor = coldFactor;//3.設置coldFactor爲3 } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; double coldIntervalMicros = stableIntervalMicros * coldFactor; //1.設置冷卻期等待時間數值coldIntervalMicros thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;//2.設置冷卻期的閾值,thresholdPermits等於預熱期產生令牌數的一半 maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);//3.設置持有令牌的最大值,爲thresholdPermits的2倍 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);//4.設置預熱區的斜率;縱座標之差/橫座標之差 if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = 0.0; } else { storedPermits = (oldMaxPermits == 0.0) ? maxPermits : storedPermits * maxPermits / oldMaxPermits; } }
前面說到,SmoothWarmingUp 和 SmoothBursty 的一個重要區別就在於 「獲取當前令牌中可用令牌的等待時間」storedPermitsToWaitTime 方法, 而 「獲取預支令牌的等待時間」 和以前一致。
@Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits;//1.獲取當前持有令牌數和閾值的差值availablePermitsAboveThreshold long micros = 0; if (availablePermitsAboveThreshold > 0.0) {//2.若是availablePermitsAboveThreshold>0,即當前持有令牌數>閾值,即到達冷區期;計算等待時間 double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);//3.計算WARM UP PERIOD部分計算的方法,這部分是一個梯形,梯形的面積計算公式是 「(上底 + 下底) * 高 / 2」 micros = (long) (permitsAboveThresholdToTake * (permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0); permitsToTake -= permitsAboveThresholdToTake;//4.剩餘的令牌從 stable部分拿 } micros += (stableIntervalMicros * permitsToTake);//5.stable 部分令牌獲取花費的時間 return micros; }
如何理解這個方法?
舉個例子:
建立限流器時 create(5, 4000, TimeUnit.MILLISECONDS);預熱了 20 個令牌
場景 1:
當前持有 20 個令牌,請求一個令牌;須要等待的時間爲:
場景 2:
當前持有 18 個令牌,請求 1 個令牌;須要等待的時間爲:
場景 3:
當前持有 20 個令牌,一次性請求 11 個令牌;須要等待的時間爲:
場景 4:
當前持有 10 個令牌,一次性請求 1 個令牌;須要等待的時間爲:
小結:
總結一下 SmoothWarmingUp 和 SmoothBursty 的建立和使用令牌的過程:
SmoothBursty
SmoothWarmingUp
最後
SmoothWarmingUp 和 SmoothBursty 的最大區別就在於,「獲取已持有令牌中可用令牌的等待時間」 不一樣,SmoothBursty 是直接返回的,SmoothWarmingUp 則是基於 「熱身模型」 和 「冷卻期」(即一段時間沒有獲取令牌,衡量指標:當前持有的令牌數量大於某個閾值)的機制進行動態調整(冷卻期按照梯形區域返回,不然按照矩形區域返回)
預支令牌的等待時間算法一致,waitTime = 預支令牌數量 * 生成一個令牌須要的時間(1/QPS)
SmoothWarmingUp 爲系統提供一種冷啓動的可能,例如:某系統底層使用緩存中間件,假如沒有 「熱身」,突發流量極可能形成緩存擊穿等問題;WarmingUp 讓系統應對突發流量有一個 「漸進準備資源」 的過程
Rhino 使用的令牌桶的平滑限流,即 WarmingUp 模式:Rhino C++ SDK 說明文檔
nginx 有兩個限流模塊,從 github 上 clone 代碼,位置在 nginx/src/http/modules 目錄下:
二者都是按照 IP 或者域名限制的
本次調研僅聚焦其限流原理,相關配置參考: limit_req 官方說明 limit_conn 官方說明
ngx_http_limit_req_module 限流核心思想:
通俗來說:就是建立一個令牌的時間,只能接收並處理一個請求,其餘的排隊或者直接丟棄
用戶可能同時配置若干限流,所以對於 HTTP 請求,nginx 須要遍歷全部限流策略,判斷是否須要限流;
ngx_http_limit_req_lookup 方法實現了漏桶算法,方法返回 3 種結果:
//limit,限流策略;hash,記錄key的hash值;data,記錄key的數據內容;len,記錄key的數據長度;ep,待處理請求數目;account,是不是最後一條限流策略 static ngx_int_t ngx_http_limit_req_lookup(ngx_http_limit_req_limit_t *limit, ngx_uint_t hash, u_char *data, size_t len, ngx_uint_t *ep, ngx_uint_t account) { //紅黑樹查找指定界定,sentinel表明紅黑樹的NULL節點 while (node != sentinel) { if (hash < node->key) { node = node->left; continue; } if (hash > node->key) { node = node->right; continue; } //hash值相等,比較數據是否相等 lr = (ngx_http_limit_req_node_t *) &node->color; rc = ngx_memn2cmp(data, lr->data, len, (size_t) lr->len); //查找到 if (rc == 0) { ngx_queue_remove(&lr->queue); ngx_queue_insert_head(&ctx->sh->queue, &lr->queue); //將記錄移動到LRU隊列頭部 ms = (ngx_msec_int_t) (now - lr->last); //當前時間減去上次訪問時間 if (ms < -60000) { ms = 1; } else if (ms < 0) { ms = 0; } //漏桶算法 excess = lr->excess - ctx->rate * ms / 1000 + 1000; //待處理請求書-限流速率*時間段+1個請求(速率,請求數等都乘以1000了) if (excess < 0) { excess = 0; } *ep = excess; //待處理數目超過burst(等待隊列大小),返回NGX_BUSY拒絕請求(沒有配置burst時,值爲0) if ((ngx_uint_t) excess > limit->burst) { return NGX_BUSY; } if (account) { //若是是最後一條限流策略,則更新上次訪問時間,待處理請求數目,返回NGX_OK lr->excess = excess; lr->last = now; return NGX_OK; } //訪問次數遞增 lr->count++; ctx->node = lr; return NGX_AGAIN; //非最後一條限流策略,返回NGX_AGAIN,繼續校驗下一條限流策略 } node = (rc < 0) ? node->left : node->right; } //假如沒有查找到節點,須要新建一條記錄 *ep = 0; size = offsetof(ngx_rbtree_node_t, color) + offsetof(ngx_http_limit_req_node_t, data) + len; //嘗試淘汰記錄(LRU) ngx_http_limit_req_expire(ctx, 1); node = ngx_slab_alloc_locked(ctx->shpool, size);//分配空間 if (node == NULL) { //空間不足,分配失敗 ngx_http_limit_req_expire(ctx, 0); //強制淘汰記錄 node = ngx_slab_alloc_locked(ctx->shpool, size); //分配空間 if (node == NULL) { //分配失敗,返回NGX_ERROR return NGX_ERROR; } } node->key = hash; //賦值 lr = (ngx_http_limit_req_node_t *) &node->color; lr->len = (u_char) len; lr->excess = 0; ngx_memcpy(lr->data, data, len); ngx_rbtree_insert(&ctx->sh->rbtree, node); //插入記錄到紅黑樹與LRU隊列 ngx_queue_insert_head(&ctx->sh->queue, &lr->queue); if (account) { //若是是最後一條限流策略,則更新上次訪問時間,待處理請求數目,返回NGX_OK lr->last = now; lr->count = 0; return NGX_OK; } lr->last = 0; lr->count = 1; ctx->node = lr; return NGX_AGAIN; //非最後一條限流策略,返回NGX_AGAIN,繼續校驗下一條限流策略 }
當一個新請求進入 Nginx 的限流流程大體以下:
解釋一下相關的變量:
excess:積壓等待處理的請求數量(也就是桶中積壓的令牌數量) 乘 1000(nginx 計算的時候單位換算乘 1000)
ctx->rate:限流的速率乘 1000(例如:設置當前的 IP 限流速率爲 5 / 秒,則 rate 等於 5000;乘 1000 是 Nginx 內部的單位換算)
ms 是當前請求和上次成功請求時間的差值,單位毫秒
怎麼理解這個表達式呢?
假設場景:
ms / 1000 的意思是本次請求在 1s 中的佔比,ctx->rate * ms / 1000 意思是這段時間能夠流過的請求數
1000 表明當次請求,即爲 1(nginx 計算的時候單位換算乘 1000)
lr->excess - ctx->rate * ms / 1000 + 1000:的意思就是 當前積壓令牌數 = 上次積壓令牌數 - 這段時間能夠產生的令牌數 + 本次請求(1 個令牌)
漏斗的本質:當 excess > limit->burst;即積壓令牌 excess > 桶的最大容量,拒絕當前請求
舉個例子,假設:
lr->excess 初始化爲 0*1000
burst(桶最大容量)爲 0*1000
令牌產生週期爲 T,請求以下圖所示
limit_conn 模塊用來限制某個 IP 的併發鏈接數。它的實現與 limit_req 模塊相似,總體邏輯和實現更爲簡單。limit_conn 模塊也將某個 IP 的信息存儲在紅黑樹的節點中。
涉及兩個核心方法:
源代碼詳見:https://github.com/nginx/ngin...
conn_handler 方法處理請求的大體流程以下:
請求處理完成後,對當前節點鏈接數減 1,若當前節點鏈接數減至 0,析構當前節點,回收內存
ngx_http_limit_req_module 源碼分析
ngx_http_limit_conn_module 源碼分析