不少文章介紹channel的時候都和併發揉在一塊兒,這裏我想把它當作一種數據結構來單獨介紹它的實現原理。golang
channel,通道。golang中用於數據傳遞的一種數據結構。是golang中一種傳遞數據的方式,也可用做事件通知。數組
使用chan關鍵字聲明一個通道,在使用前必須先建立,操做符 <-
用於指定通道的方向,發送或接收。若是未指定方向,則爲雙向通道。緩存
1 //聲明和建立 2 var ch chan int // 聲明一個傳遞int類型的channel 3 ch := make(chan int) // 使用內置函數make()定義一個channel 4 ch2 := make(chan interface{}) // 建立一個空接口類型的通道, 能夠存聽任意格式 5 6 type Equip struct{ /* 一些字段 */ } 7 ch2 := make(chan *Equip) // 建立Equip指針類型的通道, 能夠存放*Equip 8 9 //傳值 10 ch <- value // 將一個數據value寫入至channel,這會致使阻塞,直到有其餘goroutine從這個channel中讀取數據 11 value := <-ch // 從channel中讀取數據,若是channel以前沒有寫入數據,也會致使阻塞,直到channel中被寫入數據爲止 12 13 ch := make(chan interface{}) // 建立一個空接口通道 14 ch <- 0 // 將0放入通道中 15 ch <- "hello" // 將hello字符串放入通道中 16 17 //關閉 18 close(ch) // 關閉channel
把數據往通道中發送時,若是接收方一直都沒有接收,那麼發送操做將持續阻塞。Go 程序運行時能智能地發現一些永遠沒法發送成功的語句並報錯:數據結構
fatal error: all goroutines are asleep - deadlock! //運行時發現全部的 goroutine(包括main)都處於等待 goroutine。
通道默認是無緩衝的,無緩衝通道上的發送操做將會被阻塞,直到有其餘goroutine從對應的通道上執行接收操做,數據傳送完成,通道繼續工做。併發
package main import ( "fmt" "time" ) var done chan bool func HelloWorld() { fmt.Println("Hello world goroutine") time.Sleep(1*time.Second) done <- true } func main() { done = make(chan bool) // 建立一個channel go HelloWorld() <-done }
1 //輸出 2 //Hello world goroutine
因爲main不會等goroutine執行結束才返回,前文專門加了sleep輸出爲了能夠看到goroutine的輸出內容,那麼在這裏因爲是阻塞的,因此無需sleep。異步
將代碼中」done <- true」和」<-done」,去掉再執行,沒有上面的輸出內容。socket
通道能夠用來鏈接goroutine,這樣一個的輸出是另外一個輸入。這就叫作管道:函數
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 var echo chan string 8 var receive chan string 9 10 // 定義goroutine 1 11 func Echo() { 12 time.Sleep(1*time.Second) 13 echo <- "這是一次測試" 14 } 15 16 // 定義goroutine 2 17 func Receive() { 18 temp := <- echo // 阻塞等待echo的通道的返回 19 receive <- temp 20 } 21 22 23 func main() { 24 echo = make(chan string) 25 receive = make(chan string) 26 27 go Echo() 28 go Receive() 29 30 getStr := <-receive // 接收goroutine 2的返回 31 32 fmt.Println(getStr) 33 }
輸出字符串:"這是一次測試"。測試
在這裏不必定要去關閉channel,由於底層的垃圾回收機制會根據它是否能夠訪問來決定是否自動回收它。(這裏不是根據channel是否關閉來決定的)ui
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 8 // 定義goroutine 1 9 func Echo(out chan<- string) { // 定義輸出通道類型 10 time.Sleep(1*time.Second) 11 out <- "這又是一次測試" 12 close(out) 13 } 14 15 // 定義goroutine 2 16 func Receive(out chan<- string, in <-chan string) { // 定義輸出通道類型和輸入類型 17 temp := <-in // 阻塞等待echo的通道的返回 18 out <- temp 19 close(out) 20 } 21 22 23 func main() { 24 echo := make(chan string) 25 receive := make(chan string) 26 27 go Echo(echo) 28 go Receive(receive, echo) 29 30 getStr := <-receive // 接收goroutine 2的返回 31 32 fmt.Println(getStr) 33 }
輸出:這又是一次測試。
goroutine的通道默認是是阻塞的,那麼有什麼辦法能夠緩解阻塞? 答案是:加一個緩衝區。
建立一個緩衝通道:
1 ch := make(chan string, 3) // 建立了緩衝區爲3的通道 2 3 //== 4 len(ch) // 長度計算 5 cap(ch) // 容量計算
緩衝通道傳遞數據示意圖:
Go語言channel是first-class的,意味着它能夠被存儲到變量中,能夠做爲參數傳遞給函數,也能夠做爲函數的返回值返回。做爲Go語言的核心特徵之一,雖然channel看上去很高端,可是其實channel僅僅就是一個數據結構而已,具體定義在 $GOROOT/src/runtime/chan.go
裏。以下:
1 type hchan struct { 2 qcount uint // 隊列中的總數據 3 dataqsiz uint // 循環隊列的大小 4 buf unsafe.Pointer // 指向dataqsiz元素數組 5 elemsize uint16 // 6 closed uint32 7 elemtype *_type // 元素類型 8 sendx uint // 發送索引 9 recvx uint // 接收索引 10 recvq waitq // 接待員名單, 因recv而阻塞的等待隊列。 11 sendq waitq // 發送服務員列表, 因send而阻塞的等待隊列。 12 //鎖定保護hchan中的全部字段,以及幾個在此通道上阻止的sudogs中的字段。 13 //按住此鎖定時不要更改另外一個G的狀態(尤爲是不要準備G),由於這可能會致使死鎖堆棧縮小。 14 lock mutex 15 }
其中一個核心的部分是存放channel數據的環形隊列,由qcount和elemsize分別指定了隊列的容量和當前使用量。dataqsize是隊列的大小。elemalg是元素操做的一個Alg結構體,記錄下元素的操做,如copy函數,equal函數,hash函數等。
若是是帶緩衝區的chan,則緩衝區數據其實是緊接着Hchan結構體中分配的。不帶緩衝的 channel ,環形隊列 size 則爲 0。
1 c = (Hchan*)runtime.mal(n + hint*elem->size);
另外一重要部分就是recvq和sendq兩個鏈表,一個是因讀這個通道而致使阻塞的goroutine,另外一個是由於寫這個通道而阻塞的goroutine。若是一個goroutine阻塞於channel了,那麼它就被掛在recvq或sendq中。WaitQ是鏈表的定義,包含一個頭結點和一個尾結點:
1 struct WaitQ 2 { 3 SudoG* first; 4 SudoG* last; 5 };
隊列中的每一個成員是一個SudoG結構體變量:
1 struct SudoG 2 { 3 G* g; // g和selgen構成 4 uint32 selgen; // 指向g的弱指針 5 SudoG* link; 6 int64 releasetime; 7 byte* elem; // 數據元素 8 };
該結構中主要的就是一個g和一個elem。elem用於存儲goroutine的數據。讀通道時,數據會從Hchan的隊列中拷貝到SudoG的elem域。寫通道時,數據則是由SudoG的elem域拷貝到Hchan的隊列中。
基本的寫channel操做,在底層運行時庫中對應的是一個runtime.chansend函數。
1 c <- v
在運行時庫中會執行:
1 void runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
其中c就是channel,ep是取變量v的地址。這裏的傳值約定是調用者負責分配好ep的空間,僅須要簡單的取變量地址就夠了。pres參數是在select中的通道操做使用的。
這個函數首先會區分是同步仍是異步。同步是指chan是不帶緩衝區的,所以可能寫阻塞,而異步是指chan帶緩衝區,只有緩衝區滿才阻塞。在同步的狀況下,因爲channel自己是不帶數據緩存的,這時首先會查看Hchan結構體中的recvq鏈表時否爲空,便是否有由於讀該管道而阻塞的goroutine。若是有則能夠正常寫channel,不然操做會阻塞。
recvq不爲空的狀況下,將一個SudoG結構體出隊列,將傳給通道的數據(函數參數ep)拷貝到SudoG結構體中的elem域,並將SudoG中的g放到就緒隊列中,狀態置爲ready,而後函數返回。若是recvq爲空,不然要將當前goroutine阻塞。此時將一個SudoG結構體,掛到通道的sendq鏈表中,這個SudoG中的elem域是參數eq,SudoG中的g是當前的goroutine。當前goroutine會被設置爲waiting狀態並掛到等待隊列中。
在異步的狀況,若是緩衝區滿了,也是要將當前goroutine和數據一塊兒做爲SudoG結構體掛在sendq隊列中,表示因寫channel而阻塞。不然也是先看有沒有recvq鏈表是否爲空,有就喚醒。跟同步不一樣的是在channel緩衝區不滿的狀況,這裏不會阻塞寫者,而是將數據放到channel的緩衝區中,調用者返回。
讀channel的操做也是相似的,對應的函數是runtime.chansend。一個是收一個是發,基本的過程都是差很少的。
當協程嘗試從未關閉的 channel 中讀取數據時,內部的操做以下:
<- ch
未阻塞;<- ch
未阻塞;<- ch
阻塞。相似的,當協程嘗試往未關閉的 channel 中寫入數據時,內部的操做以下:
ch <-
未阻塞;ch <-
未阻塞;ch <-
阻塞。當關閉 non-nil channel 時,內部的操做以下:
空通道是指將一個channel賦值爲nil,或者定義後不調用make進行初始化。按照Go語言的語言規範,讀寫空通道是永遠阻塞的。其實在函數runtime.chansend和runtime.chanrecv開頭就有判斷這類狀況,若是發現參數c是空的,則直接將當前的goroutine放到等待隊列,狀態設置爲waiting。
讀一個關閉的通道,永遠不會阻塞,會返回一個通道數據類型的零值。這個實現也很簡單,將零值複製到調用函數的參數ep中。寫一個關閉的通道,則會panic。關閉一個空通道,也會致使panic。
類型於 POSIX 接口中線程通知其餘線程某個事件發生的條件變量,channel 的特性也能夠用來當成協程之間同步的條件變量。由於 channel 只是用來通知,因此 channel 中具體的數據類型和值並不重要,這種場景通常用 strct {}
做爲 channel 的類型。
相似 pthread_cond_signal()
的功能,用來在一個協程中通知另個某一個協程事件發生:
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 8 func main() { 9 ch := make(chan struct{}) 10 nums := make([]int, 100) 11 12 go func() { 13 time.Sleep(time.Second) 14 for i := 0; i < len(nums); i++ { 15 nums[i] = i 16 } 17 // send a finish signal 18 ch <- struct{}{} 19 }() 20 21 // wait for finish signal 22 <-ch 23 fmt.Println(nums) 24 }
相似 pthread_cond_broadcast()
的功能。利用從已關閉的 channel 讀取數據時老是非阻塞的特性,能夠實如今一個協程中向其餘多個協程廣播某個事件發生的通知:
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 8 func main() { 9 N := 10 10 exit := make(chan struct{}) 11 done := make(chan struct{}, N) 12 13 // start N worker goroutines 14 for i := 0; i < N; i++ { 15 go func(n int) { 16 for { 17 select { 18 // wait for exit signal 19 case <-exit: 20 fmt.Printf("worker goroutine #%d exit\n", n) 21 done <- struct{}{} 22 return 23 case <-time.After(time.Second): 24 fmt.Printf("worker goroutine #%d is working...\n", n) 25 } 26 } 27 }(i) 28 } 29 30 time.Sleep(3 * time.Second) 31 // broadcast exit signal 32 close(exit) 33 // wait for all worker goroutines exit 34 for i := 0; i < N; i++ { 35 <-done 36 } 37 fmt.Println("main goroutine exit") 38 }
channel 的讀/寫至關於信號量的 P / V 操做,下面的示例程序中 channel 至關於信號量:
1 package main 2 3 import ( 4 "log" 5 "math/rand" 6 "time" 7 ) 8 9 type Seat int 10 type Bar chan Seat 11 12 func (bar Bar) ServeConsumer(customerId int) { 13 log.Print("-> consumer#", customerId, " enters the bar") 14 seat := <-bar // need a seat to drink 15 log.Print("consumer#", customerId, " drinks at seat#", seat) 16 time.Sleep(time.Second * time.Duration(2+rand.Intn(6))) 17 log.Print("<- consumer#", customerId, " frees seat#", seat) 18 bar <- seat // free the seat and leave the bar 19 } 20 21 func main() { 22 rand.Seed(time.Now().UnixNano()) 23 24 bar24x7 := make(Bar, 10) // the bar has 10 seats 25 // Place seats in an bar. 26 for seatId := 0; seatId < cap(bar24x7); seatId++ { 27 bar24x7 <- Seat(seatId) // none of the sends will block 28 } 29 30 // a new consumer try to enter the bar for each second 31 for customerId := 0; ; customerId++ { 32 time.Sleep(time.Second) 33 go bar24x7.ServeConsumer(customerId) 34 } 35 }
互斥量至關於二元信號裏,因此 cap 爲 1 的 channel 能夠當成互斥量使用:
1 package main 2 3 import "fmt" 4 5 func main() { 6 mutex := make(chan struct{}, 1) // the capacity must be one 7 8 counter := 0 9 increase := func() { 10 mutex <- struct{}{} // lock 11 counter++ 12 <-mutex // unlock 13 } 14 15 increase1000 := func(done chan<- struct{}) { 16 for i := 0; i < 1000; i++ { 17 increase() 18 } 19 done <- struct{}{} 20 } 21 22 done := make(chan struct{}) 23 go increase1000(done) 24 go increase1000(done) 25 <-done; <-done 26 fmt.Println(counter) // 2000 27 }
關閉再也不須要使用的 channel 並非必須的。跟其餘資源好比打開的文件、socket 鏈接不同,這類資源使用完後不關閉後會形成句柄泄露,channel 使用完後不關閉也沒有關係,channel 沒有被任何協程用到後最終會被 GC 回收。關閉 channel 通常是用來通知其餘協程某個任務已經完成了。golang 也沒有直接提供判斷 channel 是否已經關閉的接口,雖然能夠用其餘不太優雅的方式本身實現一個:
1 func isClosed(ch chan int) bool { 2 select { 3 case <-ch: 4 return true 5 default: 6 } 7 return false 8 }
不過實現一個這樣的接口也沒什麼必要。由於就算經過 isClosed()
獲得當前 channel 當前還未關閉,若是試圖往 channel 裏寫數據,仍然可能會發生 panic ,由於在調用 isClosed()
後,其餘協程可能已經把 channel 關閉了。
關閉 channel 時應該注意如下準則:
關閉 channel 粗暴一點的作法是隨意關閉,若是產生了 panic 就用 recover 避免進程掛掉。稍好一點的方案是使用標準庫的 sync
包來作關閉 channel 時的協程同步,不過使用起來也稍微複雜些。下面介紹一種優雅些的作法。
這種場景下這個惟一的寫入端能夠關閉 channel 用來通知讀取端全部數據都已經寫入完成了。讀取端只須要用 for range
把 channel 中數據遍歷完就能夠了,當 channel 關閉時,for range
仍然會將 channel 緩衝中的數據所有遍歷完而後再退出循環:
1 package main 2 3 import ( 4 "fmt" 5 "sync" 6 ) 7 8 func main() { 9 wg := &sync.WaitGroup{} 10 ch := make(chan int, 100) 11 12 send := func() { 13 for i := 0; i < 100; i++ { 14 ch <- i 15 } 16 // signal sending finish 17 close(ch) 18 } 19 20 recv := func(id int) { 21 defer wg.Done() 22 for i := range ch { 23 fmt.Printf("receiver #%d get %d\n", id, i) 24 } 25 fmt.Printf("receiver #%d exit\n", id) 26 } 27 28 wg.Add(3) 29 go recv(0) 30 go recv(1) 31 go recv(2) 32 send() 33 34 wg.Wait() 35 }
這種場景下雖然能夠用 sync.Once
來解決多個寫入端重複關閉 channel 的問題,但更優雅的辦法設置一個額外的 channel ,由讀取端經過關閉來通知寫入端任務完成不要再繼續再寫入數據了:
1 package main 2 3 import ( 4 "fmt" 5 "sync" 6 ) 7 8 func main() { 9 wg := &sync.WaitGroup{} 10 ch := make(chan int, 100) 11 done := make(chan struct{}) 12 13 send := func(id int) { 14 defer wg.Done() 15 for i := 0; ; i++ { 16 select { 17 case <-done: 18 // get exit signal 19 fmt.Printf("sender #%d exit\n", id) 20 return 21 case ch <- id*1000 + i: 22 } 23 } 24 } 25 26 recv := func() { 27 count := 0 28 for i := range ch { 29 fmt.Printf("receiver get %d\n", i) 30 count++ 31 if count >= 1000 { 32 // signal recving finish 33 close(done) 34 return 35 } 36 } 37 } 38 39 wg.Add(3) 40 go send(0) 41 go send(1) 42 go send(2) 43 recv() 44 45 wg.Wait() 46 }
這種場景稍微複雜,和上面的例子同樣,也須要設置一個額外 channel 用來通知多個寫入端和讀取端。另外須要起一個額外的協程來經過關閉這個 channel 來廣播通知:
1 package main 2 3 import ( 4 "fmt" 5 "sync" 6 "time" 7 ) 8 9 func main() { 10 wg := &sync.WaitGroup{} 11 ch := make(chan int, 100) 12 done := make(chan struct{}) 13 14 send := func(id int) { 15 defer wg.Done() 16 for i := 0; ; i++ { 17 select { 18 case <-done: 19 // get exit signal 20 fmt.Printf("sender #%d exit\n", id) 21 return 22 case ch <- id*1000 + i: 23 } 24 } 25 } 26 27 recv := func(id int) { 28 defer wg.Done() 29 for { 30 select { 31 case <-done: 32 // get exit signal 33 fmt.Printf("receiver #%d exit\n", id) 34 return 35 case i := <-ch: 36 fmt.Printf("receiver #%d get %d\n", id, i) 37 time.Sleep(time.Millisecond) 38 } 39 } 40 } 41 42 wg.Add(6) 43 go send(0) 44 go send(1) 45 go send(2) 46 go recv(0) 47 go recv(1) 48 go recv(2) 49 50 time.Sleep(time.Second) 51 // signal finish 52 close(done) 53 // wait all sender and receiver exit 54 wg.Wait() 55 }
channle 做爲 golang 最重要的特性,用起來仍是比較方便的。傳統的 C 裏要實現相似的功能的話,通常須要用到 socket 或者 FIFO 來實現,另外還要考慮數據包的完整性與併發衝突的問題,channel 則屏蔽了這些底層細節,使用者只須要考慮讀寫就能夠了。 channel 是引用類型,瞭解一下 channel 底層的機制對更好的使用 channel 仍是很用必要的。雖然操做原語簡單,但涉及到阻塞的問題,使用不當可能會形成死鎖或者無限制的協程建立最終致使進程掛掉。