高併發下的流量控制

背景

目前在作一個消息中臺,提供給業務方各類消息通道能力。咱們在系統設計過程當中,除了有對業務方在使用時作 Quota 限制;也有對請求作流量控制(幾w+ QPS),防止併發流量上來時打垮服務。下面是我的在調研流量控制方案的一些梳理和總結。html

高併發解決方案概述

併發一般是指併發訪問,也就是在某個時間點,有多少個訪問請求同時到來。機器的性能是有限的,若是這個量級達到必定程度,就會形成系統壓力,影響系統性能。前端

應對高併發流量的幾種解決方案:nginx

  • 流量優化: 防盜鏈處理
  • 前端優化: 減小 HTTP 請求,合併 CSS 或 js, 添加異步請求,啓用流量器緩存和文件壓縮,CDN 加速,創建獨立圖片服務器
  • 服務端優化: 頁面靜態化,併發處理,隊列處理
  • 數據庫優化: 數據庫緩存,分庫分表,分區操做,讀寫分離,
  • Web 服務器優化: 負載均衡, nginx 反向代理
  • 服務降級: 若是不是核心鏈路,就把這個服務去掉
  • 流量控制:限流

流量控制

高併發最有效和經常使用的解決方案是流量控制 ,也就是限流。爲應對服務的高可用,經過對大流量的請求進行限流,攔截掉大部分請求,只容許一部分請求真正進入後端服務器,這樣就能夠防止大量請求形成系統壓力過大致使系統崩潰的狀況,從而保護服務正常可用。git

經常使用的限流算法github

  • 計算器
  • 漏桶
  • 令牌桶
  • 滑動窗口

計數器

計數器是一種比較簡單的限流算法,用途比較普遍,在接口層面,不少地方使用這種方式限流。在一段時間內,進行計數,與閥值進行比較,到了時間臨界點,再將計數器清0。算法

counter

package counter

import (
    "fmt"
    "time"
)

func CounterDemo() {
    // init rate limiter
    limitCount := int64(100)
    interval := 60 * time.Second
    rateLimiter := NewRateLimiter(limitCount, interval)

    for i := 0; i < 800; i++ {
        if rateLimiter.Grant() {
            fmt.Println("Continue to process")
            continue
        }
        fmt.Println("Exceed rate limit")
    }
}

type RateLimiter struct {
    limitCount   int64
    interval     time.Duration
    requestCount int64
    startAt      time.Time
}

func NewRateLimiter(limitCount int64, interval time.Duration) *RateLimiter {
    return &RateLimiter{
        startAt:    time.Now(),
        interval:   interval,
        limitCount: limitCount,
    }
}

func (rl *RateLimiter) Grant() bool {
    now := time.Now()
    if now.Before(rl.startAt.Add(rl.interval)) {
        if rl.requestCount < rl.limitCount {
            rl.requestCount++
            return true
        }
        return false
    }

    rl.startAt = time.Now()
    rl.requestCount = 0
    return false
}

這種實現方式存在一個時間臨界點問題:若是在單位時間 1min 內的前 1s ,已經經過了 100 個請求,那後面的 59s ,只能把請求拒絕,這種現象稱爲 突刺現象數據庫

漏桶

因爲計數器存在突刺現象,可使用漏桶算法來解決。漏桶提供了一種簡單、直觀的方法,經過隊列來限制速率,能夠把隊列看做是一個存放請求的桶。當一個請求被註冊時,會被加到隊列的末端。每隔一段時間,隊列中的第一個事件就會被處理。這也被稱爲先進先出(FIFO)隊列。若是隊列已滿,那麼額外的請求就會被丟棄(或泄露)。windows

leakyBucket

package leakyBucket

import (
    "fmt"
    "time"
)

func LeakyBucketDemo() {
    // init rate limiter
    rate := int64(5)
    size := int64(10)
    rateLimiter := NewRateLimiter(rate, size)
    for i := 0; i < 800; i++ {
        if rateLimiter.Grant() {
            fmt.Println("Continue to process")
            continue
        }
        fmt.Println("Exceed rate limit")
    }
}

type RateLimiter struct {
    startAt time.Time
    // bucket size
    size int64
    // now the water in bucket
    water int64
    // rater discharge rate
    rate int64
}

func NewRateLimiter(rate, size int64) *RateLimiter {
    return &RateLimiter{
        startAt: time.Now(),
        rate:    rate, // rate of processing requests, request/s
        size:    size,
    }
}

func (rl *RateLimiter) Grant() bool {
    // calculating water output
    now := time.Now()
    out := int64(now.Sub(rl.startAt).Milliseconds()) * rl.rate

    // remain water after the leak
    rl.water = max(0, rl.water-out)

    rl.startAt = now
    if rl.water+1 < rl.size {
        rl.water++
        return true
    }
    return false
}

func max(a, b int64) int64 {
    if a > b {
        return a
    }
    return b
}

漏桶算法的優勢是,它能將突發的請求平滑化,並以近似平均的速度處理。可是,瞬間高併發的流量可能會使請求佔滿隊列,使最新的請求沒法獲得處理,也不能保證請求在固定時間內獲得處理。後端

令牌桶

令牌桶算法是對漏桶算法的一種改進,桶算法可以限制請求調用的速率,而令牌桶算法可以在限制調用的平均速率的同時還容許必定程度的突發調用。緩存

該算法的基本原理也很容易理解。就是有一個桶,裏面有一個最大數量的 Token(容量)。每當一個消費者想要調用一個服務或消費一個資源時,他就會取出一個或多個 Token。只有當消費者可以取出所需數量的 Token 時,他才能消費一項服務。若是桶中沒有所需數量的令牌,他須要等待,直到桶中有足夠的令牌。

token-bucket

package tokenBucket

import (
    "fmt"
    "time"
)

func tokenBucketDemo() {
    tokenRate := int64(5)
    size := int64(10)
    rateLimiter := NewRateLimiter(tokenRate, size)
    for i := 0; i < 800; i++ {
        if rateLimiter.Grant() {
            fmt.Println("Continue to process")
            continue
        }
        fmt.Println("Exceed rate limit")
    }
}

type RateLimiter struct {
    startAt   time.Time
    size      int64
    tokens    int64
    tokenRate int64
}

func NewRateLimiter(tokenRate, size int64) *RateLimiter {
    return &RateLimiter{
        startAt:   time.Now(),
        tokenRate: tokenRate,
        size:      size,
    }
}

func (rl *RateLimiter) Grant() bool {
    now := time.Now()
    in := now.Sub(rl.startAt).Milliseconds() * rl.tokenRate

    rl.tokens = min(rl.size, rl.tokens+in)
    rl.startAt = now

    if rl.tokens > 0 {
        rl.tokens--
        return true
    }
    return false
}

func min(a, b int64) int64 {
    if a > b {
        return b
    }
    return a
}

滑動窗口

漏桶和令牌桶算法存在兩個缺點:

  1. 須要設置兩個參數(平均速率和閾值),不必定容易調試好
  2. 涉及多個不一樣操做(好比漏桶算法,每次校驗都須要再更新開始時間),沒法原子化完成這些操做

這裏推薦一種更優秀的限流算法:滑動窗口。它能夠靈活地擴展速率限制,而且性能良好。 能較好解決上面這兩個缺陷,同時避免了漏桶的瞬時大流量問題,以及計數器實現的突刺現象。

rollingwindows

滑動窗口是把固定時間片,進行劃分,而且隨着時間進行移動,這樣就巧妙的避開了計數器的突刺現象。也就是說這些固定數量的能夠移動的格子,將會進行計數判斷閥值,所以格子的數量影響着滑動窗口算法的精度。

package slidingWindow

import (
    "fmt"
    "sync"
    "time"
)

func SlidingWindowDemo() {
    // allow 10 requests per second
    rateLimiter := NewRateLimiter(time.Second, 10, func() Window {
        return NewLocalWindow()
    })

    if rateLimiter.Grant() {
        fmt.Println("Continue to process")
    } else {
        fmt.Println("Exceed rate limit")
    }

}

// Window represents a fixed-window
type Window interface {
    // Start returns the start boundary
    Start() time.Time

    // Count returns the accumulated count
    Count() int64

    // AddCount increments the accumulated count by n
    AddCount(n int64)

    // Reset sets the state of the window with the given settings
    Reset(s time.Time, c int64)
}

type NewWindow func() Window

type LocalWindow struct {
    start int64
    count int64
}

func NewLocalWindow() *LocalWindow {
    return &LocalWindow{}
}

func (w *LocalWindow) Start() time.Time {
    return time.Unix(0, w.start)
}

func (w *LocalWindow) Count() int64 {
    return w.count
}

func (w *LocalWindow) AddCount(n int64) {
    w.count += n
}

func (w *LocalWindow) Reset(s time.Time, c int64) {
    w.start = s.UnixNano()
    w.count = c
}

type RateLimiter struct {
    size  time.Duration
    limit int64

    mu sync.Mutex

    curr Window
    prev Window
}

func NewRateLimiter(size time.Duration, limit int64, newWindow NewWindow) *RateLimiter {
    currWin := newWindow()

    // The previous window is static (i.e. no add changes will happen within it),
    // so we always create it as an instance of LocalWindow
    prevWin := NewLocalWindow()

    return &RateLimiter{
        size:  size,
        limit: limit,
        curr:  currWin,
        prev:  prevWin,
    }

}

// Size returns the time duration of one window size
func (rl *RateLimiter) Size() time.Duration {
    return rl.size
}

// Limit returns the maximum events permitted to happen during one window size
func (rl *RateLimiter) Limit() int64 {
    rl.mu.Lock()
    defer rl.mu.Unlock()
    return rl.limit
}

func (rl *RateLimiter) SetLimit(limit int64) {
    rl.mu.Lock()
    defer rl.mu.Unlock()
    rl.limit = limit
}

// shorthand for GrantN(time.Now(), 1)
func (rl *RateLimiter) Grant() bool {
    return rl.GrantN(time.Now(), 1)
}

// reports whether n events may happen at time now
func (rl *RateLimiter) GrantN(now time.Time, n int64) bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()

    rl.advance(now)

    elapsed := now.Sub(rl.curr.Start())
    weight := float64(rl.size-elapsed) / float64(rl.size)
    count := int64(weight*float64(rl.prev.Count())) + rl.curr.Count()

    if count+n > rl.limit {
        return false
    }

    rl.curr.AddCount(n)
    return true
}

// advance updates the current/previous windows resulting from the passage of time
func (rl *RateLimiter) advance(now time.Time) {
    // Calculate the start boundary of the expected current-window.
    newCurrStart := now.Truncate(rl.size)

    diffSize := newCurrStart.Sub(rl.curr.Start()) / rl.size
    if diffSize >= 1 {
        // The current-window is at least one-window-size behind the expected one.
        newPrevCount := int64(0)
        if diffSize == 1 {
            // The new previous-window will overlap with the old current-window,
            // so it inherits the count.
            newPrevCount = rl.curr.Count()
        }

        rl.prev.Reset(newCurrStart.Add(-rl.size), newPrevCount)
        // The new current-window always has zero count.
        rl.curr.Reset(newCurrStart, 0)
    }
}

集羣限流

上面的4種限流方式,更可能是針對單實例下的併發場景, 下面介紹幾種服務集羣的限流方案:

Nginx 限流

Nginx 官方提供的限速模塊使用的是 漏桶算法,保證請求的實時處理速度不會超過預設的閾值,主要有兩個設置:

  • limit_req_zone: 限制 IP 在單位時間內的請求數
  • limit_req_conn: 限制同一時間鏈接數

Redis 限流

經過 Redis 提供的 incr 命令,在規定的時間窗口,容許經過的最大請求數

分佈式滑動窗口限流

Kong 官方提供了一種分佈式滑動窗口算法的設計, 目前支持在 Kong 上作集羣限流配置。它經過集中存儲每一個滑動窗口和 consumer 的計數,從而支持集羣場景。這裏推薦一個 Go 版本的實現: slidingwindow

其餘

另外業界在分佈式場景下,也有 經過 Nginx+Lua 和 Redis+Lua 等方式來實現限流

總結

本文主要是本身在學習和調研高併發場景下的限流方案的總結。目前業界流行的限流算法包括計數器、漏桶、令牌桶和滑動窗口, 每種算法都有本身的優點,實際應用中能夠根據本身業務場景作選擇,而分佈式場景下的限流方案,也基本經過以上限流算法來實現。在高併發下流量控制的一個原則是:先讓請求先到隊列,並作流量控制,不讓流量直接打到系統上。

參考

  1. 對高併發流量控制的一點思考
  2. How to Design a Scalable Rate Limiting Algorithm
  3. slidingwindow
  4. Token Bucket Rate Limiting
  5. Rate limiting Spring MVC endpoints with bucket4j
  6. How we built rate limiting capable of scaling to millions of domains
相關文章
相關標籤/搜索