golang goroutine和channel

goroutine

順序通訊進程 CSP

「順序通訊進程」(communicating sequential processes)或被簡稱爲CSP。CSP是一種現代的併發編程模型,在這種編程模型中值會在不一樣的運行實例(goroutine)中傳遞,儘管大多數狀況下仍然是被限制在單一實例中。express

當一個程序啓動時,其主函數即在一個單獨的goroutine中運行,咱們叫它main goroutine。新的goroutine會用go語句來建立。在語法上,go語句是一個普通的函數或方法調用前加上關鍵字go。go語句會使其語句中的函數在一個新建立的goroutine中運行。而go語句自己會迅速地完成。編程

示例 斐波那契數列

計算斐波那契數列+可見的標識來代表程序在正常運行緩存

// animation
func spinner(delay time.Duration) {
	for {
		for _, r := range `-\|/`{
			fmt.Printf("\r%c", r)
			time.Sleep(delay)
		}
	}
}

// get fibonacci of Nth
func fib(x int) int {
	if x < 2 {
		return x
	}
	return fib(x-1) + fib(x-2)
}

func main() {
	go spinner(100 * time.Millisecond)
	const n = 40
	fibN := fib(n) // slow
	fmt.Printf("\rfibonacci(%d) = %d\n", n, fibN)
}

程序分析

main()函數開始運行建立了一個main goroutine性能優化

go spinner建立一個子協程,子協程中存在死循環,因此只有在main goroutine退出時,子協程纔會結束。在子協程printsleep時,子協程會主動交出控制權,而後主協程繼續運行;服務器

主協程在計算斐波那契數列也會花費較多的時間,在函數遞歸調用時,也會交出控制權,讓其子他協程運行;網絡

主函數返回時,全部的goroutine都會被直接打斷,程序退出。數據結構

主協程、子協程交替取得控制權,就造成了併發的效果:一邊進行顯示輸出,一邊進行斐波那契計算。併發

示例 併發的Clock服務

例子是一個順序執行的時鐘服務器,它會每隔一秒鐘將當前時間寫到客戶端。tcp

clock服務器每個鏈接都會起一個goroutine。函數

func main() {
	listener, err := net.Listen("tcp", "localhost:8000")
	if err != nil {
		log.Fatal(err)
	}
	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Print(err) // e.g., connection aborted
			continue
		}
		handleConn(conn) // handle one connection at a time
	}
}

func handleConn(c net.Conn) {
	defer c.Close()
	for {
		_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
		if err != nil {
			return // e.g., client disconnected
		}
		time.Sleep(1 * time.Second)
	}
}

程序分析

Listen函數建立了一個net.Listener的對象,這個對象會監聽一個網絡端口上到來的鏈接,listener對象的Accept方法會直接阻塞,直到 一個新的鏈接被建立,而後會返回一個net.Conn對象來表示這個鏈接。

handleConn函數會處理一個完整的客戶端鏈接。在一個for死循環中,將當前的時候用 time.Now()函數獲得,而後寫到客戶端。因爲net.Conn實現了io.Writer接口,咱們能夠直接向 其寫入內容。這個死循環會一直執行,直到寫入失敗。最可能的緣由是客戶端主動斷開鏈接。這種狀況下handleConn函數會用defer調用關閉服務器側的鏈接,而後返回到主函數,繼續等待下一個鏈接請求。

這裏能夠對服務端程序作一點小改動, 使其支持併發: 在handleConn函數調用的地方增長go關鍵字,讓每一次handleConn的調用都 進入一個獨立的goroutine。

模擬簡單的telnet程序

func main() {
	conn, err := net.Dial("tcp", "localhost:8000")
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	mustCopy(os.Stdout, conn)
}

func mustCopy(dst io.Writer, src io.Reader) {
	if _, err := io.Copy(dst, src); err != nil {
		log.Fatal(err)
	}
}

程序分析

這個程序會從鏈接中讀取數據,並將讀到的內容寫到標準輸出中,直到遇到end of file的條件 或者發生錯誤。

回聲程序

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
	"strings"
	"time"
)

//
func echo(c net.Conn, shout string, delay time.Duration) {
	fmt.Fprintln(c, strings.ToUpper(shout), time.Now())
	time.Sleep(delay)
	fmt.Fprintln(c, shout, time.Now())
	time.Sleep(delay)
	fmt.Fprintln(c, strings.ToLower(shout), time.Now())
}

func handleConn(c net.Conn) {
	input := bufio.NewScanner(c)
	for input.Scan() {
		if input.Text() == "quit" {
			c.Close()
		} else {
			echo(c, input.Text(), 1*time.Second)
		}
	}
	// NOTE: ignoring potential errors from input.Err()
	c.Close()
}

func main() {
	listen, err := net.Listen("tcp", "localhost:8000")
	if err != nil {
		log.Fatal(err)
	}
	for {
		conn, err := listen.Accept()
		if err != nil {
			log.Print(err) // e.g., connection aborted
			continue
		}
		go handleConn(conn)
	}
}

Channels

goroutine是Go語音程序的併發機制,channels是goroutine間的通訊機制。

一個 channels是一個通訊機制,它可讓一個goroutine經過它給另外一個goroutine發送值信息。每 個channel都有一個特殊的類型,也就是channels可發送數據的類型。一個能夠發送int類型數據的channel通常寫爲chan int。

建立 channel

ch := make(chan int)
ch = make(chan int) // unbuffered channel 
ch = make(chan int, 0) // unbuffered channel 
ch = make(chan int, 3) // buffered channel with capacity 3

和map相似,channel也一個對應make建立的底層數據結構的引用。channel的零值也是nil。

一個channel有發送和接受兩個主要操做,都是通訊行爲。一個發送語句將一個值從一個 goroutine經過channel發送到另外一個執行接收操做的goroutine。

ch <- x  // a send statement 
x = <-ch // a receive expression in an assignment statement 
<-ch     // a receive statement; result is discarded

消息事件

有些消息事件並不攜帶額外的信息,它僅僅是用做兩個goroutine之間的同步,這時候能夠用struct{}空結構體做爲channels元素的類型。

什麼時候關閉channel

Channel還支持close(ch)操做,用於關閉channel。隨後對基於該channel的任何發送操做都將致使panic異常。對一個已經被close過的channel之行接收操做依然能夠接受到以前已經成功發送的數據;若是channel中已經沒有數據的話講產生一個零值的數據。

只有當須要告訴接收者goroutine,全部的數據已經所有發送時才須要關閉channel。無論一個channel是否被關閉,當它沒有被引用時將會被Go語言的垃圾自動回收器回收。試圖重複關閉一個channel將致使panic異常,試圖關閉一個nil值的channel也將致使panic異常。關閉一個channels還會觸發一個廣播機制。

串聯的channel和單方向的Channel

Channels也能夠用於將多個goroutine鏈接在一塊兒,一個Channel的輸出做爲下一個Channel的輸入。這種串聯的Channels就是所謂的管道(pipeline)。

22-57-08-V7hU7g

第一個goroutine是一個計數器,用於生成0、一、二、……形式的整數序列,而後經過channel將該整數序列發送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每一個整數求平方,而後將平方後的結果經過第二個channel發送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每一個整數。

關閉channel

// Squarer
go func() {
    for {
        x, ok := <-naturals
        if ok != nil {
            break // channel was closed and drained
        }
        squares <- x * x
    }
    close(squares)
}()

單方向的channel

package main

import "fmt"

func counter(out chan<- int) {
	for x := 0; x < 100; x++ {
		out <- x
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for v := range in {
		out <- v * v
	}
	close(out)
}

func printer(in <-chan int) {
	for v := range in {
		fmt.Println(v)
	}
}

func main() {
	naturals := make(chan int)
	squares := make(chan int)
	go counter(naturals)
	go squarer(squares, naturals)
	printer(squares)
}

調用counter(naturals)時,naturals的類型將隱式地從chan int轉換成chan<- int。任何雙向channel向單向channel變量的賦值操做都將致使該隱式轉換。這裏並無反向轉換的語法:也就是不能將一個相似chan<- int類型的單向型的channel轉換爲chan int類型的雙向型的channel。

帶緩存的Channels

帶緩存的Channel內部持有一個元素隊列。隊列的最大容量是在調用make函數建立channel時經過第二個參數指定的。下面的語句建立了一個能夠持有三個字符串元素的帶緩存Channel。

ch = make(chan string, 3)

23-16-15-l5TFVg

向緩存Channel的發送操做就是向內部緩存隊列的尾部插入元素,接收操做則是從隊列的頭部刪除元素。若是內部緩存隊列是滿的,那麼發送操做將阻塞直到因另外一個goroutine執行接收操做而釋放了新的隊列空間。相反,若是channel是空的,接收操做將阻塞直到有另外一個goroutine執行發送操做而向隊列插入元素。

當channel的緩存隊列將不是滿的也不是空的,對該channel執行的發送或接收操做都不會發生阻塞。經過這種方式,channel的緩存隊列解耦了接收和發送的goroutine。

在某些特殊狀況下,程序可能須要知道channel內部緩存的容量,能夠用內置的cap函數獲取:

fmt.Println(cap(ch)) // "3"

一樣,對於內置的len函數,若是傳入的是channel,那麼將返回channel內部緩存隊列中有效元素的個數。由於在併發程序中該信息會隨着接收操做而失效,可是它對某些故障診斷和性能優化會有幫助。

fmt.Println(len(ch)) // "2"

Go語言新手有時候會將一個帶緩存的channel看成同一個goroutine中的隊列使用,雖然語法看似簡單,但實際上這是一個錯誤。Channel和goroutine的調度器機制是緊密相連的,一個發送操做——或許是整個程序——可能會永遠阻塞。若是你只是須要一個簡單的隊列,使用slice就能夠了。

示例--返回最快的請求

例子展現了一個使用了帶緩存channel的應用。它併發地向三個鏡像站點發出請求,三個鏡像站點分散在不一樣的地理位置。它們分別將收到的響應發送到帶緩存channel,最後接收者只接收第一個收到的響應,也就是最快的那個響應。所以mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應以前就返回告終果。(順便說一下,多個goroutines併發地向同一個channel發送數據,或從同一個channel接收數據都是常見的用法。)

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}

return以後,沒結束的goroutine也會結束嗎?

goroutines泄漏

若是咱們使用了無緩存的channel,那麼兩個慢的goroutines將會由於沒有人接收而被永遠卡住。這種狀況,稱爲goroutines泄漏,這將是一個BUG。和垃圾變量不一樣,泄漏的goroutines並不會被自動回收,所以確保每一個再也不須要的goroutine能正常退出是重要的。

channel的選擇--緩存和不帶緩存

關於無緩存或帶緩存channels之間的選擇,或者是帶緩存channels的容量大小的選擇,均可能影響程序的正確性。無緩存channel更強地保證了每一個發送操做與相應的同步接收操做;可是對於帶緩存channel,這些操做是解耦的。一樣,即便咱們知道將要發送到一個channel的信息的數量上限,建立一個對應容量大小的帶緩存channel也是不現實的,由於這要求在執行任何接收操做以前緩存全部已經發送的值。若是未能分配足夠的緩衝將致使程序死鎖。

蛋糕生產線的比喻

Channel的緩存也可能影響程序的性能。想象一家蛋糕店有三個廚師,一個烘焙,一個上糖衣,還有一個將每一個蛋糕傳遞到它下一個廚師在生產線。在狹小的廚房空間環境,每一個廚師在完成蛋糕後必須等待下一個廚師已經準備好接受它;這相似於在一個無緩存的channel上進行溝通。

若是在每一個廚師之間有一個放置一個蛋糕的額外空間,那麼每一個廚師就能夠將一個完成的蛋糕臨時放在那裏而立刻進入下一個蛋糕在製做中;這相似於將channel的緩存隊列的容量設置爲1。只要每一個廚師的平均工做效率相近,那麼其中大部分的傳輸工做將是迅速的,個體之間細小的效率差別將在交接過程當中彌補。若是廚師之間有更大的額外空間——也是就更大容量的緩存隊列——將能夠在不中止生產線的前提下消除更大的效率波動,例如一個廚師能夠短暫地休息,而後再加快遇上進度而不影響其餘人。

另外一方面,若是生產線的前期階段一直快於後續階段,那麼它們之間的緩存在大部分時間都將是滿的。相反,若是後續階段比前期階段更快,那麼它們之間的緩存在大部分時間都將是空的。對於這類場景,額外的緩存並無帶來任何好處。

生產線的隱喻對於理解channels和goroutines的工做機制是頗有幫助的。例如,若是第二階段是須要精心製做的複雜操做,一個廚師可能沒法跟上第一個廚師的進度,或者是沒法知足第三階段廚師的需求。要解決這個問題,咱們能夠僱傭另外一個廚師來幫助完成第二階段的工做,他執行相同的任務可是獨立工做。這相似於基於相同的channels建立另外一個獨立的goroutine。

相關文章
相關標籤/搜索