Goroutine是Go中最基本的執行單元。事實上每個Go程序至少有一個goroutine:主goroutine。當程序啓動時,它會自動建立。html
事實上goroutine採用了一種fork-join的模型。 golang
sayHello := func() {
fmt.Println("hello")
}
go sayHello()
複製代碼
那咱們如何來join goroutine呢?須要引入wait操做:安全
var wg sync.WaitGroup()
sayHello := func() {
defer wg.Done()
fmt.Println("hello")
}
wg.Add(1)
go sayHello()
wa.Wait()
複製代碼
goroutine是Go語言的基本調度單位,而channels則是它們之間的通訊機制。操做符<-用來指定管道的方向,發送或接收。若是未指定方向,則爲雙向管道。bash
// 建立一個雙向channel
ch := make(chan interface{})
複製代碼
interface{}表示chan能夠爲任何類型閉包
channel有發送和接受兩個主要操做。發送和接收兩個操做都使用<-運算符。在發送語句中,channel放<-運算符左邊。在接收語句中,channel放<-運算符右邊。一個不使用接收結果的接收操做也是合法的。併發
// 發送操做
ch <- x
// 接收操做
x = <-ch
// 忽略接收到的值,合法
<-ch
複製代碼
咱們不能弄錯channel的方向:函數
writeStream := make(chan<- interface{})
readStream := make(<-chan interface{})
<-writeStream
readStream <- struct{}{}
複製代碼
上面的語句會產生以下錯誤:ui
invalid operation: <-writeStream (receive from send-only type chan<- interface {}) invalid operation: readStream <- struct {} literal (send to receive-only type <-chan interface {})
複製代碼
Channel支持close操做,用於關閉channel,後面對該channel的任何發送操做都將致使panic異常。對一個已經被close過的channel進行接收操做依然能夠接受到以前已經成功發送的數據;若是channel中已經沒有數據的話將產生一個零值的數據。spa
從已經關閉的channel中讀:設計
intStream := make(chan int)
close(intStream)
integer, ok := <- intStream
fmt.Pritf("(%v): %v", ok, integer)
// (false): 0
複製代碼
上面例子中經過返回值ok來判斷channel是否關閉,咱們還能夠經過range這種更優雅的方式來處理已經關閉的channel:
intStream := make(chan int)
go func() {
defer close(intStream)
for i:=1; i<=5; i++{
intStream <- i
}
}()
for integer := range intStream {
fmt.Printf("%v ", integer)
}
// 1 2 3 4 5
複製代碼
建立了一個能夠持有三個字符串元素的帶緩衝Channel:
ch = make(chan string, 3)
複製代碼
咱們能夠在無阻塞的狀況下連續向新建立的channel發送三個值:
ch <- "A"
ch <- "B"
ch <- "C"
複製代碼
此刻,channel的內部緩衝隊列將是滿的,若是有第四個發送操做將發生阻塞。
若是咱們接收一個值:
fmt.Println(<-ch) // "A"
複製代碼
那麼channel的緩衝隊列將不是滿的也不是空的,所以對該channel執行的發送或接收操做都不會發生阻塞。經過這種方式,channel的緩衝隊列緩衝解耦了接收和發送的goroutine。
帶緩衝的信道可被用做信號量,例如限制吞吐量。在此例中,進入的請求會被傳遞給 handle,它從信道中接收值,處理請求後將值發回該信道中,以便讓該 「信號量」 準備迎接下一次請求。信道緩衝區的容量決定了同時調用 process 的數量上限。
var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
sem <- 1 // 等待活動隊列清空。
process(r) // 可能須要很長時間。
<-sem // 完成;使下一個請求能夠運行。
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // 無需等待 handle 結束。
}
}
複製代碼
然而,它卻有個設計問題:儘管只有 MaxOutstanding 個 goroutine 能同時運行,但 Serve 仍是爲每一個進入的請求都建立了新的 goroutine。其結果就是,若請求來得很快, 該程序就會無限地消耗資源。爲了彌補這種不足,咱們能夠經過修改 Serve 來限制建立 Go 程,這是個明顯的解決方案,但要小心咱們修復後出現的 Bug。
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req) // 這兒有 Bug,解釋見下。
<-sem
}()
}
}
複製代碼
Bug 出如今 Go 的 for 循環中,該循環變量在每次迭代時會被重用,所以 req 變量會在全部的 goroutine 間共享,這不是咱們想要的。咱們須要確保 req 對於每一個 goroutine 來講都是惟一的。有一種方法可以作到,就是將 req 的值做爲實參傳入到該 goroutine 的閉包中:
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}(req)
}
}
複製代碼
另外一種解決方案就是以相同的名字建立新的變量,如例中所示:
func Serve(queue chan *Request) {
for req := range queue {
req := req // 爲該 Go 程建立 req 的新實例。
sem <- 1
go func() {
process(req)
<-sem
}()
}
}
複製代碼
下面再看一個Go語言聖經的例子。它併發地向三個鏡像站點發出請求,三個鏡像站點分散在不一樣的地理位置。它們分別將收到的響應發送到帶緩衝channel,最後接收者只接收第一個收到的響應,也就是最快的那個響應。所以mirroredQuery函數可能在另外兩個響應慢的鏡像站點響應以前就返回告終果。
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
// 僅僅返回最快的那個response
return <-responses
}
func request(hostname string) (response string) { /* ... */ }
複製代碼
若是咱們使用了無緩衝的channel,那麼兩個慢的goroutines將會由於沒有人接收而被永遠卡住。這種狀況,稱爲goroutines泄漏,這將是一個BUG。和垃圾變量不一樣,泄漏的goroutines並不會被自動回收,所以確保每一個再也不須要的goroutine能正常退出是重要的。
Go 最重要的特性就是信道是first-class value,它能夠被分配並像其它值處處傳遞。 這種特性一般被用來實現安全、並行的多路分解。
咱們能夠利用這個特性來實現一個簡單的RPC。 如下爲 Request 類型的大概定義。
type Request struct {
args []int
f func([]int) int resultChan chan int } 複製代碼
客戶端提供了一個函數及其實參,此外在請求對象中還有個接收應答的信道。
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}
request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 發送請求
clientRequests <- request
// 等待迴應
fmt.Printf("answer: %d\n", <-request.resultChan)
複製代碼
服務端的handler函數:
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
複製代碼
Channels也能夠用於將多個goroutine鏈接在一塊兒,一個Channel的輸出做爲下一個Channel的輸入。這種串聯的Channels就是所謂的管道(pipeline)。下面的程序用兩個channels將三個goroutine串聯起來:
第一個goroutine是一個計數器,用於生成0、一、二、……形式的整數序列,而後經過channel將該整數序列發送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每一個整數求平方,而後將平方後的結果經過第二個channel發送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每一個整數。
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
複製代碼
select用於從一組可能的通信中選擇一個進一步處理。若是任意一個通信均可以進一步處理,則從中隨機選擇一個,執行對應的語句。不然,若是又沒有默認分支(default case),select語句則會阻塞,直到其中一個通信完成。
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
複製代碼
如何使用select語句爲一個操做設置一個時間限制。代碼會輸出變量news的值或者超時消息,具體依賴於兩個接收語句哪一個先執行:
select {
case news := <-NewsAgency:
fmt.Println(news)
case <-time.After(time.Minute):
fmt.Println("Time out: no news in one minute.")
}
複製代碼
下面的select語句會在abort channel中有值時,從其中接收值;無值時什麼都不作。這是一個非阻塞的接收操做;反覆地作這樣的操做叫作「輪詢channel」。
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}
複製代碼
參考資料。