1、限流的關鍵做用node
對於大型互聯網架構中,限流的設計是必不可少的一個環節。在給定的時間內, 客戶端請求次數過多, 服務器就會攔截掉部分請求,避免請求流量過大形成數據庫負載高的問題。nginx
2、常見限流算法利弊分析golang
計數器限流redis
計數器限流主要有固定窗口計數器和滑動窗口計數器。固定窗口計數器即:在單位時間內請求數達到了所限定的數量時,請求如需不被攔截則須要等待下一個單位時間開始;滑動窗口計數器:在單位時間內請求數達到了所限定的數量時,當前時刻的請求會被攔截,隨時間窗口的滑動計數器數量會變化,當計數器數量小於限定數量時請求正常執行。算法
常規計數器限流數據庫
常規計數器限流是指在一個時間段內容許必定數量的請求執行,超過最大限制則會阻止請求直到超過當前時間段爲止。如上圖所示,10s內限制1000個請求,在第11s的時候計數器會從0從新開始計數。 後端
常規計數器限流瞬時流量問題安全
如上圖所示,常規計數器模式下,在第9.9s的時候執行了1000請求,在第10.1s時計數器已清0,此時又有1000請求到來,這樣至關於在0.2s的時間內有2000請求,顯然違背了限流的初衷。bash
滑動窗口計數器限流服務器
滑動窗口計數器限流是在計數器限流的基礎上將固定的時間段劃分爲若干個時間窗口,隨着時間的推移,保持時間段內的滑動窗口個數,在常規計數器限流的基礎上避免了瞬時流量對服務器的壓力。如上圖所示,0-3s內有600請求,8-13s有700請求,當第16s時新增500請求會觸發限流。
計數器限流突刺現象
特色:若是在當前時間窗口最後半秒請求數忽然達到最大限制,半秒後進入下一個時間窗口開始,若是請求繼續在半秒內達到週期上限,則至關於1秒內請求達到2倍的限制請求數;若是60s爲一個週期,在第10s的時候服務器已處理完請求,在計數器限流模式下會使得服務器空閒50s沒法處理請求。
計數器限流原理簡單,實現比較容易,可是也有一個痛點問題就是它的突刺現象,如上圖所示,10s限制1000請求,到第2s時已達請求上限,那麼在第3-10s內的請求將會持續拒絕,在服務器資源空閒的狀態下會形成極大的浪費。
漏桶限流
請求進入到漏桶中,漏桶以固定的速度流出,當訪問頻率超過接口響應頻率流速過大時拒絕請求,能夠看到漏桶至關於一個隊列,進隊的速率不受限制,出隊是固定速率。
特色:因爲出水速率是固定的,當突發大流量時會致使大量請求被限制,沒法處理。
漏桶限流,請求進入漏桶不受限制,並以固定的速率流出,當桶滿而且當前流入的請求大於當前流出的請求時,限制請求。漏桶限流解決了計數器限流模式下流量突刺的問題,當服務器處理完請求後,只要能從漏桶中流出請求則能繼續處理,不會形成長時間等待拒絕請求。
漏桶限流突發流量問題
漏桶限流突發流量問題,如上圖所示,漏桶滿後此時大量請求到來,因爲服務器已擴容能夠知足請求處理,可是漏桶會拒絕大量請求,致使沒法應對突發流量問題。
令牌桶限流
令牌token以固定的速率向桶中放入令牌直至桶滿,在執行請求前須要先從桶中獲取令牌,形式上也至關於隊列,入隊以固定速率,出對不受限制,這點與漏桶恰好相反。
特色:能夠應對突發流量,只要桶中有令牌便可執行請求。
3、golang語言層面實現限流算法
簡單計數器限流
package main import ( "fmt" "sync" "time" ) // CounterLimiter 簡單計數器限流 type CounterLimiter struct { Interval int64 // 從新計數時間 LastTime time.Time // 上一次請求時間 MaxCount int // 最大計數 Lck *sync.Mutex ReqCount int // 目前的請求數 } // NewCounterLimiter 初始化簡單計數器限流 func NewCounterLimiter(interval int64, maxCount int) *CounterLimiter { return &CounterLimiter{ Interval: interval, LastTime: time.Now(), MaxCount: maxCount, Lck: new(sync.Mutex), ReqCount: 0, } } // counterLimit 簡單計數器限流實現 func (r *CounterLimiter) counterLimit() bool { r.Lck.Lock() defer r.Lck.Unlock() now := time.Now() if now.Unix()-r.LastTime.Unix() > r.Interval { r.LastTime = now r.ReqCount = 0 } if r.ReqCount < r.MaxCount { r.ReqCount += 1 return true } return false } func main() { // 定義1秒最多5個請求 r := NewCounterLimiter(1, 5) for i := 0; i < 20; i++ { ok := r.counterLimit() if ok { fmt.Println("pass ", i) } else { fmt.Println("limit ", i) } time.Sleep(100 * time.Millisecond) } }
滑動窗口計數器限流
package main import ( "fmt" "sync" "time" ) // SlidingWindowLimiter 滑動窗口計數器限流 type SlidingWindowLimiter struct { Interval int64 // 總計數時間 LastTime time.Time // 上一個窗口時間 Lck *sync.Mutex // 鎖 WinCount []int64 // 窗口中請求當前數量 TicketSize int64 // 窗口最大容量 TicketCount int64 // 窗口個數 CurIndex int64 // 目前使用的窗口下標 } // NewSlidingWindowLimiter 初始化滑動窗口計數器限流 func NewSlidingWindowLimiter(interval int64, ticketCount int64, ticketSize int64) *SlidingWindowLimiter { return &SlidingWindowLimiter{ Interval: interval, LastTime: time.Now(), TicketSize: ticketSize, TicketCount: ticketCount, WinCount: make([]int64, ticketSize, ticketSize), CurIndex: 0, Lck: new(sync.Mutex), } } // slidingCounterLimit 滑動窗口計數器限流實現 func (r *SlidingWindowLimiter) slidingCounterLimit() bool { r.Lck.Lock() defer r.Lck.Unlock() eachTicketTime := r.Interval / r.TicketCount now := time.Now() // 若是間隔時間超過一個窗口的時間 當前窗口置0 指向下一個窗口 if now.Unix()-r.LastTime.Unix() > eachTicketTime { r.WinCount[r.CurIndex] = 0 r.CurIndex = (r.CurIndex + 1) % r.TicketCount r.LastTime = now } fmt.Println("當前窗口:", r.CurIndex) // 當前窗口未滿則正常計數 if r.WinCount[r.CurIndex] < r.TicketSize { r.WinCount[r.CurIndex]++ return true } return false } func main() { // 定義1秒10個時間窗口 每一個窗口大小爲1 即1秒10個請求 r := NewSlidingWindowLimiter(1, 10, 1) for i := 0; i < 20; i++ { ok := r.slidingCounterLimit() if ok { fmt.Println("pass ", i) } else { fmt.Println("limit ", i) } time.Sleep(100 * time.Millisecond) } }
漏桶限流
package main import ( "fmt" "sync" "time" ) // BucketLimiter 定義漏桶算法struct type BucketLimiter struct { Lck *sync.Mutex // 鎖 Rate float64 //最大速率限制 Balance float64 //漏桶的餘量 Cap float64 //漏桶的最大容量限制 LastTime time.Time //上次檢查的時間 } // NewBucketLimiter 初始化BucketLimiter func NewBucketLimiter(rate int, cap int) *BucketLimiter { return &BucketLimiter{ Lck: new(sync.Mutex), Rate: float64(rate), Balance: float64(cap), Cap: float64(cap), LastTime: time.Now(), } } // leakyBucket 漏桶算法實現 func (r *BucketLimiter) leakyBucket() bool { ok := false r.Lck.Lock() defer r.Lck.Unlock() now := time.Now() dur := now.Sub(r.LastTime).Seconds() //當前時間與上一次檢查時間差 r.LastTime = now water := dur * r.Rate //計算這段時間內漏桶流出水的流量water r.Balance += water //漏桶流出water容量的水,天然漏桶的餘量多出water if r.Balance > r.Cap { r.Balance = r.Cap } if r.Balance >= 1 { //漏桶餘量足夠容下當前的請求 r.Balance -= 1 ok = true } return ok } func main() { // 初始化 限制每秒2個請求 漏洞容量爲5 r := NewBucketLimiter(2, 5) for i := 0; i < 20; i++ { ok := r.leakyBucket() if ok { fmt.Println("pass ", i) } else { fmt.Println("limit ", i) } time.Sleep(100 * time.Millisecond) } }
令牌桶限流
package main import ( "fmt" "math" "sync" "time" ) // TokenBucket 定義令牌桶結構 type TokenBucket struct { LastTime time.Time // 當前請求時間 Capacity float64 // 桶的容量(存放令牌的最大量) Rate float64 // 令牌放入速度 Tokens float64 // 當前令牌總量 Lck *sync.Mutex } // NewTokenBucket 初始化TokenBucket func NewTokenBucket(rate int, cap int) *TokenBucket { return &TokenBucket{ LastTime: time.Now(), Capacity: float64(cap), Rate: float64(rate), Tokens: float64(cap), Lck: new(sync.Mutex), } } // getToken 判斷是否獲取令牌(若能獲取,則處理請求) func (r *TokenBucket) getToken() bool { now := time.Now() r.Lck.Lock() defer r.Lck.Unlock() // 先添加令牌 tokens := math.Min(r.Capacity, r.Tokens+now.Sub(r.LastTime).Seconds()*r.Rate) r.Tokens = tokens if tokens < 1 { // 若桶中一個令牌都沒有了,則拒絕 return false } else { // 桶中還有令牌,領取令牌 r.Tokens -= 1 r.LastTime = now return true } } func main() { // 初始化 限制每秒2個請求 令牌桶容量爲5 r := NewTokenBucket(2, 5) for i := 0; i < 20; i++ { ok := r.getToken() if ok { fmt.Println("pass ", i) } else { fmt.Println("limit ", i) } time.Sleep(100 * time.Millisecond) } }
4、nginx限流及實現
Nginx 提供了兩種限流手段:一是控制速率,二是控制併發鏈接數。
1. 控制速率
咱們須要使用 limit_req_zone 用來限制單位時間內的請求數,即速率限制,示例配置以下:
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s; server { location / { limit_req zone=mylimit; } }
limit_req zone=mylimit方案:
以上配置表示,限制每一個 IP 訪問的速度爲 2r/s,由於 Nginx 的限流統計是基於毫秒的,咱們設置的速度是 2r/s,轉換一下就是 500ms 內單個 IP 只容許經過 1 個請求,從 501ms 開始才容許經過第 2 個請求。
咱們使用單 IP 在 10ms 內發併發送了 6 個請求的執行結果以下:
從以上結果能夠看出他的執行符合咱們的預期,只有 1 個執行成功了,其餘的 5 個被拒絕了(第 2 個在 501ms 纔會被正常執行)。
表現爲對收到的請求無延時 超過訪問頻率則503
limit_req zone=mylimit burst=3方案:
上面的速率控制雖然很精準可是應用於真實環境未免太苛刻了,真實狀況下咱們應該控制一個 IP 單位總時間內的總訪問次數,而不是像上面那麼精確但毫秒,咱們可使用 burst 關鍵字開啓此設置,示例配置以下:
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s; server { location / { limit_req zone=mylimit burst=3; } }
burst=3 表示每一個 IP 最多容許3個突發請求,若是單個 IP 在 10ms 內發送 6 次請求的結果以下:
從以上結果能夠看出,有 1 個請求被當即處理了,3 個請求被放到 burst 隊列裏排隊執行了,另外 2被丟棄了。
超過了burst緩衝隊列長度和rate處理能力的請求被直接丟棄
表現爲對收到的請求有延時 全部請求排隊
limit_req zone=mylimit burst=3 nodelay方案: server { location / { limit_req zone=mylimit burst=3 nodelay; } }
若是單個 IP 在 10ms 內發送 6 次請求的結果以下:
依照在limit_req_zone中配置的rate來處理請求,同時設置了一個大小爲3的緩衝隊列,
當請求到來時,會爆發出一個峯值處理能力,表示這3個請求馬上處理,對於峯值處理數量以外的請求,直接丟棄
緩衝隊列按rate來釋放
表現爲對收到的請求無延時 緩衝已滿則503
2. 控制併發數
這個模塊用來限制單個IP的請求數。並不是全部的鏈接都被計數,只有在服務器處理了請求而且已經讀取了整個請求頭時,鏈接才被計數。
利用 limit_conn_zone 和 limit_conn 兩個指令便可控制併發數,示例配置以下:
limit_conn_zone $binary_remote_addr zone=perip:10m; limit_conn_zone $server_name zone=perserver:10m; server { ... limit_conn perip 10; limit_conn perserver 100; }
其中 limit_conn perip 10 表示限制單個 IP 同時最多能持有 10 個鏈接;limit_conn perserver 100 表示 server 同時能處理併發鏈接的總數爲 100 個。
只有當 request header 被後端處理後,這個鏈接才進行計數。
5、基於redis實現限流算法
對於上述限流算法目前已有不少成熟的第三方庫實現了,可是對於分佈式系統來講沒法起到嚴格意義上的限流,所以基於redis以gin中間件的方式實現上述限流算法。
滑動窗口計數器限流
func Limiter(ctx *gin.Context) { now := time.Now().UnixNano() username, exists := ctx.Get("username") if !exists { ctx.JSON(http.StatusBadRequest, gin.H{"message": "username獲取失敗"}) } key := fmt.Sprintf(redis.KeyLimitArticleUser, username) c, err := redis.Client.RedisCon.Dial() if err != nil || c == nil { ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis鏈接失敗"}) return } //限制五秒一次請求 var limit int64 = 1 dura := time.Second * 5 //刪除有序集合中的五秒以前的數據 _, err = c.Do("ZREMRANGEBYSCORE", key, "0", fmt.Sprint(now-(dura.Nanoseconds()))) if err != nil { ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis操做ZREMRANGEBYSCORE失敗"}) } reqs, _ := redisPool.Int64(c.Do("ZCARD", key)) if reqs >= limit { ctx.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{ "status": http.StatusTooManyRequests, "message": "too many request", }) return } ctx.Next() _, err = c.Do("ZADD", key, float64(now), float64(now)) if err != nil { ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis操做ZADD失敗"}) } _, err = c.Do("EXPIRE", key, dura) if err != nil { ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis操做EXPIRE失敗"}) } }
漏桶限流
// LeakyBucket redis實現漏桶限流 func LeakyBucket(ctx *gin.Context) { username, exists := ctx.Get("username") if !exists { ctx.JSON(http.StatusBadRequest, gin.H{"message": "username獲取失敗"}) } key := fmt.Sprintf(redis.KeyLeakyBucketArticleUser, username) c, err := redis.Client.RedisCon.Dial() if err != nil || c == nil { ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis鏈接失敗"}) return } rate := 2 // 每秒2個請求 capacity := 5 // 桶容量 lastTime, err := redisPool.Int64(c.Do("hget", key, "lastTime")) // 上次請求時間 now := time.Now().Unix() water := int(now-lastTime) * rate // 通過一段時間後桶流出的請求 balance, err := redisPool.Int(c.Do("hget", key, "balance")) // 上一次桶的餘量 balance += water // 當前桶的餘量 if balance > capacity { balance = capacity } if balance >= 1 { balance-- lastTime = now // 記錄當前請求時間 秒爲單位 c.Do("hset", key, "lastTime", lastTime) c.Do("hset", key, "balance", balance) return } // 無空閒balance可用時 429狀態碼限流提示 ctx.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{ "status": http.StatusTooManyRequests, "message": "too many request", }) }
令牌桶限流
// BucketLimit redis實現令牌桶限流 func BucketLimit(ctx *gin.Context) { username, exists := ctx.Get("username") if !exists { ctx.JSON(http.StatusBadRequest, gin.H{"message": "username獲取失敗"}) } key := fmt.Sprintf(redis.KeyBucketLimitArticleUser, username) c, err := redis.Client.RedisCon.Dial() if err != nil || c == nil { ctx.JSON(http.StatusBadRequest, gin.H{"message": "redis鏈接失敗"}) return } rate := 1 // 令牌生成速度 每秒1個token capacity := 1 // 桶容量 tokens, err := redisPool.Int(c.Do("hget", key, "tokens")) // 桶中的令牌數 lastTime, err := redisPool.Int64(c.Do("hget", key, "lastTime")) // 上次令牌生成時間 now := time.Now().Unix() // 初始狀態下 令牌數量爲桶的容量 existKey, err := redisPool.Int(c.Do("exists", key)) if existKey != 1 { tokens = capacity c.Do("hset", key, "lastTime", now) } deltaTokens := int(now-lastTime) * rate // 通過一段時間後生成的令牌 if deltaTokens > 1 { tokens = tokens + deltaTokens // 增長令牌 } if tokens > capacity { tokens = capacity } if tokens >= 1 { tokens-- // 請求進來了,令牌就減小1 c.Do("hset", key, "lastTime", now) c.Do("hset", key, "tokens", tokens) return } // 無空閒token可用時 429狀態碼限流提示 ctx.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{ "status": http.StatusTooManyRequests, "message": "too many request", }) }
redis+lua實現線程安全的分佈式限流算法
以令牌桶算法爲例:
實現流程圖
定義lua腳本
// lua腳本實現令牌桶算法限流 ScriptTokenLimit = ` local rateLimit = redis.pcall('HMGET',KEYS[1],'lastTime','tokens') local lastTime = rateLimit[1] local tokens = tonumber(rateLimit[2]) local capacity = tonumber(ARGV[1]) local rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) if tokens == nil then tokens = capacity else local deltaTokens = math.floor((now-lastTime)*rate) tokens = tokens+deltaTokens if tokens>capacity then tokens = capacity end end local result = false lastTime = now if(tokens>0) then result = true tokens = tokens-1 end redis.call('HMSET',KEYS[1],'lastTime',lastTime,'tokens',tokens) return result `
經過lua腳本實現令牌桶算法限流
// LuaTokenBucket 經過lua腳本實現令牌桶算法限流 func LuaTokenBucket(c redis.Conn, key string, capacity, rate, now int64) (bool, error) { defer c.Close() lua := redis.NewScript(1, ScriptTokenLimit) // lua腳本中的參數爲key和value res, err := redis.Bool(lua.Do(c, key, capacity, rate, now)) if err != nil { return false, err } return res, nil }
限流中間件
// LuaTokenBucket 經過lua腳本實現令牌桶算法限流 func LuaTokenBucket(c redis.Conn, key string, capacity, rate, now int64) (bool, error) { defer c.Close() lua := redis.NewScript(1, ScriptTokenLimit) // lua腳本中的參數爲key和value res, err := redis.Bool(lua.Do(c, key, capacity, rate, now)) if err != nil { return false, err } return res, nil }
6、總結
計數器、漏桶、令牌桶算法限流有各自的特色及應用場景,不能單一維度地判斷哪一個算法最好。計數器算法實現簡單,適用於對接口頻次的限制,如防惡意刷帖限制等;漏桶限流適用於處理流量突刺現象,由於只要桶爲空就能夠接受請求;而令牌桶限流適用於應對突發流量,也是目前互聯網架構中最經常使用的一種限流方式,只要能取到令牌便可處理請求。
nginx限流控制接口頻次其實現方式實質上是用到了漏桶算法,若是是http請求而且使用了nginx做爲反向代理,那麼可使用nginx做爲流量入口限制的第一關。
在分佈式場景下,通常選擇使用redis來實現限流算法,配合lua腳本使得限流的判斷是一個原子操做。