《Go 語言程序設計》讀書筆記 (五) 協程與通道

Goroutines

  • 在Go語言中,每個併發的執行單元叫做goroutine。設想一個程序中有兩個函數,假設兩個函數沒有相互之間的調用關係。一個線性的程序會先調用其中的一個函數,而後再調用另外一個。若是程序中包含多個goroutine,對兩個函數的調用則可能發生在同一時刻。
  • 當一個程序啓動時,其main函數即在一個單獨的goroutine中運行,咱們叫它main goroutine。新的goroutine會用go語句來建立。在語法上,go語句是在一個普通的函數或方法調用前加上關鍵字go。go語句會使其語句中的函數在一個新建立的goroutine中運行。而go語句自己會迅速地完成。
f()    // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
  • 主函數返回時,全部的goroutine都會被直接打斷,程序退出。除了從主函數退出或者直接終止程序以外,沒有其它的編程方法可以讓一個goroutine來打斷另外一個的執行,可是以後能夠看到一種方式來實現這個目的,經過goroutine之間的通訊來讓一個goroutine請求其它的goroutine,並被請求的goroutine自行結束執行。

Channel

  • 若是說goroutine是Go語言程序的併發體的話,那麼channels它們之間的通訊機制。它可讓一個goroutine經過它給另外一個goroutine發送值信息。每一個channel都有一個特殊的類型,也就是channel可發送數據的類型。一個能夠發送int類型數據的channel通常寫爲chan int。
  • 使用內置的make函數,咱們能夠建立一個channel:
ch := make(chan int)
  • 和map相似,channel也一個對make函數建立的底層數據結構的引用。當咱們複製一個channel或把 channel用於函數參數傳遞時,咱們只是拷貝了一個channel引用,所以調用者和被調用者將引用同一個channel對象。和其它的引用類型同樣,channel的零值也是nil。
  • channel有發送和接收兩個主要操做,都是通訊行爲。一個發送語句將一個值從一個goroutine經過channel發送到另外一個執行接收操做的goroutine。發送和接收兩個操做都是用<-運算符。在發送語句中,<-運算符分割channel和要發送的值。在接收語句中,<-運算符寫在channel對象以前。一個不使用接收結果的接收操做也是合法的。
ch <- x  // 發送消息
x = <-ch // 從 channel 中接收消息
<-ch     // 從 channel 接收並丟棄消息
  • Channel還支持close操做,用於關閉channel,隨後對基於該channel的任何發送操做都將致使panic異常。對一個已經被close過的channel執行接收操做依然能夠接收到以前已經成功發送的數據;若是channel中已經沒有數據的話將產生一個零值的數據。

    使用內置的close函數就能夠關閉一個channel:編程

close(ch)

以最簡單方式調用make函數建立的時一個無緩衝的channel,可是咱們也能夠指定第二個整形參數,對應channel的容量。若是channel的容量大於零,那麼該channel就是帶緩衝的channel。緩存

ch = make(chan int)    // unbuffered channel
  ch = make(chan int, 0) // unbuffered channel
  ch = make(chan int, 3) // buffered channel with capacity 3

無緩衝 channel

  • 一個基於無緩衝Channel的發送操做將致使發送者goroutine阻塞,直到另外一個goroutine在相同的Channel上執行接收操做,當發送的值經過Channel成功傳輸以後,兩個goroutine能夠繼續執行後面的語句。反之,若是接收操做先發生,那麼接收者goroutine也將阻塞,直到有另外一個goroutine在相同的Channel上執行發送操做。
  • 下面的程序在 main 函數的 goroutine 中將標準輸入複製到server,所以當客戶端程序關閉標準輸入時,後臺goroutine可能依然在工做。咱們須要讓主goroutine等待後臺goroutine完成工做後再退出,咱們使用了一個channel來同步兩個goroutine,在後臺goroutine返回以前,它先打印一個日誌信息,而後向done對應的channel發送一個值。主goroutine在退出前先等待從done對應的channel接收一個值。所以,老是能夠在程序退出前正確輸出「done」消息。
func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})
    go func() {
        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
        log.Println("done")
        done <- struct{}{} // signal the main goroutine
    }()
    mustCopy(conn, os.Stdin)
    conn.Close()
    <-done // wait for background goroutine to finish
}
  • 基於channel發送消息有兩個重要方面。首先每一個消息都有一個值,可是有時候通信的事實和發生的時刻也一樣重要。當咱們更但願強調通信發生的時刻時,咱們將它稱爲消息事件。有些消息事件並不攜帶額外的信息,它僅僅是用做兩個goroutine之間的同步,這時候咱們能夠用struct{}空結構體做爲channels元素的類型,雖然也可使用bool或int類型實現一樣的功能,done <- 1語句也比done <- struct{}{}更短。
  • 若是發送者知道,沒有更多的值須要發送到channel的話,那麼讓接收者也能及時知道沒有多餘的值可接收將是有用的,由於接收者能夠中止沒必要要的接收等待。這能夠經過內置的close函數來關閉channel實現:
close(naturals)
  • 當一個channel被關閉後,再向該channel發送數據將致使panic異常。當一個被關閉的channel中已經發送的數據都被成功接收後,後續的接收操做將再也不阻塞,它們會當即返回一個零值。
  • 接收 channel 語句中能夠額外增長第二個值,標識 chnnel 是否已經關閉
x, ok := <-naturals
  • Go語言的range循環可直接在channels上面迭代。使用range循環是上面處理模式的簡潔語法,它依次從channel接收數據,當channel被關閉而且沒有值可接收時跳出循環。

在下面的程序中,咱們的計數器goroutine只生成100個含數字的序列,而後關閉naturals對應的channel,這將致使計算平方數的squarer對應的goroutine能夠正常終止循環並關閉squares對應的channel。(在一個更復雜的程序中,能夠經過defer語句關閉對應的channel。)最後,主goroutine也能夠正常終止循環並退出程序。安全

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    // Counter
    go func() {
        for x := 0; x < 100; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    // Squarer
    go func() {
        for x := range naturals {
            squares <- x * x
        }
        close(squares)
    }()

    // Printer (in main goroutine)
    for x := range squares {
        fmt.Println(x)
    }
}
  • 試圖重複關閉一個channel將致使panic異常,試圖關閉一個nil值的channel也將致使panic異常。關閉一個channels還會觸發一個廣播機制,咱們將在後面討論。

單方向的 channel

  • 當一個channel做爲一個函數參數是,它通常老是被專門用於只發送或者只接收。

    爲了代表這種意圖並防止被濫用,Go語言的類型系統提供了單方向的channel類型,分別用於只發送或只接收的channel。類型chan<- int表示一個只發送int的channel,只能發送不能接收。相反,類型<-chan int表示一個只接收int的channel,只能接收不能發送。(箭頭<-和關鍵字chan的相對位置代表了channel的方向。)這種限制將在編譯期檢測。性能優化

  • 由於關閉操做只用於斷言再也不向channel發送新的數據,因此只有在發送者所在的goroutine纔會調用close函數,所以對一個只接收的channel調用close將是一個編譯錯誤。

這是改進的版本,這一次參數使用了單方向channel類型:數據結構

func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

調用counter(naturals)將致使將chan int類型的naturals隱式地轉換爲chan<- int類型只發送型的channel。調用printer(squares)也會致使類似的隱式轉換,這一次是轉換爲<-chan int類型只接收型的channel。任何雙向channel向單向channel變量的賦值操做都將致使該隱式轉換。併發

帶緩衝的 channel

帶緩存的Channel內部持有一個元素隊列。隊列的最大容量是在調用make函數建立channel時經過第二個參數指定的。下面的語句建立了一個能夠持有三個字符串元素的帶緩存Channel。圖8.2是ch變量對應的channel的圖形表示形式。tcp

ch = make(chan string, 3)

img

向緩存Channel的發送操做就是向內部緩存隊列的尾部插入元素,接收操做則是從隊列的頭部刪除元素。若是內部緩存隊列是滿的,那麼發送操做將阻塞直到因另外一個goroutine執行接收操做而釋放了新的隊列空間。相反,若是channel是空的,接收操做將阻塞直到有另外一個goroutine執行發送操做而向隊列插入元素。函數

咱們能夠在無阻塞的狀況下連續向新建立的channel發送三個值:性能

ch <- "A"
ch <- "B"
ch <- "C"

此刻,channel的內部緩存隊列將是滿的(圖8.3),若是有第四個發送操做將發生阻塞。優化

img

若是咱們接收一個值,

fmt.Println(<-ch) // "A"

那麼channel的緩存隊列將不是滿的也不是空的(圖8.4),所以對該channel執行的發送或接收操做都不會發送阻塞。經過這種方式,channel的緩存隊列解耦了接收和發送的goroutine。

img

在某些特殊狀況下,程序可能須要知道channel內部緩存的容量,能夠用內置的cap函數獲取:

fmt.Println(cap(ch)) // "3"

一樣,對於內置的len函數,若是傳入的是channel,那麼將返回channel內部緩存隊列中有效元素的個數。由於在併發程序中該信息會隨着接收操做而失效,可是它對某些故障診斷和性能優化會有幫助。

fmt.Println(len(ch)) // "2"

在繼續執行兩次接收操做後channel內部的緩存隊列將又成爲空的,若是有第四個接收操做將發生阻塞:

fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"

下面的例子展現了一個使用了帶緩存channel的應用。它併發地向三個鏡像站點發出請求,三個鏡像站點分散在不一樣的地理位置。它們分別將收到的響應發送到帶緩存channel,最後接收者只接收第一個收到的響應,也就是最快的那個響應。所以mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應以前就返回告終果。(順便說一下,多個goroutines併發地向同一個channel發送數據,或從同一個channel接收數據都是常見的用法。)

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}

func request(hostname string) (response string) { /* ... */ }

若是咱們使用了無緩存的channel,那麼兩個慢的goroutines將會由於沒有人接收而被永遠卡住。這種狀況,稱爲goroutines泄漏,這將是一個BUG。和垃圾變量不一樣,泄漏的goroutines並不會被自動回收,所以確保每一個再也不須要的goroutine能正常退出是重要的。

關於無緩存或帶緩存channel之間的選擇,或者是帶緩存channel的容量大小的選擇,均可能影響程序的正確性。無緩存channel更強地保證了每一個發送操做與相應的同步接收操做;可是對於帶緩存channel,這些操做是解耦的。一樣,即便咱們知道將要發送到一個channel的信息的數量上限,建立一個對應容量大小帶緩存channel也是不現實的,由於這要求在執行任何接收操做以前緩存全部已經發送的值。若是未能分配足夠的緩衝將致使程序死鎖。

用帶緩衝的channel 控制併發數量

此外對於buffered channel,咱們能夠用一個有容量限制的buffered channel來控制併發,這相似於操做系統裏的計數信號量概念。從概念上講,channel裏的n個空槽表明n個能夠處理內容的token(通行證),從channel裏接收一個值會釋放其中的一個token,而且生成一個新的空槽位。這樣保證了在沒有接收介入時最多有n個發送操做。(這裏可能咱們拿channel裏填充的槽來作token更直觀一些,不過仍是這樣吧~)。因爲channel裏的元素類型並不重要,咱們用一個零值的struct{}來做爲其元素。

下面的crawl函數,將對links.Extract的調用操做用獲取、釋放token的操做包裹起來,來確保同一時間對其只有20個調用。信號量數量和其能操做的IO資源數量應保持接近。

// goroutine獲取token後,能夠進行抓取操做,若是滿20了
// 那麼 goroutine 會等到有獲取 token 後再去執行
var tokens = make(chan struct{}, 20)

func crawl(url string) []string {
    fmt.Println(url)
    tokens <- struct{}{} // 獲取 token
    list, err := links.Extract(url)
    <-tokens // 釋放 token
    if err != nil {
        log.Print(err)
    }
    return list
}

併發循環的一個典型示例

在併發循環中爲了知道最後一個goroutine何時結束(最後一個結束並不必定是最後一個開始),咱們須要一個遞增的計數器,在每個goroutine啓動時加一,在goroutine退出時減一。這須要一種特殊的計數器,這個計數器須要在多個goroutine操做時作到安全而且提供在其減爲零以前一直等待的一種方法。這種計數類型被稱爲sync.WaitGroup,下面的代碼就用到了這種方法:

// makeThumbnails6爲從通道中接收到的每一個文件建立縮略圖。
// 返回每一個建立的縮略圖所佔的本身數。
func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup // number of working goroutines
    for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb) // OK to ignore error
            sizes <- info.Size()
        }(f)
    }

    // closer
    go func() {
        wg.Wait()
        close(sizes)
    }()

    var total int64
    for size := range sizes {
        total += size
    }
    return total
}

注意Add和Done方法的不對策。Add是爲計數器加一,必須在worker goroutine開始以前調用,而不是在goroutine中;不然的話咱們沒辦法肯定Add是在"closer" goroutine調用Wait以前被調用。而且Add還有一個參數,但Done卻沒有任何參數;其實它和Add(-1)是等價的。咱們使用defer來確保計數器即便是在出錯的狀況下依然可以正確地被減掉。上面的程序代碼結構是當咱們使用併發循環,但又不知道迭代次數時很一般並且很地道的寫法。

select多通道複用

select語句的通常形式,和switch語句稍微有點類似。也會有幾個case和最後的default選擇支。每個case表明一個通訊操做(在某個channel上進行發送或者接收)而且會包含一些語句組成的一個語句塊。

select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}

一個接收表達式可能只包含接收表達式自身(譯註:不把接收到的值賦值給變量什麼的),就像上面的第一個case,或者包含在一個簡短的變量聲明中,像第二個case裏同樣;第二種形式讓你可以在當前 case 塊中引用接收到的值。

select會等待case中有可以執行的case時去執行。當條件知足時,select纔會去通訊並執行case以後的語句;這時候其它通訊是不會執行的。一個沒有任何case的select語句寫做select{},會永遠地等待下去。

下面這個例子更微秒。ch這個channel的buffer大小是1,因此會交替的爲空或爲滿,因此只有一個case能夠進行下去,不管i是奇數或者偶數,它都會打印0 2 4 6 8。

ch := make(chan int, 1)
for i := 0; i < 10; i++ {
    select {
    case x := <-ch:
        fmt.Println(x) // "0" "2" "4" "6" "8"
    case ch <- i:
    }
}

若是多個case同時就緒時,select會隨機地選擇一個執行,這樣來保證每個channel都有平等的被select的機會。增長上面例子的buffer大小會使其輸出變得不肯定,由於當buffer既不爲滿也不爲空時,select語句的執行狀況就像是拋硬幣的行爲同樣是隨機的。
tWbHIMFsM3.png

相關文章
相關標籤/搜索