目前在作一個消息中臺,提供給業務方各類消息通道能力。咱們在系統設計過程當中,除了有對業務方在使用時作 Quota 限制;也有對請求作流量控制(幾w+ QPS),防止併發流量上來時打垮服務。下面是我的在調研流量控制方案的一些梳理和總結。html
併發一般是指併發訪問,也就是在某個時間點,有多少個訪問請求同時到來。機器的性能是有限的,若是這個量級達到必定程度,就會形成系統壓力,影響系統性能。前端
應對高併發流量的幾種解決方案:nginx
高併發最有效和經常使用的解決方案是流量控制 ,也就是限流。爲應對服務的高可用,經過對大流量的請求進行限流,攔截掉大部分請求,只容許一部分請求真正進入後端服務器,這樣就能夠防止大量請求形成系統壓力過大致使系統崩潰的狀況,從而保護服務正常可用。git
經常使用的限流算法github
計數器是一種比較簡單的限流算法,用途比較普遍,在接口層面,不少地方使用這種方式限流。在一段時間內,進行計數,與閥值進行比較,到了時間臨界點,再將計數器清0。算法
package counter import ( "fmt" "time" ) func CounterDemo() { // init rate limiter limitCount := int64(100) interval := 60 * time.Second rateLimiter := NewRateLimiter(limitCount, interval) for i := 0; i < 800; i++ { if rateLimiter.Grant() { fmt.Println("Continue to process") continue } fmt.Println("Exceed rate limit") } } type RateLimiter struct { limitCount int64 interval time.Duration requestCount int64 startAt time.Time } func NewRateLimiter(limitCount int64, interval time.Duration) *RateLimiter { return &RateLimiter{ startAt: time.Now(), interval: interval, limitCount: limitCount, } } func (rl *RateLimiter) Grant() bool { now := time.Now() if now.Before(rl.startAt.Add(rl.interval)) { if rl.requestCount < rl.limitCount { rl.requestCount++ return true } return false } rl.startAt = time.Now() rl.requestCount = 0 return false }
這種實現方式存在一個時間臨界點問題:若是在單位時間 1min 內的前 1s ,已經經過了 100 個請求,那後面的 59s ,只能把請求拒絕,這種現象稱爲 突刺現象。數據庫
因爲計數器存在突刺現象,可使用漏桶算法來解決。漏桶提供了一種簡單、直觀的方法,經過隊列來限制速率,能夠把隊列看做是一個存放請求的桶。當一個請求被註冊時,會被加到隊列的末端。每隔一段時間,隊列中的第一個事件就會被處理。這也被稱爲先進先出(FIFO)隊列。若是隊列已滿,那麼額外的請求就會被丟棄(或泄露)。windows
package leakyBucket import ( "fmt" "time" ) func LeakyBucketDemo() { // init rate limiter rate := int64(5) size := int64(10) rateLimiter := NewRateLimiter(rate, size) for i := 0; i < 800; i++ { if rateLimiter.Grant() { fmt.Println("Continue to process") continue } fmt.Println("Exceed rate limit") } } type RateLimiter struct { startAt time.Time // bucket size size int64 // now the water in bucket water int64 // rater discharge rate rate int64 } func NewRateLimiter(rate, size int64) *RateLimiter { return &RateLimiter{ startAt: time.Now(), rate: rate, // rate of processing requests, request/s size: size, } } func (rl *RateLimiter) Grant() bool { // calculating water output now := time.Now() out := int64(now.Sub(rl.startAt).Milliseconds()) * rl.rate // remain water after the leak rl.water = max(0, rl.water-out) rl.startAt = now if rl.water+1 < rl.size { rl.water++ return true } return false } func max(a, b int64) int64 { if a > b { return a } return b }
漏桶算法的優勢是,它能將突發的請求平滑化,並以近似平均的速度處理。可是,瞬間高併發的流量可能會使請求佔滿隊列,使最新的請求沒法獲得處理,也不能保證請求在固定時間內獲得處理。後端
令牌桶算法是對漏桶算法的一種改進,桶算法可以限制請求調用的速率,而令牌桶算法可以在限制調用的平均速率的同時還容許必定程度的突發調用。緩存
該算法的基本原理也很容易理解。就是有一個桶,裏面有一個最大數量的 Token(容量)。每當一個消費者想要調用一個服務或消費一個資源時,他就會取出一個或多個 Token。只有當消費者可以取出所需數量的 Token 時,他才能消費一項服務。若是桶中沒有所需數量的令牌,他須要等待,直到桶中有足夠的令牌。
package tokenBucket import ( "fmt" "time" ) func tokenBucketDemo() { tokenRate := int64(5) size := int64(10) rateLimiter := NewRateLimiter(tokenRate, size) for i := 0; i < 800; i++ { if rateLimiter.Grant() { fmt.Println("Continue to process") continue } fmt.Println("Exceed rate limit") } } type RateLimiter struct { startAt time.Time size int64 tokens int64 tokenRate int64 } func NewRateLimiter(tokenRate, size int64) *RateLimiter { return &RateLimiter{ startAt: time.Now(), tokenRate: tokenRate, size: size, } } func (rl *RateLimiter) Grant() bool { now := time.Now() in := now.Sub(rl.startAt).Milliseconds() * rl.tokenRate rl.tokens = min(rl.size, rl.tokens+in) rl.startAt = now if rl.tokens > 0 { rl.tokens-- return true } return false } func min(a, b int64) int64 { if a > b { return b } return a }
漏桶和令牌桶算法存在兩個缺點:
這裏推薦一種更優秀的限流算法:滑動窗口。它能夠靈活地擴展速率限制,而且性能良好。 能較好解決上面這兩個缺陷,同時避免了漏桶的瞬時大流量問題,以及計數器實現的突刺現象。
滑動窗口是把固定時間片,進行劃分,而且隨着時間進行移動,這樣就巧妙的避開了計數器的突刺現象。也就是說這些固定數量的能夠移動的格子,將會進行計數判斷閥值,所以格子的數量影響着滑動窗口算法的精度。
package slidingWindow import ( "fmt" "sync" "time" ) func SlidingWindowDemo() { // allow 10 requests per second rateLimiter := NewRateLimiter(time.Second, 10, func() Window { return NewLocalWindow() }) if rateLimiter.Grant() { fmt.Println("Continue to process") } else { fmt.Println("Exceed rate limit") } } // Window represents a fixed-window type Window interface { // Start returns the start boundary Start() time.Time // Count returns the accumulated count Count() int64 // AddCount increments the accumulated count by n AddCount(n int64) // Reset sets the state of the window with the given settings Reset(s time.Time, c int64) } type NewWindow func() Window type LocalWindow struct { start int64 count int64 } func NewLocalWindow() *LocalWindow { return &LocalWindow{} } func (w *LocalWindow) Start() time.Time { return time.Unix(0, w.start) } func (w *LocalWindow) Count() int64 { return w.count } func (w *LocalWindow) AddCount(n int64) { w.count += n } func (w *LocalWindow) Reset(s time.Time, c int64) { w.start = s.UnixNano() w.count = c } type RateLimiter struct { size time.Duration limit int64 mu sync.Mutex curr Window prev Window } func NewRateLimiter(size time.Duration, limit int64, newWindow NewWindow) *RateLimiter { currWin := newWindow() // The previous window is static (i.e. no add changes will happen within it), // so we always create it as an instance of LocalWindow prevWin := NewLocalWindow() return &RateLimiter{ size: size, limit: limit, curr: currWin, prev: prevWin, } } // Size returns the time duration of one window size func (rl *RateLimiter) Size() time.Duration { return rl.size } // Limit returns the maximum events permitted to happen during one window size func (rl *RateLimiter) Limit() int64 { rl.mu.Lock() defer rl.mu.Unlock() return rl.limit } func (rl *RateLimiter) SetLimit(limit int64) { rl.mu.Lock() defer rl.mu.Unlock() rl.limit = limit } // shorthand for GrantN(time.Now(), 1) func (rl *RateLimiter) Grant() bool { return rl.GrantN(time.Now(), 1) } // reports whether n events may happen at time now func (rl *RateLimiter) GrantN(now time.Time, n int64) bool { rl.mu.Lock() defer rl.mu.Unlock() rl.advance(now) elapsed := now.Sub(rl.curr.Start()) weight := float64(rl.size-elapsed) / float64(rl.size) count := int64(weight*float64(rl.prev.Count())) + rl.curr.Count() if count+n > rl.limit { return false } rl.curr.AddCount(n) return true } // advance updates the current/previous windows resulting from the passage of time func (rl *RateLimiter) advance(now time.Time) { // Calculate the start boundary of the expected current-window. newCurrStart := now.Truncate(rl.size) diffSize := newCurrStart.Sub(rl.curr.Start()) / rl.size if diffSize >= 1 { // The current-window is at least one-window-size behind the expected one. newPrevCount := int64(0) if diffSize == 1 { // The new previous-window will overlap with the old current-window, // so it inherits the count. newPrevCount = rl.curr.Count() } rl.prev.Reset(newCurrStart.Add(-rl.size), newPrevCount) // The new current-window always has zero count. rl.curr.Reset(newCurrStart, 0) } }
上面的4種限流方式,更可能是針對單實例下的併發場景, 下面介紹幾種服務集羣的限流方案:
Nginx 官方提供的限速模塊使用的是 漏桶算法,保證請求的實時處理速度不會超過預設的閾值,主要有兩個設置:
經過 Redis 提供的 incr 命令,在規定的時間窗口,容許經過的最大請求數
Kong 官方提供了一種分佈式滑動窗口算法的設計, 目前支持在 Kong 上作集羣限流配置。它經過集中存儲每一個滑動窗口和 consumer 的計數,從而支持集羣場景。這裏推薦一個 Go 版本的實現: slidingwindow
另外業界在分佈式場景下,也有 經過 Nginx+Lua 和 Redis+Lua 等方式來實現限流
本文主要是本身在學習和調研高併發場景下的限流方案的總結。目前業界流行的限流算法包括計數器、漏桶、令牌桶和滑動窗口, 每種算法都有本身的優點,實際應用中能夠根據本身業務場景作選擇,而分佈式場景下的限流方案,也基本經過以上限流算法來實現。在高併發下流量控制的一個原則是:先讓請求先到隊列,並作流量控制,不讓流量直接打到系統上。