限流器,顧名思義用來對高併發的請求進行流量限制的組件。html
限流包括 Nginx 層面的限流以及業務代碼邏輯上的限流。流量的限制在衆多微服務和 service mesh 中多有應用。限流主要有三種算法:信號量、漏桶算法和令牌桶算法。下面依次介紹這三種算法。git
筆者在本文的程序示例均以 Go 語言實現。github
用戶增加過快、熱門業務或者爬蟲等惡意攻擊行爲導致請求量忽然增大,好比學校的教務系統,到了查分之日,請求量漲到以前的 100 倍都不止,沒多久該接口幾乎不可以使用,並引起連鎖反應致使整個系統崩潰。如何應對這種狀況呢?生活給了咱們答案:好比老式電閘都安裝了保險絲,一旦有人使用超大功率的設備,保險絲就會燒斷以保護各個電器不被強電流給燒壞。同理咱們的接口也須要安裝上「保險絲」,以防止非預期的請求對系統壓力過大而引發的系統癱瘓,當流量過大時,能夠採起拒絕或者引流等機制。redis
後端服務因爲各個業務的不一樣和複雜性,各自在容器部署的時候均可能會有單臺的瓶頸,超過瓶頸會致使內存或者 cpu 的瓶頸,進而致使發生服務不可用或者單臺容器直接掛掉或重啓。算法
信號量在衆多開發語言中都會有相關信號量的設計。如 Java 中的Semaphore 是一個計數信號量。經常使用於限制獲取某資源的線程數量,可基於 Java 的 concurrent 併發包實現。編程
信號量兩個重要方法 Acquire() 和 Release()。經過acquire()方法獲取許可,該方法會阻塞,直到獲取許可爲止。經過release()方法釋放許可。後端
筆者在閱讀一些語言開源實現後,總結出信號量的主要有非阻塞和阻塞兩種。安全
採用鎖或者阻塞隊列方式,以 Go 語言爲示例以下:bash
// 採用channel做爲底層數據結構,從而達到阻塞的獲取和使用信號量
type Semaphore struct {
innerChan chan struct{}
}
// 初始化信號量,本質初始化一個channel,channel的初始化大小爲 信號量數值
func NewSemaphore(num uint64) *Semaphore {
return &Semaphore{
innerChan: make(chan struct{}, num),
}
}
// 獲取信號量,本質是 向channel放入元素,若是同時有不少協程併發獲取信號量,則channel則會full阻塞,從而達到控制併發協程數的目的,也便是信號量的控制
func (s *Semaphore) Acquire() {
for {
select {
case s.innerChan <- struct{}{}:
return
default:
log.Error("semaphore acquire is blocking")
time.Sleep(100 * time.Millisecond)
}
}
}
// 釋放信號量 本質是 從channel中獲取元素,因爲有acquire的放入元素,因此此處必定能回去到元素 也就能釋放成功,default只要是出於安全編程的目的
func (s *Semaphore) Release() {
select {
case <-s.innerChan:
return
default:
return
}
}
複製代碼
在實現中,定義了 Semaphore
結構體。初始化信號量,本質是初始化一個channel,channel 的初始化大小爲信號量數值;獲取信號量,本質是向channel放入元素,若是同時有不少協程併發獲取信號量,則 channel 則會 full 阻塞,從而達到控制併發協程數的目的,也便是信號量的控制;釋放信號量的本質是從channel中獲取元素,因爲有acquire的放入元素,因此此處必定能回去到元素 也就能釋放成功,default只要是出於安全編程的目的。數據結構
以併發安全的計數方式好比採用原子 atomic 加減進行。
主流的限流算法分爲兩種漏桶算法和令牌桶算法,關於這兩個算法有不少文章和論文都給出了詳細的講解。從原理上看,令牌桶算法和漏桶算法是相反的,一個 進水
,一個是 漏水
。值得一提的是 Google Guava 開源和 Uber 開源限流組件均採用漏桶算法。
漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率)而後就拒絕請求。能夠看出漏桶算法能強行限制數據的傳輸速率。示意圖以下:
可見這裏有兩個變量,一個是桶的大小,支持流量突發增多時能夠存多少的水(burst),另外一個是水桶漏洞的大小(rate)。
漏桶算法可使用 redis 隊列來實現,生產者發送消息前先檢查隊列長度是否超過閾值,超過閾值則丟棄消息,不然發送消息到 Redis 隊列中;消費者以固定速率從 Redis 隊列中取消息。Redis 隊列在這裏起到了一個緩衝池的做用,起到削峯填谷、流量整形的做用。
對於不少應用場景來講,除了要求可以限制數據的平均傳輸速率外,還要求容許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更爲適合。令牌桶算法的原理是系統會以一個恆定的速度往桶裏放入令牌,而若是請求須要被處理,則須要先從桶裏獲取一個令牌,當桶裏沒有令牌可取時,則拒絕服務。桶裏可以存放令牌的最高數量,就是容許的突發傳輸量。
放令牌這個動做是持續不斷的進行,若是桶中令牌數達到上限,就丟棄令牌,因此就存在這種狀況,桶中一直有大量的可用令牌,這時進來的請求就能夠直接拿到令牌執行,好比設置qps爲100,那麼限流器初始化完成一秒後,桶中就已經有100個令牌了,等啓動完成對外提供服務時,該限流器能夠抵擋瞬時的100個請求。因此,只有桶中沒有令牌時,請求才會進行等待,最後至關於以必定的速率執行。
能夠準備一個隊列,用來保存令牌,另外經過一個線程池按期生成令牌放到隊列中,每來一個請求,就從隊列中獲取一個令牌,並繼續執行。
因此此處筆者開門見山,直接展現此算法的 Go 語言版本的實現,代碼以下:
// 此處截取自研的熔斷器代碼中的限流實現,這是非阻塞的實現
func (sp *servicePanel) incLimit() error {
// 若是大於限制的條件則返回錯誤
if sp.currentLimitCount.Load() > sp.currLimitFunc(nil) {
return ErrCurrentLimit
}
sp.currentLimitCount.Inc()
return nil
}
func (sp *servicePanel) clearLimit() {
// 按期每秒重置計數器,從而達到每秒限制的併發數
// 好比限制1000req/s,在這裏指每秒清理1000的計數值
// 令牌桶是按期放,這裏是逆思惟,每秒清空,實現不只佔用內存低並且效率高
t := time.NewTicker(time.Second)
for {
select {
case <-t.C:
sp.currentLimitCount.Store(0)
}
}
}
複製代碼
上述的實現實際是比較粗糙的實現,沒有嚴格按照每一個請求方按照某個固定速率進行,而是以秒爲單位,粗粒度的進行計數清零,這其實會形成某個瞬間雙倍的每秒限流個數,雖然看上去不知足要求,可是在這個瞬間實際上是隻是一個雙倍值,正常系統都應該會應付一瞬間雙倍限流個數的請求量。
若是要嚴格的按照每一個請求按照某個固定數值進行,那麼能夠改進時間的粗力度,具體作法以下:
func (sp *servicePanel) incLimit() error {
// 若是大於1則返回錯誤
if sp.currentLimitCount.Load() > 1 {
return ErrCurrentLimit
}
sp.currentLimitCount.Inc()
return nil
}
func (sp *servicePanel) clearLimit() {
// 1s除以每秒限流個數
t := time.NewTicker(time.Second/time.Duration(sp.currLimitFunc(nil)))
for {
select {
case <-t.C:
sp.currentLimitCount.Store(0)
}
}
}
複製代碼
讀者能夠自行嘗試一下改進以後的漏斗算法。
uber 在 Github 上開源了一套用於服務限流的 go 語言庫 ratelimit, 該組件基於 Leaky Bucket(漏桶)實現。
#初版本
go get github.com/uber-go/ratelimit@v0.1.0
#改進版本
go get github.com/uber-go/ratelimit@master
複製代碼
首先強調一點,跟筆者自研的限流器最大的不一樣的是,這是一個阻塞調用者的限流組件。限流速率通常表示爲 rate/s 即一秒內 rate 個請求。先很少說,進行一下用法示例:
func ExampleRatelimit() {
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
if i > 0 {
fmt.Println(i, now.Sub(prev))
}
prev = now
}
}
複製代碼
預期的結果以下:
// Output:
// 1 10ms
// 2 10ms
// 3 10ms
// 4 10ms
// 5 10ms
// 6 10ms
// 7 10ms
// 8 10ms
// 9 10ms
複製代碼
測試結果徹底符合預期。在這個例子中,咱們給定限流器每秒能夠經過100個請求,也就是平均每一個請求間隔10ms。所以,最終會每10ms打印一行數據。
首先是構造一個Limiter 裏面有一個 perRequest 這是關鍵的一個變量,表示每一個請求之間相差的間隔時間,這是此組件的算法核心思想,也就是說將請求排隊,一秒以內有rate個請求,將這些請求排隊,挨個來,每一個請求的間隔就是1s/rate 歷來達到 1s內rate個請求的概念,從而達到限流的目的。
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
l := &limiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate),
}
for _, opt := range opts {
opt(l)
}
if l.clock == nil {
l.clock = clock.New()
}
return l
}
複製代碼
Take() 方法 每次請求前使用,用來獲取批准 返回批准時刻的時間。
初版本
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// If this is our first request, then we allow it.
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
t.sleepFor += t.perRequest - now.Sub(t.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if t.sleepFor < t.maxSlack { t.sleepFor = t.maxSlack } // If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { t.clock.Sleep(t.sleepFor) t.last = now.Add(t.sleepFor) t.sleepFor = 0 } else { t.last = now } return t.last } 複製代碼
在實現方面,能夠看到初版本採用了 Go 的 lock,而後排隊 sleep,完成 sleep 以後,請求之間的間隔時間恆定,單位時間以內有設定好的請求數,實現限流的目的。
第二版本
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
newState := state{}
taken := false
for !taken {
now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{}
newState.last = now
// If this is our first request, then we allow it.
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(newState.sleepFor) return newState.last } 複製代碼
第二版本採用原子操做+for的自旋操做來替代lock操做,這樣作的目的是減小協程鎖競爭。 兩個版本不論是用鎖仍是原子操做本質都是讓請求排隊,初版本存在鎖競爭,而後排隊sleep,第二版本避免鎖競爭,可是全部協程可能很快跳出for循環而後都會在sleep處sleep。
保障服務穩定的三大利器:熔斷降級、服務限流和故障模擬。本文主要講解了分佈式系統中高可用的經常使用策略:限流。限流一般有三種實現:信號量(計數器)、漏桶、令牌桶。本文基於漏桶算法實現了一個限流小插件。最後分析了 uber 開源的 uber-go
,限流器 Take() 阻塞方法的第二版本對協程鎖競爭更加友好。
歡迎購買筆者的圖書,現已出版上市:
原創不易,但願你們多多支持,期待與各位的交流學習。