[譯]Go併發編程中的那些事

併發編程

bouncing balls
bouncing balls

這篇文章將會以Go語言舉例介紹併發編程,包括如下內容html

  • 線程的併發執行(goroutines)
  • 基本的同步技術(channel和鎖)
  • Go中的基本併發模式
  • 死鎖和數據競爭
  • 並行計算

開始以前,你須要去了解怎樣寫最基本的 Go 程序。 若是你已經對 C/C++,Java 或者Python比較熟悉,A tour of go將會給你一些幫助。你也能夠看一下Go for C++ programmers 或者Go for Java programmers前端

1.多線程執行

goroutine 是 go 的一種調度機制。 Go 使用 go 進行聲明,以 goroutine 調度機制開啓一個新的執行線程。它會在新建立的 goroutine 執行程序。在單個程序中,全部goroutines都是共享相同的地址空間。java

相比於分配棧空間,goroutine 更加輕量,花銷更小。棧空間初始化很小,須要經過申請和釋放堆空間來擴展內存。Goroutines 內部是被複用在多個操做系統線程上。若是一個goroutine阻塞了一個操做系統線程,好比正在等待輸入,此時,這個線程中的其餘 goroutine 爲了保證繼續運行,將會遷移到其餘線程中,而你不須要去關心這些細節。react

下面的程序將會打印 "Hello from main goroutine". 是否打印"Hello from another goroutine",取決於兩個goroutines誰先完成.linux

func main() {

    go fmt.Println("Hello from another goroutine")
    fmt.Println("Hello from main goroutine")

    // 程序執行到這,全部活着的goroutines都會被殺掉

}複製代碼

goroutine1.goandroid

下一段程序 "Hello from main goroutine""Hello from another goroutine" 可能會以任何順序打印。但有一種可能性是第二個goroutine運行的很是慢,以致於到程序結束以前都不會打印。ios

func main() {
    go fmt.Println("Hello from another goroutine")
    fmt.Println("Hello from main goroutine")

    time.Sleep(time.Second) // 爲其餘goroutine完成等1秒鐘
}複製代碼

goroutine2.gogit

這有一個更實際的例子,咱們定義一個使用併發來推遲事件的函數。github

// 在指定時間過時後,文本會被打印到標準輸出
// 這不管如何都不會被阻塞
func Publish(text string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
    }() // 注意括號。咱們必須調用匿名函數
}複製代碼

publish1.gogolang

你可能用下面的方式調用 Publish 函數

func main() {
    Publish("A goroutine starts a new thread of execution.", 5*time.Second)
    fmt.Println("Let’s hope the news will published before I leave.")

    // 等待消息被髮布
    time.Sleep(10 * time.Second)

    fmt.Println("Ten seconds later: I’m leaving now.")
}複製代碼

publish1.go

該程序頗有可能按如下順序打印三行,每行輸出會間隔五秒鐘。

$ go run publish1.go
Let’s hope the news will published before I leave.
BREAKING NEWS: A goroutine starts a new thread of execution.
Ten seconds later: I’m leaving now.複製代碼

通常來講,咱們不可能讓線程休眠去等待對方。在下一節中, 咱們將會介紹 Go 的一種同步機制, channels 。而後演示如何使用channel來讓一個 goruntine 等待另外的 goruntine。

2. Channels

Sushi conveyor belt
Sushi conveyor belt

壽司輸送帶

channel 是一種 Go 語言結構,它經過傳遞特定元素類型的值來爲兩個 goroutines 提供同步執行和交流數據的機制
<- 標識符表示了channel的傳輸方向,接收或者發送。若是沒有指定方向。那麼 channel 就是雙向的。

chan Sushi      // 能被用於接收和發送 Sushi 類型的值
chan<- float64  // 只能被用於發送 float64 類型的值
<-chan int      // 只能被用於接收 int 類型的值複製代碼

Channels 是一種被 make 分配的引用類型

ic := make(chan int)        // 不帶緩存的  int channel
wc := make(chan *Work, 10)  // 帶緩衝工做的 channel複製代碼

經過 channel 發送值,可以使用 <- 做爲二元運算符。經過 channel 接收值,可以使用它做爲一元運算符。

ic <- 3       // 向channel中發送3
work := <-wc  // 從channel中接收指針到work複製代碼

若是 channel 是無緩衝的,發送者會一直阻塞直到有接收者從中接收值。若是是帶緩衝的,只有當值被拷貝到緩衝區且緩衝區已滿時,發送者纔會阻塞直到有接收者從中接收。接收者會一直阻塞直到 channel 中有值可被接收。

關閉

close 的做用是保證不能再向 channel 中發送值。 channel 被關閉後,仍然是能夠從中接收值的。接收操做會得到零值而不會阻塞。多值接收操做會額外返回一個布爾值,表示該值是否被髮送的。

ch := make(chan string)
go func() {
    ch <- "Hello!"
    close(ch)
}()
fmt.Println(<-ch)  // 打印 "Hello!"
fmt.Println(<-ch)  // 不阻塞的打印空值 ""
fmt.Println(<-ch)  // 再一次打印 ""
v, ok := <-ch      // v 的值是 "" , ok 的值是 false複製代碼

伴有 range 分句的 for 語句會連續讀取經過 channel 發送的值,直到 channel 被關閉

func main() {
    var ch <-chan Sushi = Producer()
    for s := range ch {
        fmt.Println("Consumed", s)
    }
}

func Producer() <-chan Sushi {
    ch := make(chan Sushi)
    go func() {
        ch <- Sushi("海老握り")  // Ebi nigiri
        ch <- Sushi("鮪とろ握り") // Toro nigiri
        close(ch)
    }()
    return ch
}複製代碼

sushi.go

3.同步

下一個例子中,Publish 函數返回一個channel,它會把發送的文本當作消息廣播出去。

// 指定時間過時後函數Publish將會打印文本到標準輸出.
// 當文本被髮布channel將會被關閉.
func Publish(text string, delay time.Duration) (wait <-chan struct{}) {
    ch := make(chan struct{})
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
        close(ch) // broadcast – a closed channel sends a zero value forever
    }()
    return ch
}複製代碼

publish2.go

注意咱們使用一個空結構的 channel : struct{}。 這代表該 channel 僅僅用於信號,而不是傳遞數據。

你可能會這樣使用該函數

func main() {
    wait := Publish("Channels let goroutines communicate.", 5*time.Second)
    fmt.Println("Waiting for the news...")
    <-wait
    fmt.Println("The news is out, time to leave.")
}複製代碼

publish2.go

程序將按給出的順序打印下列三行信息。在信息發送後,最後一行會馬上出現

$ go run publish2.go
Waiting for the news...
BREAKING NEWS: Channels let goroutines communicate.
The news is out, time to leave.複製代碼

4.死鎖

traffic jam
traffic jam

讓咱們去介紹 Publish 函數中的一個bug。

func Publish(text string, delay time.Duration) (wait <-chan struct{}) {
    ch := make(chan struct{})
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
        **//close(ch)**
    }()
    return ch
}複製代碼

這時由 Publish 函數開啓的 goroutine 打印重要信息而後退出,留下主 goroutine 繼續等待。

func main() {
    wait := Publish("Channels let goroutines communicate.", 5*time.Second)
    fmt.Println("Waiting for the news...")
    **<-wait**
    fmt.Println("The news is out, time to leave.")
}複製代碼

在某些狀況下,程序將不會有任何進展,這種狀況被稱爲死鎖。

deadlock 是線程之間相互等待而都不能繼續執行的一種狀況

在運行時,Go 對於運行時死鎖檢測具備良好支持。但在某種狀況下goroutine沒法取得任何進展,這時Go程序會提供一個詳細的錯誤信息. 下面就是咱們崩潰程序的日誌:

Waiting for the news...
BREAKING NEWS: Channels let goroutines communicate.
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
    .../goroutineStop.go:11 +0xf6

goroutine 2 [syscall]:
created by runtime.main
    .../go/src/pkg/runtime/proc.c:225

goroutine 4 [timer goroutine (idle)]:
created by addtimer
    .../go/src/pkg/runtime/ztime_linux_amd64.c:73複製代碼

多數狀況下下,在 Go 程序中很容易搞清楚是什麼致使了死鎖。接着就是如何去修復它了。

5. 數據競爭

死鎖可能聽起來很糟糕, 可是真正給併發編程帶來災難的是數據競爭。它們至關常見,並且難於調試。

一個 數據競爭 發生在當兩個線程併發訪問相同的變量,同時最少有一個訪問是在寫.

數據競爭是沒有規律的。舉個例子,打印數字1,嘗試找出它是如何發生的 — 一個可能的解釋是在代碼以後.

func race() {
    wait := make(chan struct{})
    n := 0
    go func() {
        **n++** // 一次操做:讀,增加,寫
        close(wait)
    }()
    **n++** // 另外一個衝突訪問
    <-wait
    fmt.Println(n) // 輸出: 不肯定
}複製代碼

datarace.go

兩個goroutines, g1g2, 在競爭過程當中,咱們沒法知道他們執行的順序.下面只是許多可能的結果性的一種.

  • g1n變量中讀取值0
  • g2n變量中讀取值0
  • g1 增長它的值從0變爲1
  • g1 把它的值把1賦值給n
  • g2 增長它的值從01
  • g2 把它的值把1賦值給n
  • 這段程序將會打印n的值,它的值爲1

"數據競爭」 的稱呼多少有些誤導,不只僅是他的執行順序沒法被設定,並且也沒法保證接下來會發生的狀況。編譯器和硬件時常會爲了更好的性能而調整代碼的順序。若是你仔細觀察一個正在運行的線程,那麼你纔可能會看到更多細節。

mid action
mid action

避免數據競爭的惟一方式是同步操做在線程間全部共享的可變數據。存在幾種方式,在Go中,可能最多使用 channel 或者 lock。較底層的操做可以使用 sync and sync/atomic 包,這裏再也不討論。

在Go中,處理併發數據訪問的首選方式是使用一個 channel,它將數據從一個goroutine傳遞到另外一個goroutine。有一句經典的話:"不要經過共享內存來傳遞數據;而要經過傳遞數據來共享內存"。

func sharingIsCaring() {
    ch := make(chan int)
    go func() {
        n := 0 // 局部變量只能對當前 goroutine 可見
        n++
        ch <- n // 數據經過 goroutine 傳遞
    }()
    n := <-ch   // ...從另一個 goroutine 中安全接受
    n++
    fmt.Println(n) // 輸出: 2
}複製代碼

datarace.go

在這份代碼中 channel 充當了雙重角色。它做爲一個同步點,在不一樣 goroutine 中傳遞數據。發送的 goroutine 將會等待其它的 goroutine 去接收數據,而接收的 goroutine 將會等待其餘的 goroutine 去發送數據。

Go內存模型 - 當一個 goroutine 在讀一個變量,另一個goroutine在寫相同的變量,這個過程其實是很是複雜的,可是隻要你用 channel 在不一樣goroutines中共享數據,那麼這個操做就是安全的。

6. 互斥鎖

lock
lock

有時經過直接鎖定來同步數據比使用 channel 更加方便。爲此,Go 標準庫提供了互斥鎖sync.Mutex

要讓這種類型的鎖正確工做,全部對於共享數據的操做(包括讀和寫)必須在一個 goroutine 持有該鎖時進行。這一點相當重要,goroutine 的一次錯誤就足以破壞程序和致使數據競爭。

所以你須要爲API去設計一種定製化的數據結構,而且確保全部同步操做都在內部執行。在這個例子中,咱們構建了一種安全易用的併發數據結構,AtomicInt,它存儲了單個整型,任何goroutines 都能安全的經過 AddValue 方法訪問數字。

// AtomicInt 是一種持有int類型的支持併發的數據結構。
// 它的初始化值爲0.
type AtomicInt struct {
    mu sync.Mutex // 同一時間只能有一個 goroutine 持有鎖。
    n  int
}

// Add adds n to the AtomicInt as a single atomic operation.
// 原子性的將n增長到AtomicInt中
func (a *AtomicInt) Add(n int) {
    a.mu.Lock() // 等待鎖被釋放而後獲取。
    a.n += n
    a.mu.Unlock() // 釋放鎖。
}

// 返回a的值.
func (a *AtomicInt) Value() int {
    a.mu.Lock()
    n := a.n
    a.mu.Unlock()
    return n
}

func lockItUp() {
    wait := make(chan struct{})
    var n AtomicInt
    go func() {
        n.Add(1) // one access
        close(wait)
    }()
    n.Add(1) // 另外一個併發訪問
    <-wait
    fmt.Println(n.Value()) // Output: 2
}複製代碼

datarace.go

7. 檢測數據競爭

競爭有時候難以檢測。當我執行這段存在數據競爭的程序,它打印55555。再試一次,可能會獲得不一樣的結果。 sync.WaitGroup是go標準庫的一部分;它等待一系列 goroutines 執行結束。

func race() {
    var wg sync.WaitGroup
    wg.Add(5)
    for i := 0; i < 5; **i++** {
        go func() {
            **fmt.Print(i)** // 局部變量i被6個goroutine共享
            wg.Done()
        }()
    }
    wg.Wait() // 等待5個goroutine執行結束
    fmt.Println()
}複製代碼

raceClosure.go

對於輸出 55555 較爲合理的解釋是執行 i++ 操做的 goroutine 在其餘 goroutines 打印以前就已經執行了5次。事實上,更新後的 i 對於其餘 goroutines 可見是隨機的。

一個很是簡單的解決辦法是經過使用本地變量做爲參數的方式去啓動另外的goroutine。

func correct() {
    var wg sync.WaitGroup
    wg.Add(5)
    for i := 0; i < 5; i++ {
        go func(n int) { // 局部變量。
            fmt.Print(n)
            wg.Done()
        }(i)
    }
    wg.Wait()
    fmt.Println()
}複製代碼

raceClosure.go

這段代碼是正確的,他打印了指望的結果,24031。回想一下,在不一樣 goroutines 中,程序的執行順序是亂序的。

咱們仍然可使用閉包去避免數據競爭。可是咱們須要注意在每一個 goroutine 中須要有不一樣的變量。

func alsoCorrect() {
    var wg sync.WaitGroup
    wg.Add(5)
    for i := 0; i < 5; i++ {
        n := i // 爲每一個閉包建立單獨的變量
        go func() {
            fmt.Print(n)
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Println()
}複製代碼

raceClosure.go

7. 自動競爭檢測

總的來講.咱們不可能自動的發現全部的數據競爭。可是 Go(從1.1版本開始) 提供了一個強大的數據競爭檢測器 data race detector

這個工具使用下來很是簡單: 僅僅增長 -racego 命令後。運行上述程序將會自動檢查而且打印出下面的輸出信息。

$ go run -race raceClosure.go 
Data race:
==================
WARNING: DATA RACE
Read at 0x00c420074168 by goroutine 6:
  main.race.func1()
      ../raceClosure.go:22 +0x3f

Previous write at 0x00c420074168 by main goroutine:
  main.race()
      ../raceClosure.go:20 +0x1bd
  main.main()
      ../raceClosure.go:10 +0x2f

Goroutine 6 (running) created at:
  main.race()
      ../raceClosure.go:24 +0x193
  main.main()
      ../raceClosure.go:10 +0x2f
==================
12355
Correct:
01234
Also correct:
01234
Found 1 data race(s)
exit status 66複製代碼

這個工具發如今程序20行存在數據競爭,一個goroutine向某個變量寫值,而22行存在另一個 goroutine 在不一樣步的讀取這個變量的值。

注意這個工具只能找到實際執行時發生的數據競爭。

8. Select 語句

在 Go 併發編程中,最後講的一個是 select 語句。它會挑選出一系列通訊操做中可以執行的操做。若是任意的通訊操做均可執行,則會隨機挑選一個並執行相關的語句。不然,若是也沒有默認執行語句的話,則會阻塞直到其中的任意一個通訊操做可以執行。

這有一個例子,顯示瞭如何用 select 去隨機生成數字.

// RandomBits 返回產生隨機位數的channel
func RandomBits() <-chan int {
    ch := make(chan int)
    go func() {
        for {
            select {
            case ch <- 0: // 沒有相關操做語句
            case ch <- 1:
            }
        }
    }()
    return ch
}複製代碼

randBits.go

更簡單,這裏 select 被用於設置超時。這段代碼只能打印 news 或者 time-out 消息,這取決於兩個接收語句中誰能夠執行.

select {
case news := <-NewsAgency:
    fmt.Println(news)
case <-time.After(time.Minute):
    fmt.Println("Time out: no news in one minute.")
}複製代碼

time.After是 go 標準庫的一部分;他等待特定時間過去,而後將當前時間發送到返回的 channel.

9. 最基本的併發實例

couples
couples

多花點時間仔細理解這個例子。當你徹底理解它,你將會完全的理解 Go 內部的併發工做機制。

程序演示了單個 channel 同時發送和接受多個 goroutines 的數據。它也展現了 select 語句如何從多個通訊操做中選擇執行。

func main() {
    people := []string{"Anna", "Bob", "Cody", "Dave", "Eva"}
    match := make(chan string, 1) // 給未匹配的元素預留空間
    wg := new(sync.WaitGroup)
    for _, name := range people {
        wg.Add(1)
        go Seek(name, match, wg)
    }
    wg.Wait()
    select {
    case name := <-match:
        fmt.Printf("No one received %s’s message.\n", name)
    default:
        // 沒有待處理的發送操做.
    }
}

// 尋求發送或接收匹配上名稱名稱的通道,並在完成後通知等待組.
func Seek(name string, match chan string, wg *sync.WaitGroup) {
    select {
    case peer := <-match:
        fmt.Printf("%s received a message from %s.\n", name, peer)
    case match <- name:
        // 等待其餘人接受消息.
    }
    wg.Done()
}複製代碼

matching.go

實例輸出:

$ go run matching.go
Anna received a message from Eva.
Cody received a message from Bob.
No one received Dave’s message.複製代碼

10. 並行計算

CPUs
CPUs

具備併發特性應用會將一個大的計算劃分爲小的計算單元,每一個計算單元都會單獨的工做。

多 CPU 上的分佈式計算不只僅是一門科學,更是一門藝術。

  • 每一個計算單元執行時間大約在100us至1ms之間.若是這些單元過小,那麼分配問題和管理子模塊的開銷可能會增大。若是這些單元太大,整個的計算體系可能會被一個小的耗時操做阻塞。不少因素都會影響計算速度,好比調度,程序終端,內存佈局(注意工做單元的個數和 CPU 的個數無關)。

  • 儘可能減小數據共享的量。併發寫入是很是消耗性能的,特別是多個 goroutines 在不一樣CPU上執行時。共享數據讀操做對性能影響不是很大。

  • 數據的合理組織是一種高效的方式。若是數據保存在緩存中,數據的加載和存儲的速度將會大大加快。再次強調,這對寫操做來講是很是重要的。

下面的例子將會顯示如何將多個耗時計算分配到多個可用的 CPU 上。這就是咱們想要優化的代碼。

type Vector []float64

// Convolve computes w = u * v, where w[k] = Σ u[i]*v[j], i + j = k.
// Precondition: len(u) > 0, len(v) > 0.
func Convolve(u, v Vector) Vector {
    n := len(u) + len(v) - 1
    w := make(Vector, n)

    for k := 0; k < n; k++ {
        w[k] = mul(u, v, k)
    }
    return w
}

// mul returns Σ u[i]*v[j], i + j = k.
func mul(u, v Vector, k int) float64 {
    var res float64
    n := min(k+1, len(u))
    j := min(k, len(v)-1)
    for i := k - j; i < n; i, j = i+1, j-1 {
        res += u[i] * v[j]
    }
    return res
}複製代碼

這個想法很簡單:識別適合大小的工做單元,而後在單獨的 goroutine 中運行每一個工做單元. 這就是 Convolve 的併發版本.

func Convolve(u, v Vector) Vector {
    n := len(u) + len(v) - 1
    w := make(Vector, n)

    // 將w劃分爲多個將會計算100us-1ms時間計算的工做單元
    size := max(1, 1000000/n)

    var wg sync.WaitGroup
    for i, j := 0, size; i < n; i, j = j, j+size {
        if j > n {
            j = n
        }
        // goroutines只爲讀共享內存.
        wg.Add(1)
        go func(i, j int) {
            for k := i; k < j; k++ {
                w[k] = mul(u, v, k)
            }
            wg.Done()
        }(i, j)
    }
    wg.Wait()
    return w
}複製代碼

convolution.go

當定義好計算單元,一般最好將調度留給程序執行和操做系統。然而,在 Go1.*版本中,你須要指定 goroutines 的個數。

func init() {
    numcpu := runtime.NumCPU()
    runtime.GOMAXPROCS(numcpu) // 儘可能使用全部可用的 CPU
}複製代碼

Stefan Nilsson


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOSReact前端後端產品設計 等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃官方微博知乎專欄

相關文章
相關標籤/搜索