f() // call f(); wait for it to return go f() // create a new goroutine that calls f(); don't wait
ch := make(chan int)
<-
運算符。在發送語句中,<-
運算符分割channel和要發送的值。在接收語句中,<-
運算符寫在channel對象以前。一個不使用接收結果的接收操做也是合法的。ch <- x // 發送消息 x = <-ch // 從 channel 中接收消息 <-ch // 從 channel 接收並丟棄消息
使用內置的close函數就能夠關閉一個channel:編程
close(ch)
以最簡單方式調用make函數建立的時一個無緩衝的channel,可是咱們也能夠指定第二個整形參數,對應channel的容量。若是channel的容量大於零,那麼該channel就是帶緩衝的channel。緩存
ch = make(chan int) // unbuffered channel ch = make(chan int, 0) // unbuffered channel ch = make(chan int, 3) // buffered channel with capacity 3
func main() { conn, err := net.Dial("tcp", "localhost:8000") if err != nil { log.Fatal(err) } done := make(chan struct{}) go func() { io.Copy(os.Stdout, conn) // NOTE: ignoring errors log.Println("done") done <- struct{}{} // signal the main goroutine }() mustCopy(conn, os.Stdin) conn.Close() <-done // wait for background goroutine to finish }
struct{}
空結構體做爲channels元素的類型,雖然也可使用bool或int類型實現一樣的功能,done <- 1
語句也比done <- struct{}{}
更短。close(naturals)
x, ok := <-naturals
在下面的程序中,咱們的計數器goroutine只生成100個含數字的序列,而後關閉naturals對應的channel,這將致使計算平方數的squarer對應的goroutine能夠正常終止循環並關閉squares對應的channel。(在一個更復雜的程序中,能夠經過defer語句關閉對應的channel。)最後,主goroutine也能夠正常終止循環並退出程序。安全
func main() { naturals := make(chan int) squares := make(chan int) // Counter go func() { for x := 0; x < 100; x++ { naturals <- x } close(naturals) }() // Squarer go func() { for x := range naturals { squares <- x * x } close(squares) }() // Printer (in main goroutine) for x := range squares { fmt.Println(x) } }
爲了代表這種意圖並防止被濫用,Go語言的類型系統提供了單方向的channel類型,分別用於只發送或只接收的channel。類型chan<- int
表示一個只發送int的channel,只能發送不能接收。相反,類型<-chan int
表示一個只接收int的channel,只能接收不能發送。(箭頭<-
和關鍵字chan的相對位置代表了channel的方向。)這種限制將在編譯期檢測。性能優化
這是改進的版本,這一次參數使用了單方向channel類型:數據結構
func counter(out chan<- int) { for x := 0; x < 100; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { naturals := make(chan int) squares := make(chan int) go counter(naturals) go squarer(squares, naturals) printer(squares) }
調用counter(naturals)將致使將chan int
類型的naturals隱式地轉換爲chan<- int
類型只發送型的channel。調用printer(squares)也會致使類似的隱式轉換,這一次是轉換爲<-chan int
類型只接收型的channel。任何雙向channel向單向channel變量的賦值操做都將致使該隱式轉換。併發
帶緩存的Channel內部持有一個元素隊列。隊列的最大容量是在調用make函數建立channel時經過第二個參數指定的。下面的語句建立了一個能夠持有三個字符串元素的帶緩存Channel。圖8.2是ch變量對應的channel的圖形表示形式。tcp
ch = make(chan string, 3)
向緩存Channel的發送操做就是向內部緩存隊列的尾部插入元素,接收操做則是從隊列的頭部刪除元素。若是內部緩存隊列是滿的,那麼發送操做將阻塞直到因另外一個goroutine執行接收操做而釋放了新的隊列空間。相反,若是channel是空的,接收操做將阻塞直到有另外一個goroutine執行發送操做而向隊列插入元素。函數
咱們能夠在無阻塞的狀況下連續向新建立的channel發送三個值:性能
ch <- "A" ch <- "B" ch <- "C"
此刻,channel的內部緩存隊列將是滿的(圖8.3),若是有第四個發送操做將發生阻塞。優化
若是咱們接收一個值,
fmt.Println(<-ch) // "A"
那麼channel的緩存隊列將不是滿的也不是空的(圖8.4),所以對該channel執行的發送或接收操做都不會發送阻塞。經過這種方式,channel的緩存隊列解耦了接收和發送的goroutine。
在某些特殊狀況下,程序可能須要知道channel內部緩存的容量,能夠用內置的cap函數獲取:
fmt.Println(cap(ch)) // "3"
一樣,對於內置的len函數,若是傳入的是channel,那麼將返回channel內部緩存隊列中有效元素的個數。由於在併發程序中該信息會隨着接收操做而失效,可是它對某些故障診斷和性能優化會有幫助。
fmt.Println(len(ch)) // "2"
在繼續執行兩次接收操做後channel內部的緩存隊列將又成爲空的,若是有第四個接收操做將發生阻塞:
fmt.Println(<-ch) // "B" fmt.Println(<-ch) // "C"
下面的例子展現了一個使用了帶緩存channel的應用。它併發地向三個鏡像站點發出請求,三個鏡像站點分散在不一樣的地理位置。它們分別將收到的響應發送到帶緩存channel,最後接收者只接收第一個收到的響應,也就是最快的那個響應。所以mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應以前就返回告終果。(順便說一下,多個goroutines併發地向同一個channel發送數據,或從同一個channel接收數據都是常見的用法。)
func mirroredQuery() string { responses := make(chan string, 3) go func() { responses <- request("asia.gopl.io") }() go func() { responses <- request("europe.gopl.io") }() go func() { responses <- request("americas.gopl.io") }() return <-responses // return the quickest response } func request(hostname string) (response string) { /* ... */ }
若是咱們使用了無緩存的channel,那麼兩個慢的goroutines將會由於沒有人接收而被永遠卡住。這種狀況,稱爲goroutines泄漏,這將是一個BUG。和垃圾變量不一樣,泄漏的goroutines並不會被自動回收,所以確保每一個再也不須要的goroutine能正常退出是重要的。
關於無緩存或帶緩存channel之間的選擇,或者是帶緩存channel的容量大小的選擇,均可能影響程序的正確性。無緩存channel更強地保證了每一個發送操做與相應的同步接收操做;可是對於帶緩存channel,這些操做是解耦的。一樣,即便咱們知道將要發送到一個channel的信息的數量上限,建立一個對應容量大小帶緩存channel也是不現實的,由於這要求在執行任何接收操做以前緩存全部已經發送的值。若是未能分配足夠的緩衝將致使程序死鎖。
此外對於buffered channel,咱們能夠用一個有容量限制的buffered channel來控制併發,這相似於操做系統裏的計數信號量概念。從概念上講,channel裏的n個空槽表明n個能夠處理內容的token(通行證),從channel裏接收一個值會釋放其中的一個token,而且生成一個新的空槽位。這樣保證了在沒有接收介入時最多有n個發送操做。(這裏可能咱們拿channel裏填充的槽來作token更直觀一些,不過仍是這樣吧~)。因爲channel裏的元素類型並不重要,咱們用一個零值的struct{}來做爲其元素。
下面的crawl
函數,將對links.Extract
的調用操做用獲取、釋放token的操做包裹起來,來確保同一時間對其只有20個調用。信號量數量和其能操做的IO
資源數量應保持接近。
// goroutine獲取token後,能夠進行抓取操做,若是滿20了 // 那麼 goroutine 會等到有獲取 token 後再去執行 var tokens = make(chan struct{}, 20) func crawl(url string) []string { fmt.Println(url) tokens <- struct{}{} // 獲取 token list, err := links.Extract(url) <-tokens // 釋放 token if err != nil { log.Print(err) } return list }
在併發循環中爲了知道最後一個goroutine何時結束(最後一個結束並不必定是最後一個開始),咱們須要一個遞增的計數器,在每個goroutine啓動時加一,在goroutine退出時減一。這須要一種特殊的計數器,這個計數器須要在多個goroutine操做時作到安全而且提供在其減爲零以前一直等待的一種方法。這種計數類型被稱爲sync.WaitGroup,下面的代碼就用到了這種方法:
// makeThumbnails6爲從通道中接收到的每一個文件建立縮略圖。 // 返回每一個建立的縮略圖所佔的本身數。 func makeThumbnails6(filenames <-chan string) int64 { sizes := make(chan int64) var wg sync.WaitGroup // number of working goroutines for f := range filenames { wg.Add(1) // worker go func(f string) { defer wg.Done() thumb, err := thumbnail.ImageFile(f) if err != nil { log.Println(err) return } info, _ := os.Stat(thumb) // OK to ignore error sizes <- info.Size() }(f) } // closer go func() { wg.Wait() close(sizes) }() var total int64 for size := range sizes { total += size } return total }
注意Add和Done方法的不對策。Add是爲計數器加一,必須在worker goroutine開始以前調用,而不是在goroutine中;不然的話咱們沒辦法肯定Add是在"closer" goroutine調用Wait以前被調用。而且Add還有一個參數,但Done卻沒有任何參數;其實它和Add(-1)是等價的。咱們使用defer來確保計數器即便是在出錯的狀況下依然可以正確地被減掉。上面的程序代碼結構是當咱們使用併發循環,但又不知道迭代次數時很一般並且很地道的寫法。
select語句的通常形式,和switch語句稍微有點類似。也會有幾個case和最後的default選擇支。每個case表明一個通訊操做(在某個channel上進行發送或者接收)而且會包含一些語句組成的一個語句塊。
select { case <-ch1: // ... case x := <-ch2: // ...use x... case ch3 <- y: // ... default: // ... }
一個接收表達式可能只包含接收表達式自身(譯註:不把接收到的值賦值給變量什麼的),就像上面的第一個case,或者包含在一個簡短的變量聲明中,像第二個case裏同樣;第二種形式讓你可以在當前 case 塊中引用接收到的值。
select會等待case中有可以執行的case時去執行。當條件知足時,select纔會去通訊並執行case以後的語句;這時候其它通訊是不會執行的。一個沒有任何case的select語句寫做select{},會永遠地等待下去。
下面這個例子更微秒。ch這個channel的buffer大小是1,因此會交替的爲空或爲滿,因此只有一個case能夠進行下去,不管i是奇數或者偶數,它都會打印0 2 4 6 8。
ch := make(chan int, 1) for i := 0; i < 10; i++ { select { case x := <-ch: fmt.Println(x) // "0" "2" "4" "6" "8" case ch <- i: } }
若是多個case同時就緒時,select會隨機地選擇一個執行,這樣來保證每個channel都有平等的被select的機會。增長上面例子的buffer大小會使其輸出變得不肯定,由於當buffer既不爲滿也不爲空時,select語句的執行狀況就像是拋硬幣的行爲同樣是隨機的。