令牌桶是一種常見用於控制速率的控流算法。原理於 Wikipedia 上描述以下:git
每秒會有 r 個令牌被放入桶中,即每 1 / r 秒向桶中放入一個令牌。github
一個桶最多能夠存放 b 個令牌。當令牌被放入桶時,若桶已滿,則令牌被直接丟棄。算法
當一個 n 字節的數據包抵達時,消耗 n 個令牌,而後放行之。緩存
若桶中的令牌不足 n ,則該數據包要麼被緩存要麼被丟棄。安全
下面咱們便根據上述描述,使用 Go 語言,基於多 goroutine ,來實現是一個併發安全的令牌桶。後述代碼的完整實現的倉庫地址在:https://github.com/DavidCai19... 。併發
最基本的結構即是,定義一個令牌桶 struct ,該 struct 每個新生成的令牌桶實例,各自帶有一個 goroutine ,像守護進程同樣以固定時間向實例桶中放入令牌:異步
type TokenBucket struct { interval time.Duration // 時間間隔 ticker *time.Ticker // 定時器 timer // ... cap int64 // 桶總容量 avail int64 // 桶內現有令牌數 } func (tb *TokenBucket) adjustDaemon() { for now := range tb.ticker.C { var _ = now if tb.avail < tb.cap { tb.avail++ } } } func New(interval time.Duration, cap int64) *TokenBucket { tb := &TokenBucket{ // ... } go tb.adjustDaemon() return tb }
該 struct 最終會提供如下 API :設計
TryTake(count int64) bool
: 嘗試從桶中取出 n
個令牌。馬上返回,返回值表示該次取出是否成功。code
Take(count int64)
:嘗試從桶中取出 n
個令牌,若當前桶中的令牌數不足,則保持等待,直至桶內令牌數量達標而後取出。token
TakeMaxDuration(count int64, max time.Duration) bool
:嘗試從桶中取出 n
個令牌,若當前桶中的令牌數不足,則保持等待,直至桶內令牌數量達標而後取出。不過設置了一個超時時間 max
,若超時,則再也不等待馬上返回,返回值表示該次取出是否成功。
Wait(count int64)
:保持等待直至桶內令牌數大於等於 n
。
WaitMaxDuration(count int64, max time.Duration) bool
保持等待直至桶內令牌數大於等於 n
,但設置了一個超時時間 max
。
TryTake
: 一次性取出嘗試TryTake(count int64) bool
這樣的一次性取出嘗試,便可返回,實現起來最爲簡易。惟一須要注意的問題爲當前咱們在一個多 goroutine 環境下,令牌是咱們的共享資源,爲了防止競爭條件,最簡單的解決方案即爲存取都加上鎖。Go 語言自帶的 sync.Mutex
類提供了鎖的實現。
type TokenBucket struct { // ... tokenMutex *sync.Mutex // 令牌鎖 } func (tb *TokenBucket) tryTake(count int64) bool { tb.tokenMutex.Lock() // 檢查共享資源,加鎖 defer tb.tokenMutex.Unlock() if count <= tb.avail { tb.avail -= count return true } return false } func (tb *TokenBucket) adjustDaemon() { for now := range tb.ticker.C { var _ = now tb.tokenMutex.Lock() // 檢查共享資源,加鎖 if tb.avail < tb.cap { tb.avail++ } tb.tokenMutex.Unlock() } }
Take
,TakeMaxDuration
等待型取出(嘗試)對於 Take(count int64)
和 TakeMaxDuration(count int64, max time.Duration) bool
這樣的等待型取出(嘗試),狀況別就有所不一樣了:
因爲這兩個操做都是須要進行等待被通知,故本來的主動加鎖檢查共享資源的方案已再也不適合。
因爲可能存在多個正在等待的操做,爲了不混亂,咱們須要有個先來後到,最先等待的操做,首先獲取令牌。
咱們可使用 Go 語言提供的第二種共享多 goroutine 間共享資源的方式:channel 來解決第一個問題。channel 能夠是雙向的,徹底符合咱們須要被動通知的場景。而面對第二個問題,咱們須要爲等待的操做維護一個隊列。這裏咱們使用的是 list.List
來模擬 FIFO 隊列,不過值得留意的是,這樣一來,隊列自己也成了一個共享資源,咱們也須要爲了它,來配一把鎖。
跟着上述思路,咱們先來實現 Take(count int64)
:
type TokenBucket struct { // ... waitingQuqueMutex: &sync.Mutex{}, // 等到操做的隊列 waitingQuque: list.New(), // 列隊的鎖 } type waitingJob struct { ch chan struct{} count int64 } func (tb *TokenBucket) Take(count int64) { w := &waitingJob{ ch: make(chan struct{}), count: count, } tb.addWaitingJob(w) // 將 w 放入列隊,需爲隊列加鎖。 <-w.ch close(w.ch) } func (tb *TokenBucket) adjustDaemon() { var waitingJobNow *waitingJob for now := range tb.ticker.C { var _ = now tb.tokenMutex.Lock() // 檢查共享資源,加鎖 if tb.avail < tb.cap { tb.avail++ } element := tb.getFrontWaitingJob() // 取出隊列頭,需爲隊列加鎖。 if element != nil { if waitingJobNow == nil { waitingJobNow = element.Value.(*waitingJob) tb.removeWaitingJob(element) // 移除隊列頭,需爲隊列加鎖。 } if tb.avail >= waitingJobNow.need { tb.avail -= waitingJobNow.count waitingJobNow.ch <- struct{}{} waitingJobNow = nil } } tb.tokenMutex.Unlock() } }
接着咱們來實現 TakeMaxDuration(count int64, max time.Duration) bool
,該操做的超時部分,咱們可使用 Go 自帶的 select
關鍵字結合定時器 channel 來實現。而且爲 waitingJob
加上一個標識字段來代表該操做是否已超時被棄用。因爲檢查棄用的操做會在 adjustDaemon
中進行,而標識棄用的操做會在 TakeMaxDuration
內的 select
中,爲了再次避免競爭狀態,咱們將使用的令牌的操做從 adjustDaemon
內經過 channel 返回給 select
中,並阻塞,來避免了競爭條件而且享受了令牌鎖的保護:
func (tb *TokenBucket) TakeMaxDuration(count int64, max time.Duration) bool { w := &waitingJob{ ch: make(chan struct{}), count: count, abandoned: false, // 超時棄置標識 } defer close(w.ch) tb.addWaitingJob(w) select { case <-w.ch: tb.avail -= use w.ch <- struct{}{} return true case <-time.After(max): w.abandoned = true return false } } func (tb *TokenBucket) adjustDaemon() { // ... if element != nil { if waitingJobNow == nil || waitingJobNow.abandoned { waitingJobNow = element.Value.(*waitingJob) tb.removeWaitingJob(element) } if tb.avail >= waitingJobNow.need && !waitingJobNow.abandoned { waitingJobNow.ch <- struct{}{} <-waitingJobNow.ch waitingJobNow = nil } } // ... }
最後總結一些關鍵點:
對於共享資源的存取,要麼使用鎖,要麼使用 channel ,視場景選擇最好用的用之。
channel 可被動等待共享資源,而鎖則使用十分簡易。
異步的多個等待操做,可以使用隊列進行協調。
能夠在鎖的保護下,結合 channel 來對共享資源實現一個處理 pipeline ,結合二者優點,十分好用。