慕課go高級工程師實戰營

愛分享 愛生活 加油 2021

 

循環獲取channel

func get(data chan int){
    for v,ok := range chan{
        if !ok{
            //channel 已經關閉
            break
        }
        // do something with v
    }
}

若是須要中止使用channel,須要手動將channel關閉緩存

close(data)

關閉後的channel還能獲取其中存在的數據,可是不能再增長數據。數據取完後ok值爲false。併發

channel關閉的判斷

ch = make(chan int, 10)
//....some code
select{
    case r,ok := <- ch:
    if !ok  {
        //通道已空 而且已經關閉
    }
}

向有緩存的channel傳數據,滿了就中止,不阻塞

ch = make(chan int, 10)

Fill: //爲循環設置tag
for{
    select {
        case ch <- 1:
        default:
            break Fill
    }
}

for循環必須設置tag,否則select中的break沒法中止外部循環,會一直執行default,陷入死循環。app

//這段代碼會陷入死循環中,每次都執行default
for{
    select{
    case <- time.After(10*time.Second):
    default:
        break
    }
}

超時的使用

select{
    case job <- jobList
    case <- time.After(10 * time.Second):
    //10秒後作超時處理
}



前些日子寫的限制請求次數,結果用的時候發現能夠更簡單實現。ide

需求:抓數據的網站限定1秒只能有10次請求函數

因爲發起併發請求幾乎是0耗時的,因此能夠選擇同時發完全部的請求,而後等到下一個週期。這樣控制週期內請求次數只須要一個ticker就能搞定:發完請求就阻塞一個週期;網站

而控制同時最大併發只須要一個channel用來計數。計數不能用互斥鎖計數器,由於互斥鎖不能實現阻塞atom

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    working chan int //goroutine計數器 用於限制最大併發數
    wg      sync.WaitGroup
)

func main() {
    jobList := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} //要工做的任務清單

    //每秒3個請求 最大同時4個請求
    duration := time.Second
    concurrency := 3
    concurrencyMax := 4

    ticker := time.NewTicker(duration)
    working = make(chan int, concurrencyMax)

    //經過限定1個週期派發3個任務來實現限制請求次數
    k := 0 //用於控制週期內發送次數
    for c, job := range jobList {
        working <- c //計數器+1 可能會發生阻塞

        wg.Add(1)
        go work(job)

        k++
        if k == concurrency {
            <-ticker.C //等待一個週期 可能會白等
            k = 0
        }
    }
    wg.Wait()
}

func work(j int64) {
    defer wg.Done()

    fmt.Println("doing work#", j)
    <-time.After(5 * time.Second) //假設5秒完成

    //工做完成後計數器減1
    <-working
}

上面這個相對就省事不少了。
可是,若是計數器+1的時候發生阻塞,那麼下一個等待週期多是白等的。
一樣的緣由,若是發起請求的操做也有耗時,極可能這一批請求發完就已經進入下一個週期,因而不等就有超發的風險,等待有白等的風險。設計

所以上面的方法僅限於發起併發請求幾乎0耗時的操做。code

若是要避免白等,就還須要一個精確的週期計數器。兩種方案:token

  1. 相似令牌池,維持一個channel來發放令牌,週期性刷新。就像這裏令牌池的實現
  2. 維持一個計數器,週期性重置

不管哪一種方案都須要加鎖。
第一種方案加鎖是爲了不在發放令牌的時候遭遇通道關閉(會引起panic)。
第二種在+1和-1甚至比對的時候都要加鎖。

 

抓數據的網站限定1秒只能有10次請求,所以設計了一個令牌管理機制來控制請求數量。

設計思路以下:

  • 發請求前須要先獲取令牌
  • 限定某時間段內的發放的令牌數量
  • 任務執行完成後不能歸還令牌,只能使用定時器不斷重置令牌
  • 若是當前goroutine數量過多時也不重置令牌
package main

import (
    "errors"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

//節流器

type throttle struct {
    D      time.Duration //週期是D,
    C      int64         //限制一個週期最多操做C次
    Mu     sync.Mutex
    Token  chan bool //令牌池
    num    int64     //當前的goroutine數量
    maxNum int64     //容許工做goroutine最大數量
}

//若是兩個週期後尚未申請到令牌,就報錯超時
//目前用不到,若是限制routine最大數量須要靠這來監控
var ErrApplyTimeout = errors.New("apply token time out")

func NewThrottle(D time.Duration, C, maxNum int64) *throttle {
    instance := &throttle{
        D:      D,
        C:      C,
        Token:  make(chan bool, C),
        maxNum: maxNum,
    }
    go instance.reset()
    return instance
}

//每週期從新填充一次令牌池
func (t *throttle) reset() {
    ticker := time.NewTicker(t.D)
    for _ = range ticker.C {
        //goroutine數量不超過最大數量時再填充令牌池
        if t.num >= t.maxNum {
            continue
        }
        t.Mu.Lock()
        supply := t.C - int64(len(t.Token))
        fmt.Printf("reset token:%d\n", supply)
        for supply > 0 {
            t.Token <- true
            supply--
        }
        t.Mu.Unlock()
    }
}

//申請令牌,若是過兩個週期還沒申請到就報超時退出
func (t *throttle) ApplyToken() (bool, error) {
    select {
    case <-t.Token:
        return true, nil
    case <-time.After(t.D * 2):
        return false, ErrApplyTimeout
    }
}

func (t *throttle) Work(job func()) {
    if ok, err := t.ApplyToken(); !ok {
        fmt.Println(err)
        return
    }
    go func() {
        atomic.AddInt64(&t.num, 1)
        defer atomic.AddInt64(&t.num, -1)
        job()
    }()
}
func main() {
    t := NewThrottle(time.Second, 10, 20) //每秒10次,同時最多20個routine存在
    for {
        t.Work(doWork)
    }
}

//真正的工做函數 假設每一個須要執行5秒
func doWork() {
    fmt.Println(time.Now())
    <-time.After(5 * time.Second)
}
相關文章
相關標籤/搜索