func get(data chan int){ for v,ok := range chan{ if !ok{ //channel 已經關閉 break } // do something with v } }
若是須要中止使用channel,須要手動將channel關閉緩存
close(data)
關閉後的channel還能獲取其中存在的數據,可是不能再增長數據。數據取完後ok
值爲false。併發
ch = make(chan int, 10) //....some code select{ case r,ok := <- ch: if !ok { //通道已空 而且已經關閉 } }
ch = make(chan int, 10) Fill: //爲循環設置tag for{ select { case ch <- 1: default: break Fill } }
for循環必須設置tag,否則select中的break沒法中止外部循環,會一直執行default,陷入死循環。app
//這段代碼會陷入死循環中,每次都執行default for{ select{ case <- time.After(10*time.Second): default: break } }
select{ case job <- jobList case <- time.After(10 * time.Second): //10秒後作超時處理 }
前些日子寫的限制請求次數,結果用的時候發現能夠更簡單實現。ide
需求:抓數據的網站限定1秒只能有10次請求函數
因爲發起併發請求幾乎是0耗時的,因此能夠選擇同時發完全部的請求,而後等到下一個週期。這樣控制週期內請求次數只須要一個ticker
就能搞定:發完請求就阻塞一個週期;網站
而控制同時最大併發只須要一個channel用來計數。計數不能用互斥鎖計數器,由於互斥鎖不能實現阻塞atom
package main import ( "fmt" "sync" "time" ) var ( working chan int //goroutine計數器 用於限制最大併發數 wg sync.WaitGroup ) func main() { jobList := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} //要工做的任務清單 //每秒3個請求 最大同時4個請求 duration := time.Second concurrency := 3 concurrencyMax := 4 ticker := time.NewTicker(duration) working = make(chan int, concurrencyMax) //經過限定1個週期派發3個任務來實現限制請求次數 k := 0 //用於控制週期內發送次數 for c, job := range jobList { working <- c //計數器+1 可能會發生阻塞 wg.Add(1) go work(job) k++ if k == concurrency { <-ticker.C //等待一個週期 可能會白等 k = 0 } } wg.Wait() } func work(j int64) { defer wg.Done() fmt.Println("doing work#", j) <-time.After(5 * time.Second) //假設5秒完成 //工做完成後計數器減1 <-working }
上面這個相對就省事不少了。
可是,若是計數器+1的時候發生阻塞,那麼下一個等待週期多是白等的。
一樣的緣由,若是發起請求的操做也有耗時,極可能這一批請求發完就已經進入下一個週期,因而不等就有超發的風險,等待有白等的風險。設計
所以上面的方法僅限於發起併發請求幾乎0耗時的操做。code
若是要避免白等,就還須要一個精確的週期計數器。兩種方案:token
不管哪一種方案都須要加鎖。
第一種方案加鎖是爲了不在發放令牌的時候遭遇通道關閉(會引起panic)。
第二種在+1和-1甚至比對的時候都要加鎖。
抓數據的網站限定1秒只能有10次請求,所以設計了一個令牌管理機制來控制請求數量。
設計思路以下:
package main import ( "errors" "fmt" "sync" "sync/atomic" "time" ) //節流器 type throttle struct { D time.Duration //週期是D, C int64 //限制一個週期最多操做C次 Mu sync.Mutex Token chan bool //令牌池 num int64 //當前的goroutine數量 maxNum int64 //容許工做goroutine最大數量 } //若是兩個週期後尚未申請到令牌,就報錯超時 //目前用不到,若是限制routine最大數量須要靠這來監控 var ErrApplyTimeout = errors.New("apply token time out") func NewThrottle(D time.Duration, C, maxNum int64) *throttle { instance := &throttle{ D: D, C: C, Token: make(chan bool, C), maxNum: maxNum, } go instance.reset() return instance } //每週期從新填充一次令牌池 func (t *throttle) reset() { ticker := time.NewTicker(t.D) for _ = range ticker.C { //goroutine數量不超過最大數量時再填充令牌池 if t.num >= t.maxNum { continue } t.Mu.Lock() supply := t.C - int64(len(t.Token)) fmt.Printf("reset token:%d\n", supply) for supply > 0 { t.Token <- true supply-- } t.Mu.Unlock() } } //申請令牌,若是過兩個週期還沒申請到就報超時退出 func (t *throttle) ApplyToken() (bool, error) { select { case <-t.Token: return true, nil case <-time.After(t.D * 2): return false, ErrApplyTimeout } } func (t *throttle) Work(job func()) { if ok, err := t.ApplyToken(); !ok { fmt.Println(err) return } go func() { atomic.AddInt64(&t.num, 1) defer atomic.AddInt64(&t.num, -1) job() }() } func main() { t := NewThrottle(time.Second, 10, 20) //每秒10次,同時最多20個routine存在 for { t.Work(doWork) } } //真正的工做函數 假設每一個須要執行5秒 func doWork() { fmt.Println(time.Now()) <-time.After(5 * time.Second) }