- 原文地址:Part 23: Buffered Channels and Worker Pools
- 原文做者:Naveen R
- 譯者:咔嘰咔嘰 轉載請註明出處。
channel
咱們在上一個教程中討論的全部channel
基本上都是無緩衝的。正如咱們在channel
教程中詳細討論的那樣,發送和接收到無緩衝的channel
都是阻塞的。git
可使用緩衝區建立channel
。僅當緩衝區已滿時纔會阻塞對緩衝channel
的發送。相似地,僅當緩衝區爲空時才阻塞從緩衝channel
接收。golang
能夠經過添加一個capacity
參數傳遞給make
函數來建立緩衝channel
,該函數指定緩衝區的大小。併發
ch := make(chan type, capacity)
複製代碼
對於具備緩衝區的channel
,上述語法中的容量應大於 0。默認狀況下,無緩衝通道的容量爲 0,所以在上一個教程中建立通道時省略了容量參數。dom
咱們來建立一個緩衝channel
,函數
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}
複製代碼
在上面的程序中,第 9 行咱們建立一個容量爲 2 的緩衝channel
。因爲channel
的容量爲 2,所以能夠將 2 個字符串寫入而不會被阻塞。咱們在第 10 和 11 行寫入 2 個字符串,隨後讀取了寫入的字符串並打印,ui
naveen
paul
複製代碼
讓咱們再看一個緩衝channel
的例子,其中channel
的值寫入Goroutine
並從main Goroutine
讀取。這個例子將幫助咱們更好地理解什麼時候寫入緩衝的channel
。spa
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
複製代碼
Run in playgroudcode
上面的程序中,在第 16 行建立了容量爲 2 的緩衝channel ch
。main Goroutine
將ch
傳給write Goroutine
,而後main Goroutine
休眠 2 秒鐘。在此期間,write Goroutine
在運行。write Goroutine
有一個 for 循環,它將 0 到 4 的數字循環寫入channel ch
。因爲容量爲 2,所以可以將值 0 和 1 寫入,而後阻塞直到從channel ch
讀取至少一個值。因此這個程序會當即打印如下 2 行,協程
successfully wrote 0 to ch
successfully wrote 1 to ch
複製代碼
在打印上述兩行以後,write Goroutine
中的寫入被阻塞,直到channel ch
的數據被讀取。因爲main Goroutine
會休眠 2 秒,所以程序在接下來的 2 秒內不會打印任何內容。當main Goroutine
在被喚醒後,使用for range
循環開始從channel ch
讀取並打印讀取值,而後再次休眠 2 秒,此循環繼續,直到 ch 關閉。所以程序將在 2 秒後打印如下行,
read value 0 from ch
successfully wrote 2 to ch
複製代碼
而後繼續直到全部值都寫入並在關閉 channel
。最終的輸出是,
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
複製代碼
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
ch <- "steve"
fmt.Println(<-ch)
fmt.Println(<-ch)
}
複製代碼
Run in playgroud 在上面的程序中,咱們將 3 個字符串寫入容量爲 2 的緩衝channel
。當第三個字符串寫入的時候已超過其容量,所以寫入操做被阻塞。如今必須等待其餘Goroutine
從channel
讀取數據才能繼續寫入,但在上述代碼中並無從該channel
讀取數據的Goroutine
。所以會出現死鎖,程序將在運行時打印如下內容,
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox274756028/main.go:11 +0x100
複製代碼
容量是channel
能夠容納的值的數量。這是咱們使用make
函數建立時指定的值。
長度是當前在channel
中的元素數量。
一個程序會讓理解變得簡單😀
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
複製代碼
在上面的程序中,建立的channel
容量爲 3,即它能夠容納 3 個字符串。而後咱們分別寫入 2 個字符串,如今該channel
有 2 個字符串,所以其長度爲 2。 咱們從channel
中讀取一個字符串。如今channel
只有一個字符串了,所以它的長度變爲 1。這個程序將打印,
capacity is 3
length is 2
read value naveen
new length is 1
複製代碼
本教程的下一部分是關於Worker Pools
。要了解工做池,咱們首先須要瞭解WaitGroup
,由於它將用於工做池的實現。
WaitGroup
用於阻塞main Goroutines
直到全部Goroutines
完成執行。好比說咱們有 3 個從main Goroutine
生成的Goroutines
須要併發執行。main Goroutines
須要等待其餘 3 個Goroutines
完成才能終止,不然可能在main Goroutines
終止時,其他的Goroutines
還沒能得當執行,這種場景下可使用WaitGroup
來完成。
中止理論上代碼😀
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
複製代碼
WaitGroup
是一種結構類型,咱們在第 18 行建立一個WaitGroup
類型的空值變量。 WaitGroup
的工做方式是使用計數器。當咱們在WaitGroup
上調用int
型參數調用Add
方法,計數器會增長傳遞給Add
的值。遞減計數器的方法是在WaitGroup上
調用Done
方法。 Wait
方法阻塞調用它的Goroutine
,直到計數器變爲零。
在上面的程序中,咱們在第 20 行調用wg.Add(1)
循環迭代 3 次。因此計數器的值如今變成了 3。 for 循環也產生 3 個Goroutines
,main Goroutines
在第 23 行調用了wg.Wait()
以阻塞直到計數器變爲零。在Goroutine
中,經過調用wg.Done
來減小計數器的值。 一旦全部 3 個生成的Goroutines
完成執行,也就是wg.Done()
被調用三次,計數器被清零,main Goroutine
被解除阻塞,程序執行完成,輸出,
started Goroutine 2
started Goroutine 0
started Goroutine 1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing
複製代碼
你們的輸出可能與個人不一樣,由於Goroutines
的執行順序會有所不一樣:)。
緩衝channel
的一個重要用途是協程池的實現。
一般,協程池是一組協程,它們等待分配給它們任務。一旦完成分配的任務,他們就會再次等待下一個任務。
咱們將使用緩衝channel
實現協程池。咱們的協程池將執行查找輸入數字的數字之和的任務。例如,若是傳遞 234,則輸出將爲 9(9 = 2 + 3 + 4)。協程池的輸入將是僞隨機整數列表。
如下是咱們協程池的核心功能
Goroutines
池,用於監聽緩衝jobs channel
,等待任務分配jobs channel
添加任務results channel
results channel
讀取和打印結果咱們將逐步編寫此程序,以便更容易理解。
第一步是建立表示任務和結果的結構。
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
複製代碼
每一個Job
結構都有一個id
和randomno
,用來計算各個數字的總和。
Result
結構有一個job
字段和sumofdigits
字段,sumofdigits
字段用來保存job
各個數字之和的結果。
下一步是建立用於接收任務和存儲結果的緩衝channel
。
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
複製代碼
worker Goroutines
在任務緩衝channel
上偵聽新任務。一旦任務完成,把結果寫入結果緩衝channel
。
digits
函數執行查找整數的各個數字之和並返回它的。咱們爲此函數添加 2 秒的休眠,以模擬此函數計算結果須要一些時間的場景。
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
複製代碼
接下來將編寫一個建立worker Goroutine
的函數。
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
複製代碼
上面的函數建立了一個worker
,它從jobs channel
讀取任務,使用當前任務和digits
函數的返回值建立Result
結構,而後將結果寫入結果緩衝channel
。此函數將WaitGroup wg
做爲參數,在全部任務完成後,它將調用Done
方法結束當前Goroutine
的阻塞。
createWorkerPool
函數將建立一個Goroutines
池。
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
複製代碼
上面的函數將要建立的worker
數量做爲參數。它在建立Goroutine
以前調用了wg.Add(1)
來增長WaitGroup
計數器。而後它經過將WaitGroup wg
的地址傳遞給worker
函數來建立worker Goroutines
。在建立了所需的worker Goroutines
以後,它經過調用wg.Wait()
來阻塞當前協程直到全部Goroutines
完成執行後,關閉results channel
,由於全部的Goroutines
都已完成執行,沒有結果被寫入該results channel
。
如今咱們已經寫好了協程池,讓咱們繼續編寫將任務分配給協程的功能。
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
複製代碼
上面的allocate
函數將要建立的任務數做爲輸入參數,生成最大值爲 998 的僞隨機數,使用隨機數建立Job
結構,並將 for 循環計數器的i
做爲id
,而後將它們寫入jobs channel
。它在寫完全部任務後關閉了jobs channel
。
下一步是建立一個函數讀取results channel
並打印輸出。
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
複製代碼
result
函數讀取results channel
並打印任務 ID,輸入隨機數和隨機數的總和。result
函數在打印全部結果後,將true
寫入done channel
。
萬事俱備,讓咱們把上面全部的功能用main
函數串聯起來。
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
複製代碼
第 2 行咱們首先將程序的執行開始時間存儲起來,在最後一行(第 12 行),咱們計算endTime
和startTime
之間的時間差,並顯示程序的總運行時間。這是必要的,由於咱們將經過改變Goroutines
的數量來作一些基準測試。
noOfJobs
設置爲 100,而後調用allocate
以將任務添加到jobs channel
。
而後建立done channel
並將其傳遞給results channel
,以便它能夠開始打印輸出並在打印完全部內容後通知。
最後,經過調用createWorkerPool
函數建立了一個 10 個work Goroutines
的池,而後main
阻塞直到done channel
寫入true
值,最後打印全部結果。
下面是完整的代碼。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
複製代碼
請在本地計算機上運行此程序,以便計算的總時間更準確。
程序將打印,
Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.01081009 seconds
複製代碼
對應於 100 個任務,將打印總共 100 行,將在最後一行打印該程序運行所花費的總時間。您的輸出將與個人不一樣,由於Goroutines
能夠按任何順序運行,總時間也會因硬件而異。在個人狀況下,程序完成大約須要 20 秒。
如今讓咱們將main
函數中的noOfWorkers
增長到 20。咱們將worker
的數量增長了一倍。因爲work Goroutines
已經增長,程序完成所需的總時間應該減小。在個人狀況下,它變成 10.004364685 秒,程序打印,
...
total time taken 10.004364685 seconds
複製代碼
如今咱們瞭解到了隨着work Goroutines
數量的增長,完成任務所需的總時間減小了。我把它留做練習,讓你在主函數中使用不一樣的noOfJobs
和noOfWorkers
的值執行並分析結果。