[譯] part23: golang 緩衝 channel 和協程池

什麼是緩衝channel

咱們在上一個教程中討論的全部channel基本上都是無緩衝的。正如咱們在channel教程中詳細討論的那樣,發送和接收到無緩衝的channel都是阻塞的。git

可使用緩衝區建立channel。僅當緩衝區已滿時纔會阻塞對緩衝channel的發送。相似地,僅當緩衝區爲空時才阻塞從緩衝channel接收。golang

能夠經過添加一個capacity參數傳遞給make函數來建立緩衝channel,該函數指定緩衝區的大小。併發

ch := make(chan type, capacity)
複製代碼

對於具備緩衝區的channel,上述語法中的容量應大於 0。默認狀況下,無緩衝通道的容量爲 0,所以在上一個教程中建立通道時省略了容量參數。dom

咱們來建立一個緩衝channel函數

package main

import (
    "fmt"
)


func main() {
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}
複製代碼

Run in playgroud測試

在上面的程序中,第 9 行咱們建立一個容量爲 2 的緩衝channel。因爲channel的容量爲 2,所以能夠將 2 個字符串寫入而不會被阻塞。咱們在第 10 和 11 行寫入 2 個字符串,隨後讀取了寫入的字符串並打印,ui

naveen
paul
複製代碼

另外一個例子

讓咱們再看一個緩衝channel的例子,其中channel的值寫入Goroutine並從main Goroutine讀取。這個例子將幫助咱們更好地理解什麼時候寫入緩衝的channelspa

package main

import (
    "fmt"
    "time"
)

func write(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}
複製代碼

Run in playgroudcode

上面的程序中,在第 16 行建立了容量爲 2 的緩衝channel chmain Goroutinech傳給write Goroutine,而後main Goroutine休眠 2 秒鐘。在此期間,write Goroutine在運行。write Goroutine有一個 for 循環,它將 0 到 4 的數字循環寫入channel ch。因爲容量爲 2,所以可以將值 0 和 1 寫入,而後阻塞直到從channel ch讀取至少一個值。因此這個程序會當即打印如下 2 行,協程

successfully wrote 0 to ch
successfully wrote 1 to ch
複製代碼

在打印上述兩行以後,write Goroutine中的寫入被阻塞,直到channel ch的數據被讀取。因爲main Goroutine會休眠 2 秒,所以程序在接下來的 2 秒內不會打印任何內容。當main Goroutine在被喚醒後,使用for range循環開始從channel ch讀取並打印讀取值,而後再次休眠 2 秒,此循環繼續,直到 ch 關閉。所以程序將在 2 秒後打印如下行,

read value 0 from ch
successfully wrote 2 to ch
複製代碼

而後繼續直到全部值都寫入並在關閉 channel。最終的輸出是,

successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
複製代碼

死鎖

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}
複製代碼

Run in playgroud 在上面的程序中,咱們將 3 個字符串寫入容量爲 2 的緩衝channel。當第三個字符串寫入的時候已超過其容量,所以寫入操做被阻塞。如今必須等待其餘Goroutinechannel讀取數據才能繼續寫入,但在上述代碼中並無從該channel讀取數據的Goroutine。所以會出現死鎖,程序將在運行時打印如下內容,

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /tmp/sandbox274756028/main.go:11 +0x100
複製代碼

長度 VS 容量

容量是channel能夠容納的值的數量。這是咱們使用make函數建立時指定的值。

長度是當前在channel中的元素數量。

一個程序會讓理解變得簡單😀

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}
複製代碼

Run in playgroud

在上面的程序中,建立的channel容量爲 3,即它能夠容納 3 個字符串。而後咱們分別寫入 2 個字符串,如今該channel有 2 個字符串,所以其長度爲 2。 咱們從channel中讀取一個字符串。如今channel只有一個字符串了,所以它的長度變爲 1。這個程序將打印,

capacity is 3
length is 2
read value naveen
new length is 1
複製代碼

WaitGroup

本教程的下一部分是關於Worker Pools。要了解工做池,咱們首先須要瞭解WaitGroup,由於它將用於工做池的實現。

WaitGroup用於阻塞main Goroutines直到全部Goroutines完成執行。好比說咱們有 3 個從main Goroutine生成的Goroutines須要併發執行。main Goroutines須要等待其餘 3 個Goroutines完成才能終止,不然可能在main Goroutines終止時,其他的Goroutines還沒能得當執行,這種場景下可使用WaitGroup來完成。

中止理論上代碼😀

package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}
複製代碼

Run in playgroud

WaitGroup是一種結構類型,咱們在第 18 行建立一個WaitGroup類型的空值變量。 WaitGroup的工做方式是使用計數器。當咱們在WaitGroup上調用int型參數調用Add 方法,計數器會增長傳遞給Add的值。遞減計數器的方法是在WaitGroup上調用Done方法。 Wait方法阻塞調用它的Goroutine,直到計數器變爲零。

在上面的程序中,咱們在第 20 行調用wg.Add(1)循環迭代 3 次。因此計數器的值如今變成了 3。 for 循環也產生 3 個Goroutinesmain Goroutines在第 23 行調用了wg.Wait()以阻塞直到計數器變爲零。在Goroutine中,經過調用wg.Done來減小計數器的值。 一旦全部 3 個生成的Goroutines完成執行,也就是wg.Done()被調用三次,計數器被清零,main Goroutine被解除阻塞,程序執行完成,輸出,

started Goroutine  2
started Goroutine  0
started Goroutine  1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing
複製代碼

你們的輸出可能與個人不一樣,由於Goroutines的執行順序會有所不一樣:)。

協程池的實現

緩衝channel的一個重要用途是協程池的實現。

一般,協程池是一組協程,它們等待分配給它們任務。一旦完成分配的任務,他們就會再次等待下一個任務。

咱們將使用緩衝channel實現協程池。咱們的協程池將執行查找輸入數字的數字之和的任務。例如,若是傳遞 234,則輸出將爲 9(9 = 2 + 3 + 4)。協程池的輸入將是僞隨機整數列表。

如下是咱們協程池的核心功能

  • 建立一個Goroutines池,用於監聽緩衝jobs channel,等待任務分配
  • jobs channel添加任務
  • 任務完成後,將結果寫入緩衝results channel
  • results channel讀取和打印結果

咱們將逐步編寫此程序,以便更容易理解。

第一步是建立表示任務和結果的結構。

type Job struct {
    id       int
    randomno int
}

type Result struct {
    job         Job
    sumofdigits int
}
複製代碼

每一個Job結構都有一個idrandomno,用來計算各個數字的總和。

Result結構有一個job字段和sumofdigits字段,sumofdigits字段用來保存job各個數字之和的結果。

下一步是建立用於接收任務和存儲結果的緩衝channel

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
複製代碼

worker Goroutines在任務緩衝channel上偵聽新任務。一旦任務完成,把結果寫入結果緩衝channel

digits函數執行查找整數的各個數字之和並返回它的。咱們爲此函數添加 2 秒的休眠,以模擬此函數計算結果須要一些時間的場景。

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
複製代碼

接下來將編寫一個建立worker Goroutine的函數。

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
複製代碼

上面的函數建立了一個worker,它從jobs channel讀取任務,使用當前任務和digits函數的返回值建立Result結構,而後將結果寫入結果緩衝channel。此函數將WaitGroup wg做爲參數,在全部任務完成後,它將調用Done方法結束當前Goroutine的阻塞。

createWorkerPool函數將建立一個Goroutines池。

func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
複製代碼

上面的函數將要建立的worker數量做爲參數。它在建立Goroutine以前調用了wg.Add(1)來增長WaitGroup計數器。而後它經過將WaitGroup wg的地址傳遞給worker函數來建立worker Goroutines。在建立了所需的worker Goroutines以後,它經過調用wg.Wait()來阻塞當前協程直到全部Goroutines完成執行後,關閉results channel,由於全部的Goroutines都已完成執行,沒有結果被寫入該results channel

如今咱們已經寫好了協程池,讓咱們繼續編寫將任務分配給協程的功能。

func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
複製代碼

上面的allocate函數將要建立的任務數做爲輸入參數,生成最大值爲 998 的僞隨機數,使用隨機數建立Job結構,並將 for 循環計數器的i做爲id,而後將它們寫入jobs channel。它在寫完全部任務後關閉了jobs channel

下一步是建立一個函數讀取results channel並打印輸出。

func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
複製代碼

result函數讀取results channel並打印任務 ID,輸入隨機數和隨機數的總和。result函數在打印全部結果後,將true寫入done channel

萬事俱備,讓咱們把上面全部的功能用main函數串聯起來。

func main() {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
複製代碼

第 2 行咱們首先將程序的執行開始時間存儲起來,在最後一行(第 12 行),咱們計算endTimestartTime之間的時間差,並顯示程序的總運行時間。這是必要的,由於咱們將經過改變Goroutines的數量來作一些基準測試。

noOfJobs設置爲 100,而後調用allocate以將任務添加到jobs channel

而後建立done channel並將其傳遞給results channel,以便它能夠開始打印輸出並在打印完全部內容後通知。

最後,經過調用createWorkerPool函數建立了一個 10 個work Goroutines的池,而後main阻塞直到done channel寫入true值,最後打印全部結果。

下面是完整的代碼。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    id       int
    randomno int
}
type Result struct {
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
複製代碼

Run in playgroud

請在本地計算機上運行此程序,以便計算的總時間更準確。

程序將打印,

Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken  20.01081009 seconds
複製代碼

對應於 100 個任務,將打印總共 100 行,將在最後一行打印該程序運行所花費的總時間。您的輸出將與個人不一樣,由於Goroutines能夠按任何順序運行,總時間也會因硬件而異。在個人狀況下,程序完成大約須要 20 秒。

如今讓咱們將main函數中的noOfWorkers增長到 20。咱們將worker的數量增長了一倍。因爲work Goroutines已經增長,程序完成所需的總時間應該減小。在個人狀況下,它變成 10.004364685 秒,程序打印,

...
total time taken  10.004364685 seconds
複製代碼

如今咱們瞭解到了隨着work Goroutines數量的增長,完成任務所需的總時間減小了。我把它留做練習,讓你在主函數中使用不一樣的noOfJobsnoOfWorkers的值執行並分析結果。

相關文章
相關標籤/搜索