Golang併發(四)- buffered channel 和 Worker Pools

What you are wasting today is tomorrow for those who died yesterday; what you hate now is the future you can not go back.git

你所浪費的今天是昨天死去的人奢望的明天; 你所厭惡的如今是將來的你回不去的曾經。數據結構

 

Buffered channel

    以前咱們說的channel都是不帶緩衝的,不管發送和接收都會致使阻塞。dom

    緩衝Channel的特色是:只有當發送至緩衝區存滿後致使阻塞, 接受也是如此。spa

    建立方式: ch:= make(chan Type , capacity)指針

    capacity 容量, 當capacity = 0 時, 爲無緩衝channel,一般省略而已。code

package main

import (
	"fmt"
)


func main() {
	ch := make(chan string, 2)
	ch <- "naveen"
	ch <- "paul"
	fmt.Println(<- ch)
	fmt.Println(<- ch)  // 註釋此行,會不會deadlock???
}

 

下這個例子請認真思考,有助於理解buffered channel:協程

package main

import (  
    "fmt"
    "time"
)

func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {  
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}

解釋:ci

    當main程建立一個有容量爲2的channel,而後在goroutine中循環寫入, 在寫入兩次後, goroutine阻塞, main程同時也進入了sleep中,當range開始接收後,goroutine發現又能夠繼續寫入。input

輸出:string

successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch

一開始寫入兩次,是由於channel容量爲2, 不須要讀取就可寫入。 

最後連續兩次讀,是由於當range讀取一次後, goroutine馬上寫入一次,因此channel中始終保持2個數據。

 

概念: 長度與容量

    容量是指channel最大的存儲長度。 長度是指當前channel中正在排隊的數據長度。

  代碼說明:

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 3)
    ch <- "數據1"
	ch <- "數據2"

    //容量爲3, 可是其中數據只有2個
    fmt.Println("capacity is", cap(ch)) 

    //數據長度爲2
    fmt.Println("length is", len(ch)) 

    //讀取一次
    fmt.Println("read value", <-ch)

   //數據長度爲1, 可是容量仍是3
    fmt.Println("new length is", len(ch)) 
}

輸出:

capacity is 3
length is 2
read value 數據1
new length is 1

 

WaitGroup

工做池的實現離不開WaitGroup, 下面講一下關於WariGroup。

若是一個main程中有三個goroutine, 要想得到這三個goroutine的輸出,那麼 須要使用WaitGroup阻塞main程,等待全部goroutine結束。

package main

import (
	"fmt"
	"sync"
	"time"
)

func ProcessEcho( i int , w *sync.WaitGroup){
	fmt.Println("協程", i , "開始")
	time.Sleep(1*time.Second)
	fmt.Println("協程", i , "結束")
	w.Done()
}

func main(){
	var w sync.WaitGroup
	Max := 10
	for i:= 0; i<Max ;i++  {
		w.Add(1)
		go ProcessEcho(i, &w)
	}
	w.Wait()

	fmt.Println("main執行完成並退出。")
}

解釋:

    main程啓動10個協程, 天天啓動都高速WaitGroup來添加一個監聽,每一個goroutine結束都標記一次結束。 main程中等待全部標記完成,結束阻塞。

注意點:

1. 爲何go ProcessEcho中使用的是w的指針?!

2. goroutine的輸出是沒有規律的。

 

 

細看工做池的實現吧:

package main

import (
	"fmt"
	"sync"
	"time"
)
//任務結構
type Job struct {
	id       int
	randomno int
}

//接受數據結構
type Result struct {
	job         Job
	sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {

	time.Sleep(2 * time.Second)
	return number
}
func worker(i int , wg *sync.WaitGroup) {
	for job := range jobs {
		output := Result{job, digits(i)}
		results <- output
	}
	wg.Done()
}
func createWorkerPool(noOfWorkers int) {
	var wg sync.WaitGroup
	for i := 0; i < noOfWorkers; i++ {
		wg.Add(1)
		go worker(i,&wg)
	}
	wg.Wait()
	close(results)
}
func allocate(noOfJobs int) {
	for i := 0; i < noOfJobs; i++ {
		randomno := i
		job := Job{i, randomno}
		jobs <- job
	}
	close(jobs)
}
func result(done chan bool) {
	for result := range results {
		fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
	}
	done <- true
}
func main() {
	startTime := time.Now()
	noOfJobs := 12 // 任務數
	go allocate(noOfJobs)
	done := make(chan bool)
	go result(done)
	noOfWorkers := 3 // 執行者
	createWorkerPool(noOfWorkers)
	<-done
	endTime := time.Now()
	diff := endTime.Sub(startTime)
	fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
相關文章
相關標籤/搜索