原文:Fundamentals of concurrent programminghtml
譯者:youngsterxyfjava
本文是一篇併發編程方面的入門文章,以Go語言編寫示例代碼,內容涵蓋:linux
在開始閱讀本文以前,你應該知道如何編寫簡單的Go程序。若是你熟悉的是C/C++、Java或Python之類的語言,那麼 Go語言之旅 能提供全部必要的背景知識。也許你還有興趣讀一讀 爲C++程序員準備的Go語言教程 或 爲Java程序員準備的Go語言教程。git
Go容許使用go
語句開啓一個新的運行期線程,即 goroutine,以一個不一樣的、新建立的goroutine來執行一個函數。同一個程序中的全部goroutine共享同一個地址空間。程序員
Goroutine很是輕量,除了爲之分配的棧空間,其所佔用的內存空間微乎其微。而且其棧空間在開始時很是小,以後隨着堆存儲空間的按需分配或釋放而變化。內部實現上,goroutine會在多個操做系統線程上多路複用。若是一個goroutine阻塞了一個操做系統線程,例如:等待輸入,這個線程上的其餘goroutine就會遷移到其餘線程,這樣能繼續運行。開發者並不須要關心/擔憂這些細節。github
下面所示程序會輸出「Hello from main goroutine」。也可能會輸出「Hello from another goroutine」,具體依賴於兩個goroutine哪一個先結束。golang
func main() { go fmt.Println("Hello from another goroutine") fmt.Println("Hello from main goroutine") // 至此,程序運行結束, // 全部活躍的goroutine被殺死 }
接下來的這個程序,多數狀況下,會輸出「Hello from main goroutine」和「Hello from another goroutine」,輸出的順序不肯定。但還有另外一個可能性是:第二個goroutine運行得極其慢,在程序結束以前都沒來得及輸出相應的消息。c#
func main() { go fmt.Println("Hello from another goroutine") fmt.Println("Hello from main goroutine") time.Sleep(time.Second) // 等待1秒,等另外一個goroutine結束 }
下面則是一個相對更加實際的示例,其中定義了一個函數使用併發來推遲觸發一個事件。
// 函數Publish在給定時間過時後打印text字符串到標準輸出 // 該函數並不會阻塞而是當即返回 func Publish(text string, delay time.Duration) { go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) }() // 注意這裏的括號。必須調用匿名函數 }
你可能會這樣使用Publish
函數:
func main() { Publish("A goroutine starts a new thread of execution.", 5*time.Second) fmt.Println("Let’s hope the news will published before I leave.") // 等待發布新聞 time.Sleep(10 * time.Second) fmt.Println("Ten seconds later: I’m leaving now.") }
這個程序,絕大多數狀況下,會輸出如下三行,順序固定,每行輸出之間相隔5秒。
$ go run publish1.go Let’s hope the news will published before I leave. BREAKING NEWS: A goroutine starts a new thread of execution. Ten seconds later: I’m leaving now.
通常來講,經過睡眠的方式來編排線程之間相互等待是不太可能的。下一章節會介紹Go語言中的一種同步機制 - 管道,並演示如何使用管道讓一個goroutine等待另外一個goroutine。
管道是Go語言的一個構件,提供一種機制用於兩個goroutine之間經過傳遞一個指定類型的值來同步運行和通信。操做符<-
用於指定管道的方向,發送或接收。若是未指定方向,則爲雙向管道。
chan Sushi // 可用來發送和接收Sushi類型的值 chan<- float64 // 僅可用來發送float64類型的值 <-chan int // 僅可用來接收int類型的值
管道是引用類型,基於make函數來分配。
ic := make(chan int) // 不帶緩衝的int類型管道 wc := make(chan *Work, 10) // 帶緩衝的Work類型指針管道
若是經過管道發送一個值,則將<-
做爲二元操做符使用。經過管道接收一個值,則將其做爲一元操做符使用:
ic <- 3 // 往管道發送3 work := <-wc // 從管道接收一個指向Work類型值的指針
若是管道不帶緩衝,發送方會阻塞直到接收方從管道中接收了值。若是管道帶緩衝,發送方則會阻塞直到發送的值被拷貝到緩衝區內;若是緩衝區已滿,則意味着須要等待直到某個接收方獲取到一個值。接收方在有值能夠接收以前會一直阻塞。
關閉管道(Close)
close 函數標誌着不會再往某個管道發送值。在調用close
以後,而且在以前發送的值都被接收後,接收操做會返回一個零值,不會阻塞。一個多返回值的接收操做會額外返回一個布爾值用來指示返回的值是否發送操做傳遞的。
ch := make(chan string) go func() { ch <- "Hello!" close(ch) }() fmt.Println(<-ch) // 輸出字符串"Hello!" fmt.Println(<-ch) // 輸出零值 - 空字符串"",不會阻塞 fmt.Println(<-ch) // 再次打印輸出空字符串"" v, ok := <-ch // 變量v的值爲空字符串"",變量ok的值爲false
一個帶有range
子句的for
語句會依次讀取發往管道的值,直到該管道關閉:
func main() { // 譯註:要想運行該示例,須要先定義類型Sushi,如type Sushi string var ch <-chan Sushi = Producer() for s := range ch { fmt.Println("Consumed", s) } } func Producer() <-chan Sushi { ch := make(chan Sushi) go func(){ ch <- Sushi("海老握り") // Ebi nigiri ch <- Sushi("鮪とろ握り") // Toro nigiri close(ch) }() return ch }
下一個示例中,咱們讓Publish
函數返回一個管道 - 用於在發佈text變量值時廣播一條消息:
// 在給定時間過時時,Publish函數會打印text變量值到標準輸出 // 在text變量值發佈後,該函數會關閉管道wait func Publish(text string, delay time.Duration) (wait <-chan struct{}) { ch := make(chan struct{}) go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) close(ch) // 廣播 - 一個關閉的管道都會發送一個零值 }() return ch }
注意:咱們使用了一個空結構體的管道:struct{}
。這明確地指明該管道僅用於發信號,而不是傳遞數據。
咱們可能會這樣使用這個函數:
func main() { wait := Publish("Channels let goroutines communicate.", 5*time.Second) fmt.Println("Waiting for the news...") <-wait fmt.Println("The news is out, time to leave.") }
這個程序會按指定的順序輸出如下三行內容。最後一行在新聞(news)一出就會當即輸出。
$ go run publish2.go Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. The news is out, time to leave.
如今咱們在Publish
函數中引入一個bug:
func Publish(text string, delay time.Duration) (wait <-chan struct{}) { ch := make(chan struct{}) go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) // 譯註:注意這裏將close函數調用註釋掉了 //close(ch) }() return ch }
主程序仍是像以前同樣開始運行:輸出第一行,而後等待5秒,這時Publish
函數開啓的goroutine會輸出突發新聞(breaking news),而後退出,留下主goroutine獨自等待。
func main() { wait := Publish("Channels let goroutines communicate.", 5*time.Second) fmt.Println("Waiting for the news...") // 譯註:注意下面這一句 <-wait fmt.Println("The news is out, time to leave.") }
此刻以後,程序沒法再繼續往下執行。衆所周知,這種情形即爲死鎖。
死鎖是線程之間相互等待,其中任何一個都沒法向前運行的情形。
Go語言對於運行時的死鎖檢測具有良好的支持。當沒有任何goroutine可以往前執行的情形發生時,Go程序一般會提供詳細的錯誤信息。如下就是咱們的問題程序的輸出:
Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() .../goroutineStop.go:11 +0xf6 goroutine 2 [syscall]: created by runtime.main .../go/src/pkg/runtime/proc.c:225 goroutine 4 [timer goroutine (idle)]: created by addtimer .../go/src/pkg/runtime/ztime_linux_amd64.c:73
大多數狀況下找出Go程序中形成死鎖的緣由都比較容易,那麼剩下的就是如何解決這個bug了。
死鎖也許聽起來使人挺憂傷的,但伴隨併發編程真正災難性的錯誤實際上是數據競爭,至關常見,也可能很是難於調試。
當兩個線程併發地訪問同一個變量,而且其中至少一個訪問是寫操做時,數據競爭就發生了。
下面的這個函數就有數據競爭問題,其行爲是未定義的。例如,可能輸出數值1。代碼以後是一個可能性解釋,試圖搞清楚這一切是如何發生得。
func race() { wait := make(chan struct{}) n := 0 go func() { // 譯註:注意下面這一行 n++ // 一次訪問: 讀, 遞增, 寫 close(wait) }() // 譯註:注意下面這一行 n++ // 另外一次衝突的訪問 <-wait fmt.Println(n) // 輸出:未指定 }
代碼中的兩個goroutine(假設命名爲g1
和g2
)參與了一次競爭,咱們沒法獲知操做會以何種順序發生。如下是諸多可能中的一種:
g1
從 n
中獲取值0g2
從 n
中獲取值0g1
將值從0增大到1g1
將1寫到 n
g2
將值從0增大到1g2
將1寫到 n
「數據競爭(data race)」這名字有點誤導的嫌疑。不只操做的順序是未定義的,其實根本沒有任何保證(no guarantees whatsoever)。編譯器和硬件爲了獲得更好的性能,常常都會對代碼進行上下內外的順序變換。若是你看到一個線程處於中間行爲狀態時,那麼當時的場景可能就像下圖所示的同樣:
避免數據競爭的惟一方式是線程間同步訪問全部的共享可變數據。有幾種方式可以實現這一目標。Go語言中,一般是使用管道或者鎖。(sync和sync/atomic包中還有更低層次的機制可供使用,但本文中不作討論)。
Go語言中,處理併發數據訪問的推薦方式是使用管道從一個goroutine中往下一個goroutine傳遞實際的數據。有格言說得好:「不要經過共享內存來通信,而是經過通信來共享內存」。
func sharingIsCaring() { ch := make(chan int) go func() { n := 0 // 僅爲一個goroutine可見的局部變量. n++ ch <- n // 數據從一個goroutine離開... }() n := <-ch // ...而後安全到達另外一個goroutine. n++ fmt.Println(n) // 輸出: 2 }
以上代碼中的管道肩負雙重責任 - 從一個goroutine將數據傳遞到另外一個goroutine,而且起到同步的做用:發送方goroutine會等待另外一個goroutine接收數據,接收方goroutine也會等待另外一個goroutine發送數據。
Go語言內存模型 - 要保證一個goroutine中對一個變量的讀操做獲得的值正好是另外一個goroutine中對同一個變量寫操做產生的值,條件至關複雜,但goroutine之間只要經過管道來共享全部可變數據,那麼就能遠離數據競爭了。
有時,經過顯式加鎖,而不是使用管道,來同步數據訪問,可能更加便捷。Go語言標準庫爲這一目的提供了一個互斥鎖 - sync.Mutex。
要想這類加鎖起效的話,關鍵之處在於:全部對共享數據的訪問,無論讀寫,僅當goroutine持有鎖才能操做。一個goroutine出錯就足以破壞掉一個程序,引入數據競爭。
所以,應該設計一個自定義數據結構,具有明確的API,確保全部的同步都在數據結構內部完成。下例中,咱們構建了一個安全、易於使用的併發數據結構,AtomicInt
,用於存儲一個整型值。任意數量的goroutine都能經過Add
和Value
方法安全地訪問這個數值。
// AtomicInt是一個併發數據結構,持有一個整數值 // 該數據結構的零值爲0 type AtomicInt struct { mu sync.Mutex // 鎖,一次僅能被一個goroutine持有。 n int } // Add方法做爲一個原子操做將n加到AtomicInt func (a *AtomicInt) Add(n int) { a.mu.Lock() // 等待鎖釋放,而後持有它 a.n += n a.mu.Unlock() // 釋放鎖 } // Value方法返回a的值 func (a *AtomicInt) Value() int { a.mu.Lock() n := a.n a.mu.Unlock() return n } func lockItUp() { wait := make(chan struct{}) var n AtomicInt go func() { n.Add(1) // 一個訪問 close(wait) }() n.Add(1) // 另外一個併發訪問 <-wait fmt.Println(n.Value()) // 輸出: 2 }
競爭有時很是難於檢測。下例中的這個函數有一個數據競爭問題,執行這個程序時會輸出55555
。嘗試一下,也許你會獲得一個不一樣的結果。(sync.WaitGroup是Go語言標準庫的一部分;用於等待一組goroutine結束運行。)
func race() { var wg sync.WaitGroup wg.Add(5) // 譯註:注意下面這行代碼中的i++ for i := 0; i < 5; i++ { go func() { // 注意下一行代碼會輸出什麼?爲何? fmt.Print(i) // 6個goroutine共享變量i wg.Done() }() } wg.Wait() // 等待全部(5個)goroutine運行結束 fmt.Println() }
對於輸出55555
,一個貌似合理的解釋是:執行i++
的goroutine在其餘goroutine執行打印語句以前就完成了5次i++
操做。實際上變量i
更新後的值爲其餘goroutine所見純屬巧合。
一個簡單的解決方案是:使用一個局部變量,而後當開啓新的goroutine時,將數值做爲參數傳遞:
func correct() { var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { go func(n int) { // 使用局部變量 fmt.Print(n) wg.Done() }(i) } wg.Wait() fmt.Println() }
此次代碼就對了,程序會輸出指望的結果,如:24031
。注意:goroutine之間的運行順序是不肯定的。
仍舊使用閉包,但可以避免數據競爭也是可能的,必須當心翼翼地讓每一個goroutine使用一個獨有的變量。
func alsoCorrect() { var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { n := i // 爲每一個閉包建立一個獨有的變量 go func() { fmt.Print(n) wg.Done() }() } wg.Wait() fmt.Println() }
數據競爭自動檢測
通常來講,不太可能可以自動檢測發現全部可能的數據競爭狀況,但Go(從版本1.1開始)有一個強大的數據競爭檢測器。
這個工具用起來也很簡單:只要在使用go
命令時加上-race
標記便可。開啓檢測器運行上面的程序會給出清晰且信息量大的輸出:
$ go run -race raceClosure.go Race: ================== WARNING: DATA RACE Read by goroutine 2: main.func·001() ../raceClosure.go:22 +0x65 Previous write by goroutine 0: main.race() ../raceClosure.go:20 +0x19b main.main() ../raceClosure.go:10 +0x29 runtime.main() ../go/src/pkg/runtime/proc.c:248 +0x91 Goroutine 2 (running) created at: main.race() ../raceClosure.go:24 +0x18b main.main() ../raceClosure.go:10 +0x29 runtime.main() ../go/src/pkg/runtime/proc.c:248 +0x91 ================== 55555 Correct: 01234 Also correct: 01324 Found 1 data race(s) exit status 66
該工具發現一處數據競爭,包含:一個goroutine在第20行對一個變量進行寫操做,跟着另外一個goroutine在第22行對同一個變量進行了未同步的讀操做。
注意:競爭檢測器只能發如今運行期確實發生的數據競爭(譯註:我也不太理解這話,請指導)
select語句是Go語言併發工具集中的終極工具。select用於從一組可能的通信中選擇一個進一步處理。若是任意一個通信均可以進一步處理,則從中隨機選擇一個,執行對應的語句。不然,若是又沒有默認分支(default case),select語句則會阻塞,直到其中一個通信完成。
如下是一個玩具示例,演示select
語句如何用於實現一個隨機數生成器:
// RandomBits函數 返回一個管道,用於產生一個比特隨機序列 func RandomBits() <-chan int { ch := make(chan int) go func() { for { select { case ch <- 0: // 注意:分支沒有對應的處理語句 case ch <- 1: } } }() return ch }
下面是相對更加實際一點的例子:如何使用select語句爲一個操做設置一個時間限制。代碼會輸出變量news的值或者超時消息,具體依賴於兩個接收語句哪一個先執行:
select { case news := <-NewsAgency: fmt.Println(news) case <-time.After(time.Minute): fmt.Println("Time out: no news in one minute.") }
函數 time.After 是Go語言標準庫的一部分;它會在等待指定時間後將當前的時間發送到返回的管道中。
花點時間認真研究一下這個示例。若是你徹底理解,也就對Go語言中併發的應用方式有了全面的掌握。
這個程序演示瞭如何將管道用於被任意數量的goroutine發送和接收數據,也演示瞭如何將select語句用於從多個通信中選擇一個。
func main() { people := []string{"Anna", "Bob", "Cody", "Dave", "Eva"} match := make(chan string, 1) // 爲一個未匹配的發送操做提供空間 wg := new(sync.WaitGroup) wg.Add(len(people)) for _, name := range people { go Seek(name, match, wg) } wg.Wait() select { case name := <-match: fmt.Printf("No one received %s’s message.\n", name) default: // 沒有待處理的發送操做 } } // 函數Seek 發送一個name到match管道或從match管道接收一個peer,結束時通知wait group func Seek(name string, match chan string, wg *sync.WaitGroup) { select { case peer := <-match: fmt.Printf("%s sent a message to %s.\n", peer, name) case match <- name: // 等待某個goroutine接收個人消息 } wg.Done() }
示例輸出:
$ go run matching.go Cody sent a message to Bob. Anna sent a message to Eva. No one received Dave’s message.
併發的一個應用是將一個大的計算切分紅一些工做單元,調度到不一樣的CPU上同時地計算。
將計算分佈到多個CPU上更可能是一門藝術,而不是一門科學。如下是一些經驗法則:
下面的這個示例展現如何切分一個開銷很大的計算並將其分佈在全部可用的CPU上進行計算。先看一下有待優化的代碼:
type Vector []float64 // 函數Convolve 計算 w = u * v,其中 w[k] = Σ u[i]*v[j], i + j = k // 先決條件:len(u) > 0, len(v) > 0 func Convolve(u, v Vector) (w Vector) { n := len(u) + len(v) - 1 w = make(Vector, n) for k := 0; k < n; k++ { w[k] = mul(u, v, k) } return } // 函數mul 返回 Σ u[i]*v[j], i + j = k. func mul(u, v Vector, k int) (res float64) { n := min(k+1, len(u)) j := min(k, len(v)-1) for i := k - j; i < n; i, j = i+1, j-1 { res += u[i] * v[j] } return }
思路很簡單:肯定合適大小的工做單元,而後在不一樣的goroutine中執行每一個工做單元。如下是併發版本的 Convolve
:
func Convolve(u, v Vector) (w Vector) { n := len(u) + len(v) - 1 w = make(Vector, n) // 將 w 切分紅花費 ~100μs-1ms 用於計算的工做單元 size := max(1, 1<<20/n) wg := new(sync.WaitGroup) wg.Add(1 + (n-1)/size) for i := 0; i < n && i >= 0; i += size { // 整型溢出後 i < 0 j := i + size if j > n || j < 0 { // 整型溢出後 j < 0 j = n } // 這些goroutine共享內存,可是隻讀 go func(i, j int) { for k := i; k < j; k++ { w[k] = mul(u, v, k) } wg.Done() }(i, j) } wg.Wait() return }
工做單元定義以後,一般狀況下最好將調度工做交給運行時和操做系統。然而,對於Go 1.* 你也許須要告訴運行時但願多少個goroutine來同時地運行代碼。
func init() { numcpu := runtime.NumCPU() runtime.GOMAXPROCS(numcpu) // 嘗試使用全部可用的CPU }