不一樣的並行協程之間交流的方式有兩種,一種是經過共享變量,另外一種是經過隊列。Go 語言鼓勵使用隊列的形式來交流,它單獨爲協程之間的隊列數據交流定製了特殊的語法 —— 通道。java
通道是協程的輸入和輸出。做爲協程的輸出,通道是一個容器,它能夠容納數據。做爲協程的輸入,通道是一個生產者,它能夠向協程提供數據。通道做爲容器是有限定大小的,滿了就寫不進去,空了就讀不出來。通道還有它本身的類型,它能夠限定進入通道的數據的類型。 git
建立通道只有一種語法,那就是 make 全局函數,提供第一個類型參數限定通道能夠容納的數據類型,再提供第二個整數參數做爲通道的容器大小。大小參數是可選的,若是不填,那這個通道的容量爲零,叫着「非緩衝型通道」,非緩衝型通道必須確保有協程正在嘗試讀取當前通道,不然寫操做就會阻塞直到有其它協程來從通道中讀東西。非緩衝型通道老是處於既滿又空的狀態。與之對應的有限定大小的通道就是緩衝型通道。在 Go 語言裏不存在無界通道,每一個通道都是有限定最大容量的。github
// 緩衝型通道,裏面只能放整數
var bufferedChannel = make(chan int, 1024)
// 非緩衝型通道
var unbufferedChannel = make(chan int)
複製代碼
Go 語言爲通道的讀寫設計了特殊的箭頭語法糖 <-,讓咱們使用通道時很是方便。把箭頭寫在通道變量的右邊就是寫通道,把箭頭寫在通道的左邊就是讀通道。一次只能讀寫一個元素。數組
package main
import "fmt"
func main() {
var ch chan int = make(chan int, 4)
for i:=0; i<cap(ch); i++ {
ch <- i // 寫通道
}
for len(ch) > 0 {
var value int = <- ch // 讀通道
fmt.Println(value)
}
}
複製代碼
通道做爲容器,它能夠像切片同樣,使用 cap() 和 len() 全局函數得到通道的容量和當前內部的元素個數。通道通常做爲不一樣的協程交流的媒介,在同一個協程裏它也是可使用的。安全
通道滿了,寫操做就會阻塞,協程就會進入休眠,直到有其它協程讀通道挪出了空間,協程纔會被喚醒。若是有多個協程的寫操做都阻塞了,一個讀操做只會喚醒一個協程。bash
通道空了,讀操做就會阻塞,協程也會進入睡眠,直到有其它協程寫通道裝進了數據纔會被喚醒。若是有多個協程的讀操做阻塞了,一個寫操做也只會喚醒一個協程。數據結構
package main
import "fmt"
import "time"
import "math/rand"
func send(ch chan int) {
for {
var value = rand.Intn(100)
ch <- value
fmt.Printf("send %d\n", value)
}
}
func recv(ch chan int) {
for {
value := <- ch
fmt.Printf("recv %d\n", value)
time.Sleep(time.Second)
}
}
func main() {
var ch = make(chan int, 1)
// 子協程循環讀
go recv(ch)
// 主協程循環寫
send(ch)
}
--------
send 81
send 87
recv 81
recv 87
send 47
recv 47
send 59
複製代碼
Go 語言的通道有點像文件,不但支持讀寫操做, 還支持關閉。讀取一個已經關閉的通道會當即返回通道類型的「零值」,而寫一個已經關閉的通道會拋異常。若是通道里的元素是整型的,讀操做是不能經過返回值來肯定通道是否關閉的。多線程
package main
import "fmt"
func main() {
var ch = make(chan int, 4)
ch <- 1
ch <- 2
close(ch)
value := <- ch
fmt.Println(value)
value = <- ch
fmt.Println(value)
value = <- ch
fmt.Println(value)
}
-------
1
2
0
複製代碼
這時候就須要引入一個新的知識點 —— 使用 for range 語法糖來遍歷通道併發
for range 語法咱們已經見了不少次了,它是多功能的,除了能夠遍歷數組、切片、字典,還能夠遍歷通道,取代箭頭操做符。當通道空了,循環會暫停阻塞,當通道關閉時,阻塞中止,循環也跟着結束了。當循環結束時,咱們就知道通道已經關閉了。函數
package main
import "fmt"
func main() {
var ch = make(chan int, 4)
ch <- 1
ch <- 2
close(ch)
// for range 遍歷通道
for value := range ch {
fmt.Println(value)
}
}
------
1
2
複製代碼
通道若是沒有顯式關閉,當它再也不被程序使用的時候,會自動關閉被垃圾回收掉。不過優雅的程序應該將通道當作資源,顯式關閉每一個再也不使用的資源是一種良好的習慣。
上面提到向一個已經關閉的通道執行寫操做會拋出異常,這意味着咱們在寫通道時必定要確保通道沒有被關閉。
package main
import "fmt"
func send(ch chan int) {
i := 0
for {
i++
ch <- i
}
}
func recv(ch chan int) {
value := <- ch
fmt.Println(value)
value = <- ch
fmt.Println(value)
close(ch)
}
func main() {
var ch = make(chan int, 4)
go recv(ch)
send(ch)
}
---------
1
2
panic: send on closed channel
goroutine 1 [running]:
main.send(0xc42008a000)
/Users/qianwp/go/src/github.com/pyloque/practice/main.go:9 +0x44
main.main()
/Users/qianwp/go/src/github.com/pyloque/practice/main.go:24 +0x66
exit status 2
複製代碼
那如何確保呢?Go 語言並不存在一個內置函數能夠判斷出通道是否已經被關閉。即便存在這樣一個函數,當你判斷時通道沒有關閉,並不意味着當你往通道里寫數據時它就必定沒有被關閉,併發環境下,它是可能被其它協程隨時關閉的。
確保通道寫安全的最好方式是由負責寫通道的協程本身來關閉通道,讀通道的協程不要去關閉通道。
package main
import "fmt"
func send(ch chan int) {
ch <- 1
ch <- 2
ch <- 3
ch <- 4
close(ch)
}
func recv(ch chan int) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
var ch = make(chan int, 1)
go send(ch)
recv(ch)
}
-----------
1
2
3
4
複製代碼
這個方法確實能夠解決單寫多讀的場景,可要是趕上了多寫單讀的場合該怎麼辦呢?任意一個讀寫通道的協程都不能夠隨意關閉通道,不然會致使其它寫通道協程拋出異常。這時候就必須讓其它不相干的協程來幹這件事,這個協程須要等待全部的寫通道協程都結束運行後才能關閉通道。那其它協程要如何才能知道全部的寫通道已經結束運行了呢?這個就須要使用到內置 sync 包提供的 WaitGroup 對象,它使用計數來等待指定事件完成。
package main
import "fmt"
import "time"
import "sync"
func send(ch chan int, wg *sync.WaitGroup) {
defer wg.Done() // 計數值減一
i := 0
for i < 4 {
i++
ch <- i
}
}
func recv(ch chan int) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
var ch = make(chan int, 4)
var wg = new(sync.WaitGroup)
wg.Add(2) // 增長計數值
go send(ch, wg) // 寫
go send(ch, wg) // 寫
go recv(ch)
// Wait() 阻塞等待全部的寫通道協程結束
// 待計數值變成零,Wait() 纔會返回
wg.Wait()
// 關閉通道
close(ch)
time.Sleep(time.Second)
}
---------
1
2
3
4
1
2
3
4
複製代碼
在真實的世界中,還有一種消息傳遞場景,那就是消費者有多個消費來源,只要有一個來源生產了數據,消費者就能夠讀這個數據進行消費。這時候能夠將多個來源通道的數據匯聚到目標通道,而後統一在目標通道進行消費。
package main
import "fmt"
import "time"
// 每隔一會生產一個數
func send(ch chan int, gap time.Duration) {
i := 0
for {
i++
ch <- i
time.Sleep(gap)
}
}
// 將多個原通道內容拷貝到單一的目標通道
func collect(source chan int, target chan int) {
for v := range source {
target <- v
}
}
// 從目標通道消費數據
func recv(ch chan int) {
for v := range ch {
fmt.Printf("receive %d\n", v)
}
}
func main() {
var ch1 = make(chan int)
var ch2 = make(chan int)
var ch3 = make(chan int)
go send(ch1, time.Second)
go send(ch2, 2 * time.Second)
go collect(ch1, ch3)
go collect(ch2, ch3)
recv(ch3)
}
---------
receive 1
receive 1
receive 2
receive 2
receive 3
receive 4
receive 3
receive 5
receive 6
receive 4
receive 7
receive 8
receive 5
receive 9
....
複製代碼
可是上面這種形式比較繁瑣,須要爲每一種消費來源都單獨啓動一個匯聚協程。Go 語言爲這種使用場景帶來了「多路複用」語法糖,也就是下面要講的 select 語句,它能夠同時管理多個通道讀寫,若是全部通道都不能讀寫,它就總體阻塞,只要有一個通道能夠讀寫,它就會繼續。下面咱們使用 select 語句來簡化上面的邏輯
package main
import "fmt"
import "time"
func send(ch chan int, gap time.Duration) {
i := 0
for {
i++
ch <- i
time.Sleep(gap)
}
}
func recv(ch1 chan int, ch2 chan int) {
for {
select {
case v := <- ch1:
fmt.Printf("recv %d from ch1\n", v)
case v := <- ch2:
fmt.Printf("recv %d from ch2\n", v)
}
}
}
func main() {
var ch1 = make(chan int)
var ch2 = make(chan int)
go send(ch1, time.Second)
go send(ch2, 2 * time.Second)
recv(ch1, ch2)
}
------------
recv 1 from ch2
recv 1 from ch1
recv 2 from ch1
recv 3 from ch1
recv 2 from ch2
recv 4 from ch1
recv 3 from ch2
recv 5 from ch1
複製代碼
上面是多路複用 select 語句的讀通道形式,下面是它的寫通道形式,只要有一個通道能寫進去,它就會打破阻塞。
select {
case ch1 <- v:
fmt.Println("send to ch1")
case ch2 <- v:
fmt.Println("send to ch2")
}
複製代碼
前面咱們講的讀寫都是阻塞讀寫,Go 語言還提供了通道的非阻塞讀寫。當通道空時,讀操做不會阻塞,當通道滿時,寫操做也不會阻塞。非阻塞讀寫須要依靠 select 語句的 default 分支。當 select 語句全部通道都不可讀寫時,若是定義了 default 分支,那就會執行 default 分支邏輯,這樣就起到了不阻塞的效果。下面咱們演示一個單生產者多消費者的場景。生產者同時向兩個通道寫數據,寫不進去就丟棄。
package main
import "fmt"
import "time"
func send(ch1 chan int, ch2 chan int) {
i := 0
for {
i++
select {
case ch1 <- i:
fmt.Printf("send ch1 %d\n", i)
case ch2 <- i:
fmt.Printf("send ch2 %d\n", i)
default:
}
}
}
func recv(ch chan int, gap time.Duration, name string) {
for v := range ch {
fmt.Printf("receive %s %d\n", name, v)
time.Sleep(gap)
}
}
func main() {
// 無緩衝通道
var ch1 = make(chan int)
var ch2 = make(chan int)
// 兩個消費者的休眠時間不同,名稱不同
go recv(ch1, time.Second, "ch1")
go recv(ch2, 2 * time.Second, "ch2")
send(ch1, ch2)
}
------------
send ch1 27
send ch2 28
receive ch1 27
receive ch2 28
send ch1 6708984
receive ch1 6708984
send ch2 13347544
send ch1 13347775
receive ch2 13347544
receive ch1 13347775
send ch1 20101642
receive ch1 20101642
send ch2 26775795
receive ch2 26775795
...
複製代碼
從輸出中能夠明顯看出有不少的數據都丟棄了,消費者讀到的數據是不連續的。若是將 select 語句裏面的 default 分支幹掉,再運行一次,結果以下
send ch2 1
send ch1 2
receive ch1 2
receive ch2 1
receive ch1 3
send ch1 3
receive ch2 4
send ch2 4
send ch1 5
receive ch1 5
receive ch1 6
send ch1 6
receive ch1 7
複製代碼
能夠看到消費者讀到的數據都連續了,可是每一個數據只給了一個消費者。select 語句的 default 分支很是關鍵,它是決定通道讀寫操做阻塞與否的關鍵。
通道在其它語言裏面的表現形式是隊列,在 Java 語言裏,帶緩衝通道就是併發包內置的 java.util.concurrent.ArrayBlockingQueue,無緩衝通道也是併發包內置的 java.util.concurrent.SynchronousQueue。ArrayBlockingQueue 的內部實現形式是一個數組,多線程讀寫時須要使用鎖來控制併發訪問。不過像 Go 語言提供的多路複用效果,Java 語言就沒有內置的實現了。
Go 語言的通道內部結構是一個循環數組,經過讀寫偏移量來控制元素髮送和接受。它爲了保證線程安全,內部會有一個全局鎖來控制併發。對於發送和接受操做都會有一個隊列來容納處於阻塞狀態的協程。
type hchan struct {
qcount uint // 通道有效元素個數
dataqsize uint // 通道容量,循環數組總長度
buf unsafe.Pointer // 數組地址
elemsize uint16 // 內部元素的大小
closed uint32 // 是否已關閉 0或者1
elemtype *_type // 內部元素類型信息
sendx uint // 循環數組的寫偏移量
recvx uint // 循環數組的讀偏移量
recvq waitq // 阻塞在讀操做上的協程隊列
sendq waitq // 阻塞在寫操做上的協程隊列
lock mutex // 全局鎖
}
複製代碼
這個循環隊列和 Java 語言內置的 ArrayBlockingQueue 結構一模一樣。從這個數據結構中咱們也能夠得出結論:隊列在本質上是使用共享變量加鎖的方式來實現的,共享變量纔是並行交流的本質。
class ArrayBlockingQueue extends AbstractQueue {
Object[] items;
int takeIndex;
int putIndex;
int count;
ReentrantLock lock;
...
}
複製代碼
因此讀者不要認爲 Go 語言的通道很神奇,Go 語言只是對通道設計了一套便於使用的語法糖,讓這套數據結構顯的平易近人。它在內部實現上和其它語言的併發隊列大同小異。
閱讀《快學 Go 語言》更多章節,長按圖片識別二維碼關注公衆號「碼洞」