【Go併發編程】第二篇 - Goroutines和Channels

Goroutines

Goroutine是Go中最基本的執行單元。事實上每個Go程序至少有一個goroutine:主goroutine。當程序啓動時,它會自動建立。html

事實上goroutine採用了一種fork-join的模型。 golang

sayHello := func() {
	fmt.Println("hello")
}

go sayHello()
複製代碼

那咱們如何來join goroutine呢?須要引入wait操做:安全

var wg sync.WaitGroup()
sayHello := func() {
	defer wg.Done()
	fmt.Println("hello")
}

wg.Add(1)
go sayHello()
wa.Wait()
複製代碼

Channel

讀寫channel

goroutine是Go語言的基本調度單位,而channels則是它們之間的通訊機制。操做符<-用來指定管道的方向,發送或接收。若是未指定方向,則爲雙向管道。bash

// 建立一個雙向channel
ch := make(chan interface{})
複製代碼

interface{}表示chan能夠爲任何類型閉包

channel有發送和接受兩個主要操做。發送和接收兩個操做都使用<-運算符。在發送語句中,channel放<-運算符左邊。在接收語句中,channel放<-運算符右邊。一個不使用接收結果的接收操做也是合法的。併發

// 發送操做
ch <- x 
// 接收操做
x = <-ch 
// 忽略接收到的值,合法
<-ch     
複製代碼

咱們不能弄錯channel的方向:函數

writeStream := make(chan<- interface{})
readStream := make(<-chan interface{})

<-writeStream
readStream <- struct{}{}
複製代碼

上面的語句會產生以下錯誤:ui

invalid operation: <-writeStream (receive from send-only type chan<- interface {}) invalid operation: readStream <- struct {} literal (send to receive-only type <-chan interface {})
複製代碼

關閉channel

Channel支持close操做,用於關閉channel,後面對該channel的任何發送操做都將致使panic異常。對一個已經被close過的channel進行接收操做依然能夠接受到以前已經成功發送的數據;若是channel中已經沒有數據的話將產生一個零值的數據。spa

從已經關閉的channel中讀:設計

intStream := make(chan int) 
close(intStream)
integer, ok := <- intStream
fmt.Pritf("(%v): %v", ok, integer)
// (false): 0
複製代碼

上面例子中經過返回值ok來判斷channel是否關閉,咱們還能夠經過range這種更優雅的方式來處理已經關閉的channel:

intStream := make(chan int) 
go func() {
	defer close(intStream) 
	for i:=1; i<=5; i++{ 
		intStream <- i 
	}
}()

for integer := range intStream { 
	fmt.Printf("%v ", integer)
}
// 1 2 3 4 5
複製代碼

帶緩衝(buffered)的channel

建立了一個能夠持有三個字符串元素的帶緩衝Channel:

ch = make(chan string, 3)
複製代碼

咱們能夠在無阻塞的狀況下連續向新建立的channel發送三個值:

ch <- "A"
ch <- "B"
ch <- "C"
複製代碼

此刻,channel的內部緩衝隊列將是滿的,若是有第四個發送操做將發生阻塞。

若是咱們接收一個值:

fmt.Println(<-ch) // "A"
複製代碼

那麼channel的緩衝隊列將不是滿的也不是空的,所以對該channel執行的發送或接收操做都不會發生阻塞。經過這種方式,channel的緩衝隊列緩衝解耦了接收和發送的goroutine。

帶緩衝的信道可被用做信號量,例如限制吞吐量。在此例中,進入的請求會被傳遞給 handle,它從信道中接收值,處理請求後將值發回該信道中,以便讓該 「信號量」 準備迎接下一次請求。信道緩衝區的容量決定了同時調用 process 的數量上限。

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1 // 等待活動隊列清空。
    process(r)  // 可能須要很長時間。
    <-sem    // 完成;使下一個請求能夠運行。
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // 無需等待 handle 結束。
    }
}
複製代碼

然而,它卻有個設計問題:儘管只有 MaxOutstanding 個 goroutine 能同時運行,但 Serve 仍是爲每一個進入的請求都建立了新的 goroutine。其結果就是,若請求來得很快, 該程序就會無限地消耗資源。爲了彌補這種不足,咱們能夠經過修改 Serve 來限制建立 Go 程,這是個明顯的解決方案,但要小心咱們修復後出現的 Bug。

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // 這兒有 Bug,解釋見下。
            <-sem
        }()
    }
}
複製代碼

Bug 出如今 Go 的 for 循環中,該循環變量在每次迭代時會被重用,所以 req 變量會在全部的 goroutine 間共享,這不是咱們想要的。咱們須要確保 req 對於每一個 goroutine 來講都是惟一的。有一種方法可以作到,就是將 req 的值做爲實參傳入到該 goroutine 的閉包中:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}
複製代碼

另外一種解決方案就是以相同的名字建立新的變量,如例中所示:

func Serve(queue chan *Request) {
    for req := range queue {
        req := req // 爲該 Go 程建立 req 的新實例。
        sem <- 1
        go func() {
            process(req)
            <-sem
        }()
    }
}
複製代碼

下面再看一個Go語言聖經的例子。它併發地向三個鏡像站點發出請求,三個鏡像站點分散在不一樣的地理位置。它們分別將收到的響應發送到帶緩衝channel,最後接收者只接收第一個收到的響應,也就是最快的那個響應。所以mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應以前就返回告終果。

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    // 僅僅返回最快的那個response
    return <-responses 
}

func request(hostname string) (response string) { /* ... */ }
複製代碼

若是咱們使用了無緩衝的channel,那麼兩個慢的goroutines將會由於沒有人接收而被永遠卡住。這種狀況,稱爲goroutines泄漏,這將是一個BUG。和垃圾變量不一樣,泄漏的goroutines並不會被自動回收,所以確保每一個再也不須要的goroutine能正常退出是重要的。

Channels of channels

Go 最重要的特性就是信道是first-class value,它能夠被分配並像其它值處處傳遞。 這種特性一般被用來實現安全、並行的多路分解。

咱們能夠利用這個特性來實現一個簡單的RPC。 如下爲 Request 類型的大概定義。

type Request struct {
    args        []int
    f           func([]int) int resultChan chan int } 複製代碼

客戶端提供了一個函數及其實參,此外在請求對象中還有個接收應答的信道。

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 發送請求
clientRequests <- request
// 等待迴應
fmt.Printf("answer: %d\n", <-request.resultChan)
複製代碼

服務端的handler函數:

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}
複製代碼

Channels pipeline

Channels也能夠用於將多個goroutine鏈接在一塊兒,一個Channel的輸出做爲下一個Channel的輸入。這種串聯的Channels就是所謂的管道(pipeline)。下面的程序用兩個channels將三個goroutine串聯起來:

第一個goroutine是一個計數器,用於生成0、一、二、……形式的整數序列,而後經過channel將該整數序列發送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每一個整數求平方,而後將平方後的結果經過第二個channel發送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每一個整數。

func counter(out chan<- int) {
	for x := 0; x < 100; x++ {
		out <- x
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for v := range in {
		out <- v * v
	}
	close(out)
}

func printer(in <-chan int) {
	for v := range in {
		fmt.Println(v)
	}
}

func main() {
	naturals := make(chan int)
	squares := make(chan int)

	go counter(naturals)
	go squarer(squares, naturals)
	printer(squares)
}
複製代碼

Select多路複用

select用於從一組可能的通信中選擇一個進一步處理。若是任意一個通信均可以進一步處理,則從中隨機選擇一個,執行對應的語句。不然,若是又沒有默認分支(default case),select語句則會阻塞,直到其中一個通信完成。

select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}
複製代碼

如何使用select語句爲一個操做設置一個時間限制。代碼會輸出變量news的值或者超時消息,具體依賴於兩個接收語句哪一個先執行:

select {
case news := <-NewsAgency:
    fmt.Println(news)
case <-time.After(time.Minute):
    fmt.Println("Time out: no news in one minute.")
}
複製代碼

下面的select語句會在abort channel中有值時,從其中接收值;無值時什麼都不作。這是一個非阻塞的接收操做;反覆地作這樣的操做叫作「輪詢channel」。

select {
case <-abort:
    fmt.Printf("Launch aborted!\n")
    return
default:
    // do nothing
}
複製代碼

參考資料。

  1. Concurrency in Go
  2. gopl
  3. Effective Go
相關文章
相關標籤/搜索