基於多 goroutine 實現令牌桶

前言

令牌桶是一種常見用於控制速率的控流算法。原理於 Wikipedia 上描述以下:git

  • 每秒會有 r 個令牌被放入桶中,即每 1 / r 秒向桶中放入一個令牌。github

  • 一個桶最多能夠存放 b 個令牌。當令牌被放入桶時,若桶已滿,則令牌被直接丟棄。算法

  • 當一個 n 字節的數據包抵達時,消耗 n 個令牌,而後放行之。緩存

  • 若桶中的令牌不足 n ,則該數據包要麼被緩存要麼被丟棄。安全

下面咱們便根據上述描述,使用 Go 語言,基於多 goroutine ,來實現是一個併發安全的令牌桶。後述代碼的完整實現的倉庫地址在:https://github.com/DavidCai19...併發

基本設計

最基本的結構即是,定義一個令牌桶 struct ,該 struct 每個新生成的令牌桶實例,各自帶有一個 goroutine ,像守護進程同樣以固定時間向實例桶中放入令牌:異步

type TokenBucket struct {
    interval          time.Duration  // 時間間隔
    ticker            *time.Ticker   // 定時器 timer
  // ...
    cap               int64          // 桶總容量
    avail             int64          // 桶內現有令牌數
}

func (tb *TokenBucket) adjustDaemon() {
    for now := range tb.ticker.C {
        var _ = now

        if tb.avail < tb.cap {
            tb.avail++
        }
    }
}

func New(interval time.Duration, cap int64) *TokenBucket {
    tb := &TokenBucket{
    // ...
    }

    go tb.adjustDaemon()

    return tb
}

該 struct 最終會提供如下 API :設計

  • TryTake(count int64) bool: 嘗試從桶中取出 n 個令牌。馬上返回,返回值表示該次取出是否成功。code

  • Take(count int64):嘗試從桶中取出 n 個令牌,若當前桶中的令牌數不足,則保持等待,直至桶內令牌數量達標而後取出。token

  • TakeMaxDuration(count int64, max time.Duration) bool:嘗試從桶中取出 n 個令牌,若當前桶中的令牌數不足,則保持等待,直至桶內令牌數量達標而後取出。不過設置了一個超時時間 max ,若超時,則再也不等待馬上返回,返回值表示該次取出是否成功。

  • Wait(count int64):保持等待直至桶內令牌數大於等於 n

  • WaitMaxDuration(count int64, max time.Duration) bool 保持等待直至桶內令牌數大於等於 n ,但設置了一個超時時間 max

TryTake: 一次性取出嘗試

TryTake(count int64) bool 這樣的一次性取出嘗試,便可返回,實現起來最爲簡易。惟一須要注意的問題爲當前咱們在一個多 goroutine 環境下,令牌是咱們的共享資源,爲了防止競爭條件,最簡單的解決方案即爲存取都加上。Go 語言自帶的 sync.Mutex 類提供了鎖的實現。

type TokenBucket struct {
  // ...
    tokenMutex        *sync.Mutex // 令牌鎖
}

func (tb *TokenBucket) tryTake(count int64) bool {
    tb.tokenMutex.Lock() // 檢查共享資源,加鎖
    defer tb.tokenMutex.Unlock()

    if count <= tb.avail {
        tb.avail -= count

        return true
    }

    return false
}

func (tb *TokenBucket) adjustDaemon() {
    for now := range tb.ticker.C {
        var _ = now

    tb.tokenMutex.Lock() // 檢查共享資源,加鎖

        if tb.avail < tb.cap {
            tb.avail++
        }

    tb.tokenMutex.Unlock()
    }
}

TakeTakeMaxDuration 等待型取出(嘗試)

對於 Take(count int64)TakeMaxDuration(count int64, max time.Duration) bool 這樣的等待型取出(嘗試),狀況別就有所不一樣了:

  1. 因爲這兩個操做都是須要進行等待被通知,故本來的主動加鎖檢查共享資源的方案已再也不適合。

  2. 因爲可能存在多個正在等待的操做,爲了不混亂,咱們須要有個先來後到,最先等待的操做,首先獲取令牌。

咱們可使用 Go 語言提供的第二種共享多 goroutine 間共享資源的方式:channel 來解決第一個問題。channel 能夠是雙向的,徹底符合咱們須要被動通知的場景。而面對第二個問題,咱們須要爲等待的操做維護一個隊列。這裏咱們使用的是 list.List 來模擬 FIFO 隊列,不過值得留意的是,這樣一來,隊列自己也成了一個共享資源,咱們也須要爲了它,來配一把鎖。

跟着上述思路,咱們先來實現 Take(count int64)

type TokenBucket struct {
  // ...
    waitingQuqueMutex: &sync.Mutex{}, // 等到操做的隊列
    waitingQuque:      list.New(),    // 列隊的鎖
}

type waitingJob struct {
    ch        chan struct{}
    count     int64
}

func (tb *TokenBucket) Take(count int64) {
    w := &waitingJob{
        ch:    make(chan struct{}),
        count: count,
    }

    tb.addWaitingJob(w) // 將 w 放入列隊,需爲隊列加鎖。

    <-w.ch

    close(w.ch)
}

func (tb *TokenBucket) adjustDaemon() {
  var waitingJobNow *waitingJob

    for now := range tb.ticker.C {
        var _ = now

    tb.tokenMutex.Lock() // 檢查共享資源,加鎖

        if tb.avail < tb.cap {
            tb.avail++
        }

    element := tb.getFrontWaitingJob() // 取出隊列頭,需爲隊列加鎖。

    if element != nil {
            if waitingJobNow == nil {
                waitingJobNow = element.Value.(*waitingJob)

                tb.removeWaitingJob(element) // 移除隊列頭,需爲隊列加鎖。
            }

      if tb.avail >= waitingJobNow.need {
        tb.avail -= waitingJobNow.count
        waitingJobNow.ch <- struct{}{}

        waitingJobNow = nil
      }
        }

    tb.tokenMutex.Unlock()
    }
}

接着咱們來實現 TakeMaxDuration(count int64, max time.Duration) bool ,該操做的超時部分,咱們可使用 Go 自帶的 select 關鍵字結合定時器 channel 來實現。而且爲 waitingJob 加上一個標識字段來代表該操做是否已超時被棄用。因爲檢查棄用的操做會在 adjustDaemon 中進行,而標識棄用的操做會在 TakeMaxDuration 內的 select 中,爲了再次避免競爭狀態,咱們將使用的令牌的操做從 adjustDaemon 內經過 channel 返回給 select 中,並阻塞,來避免了競爭條件而且享受了令牌鎖的保護:

func (tb *TokenBucket) TakeMaxDuration(count int64, max time.Duration) bool {
    w := &waitingJob{
        ch:        make(chan struct{}),
        count:     count,
        abandoned: false, // 超時棄置標識
    }

    defer close(w.ch)

    tb.addWaitingJob(w)

    select {
    case <-w.ch:
        tb.avail -= use
        w.ch <- struct{}{}
        return true
    case <-time.After(max):
        w.abandoned = true
        return false
    }
}

func (tb *TokenBucket) adjustDaemon() {
    // ...

    if element != nil {
            if waitingJobNow == nil || waitingJobNow.abandoned {
                waitingJobNow = element.Value.(*waitingJob)

                tb.removeWaitingJob(element)
            }

            if tb.avail >= waitingJobNow.need && !waitingJobNow.abandoned {
                waitingJobNow.ch <- struct{}{}
                <-waitingJobNow.ch

                waitingJobNow = nil
            }
        }

    // ...
}

最後

最後總結一些關鍵點:

  • 對於共享資源的存取,要麼使用鎖,要麼使用 channel ,視場景選擇最好用的用之。

  • channel 可被動等待共享資源,而鎖則使用十分簡易。

  • 異步的多個等待操做,可以使用隊列進行協調。

  • 能夠在鎖的保護下,結合 channel 來對共享資源實現一個處理 pipeline ,結合二者優點,十分好用。

相關文章
相關標籤/搜索