分佈式系統高可用實戰之限流器(Go 版本實現)

限流器,顧名思義用來對高併發的請求進行流量限制的組件。html

限流包括 Nginx 層面的限流以及業務代碼邏輯上的限流。流量的限制在衆多微服務和 service mesh 中多有應用。限流主要有三種算法:信號量、漏桶算法和令牌桶算法。下面依次介紹這三種算法。git

筆者在本文的程序示例均以 Go 語言實現。github

1. 問題描述

用戶增加過快、熱門業務或者爬蟲等惡意攻擊行爲導致請求量忽然增大,好比學校的教務系統,到了查分之日,請求量漲到以前的 100 倍都不止,沒多久該接口幾乎不可以使用,並引起連鎖反應致使整個系統崩潰。如何應對這種狀況呢?生活給了咱們答案:好比老式電閘都安裝了保險絲,一旦有人使用超大功率的設備,保險絲就會燒斷以保護各個電器不被強電流給燒壞。同理咱們的接口也須要安裝上「保險絲」,以防止非預期的請求對系統壓力過大而引發的系統癱瘓,當流量過大時,能夠採起拒絕或者引流等機制。redis

後端服務因爲各個業務的不一樣和複雜性,各自在容器部署的時候均可能會有單臺的瓶頸,超過瓶頸會致使內存或者 cpu 的瓶頸,進而致使發生服務不可用或者單臺容器直接掛掉或重啓。算法

2. 信號量限流

信號量在衆多開發語言中都會有相關信號量的設計。如 Java 中的Semaphore 是一個計數信號量。經常使用於限制獲取某資源的線程數量,可基於 Java 的 concurrent 併發包實現。編程

信號量兩個重要方法 Acquire() 和 Release()。經過acquire()方法獲取許可,該方法會阻塞,直到獲取許可爲止。經過release()方法釋放許可。後端

筆者在閱讀一些語言開源實現後,總結出信號量的主要有非阻塞和阻塞兩種。安全

2.1 阻塞方式

採用鎖或者阻塞隊列方式,以 Go 語言爲示例以下:bash

// 採用channel做爲底層數據結構,從而達到阻塞的獲取和使用信號量
type Semaphore struct {
	innerChan chan struct{}
}
// 初始化信號量,本質初始化一個channel,channel的初始化大小爲 信號量數值
func NewSemaphore(num uint64) *Semaphore {
	return &Semaphore{
		innerChan: make(chan struct{}, num),
	}
}
// 獲取信號量,本質是 向channel放入元素,若是同時有不少協程併發獲取信號量,則channel則會full阻塞,從而達到控制併發協程數的目的,也便是信號量的控制
func (s *Semaphore) Acquire() {
	for {
		select {
		case s.innerChan <- struct{}{}:
			return
		default:
			log.Error("semaphore acquire is blocking")
			time.Sleep(100 * time.Millisecond)
		}
	}
}
// 釋放信號量 本質是 從channel中獲取元素,因爲有acquire的放入元素,因此此處必定能回去到元素 也就能釋放成功,default只要是出於安全編程的目的
func (s *Semaphore) Release() {
	select {
	case <-s.innerChan:
		return
	default:
		return
	}
}
複製代碼

在實現中,定義了 Semaphore 結構體。初始化信號量,本質是初始化一個channel,channel 的初始化大小爲信號量數值;獲取信號量,本質是向channel放入元素,若是同時有不少協程併發獲取信號量,則 channel 則會 full 阻塞,從而達到控制併發協程數的目的,也便是信號量的控制;釋放信號量的本質是從channel中獲取元素,因爲有acquire的放入元素,因此此處必定能回去到元素 也就能釋放成功,default只要是出於安全編程的目的。數據結構

2.2 非阻塞方式

以併發安全的計數方式好比採用原子 atomic 加減進行。

3. 限流算法

主流的限流算法分爲兩種漏桶算法和令牌桶算法,關於這兩個算法有不少文章和論文都給出了詳細的講解。從原理上看,令牌桶算法和漏桶算法是相反的,一個 進水,一個是 漏水。值得一提的是 Google Guava 開源和 Uber 開源限流組件均採用漏桶算法。

3.1 漏桶算法

漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶裏,漏桶以必定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率)而後就拒絕請求。能夠看出漏桶算法能強行限制數據的傳輸速率。示意圖以下:

可見這裏有兩個變量,一個是桶的大小,支持流量突發增多時能夠存多少的水(burst),另外一個是水桶漏洞的大小(rate)。

漏桶算法可使用 redis 隊列來實現,生產者發送消息前先檢查隊列長度是否超過閾值,超過閾值則丟棄消息,不然發送消息到 Redis 隊列中;消費者以固定速率從 Redis 隊列中取消息。Redis 隊列在這裏起到了一個緩衝池的做用,起到削峯填谷、流量整形的做用。

3.2 令牌桶算法

對於不少應用場景來講,除了要求可以限制數據的平均傳輸速率外,還要求容許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更爲適合。令牌桶算法的原理是系統會以一個恆定的速度往桶裏放入令牌,而若是請求須要被處理,則須要先從桶裏獲取一個令牌,當桶裏沒有令牌可取時,則拒絕服務。桶裏可以存放令牌的最高數量,就是容許的突發傳輸量。

放令牌這個動做是持續不斷的進行,若是桶中令牌數達到上限,就丟棄令牌,因此就存在這種狀況,桶中一直有大量的可用令牌,這時進來的請求就能夠直接拿到令牌執行,好比設置qps爲100,那麼限流器初始化完成一秒後,桶中就已經有100個令牌了,等啓動完成對外提供服務時,該限流器能夠抵擋瞬時的100個請求。因此,只有桶中沒有令牌時,請求才會進行等待,最後至關於以必定的速率執行。

能夠準備一個隊列,用來保存令牌,另外經過一個線程池按期生成令牌放到隊列中,每來一個請求,就從隊列中獲取一個令牌,並繼續執行。

3.3 漏桶算法的實現

因此此處筆者開門見山,直接展現此算法的 Go 語言版本的實現,代碼以下:

// 此處截取自研的熔斷器代碼中的限流實現,這是非阻塞的實現
func (sp *servicePanel) incLimit() error {
	// 若是大於限制的條件則返回錯誤
	if sp.currentLimitCount.Load() > sp.currLimitFunc(nil) {
		return ErrCurrentLimit
	}
	sp.currentLimitCount.Inc()
	return nil
}

func (sp *servicePanel) clearLimit() {
	// 按期每秒重置計數器,從而達到每秒限制的併發數
	// 好比限制1000req/s,在這裏指每秒清理1000的計數值
// 令牌桶是按期放,這裏是逆思惟,每秒清空,實現不只佔用內存低並且效率高
	t := time.NewTicker(time.Second)
	for {
		select {
		case <-t.C:
			sp.currentLimitCount.Store(0)
		}
	}
}
複製代碼

上述的實現實際是比較粗糙的實現,沒有嚴格按照每一個請求方按照某個固定速率進行,而是以秒爲單位,粗粒度的進行計數清零,這其實會形成某個瞬間雙倍的每秒限流個數,雖然看上去不知足要求,可是在這個瞬間實際上是隻是一個雙倍值,正常系統都應該會應付一瞬間雙倍限流個數的請求量。

改進

若是要嚴格的按照每一個請求按照某個固定數值進行,那麼能夠改進時間的粗力度,具體作法以下:

func (sp *servicePanel) incLimit() error {
	// 若是大於1則返回錯誤
	if sp.currentLimitCount.Load() > 1 {
		return ErrCurrentLimit
	}
	sp.currentLimitCount.Inc()
	return nil
}

func (sp *servicePanel) clearLimit() {
	// 1s除以每秒限流個數
	t := time.NewTicker(time.Second/time.Duration(sp.currLimitFunc(nil)))
	for {
		select {
		case <-t.C:
			sp.currentLimitCount.Store(0)
		}
	}
}
複製代碼

讀者能夠自行嘗試一下改進以後的漏斗算法。

4. Uber 開源實現 RateLimit 深刻解析

uber 在 Github 上開源了一套用於服務限流的 go 語言庫 ratelimit, 該組件基於 Leaky Bucket(漏桶)實現。

4.1 引入方式

#初版本
go get github.com/uber-go/ratelimit@v0.1.0
#改進版本
go get github.com/uber-go/ratelimit@master
複製代碼

4.2 使用

首先強調一點,跟筆者自研的限流器最大的不一樣的是,這是一個阻塞調用者的限流組件。限流速率通常表示爲 rate/s 即一秒內 rate 個請求。先很少說,進行一下用法示例:

func ExampleRatelimit() {
	rl := ratelimit.New(100) // per second

	prev := time.Now()
	for i := 0; i < 10; i++ {
		now := rl.Take()
		if i > 0 {
			fmt.Println(i, now.Sub(prev))
		}
		prev = now
	}
}
複製代碼

預期的結果以下:

// Output:
	// 1 10ms
	// 2 10ms
	// 3 10ms
	// 4 10ms
	// 5 10ms
	// 6 10ms
	// 7 10ms
	// 8 10ms
	// 9 10ms
複製代碼

測試結果徹底符合預期。在這個例子中,咱們給定限流器每秒能夠經過100個請求,也就是平均每一個請求間隔10ms。所以,最終會每10ms打印一行數據。

4.3 實現細節

構造限流器

首先是構造一個Limiter 裏面有一個 perRequest 這是關鍵的一個變量,表示每一個請求之間相差的間隔時間,這是此組件的算法核心思想,也就是說將請求排隊,一秒以內有rate個請求,將這些請求排隊,挨個來,每一個請求的間隔就是1s/rate 歷來達到 1s內rate個請求的概念,從而達到限流的目的。

// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
	l := &limiter{
		perRequest: time.Second / time.Duration(rate),
		maxSlack:   -10 * time.Second / time.Duration(rate),
	}
	for _, opt := range opts {
		opt(l)
	}
	if l.clock == nil {
		l.clock = clock.New()
	}
	return l
}
複製代碼
限流器Take() 阻塞方法

Take() 方法 每次請求前使用,用來獲取批准 返回批准時刻的時間。

初版本

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
	t.Lock()
	defer t.Unlock()

	now := t.clock.Now()

	// If this is our first request, then we allow it.
	if t.last.IsZero() {
		t.last = now
		return t.last
	}

	// sleepFor calculates how much time we should sleep based on
	// the perRequest budget and how long the last request took.
	// Since the request may take longer than the budget, this number
	// can get negative, and is summed across requests.
	t.sleepFor += t.perRequest - now.Sub(t.last)

	// We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if t.sleepFor < t.maxSlack { t.sleepFor = t.maxSlack } // If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { t.clock.Sleep(t.sleepFor) t.last = now.Add(t.sleepFor) t.sleepFor = 0 } else { t.last = now } return t.last } 複製代碼

在實現方面,能夠看到初版本採用了 Go 的 lock,而後排隊 sleep,完成 sleep 以後,請求之間的間隔時間恆定,單位時間以內有設定好的請求數,實現限流的目的。

第二版本

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
	newState := state{}
	taken := false
	for !taken {
		now := t.clock.Now()

		previousStatePointer := atomic.LoadPointer(&t.state)
		oldState := (*state)(previousStatePointer)

		newState = state{}
		newState.last = now

		// If this is our first request, then we allow it.
		if oldState.last.IsZero() {
			taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
			continue
		}

		// sleepFor calculates how much time we should sleep based on
		// the perRequest budget and how long the last request took.
		// Since the request may take longer than the budget, this number
		// can get negative, and is summed across requests.
		newState.sleepFor += t.perRequest - now.Sub(oldState.last)
		// We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(newState.sleepFor) return newState.last } 複製代碼

第二版本採用原子操做+for的自旋操做來替代lock操做,這樣作的目的是減小協程鎖競爭。 兩個版本不論是用鎖仍是原子操做本質都是讓請求排隊,初版本存在鎖競爭,而後排隊sleep,第二版本避免鎖競爭,可是全部協程可能很快跳出for循環而後都會在sleep處sleep。

5. 小結

保障服務穩定的三大利器:熔斷降級、服務限流和故障模擬。本文主要講解了分佈式系統中高可用的經常使用策略:限流。限流一般有三種實現:信號量(計數器)、漏桶、令牌桶。本文基於漏桶算法實現了一個限流小插件。最後分析了 uber 開源的 uber-go,限流器 Take() 阻塞方法的第二版本對協程鎖競爭更加友好。

優質圖書推薦

歡迎購買筆者的圖書,現已出版上市:

原創不易,但願你們多多支持,期待與各位的交流學習。

參考

高併發系統限流-漏桶算法和令牌桶算法

相關文章
相關標籤/搜索