Goroutine是Go中最基本的執行單元。事實上每個Go程序至少有一個goroutine:主goroutine。當程序啓動時,它會自動建立。html
事實上goroutine採用了一種fork-join的模型。golang
sayHello := func() { fmt.Println("hello") } go sayHello()
那咱們如何來join goroutine呢?須要引入wait操做:安全
var wg sync.WaitGroup() sayHello := func() { defer wg.Done() fmt.Println("hello") } wg.Add(1) go sayHello() wa.Wait()
goroutine是Go語言的基本調度單位,而channels則是它們之間的通訊機制。操做符<-用來指定管道的方向,發送或接收。若是未指定方向,則爲雙向管道。閉包
// 建立一個雙向channel ch := make(chan interface{})
interface{}表示chan能夠爲任何類型併發
channel有發送和接受兩個主要操做。發送和接收兩個操做都使用<-運算符。在發送語句中,channel放<-運算符左邊。在接收語句中,channel放<-運算符右邊。一個不使用接收結果的接收操做也是合法的。函數
// 發送操做 ch <- x // 接收操做 x = <-ch // 忽略接收到的值,合法 <-ch
咱們不能弄錯channel的方向:spa
writeStream := make(chan<- interface{}) readStream := make(<-chan interface{}) <-writeStream readStream <- struct{}{}
Channel支持close操做,用於關閉channel,後面對該channel的任何發送操做都將致使panic異常。對一個已經被close過的channel進行接收操做依然能夠接受到以前已經成功發送的數據;若是channel中已經沒有數據的話將產生一個零值的數據。設計
從已經關閉的channel中讀:3d
intStream := make(chan int) close(intStream) integer, ok := <- intStream fmt.Pritf("(%v): %v", ok, integer) // (false): 0
上面例子中經過返回值ok來判斷channel是否關閉,咱們還能夠經過range這種更優雅的方式來處理已經關閉的channel:code
intStream := make(chan int) go func() { defer close(intStream) for i:=1; i<=5; i++{ intStream <- i } }() for integer := range intStream { fmt.Printf("%v ", integer) } // 1 2 3 4 5
建立了一個能夠持有三個字符串元素的帶緩衝Channel:
ch = make(chan string, 3)
咱們能夠在無阻塞的狀況下連續向新建立的channel發送三個值:
ch <- "A" ch <- "B" ch <- "C"
此刻,channel的內部緩衝隊列將是滿的,若是有第四個發送操做將發生阻塞。
若是咱們接收一個值:
fmt.Println(<-ch) // "A"
那麼channel的緩衝隊列將不是滿的也不是空的,所以對該channel執行的發送或接收操做都不會發生阻塞。經過這種方式,channel的緩衝隊列緩衝解耦了接收和發送的goroutine。
帶緩衝的信道可被用做信號量,例如限制吞吐量。在此例中,進入的請求會被傳遞給 handle,它從信道中接收值,處理請求後將值發回該信道中,以便讓該 「信號量」 準備迎接下一次請求。信道緩衝區的容量決定了同時調用 process 的數量上限。
var sem = make(chan int, MaxOutstanding) func handle(r *Request) { sem <- 1 // 等待活動隊列清空。 process(r) // 可能須要很長時間。 <-sem // 完成;使下一個請求能夠運行。 } func Serve(queue chan *Request) { for { req := <-queue go handle(req) // 無需等待 handle 結束。 } }
然而,它卻有個設計問題:儘管只有 MaxOutstanding 個 goroutine 能同時運行,但 Serve 仍是爲每一個進入的請求都建立了新的 goroutine。其結果就是,若請求來得很快, 該程序就會無限地消耗資源。爲了彌補這種不足,咱們能夠經過修改 Serve 來限制建立 Go 程,這是個明顯的解決方案,但要小心咱們修復後出現的 Bug。
func Serve(queue chan *Request) { for req := range queue { sem <- 1 go func() { process(req) // 這兒有 Bug,解釋見下。 <-sem }() } }
Bug 出如今 Go 的 for 循環中,該循環變量在每次迭代時會被重用,所以 req 變量會在全部的 goroutine 間共享,這不是咱們想要的。咱們須要確保 req 對於每一個 goroutine 來講都是惟一的。有一種方法可以作到,就是將 req 的值做爲實參傳入到該 goroutine 的閉包中:
func Serve(queue chan *Request) { for req := range queue { sem <- 1 go func(req *Request) { process(req) <-sem }(req) } }
另外一種解決方案就是以相同的名字建立新的變量,如例中所示:
func Serve(queue chan *Request) { for req := range queue { req := req // 爲該 Go 程建立 req 的新實例。 sem <- 1 go func() { process(req) <-sem }() } }
下面再看一個Go語言聖經的例子。它併發地向三個鏡像站點發出請求,三個鏡像站點分散在不一樣的地理位置。它們分別將收到的響應發送到帶緩衝channel,最後接收者只接收第一個收到的響應,也就是最快的那個響應。所以mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應以前就返回告終果。
func mirroredQuery() string { responses := make(chan string, 3) go func() { responses <- request("asia.gopl.io") }() go func() { responses <- request("europe.gopl.io") }() go func() { responses <- request("americas.gopl.io") }() // 僅僅返回最快的那個response return <-responses } func request(hostname string) (response string) { /* ... */ }
若是咱們使用了無緩衝的channel,那麼兩個慢的goroutines將會由於沒有人接收而被永遠卡住。這種狀況,稱爲goroutines泄漏,這將是一個BUG。和垃圾變量不一樣,泄漏的goroutines並不會被自動回收,所以確保每一個再也不須要的goroutine能正常退出是重要的。
Go 最重要的特性就是信道是first-class value,它能夠被分配並像其它值處處傳遞。 這種特性一般被用來實現安全、並行的多路分解。
咱們能夠利用這個特性來實現一個簡單的RPC。
如下爲 Request 類型的大概定義。
type Request struct { args []int f func([]int) int resultChan chan int }
客戶端提供了一個函數及其實參,此外在請求對象中還有個接收應答的信道。
func sum(a []int) (s int) { for _, v := range a { s += v } return } request := &Request{[]int{3, 4, 5}, sum, make(chan int)} // 發送請求 clientRequests <- request // 等待迴應 fmt.Printf("answer: %d\n", <-request.resultChan)
服務端的handler函數:
func handle(queue chan *Request) { for req := range queue { req.resultChan <- req.f(req.args) } }
Channels也能夠用於將多個goroutine鏈接在一塊兒,一個Channel的輸出做爲下一個Channel的輸入。這種串聯的Channels就是所謂的管道(pipeline)。下面的程序用兩個channels將三個goroutine串聯起來:
第一個goroutine是一個計數器,用於生成0、一、二、……形式的整數序列,而後經過channel將該整數序列發送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每一個整數求平方,而後將平方後的結果經過第二個channel發送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每一個整數。
func counter(out chan<- int) { for x := 0; x < 100; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { naturals := make(chan int) squares := make(chan int) go counter(naturals) go squarer(squares, naturals) printer(squares) }
select用於從一組可能的通信中選擇一個進一步處理。若是任意一個通信均可以進一步處理,則從中隨機選擇一個,執行對應的語句。不然,若是又沒有默認分支(default case),select語句則會阻塞,直到其中一個通信完成。
select { case <-ch1: // ... case x := <-ch2: // ...use x... case ch3 <- y: // ... default: // ... }
如何使用select語句爲一個操做設置一個時間限制。代碼會輸出變量news的值或者超時消息,具體依賴於兩個接收語句哪一個先執行:
select { case news := <-NewsAgency: fmt.Println(news) case <-time.After(time.Minute): fmt.Println("Time out: no news in one minute.") }
下面的select語句會在abort channel中有值時,從其中接收值;無值時什麼都不作。這是一個非阻塞的接收操做;反覆地作這樣的操做叫作「輪詢channel」。
select { case <-abort: fmt.Printf("Launch aborted!\n") return default: // do nothing }
參考資料。