What you are wasting today is tomorrow for those who died yesterday; what you hate now is the future you can not go back.git
你所浪費的今天是昨天死去的人奢望的明天; 你所厭惡的如今是將來的你回不去的曾經。數據結構
以前咱們說的channel都是不帶緩衝的,不管發送和接收都會致使阻塞。dom
緩衝Channel的特色是:只有當發送至緩衝區存滿後致使阻塞, 接受也是如此。spa
建立方式: ch:= make(chan Type , capacity)指針
capacity 容量, 當capacity = 0 時, 爲無緩衝channel,一般省略而已。code
package main import ( "fmt" ) func main() { ch := make(chan string, 2) ch <- "naveen" ch <- "paul" fmt.Println(<- ch) fmt.Println(<- ch) // 註釋此行,會不會deadlock??? }
下這個例子請認真思考,有助於理解buffered channel:協程
package main import ( "fmt" "time" ) func write(ch chan int) { for i := 0; i < 5; i++ { ch <- i fmt.Println("successfully wrote", i, "to ch") } close(ch) } func main() { ch := make(chan int, 2) go write(ch) time.Sleep(2 * time.Second) for v := range ch { fmt.Println("read value", v,"from ch") time.Sleep(2 * time.Second) } }
解釋:ci
當main程建立一個有容量爲2的channel,而後在goroutine中循環寫入, 在寫入兩次後, goroutine阻塞, main程同時也進入了sleep中,當range開始接收後,goroutine發現又能夠繼續寫入。input
輸出:string
successfully wrote 0 to ch successfully wrote 1 to ch read value 0 from ch successfully wrote 2 to ch read value 1 from ch successfully wrote 3 to ch read value 2 from ch successfully wrote 4 to ch read value 3 from ch read value 4 from ch
一開始寫入兩次,是由於channel容量爲2, 不須要讀取就可寫入。
最後連續兩次讀,是由於當range讀取一次後, goroutine馬上寫入一次,因此channel中始終保持2個數據。
容量是指channel最大的存儲長度。 長度是指當前channel中正在排隊的數據長度。
代碼說明:
package main import ( "fmt" ) func main() { ch := make(chan string, 3) ch <- "數據1" ch <- "數據2" //容量爲3, 可是其中數據只有2個 fmt.Println("capacity is", cap(ch)) //數據長度爲2 fmt.Println("length is", len(ch)) //讀取一次 fmt.Println("read value", <-ch) //數據長度爲1, 可是容量仍是3 fmt.Println("new length is", len(ch)) }
輸出:
capacity is 3 length is 2 read value 數據1 new length is 1
工做池的實現離不開WaitGroup, 下面講一下關於WariGroup。
若是一個main程中有三個goroutine, 要想得到這三個goroutine的輸出,那麼 須要使用WaitGroup阻塞main程,等待全部goroutine結束。
package main import ( "fmt" "sync" "time" ) func ProcessEcho( i int , w *sync.WaitGroup){ fmt.Println("協程", i , "開始") time.Sleep(1*time.Second) fmt.Println("協程", i , "結束") w.Done() } func main(){ var w sync.WaitGroup Max := 10 for i:= 0; i<Max ;i++ { w.Add(1) go ProcessEcho(i, &w) } w.Wait() fmt.Println("main執行完成並退出。") }
解釋:
main程啓動10個協程, 天天啓動都高速WaitGroup來添加一個監聽,每一個goroutine結束都標記一次結束。 main程中等待全部標記完成,結束阻塞。
注意點:
1. 爲何go ProcessEcho中使用的是w的指針?!
2. goroutine的輸出是沒有規律的。
細看工做池的實現吧:
package main import ( "fmt" "sync" "time" ) //任務結構 type Job struct { id int randomno int } //接受數據結構 type Result struct { job Job sumofdigits int } var jobs = make(chan Job, 10) var results = make(chan Result, 10) func digits(number int) int { time.Sleep(2 * time.Second) return number } func worker(i int , wg *sync.WaitGroup) { for job := range jobs { output := Result{job, digits(i)} results <- output } wg.Done() } func createWorkerPool(noOfWorkers int) { var wg sync.WaitGroup for i := 0; i < noOfWorkers; i++ { wg.Add(1) go worker(i,&wg) } wg.Wait() close(results) } func allocate(noOfJobs int) { for i := 0; i < noOfJobs; i++ { randomno := i job := Job{i, randomno} jobs <- job } close(jobs) } func result(done chan bool) { for result := range results { fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits) } done <- true } func main() { startTime := time.Now() noOfJobs := 12 // 任務數 go allocate(noOfJobs) done := make(chan bool) go result(done) noOfWorkers := 3 // 執行者 createWorkerPool(noOfWorkers) <-done endTime := time.Now() diff := endTime.Sub(startTime) fmt.Println("total time taken ", diff.Seconds(), "seconds") }