Go channel系列:html
channel用於goroutines之間的通訊,讓它們之間能夠進行數據交換。像管道同樣,一個goroutine_A向channel_A中放數據,另外一個goroutine_B從channel_A取數據。golang
channel是指針類型的數據類型,經過make來分配內存。例如:bash
ch := make(chan int)
這表示建立一個channel,這個channel中只能保存int類型的數據。也就是說一端只能向此channel中放進int類型的值,另外一端只能今後channel中讀出int類型的值。併發
須要注意,chan TYPE
才表示channel的類型。因此其做爲參數或返回值時,需指定爲xxx chan int
相似的格式。dom
向ch這個channel放數據的操做形式爲:異步
ch <- VALUE
從ch這個channel讀數據的操做形式爲:函數
<-ch // 從ch中讀取一個值 val = <-ch val := <-ch // 從ch中讀取一個值並保存到val變量中 val,ok = <-ch // 從ch讀取一個值,判斷是否讀取成功,若是成功則保存到val變量中
其實很簡單,當ch出如今<-
的左邊表示send,當ch出如今<-
的右邊表示recv。ui
例如:命令行
package main import ( "fmt" "time" ) func main() { ch := make(chan string) go sender(ch) // sender goroutine go recver(ch) // recver goroutine time.Sleep(1e9) } func sender(ch chan string) { ch <- "malongshuai" ch <- "gaoxiaofang" ch <- "wugui" ch <- "tuner" } func recver(ch chan string) { var recv string for { recv = <-ch fmt.Println(recv) } }
輸出結果:指針
malongshuai gaoxiaofang wugui tuner
上面激活了一個goroutine用於執行sender()函數,該函數每次向channel ch中發送一個字符串。同時還激活了另外一個goroutine用於執行recver()函數,該函數每次從channel ch中讀取一個字符串。
注意上面的recv = <-ch
,當channel中沒有數據可讀時,recver goroutine將會阻塞在此行。因爲recver中讀取channel的操做放在了無限for循環中,表示recver goroutine將一直阻塞,直到從channel ch中讀取到數據,讀取到數據後進入下一輪循環由被阻塞在recv = <-ch
上。直到main中的time.Sleep()指定的時間到了,main程序終止,全部的goroutine將所有被強制終止。
由於receiver要不斷從channel中讀取可能存在的數據,因此receiver通常都使用一個無限循環來讀取channel,避免sender發送的數據被丟棄。
每一個channel都有3種操做:send、receive和close
例如,判斷channel是否被關閉:
val, ok := <-counter if ok { fmt.Println(val) }
由於關閉通道也會讓recv成功讀取(只不過讀取到的值爲類型的空值),使得本來阻塞在recv操做上的goroutine變得不阻塞,藉此技巧能夠實現goroutine的執行前後順序。具體示例見後文:指定goroutine的執行順序。
channel分爲兩種:unbuffered channel和buffered channel
能夠認爲阻塞和不阻塞是由channel控制的,不管是send仍是recv操做,都是在向channel發送請求:
buffered channel有兩個屬性:容量和長度:和slice的capacity和length的概念是同樣的
make(chan TYPE,CAP)
unbuffered channel能夠認爲是容量爲0的buffered channel,因此每發送一個數據就被阻塞。注意,不是容量爲1的buffered channel,由於容量爲1的channel,是在channel中已有一個數據,併發送第二個數據的時候才被阻塞。
換句話說,send被阻塞的時候,實際上是沒有發送成功的,只有被另外一端讀走一個數據以後纔算是send成功。對於unbuffered channel來講,這是send/recv的同步模式。而buffered channel則是在每次發送數據到通道的時候,(通道)都向發送者返回一個消息,容量未滿的時候返回成功的消息,發送者所以而不會阻塞,容量已滿的時候由於已滿而遲遲不返回消息,使得發送者被阻塞。
實際上,當向一個channel進行send的時候,先關閉了channel,再讀取channel時會發現錯誤在send,而不是recv。它會提示向已經關閉了的channel發送數據。
func main() { counter := make(chan int) go func() { counter <- 32 }() close(counter) fmt.Println(<-counter) }
輸出報錯:
panic: send on closed channel
因此,在Go的內部行爲中,send和recv是一個總體行爲,數據未讀就表示未send成功。
有兩種特殊的channel:nil channel和channal類型的channel。
當未爲channel分配內存時,channel就是nil channel,例如var ch1 chan int
。nil channel會永遠阻塞對該channel的讀、寫操做。
nil channel在某些時候有些妙用,例如在select(關於select,見後文)的某個case分支A將其它某case分支B所操做的channel忽然設置爲nil,這將會禁用case分支B。
當channel的類型爲一個channel時,就是channel的channel,也就是雙層通道。例如:
var chch1 chan chan int
channel的channel是指通道里的數據是通道,能夠認爲通道里面嵌套了一個或多個通道:只能將整個通道發送到外層通道,讀取外層通道時獲取到的是內層通道,而後能夠操做內層通道。
// 發送通道給外層通道 chch1 <-ch1 chch1 <-ch2 // 從外層通道取出內層通道 c <-chch1 // 操做取出的內層通道 c <-123 val := <-c
channel of channel的妙用之一是將外層通道做爲通道的加工廠:在某個goroutine中不斷生成通道,在其它goroutine能夠不斷取出通道來操做。
當channel的某一端(sender/receiver)期待另外一端的(receiver/sender)操做,另外一端正好在期待本端的操做時,也就是說兩端都由於對方而使得本身當前處於阻塞狀態,這時將會出現死鎖問題。
更通俗地說,只要全部goroutine都被阻塞,就會出現死鎖。
好比,在main函數中,它有一個默認的goroutine,若是在此goroutine中建立一個unbuffered channel,並在main goroutine中向此channel中發送數據並直接receive數據,將會出現死鎖:
package main import ( "fmt" ) func main (){ goo(32) } func goo(s int) { counter := make(chan int) counter <- s fmt.Println(<-counter) }
在上面的示例中,向unbuffered channel中send數據的操做counter <- s
是在main goroutine中進行的,今後channel中recv的操做<-counter
也是在main goroutine中進行的。send的時候會直接阻塞main goroutine,使得recv操做沒法被執行,go將探測到此問題,並報錯:
fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]:
要修復此問題,只需將send操做放在另外一個goroutine中執行便可:
package main import ( "fmt" ) func main() { goo(32) } func goo(s int) { counter := make(chan int) go func() { counter <- s }() fmt.Println(<-counter) }
或者,將counter設置爲一個容量爲1的buffered channel:
counter := make(chan int,1)
這樣放完一個數據後send不會阻塞(被recv以前放第二個數據纔會阻塞),能夠執行到recv操做。
下面經過sync.WaitGroup類型來等待程序的結束,分析多個goroutine之間通訊時狀態的轉換。由於建立的channel是unbuffered類型的,因此send和recv都是阻塞的。
package main import ( "fmt" "sync" ) // wg用於等待程序執行完成 var wg sync.WaitGroup func main() { count := make(chan int) // 增長兩個待等待的goroutines wg.Add(2) fmt.Println("Start Goroutines") // 激活一個goroutine,label:"Goroutine-1" go printCounts("Goroutine-1", count) // 激活另外一個goroutine,label:"Goroutine-2" go printCounts("Goroutine-2", count) fmt.Println("Communication of channel begins") // 向channel中發送初始數據 count <- 1 // 等待goroutines都執行完成 fmt.Println("Waiting To Finish") wg.Wait() fmt.Println("\nTerminating the Program") } func printCounts(label string, count chan int) { // goroutine執行完成時,wg的計數器減1 defer wg.Done() for { // 從channel中接收數據 // 若是無數據可recv,則goroutine阻塞在此 val, ok := <-count if !ok { fmt.Println("Channel was closed:",label) return } fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 { fmt.Printf("Channel Closed from %s \n", label) // Close the channel close(count) return } // 輸出接收到的數據後,加1,並從新將其send到channel中 val++ count <- val } }
上面的程序中,激活了兩個goroutine,激活這兩個goroutine後,向channel中發送一個初始數據值1,而後main goroutine將由於wg.Wait()等待2個goroutine都執行完成而被阻塞。
再看這兩個goroutine,這兩個goroutine執行徹底同樣的函數代碼,它們都接收count這個channel的數據,但多是goroutine1先接收到channel中的初始值1,也多是goroutine2先接收到初始值1。接收到數據後輸出值,並在輸出後對數據加1,而後將加1後的數據再次send到channel,每次send都會將本身這個goroutine阻塞(由於unbuffered channel),此時另外一個goroutine由於等待recv而執行。當加1後發送給channel的數據爲10以後,某goroutine將關閉count channel,該goroutine將退出,wg的計數器減1,另外一個goroutine因等待recv而阻塞的狀態將由於channel的關閉而失敗,ok狀態碼將讓該goroutine退出,因而wg的計數器減爲0,main goroutine由於wg.Wait()而繼續執行後面的代碼。
前面都是在for無限循環中讀取channel中的數據,但也可使用range來迭代channel,它會返回每次迭代過程當中所讀取的數據,直到channel被關閉。必須注意,只要channel未關閉,range迭代channel就會一直被阻塞。
例如,將上面示例中的printCounts()改成for-range的循環形式。
func printCounts(label string, count chan int) { defer wg.Done() for val := range count { fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 { fmt.Printf("Channel Closed from %s \n", label) close(count) return } val++ count <- val } }
channel是goroutine與goroutine之間通訊的基礎,一邊產生數據放進channel,另外一邊從channel讀取放進來的數據。能夠藉此實現多個goroutine之間的數據交換,例如goroutine_1->goroutine_2->goroutine_3
,就像bash的管道同樣,上一個命令的輸出能夠不斷傳遞給下一個命令的輸入,只不過golang藉助channel能夠在多個goroutine(如函數的執行)之間傳,而bash是在命令之間傳。
如下是一個示例,第一個函數getRandNum()用於生成隨機整數,並將生成的整數放進第一個channel ch1中,第二個函數addRandNum()用於接收ch1中的數據(來自第一個函數),將其輸出,而後對接收的值加1後放進第二個channel ch2中,第三個函數printRes接收ch2中的數據並將其輸出。
若是將函數認爲是Linux的命令,則相似於下面的命令行:ch1至關於第一個管道,ch2至關於第二個管道
getRandNum | addRandNum | printRes
如下是代碼部分:
package main import ( "fmt" "math/rand" "sync" ) var wg sync.WaitGroup func main() { wg.Add(3) // 建立兩個channel ch1 := make(chan int) ch2 := make(chan int) // 3個goroutine並行 go getRandNum(ch1) go addRandNum(ch1, ch2) go printRes(ch2) wg.Wait() } func getRandNum(out chan int) { // defer the wg.Done() defer wg.Done() var random int // 總共生成10個隨機數 for i := 0; i < 10; i++ { // 生成[0,30)之間的隨機整數並放進channel out random = rand.Intn(30) out <- random } close(out) } func addRandNum(in,out chan int) { defer wg.Done() for v := range in { // 輸出從第一個channel中讀取到的數據 // 並將值+1後放進第二個channel中 fmt.Println("before +1:",v) out <- (v + 1) } close(out) } func printRes(in chan int){ defer wg.Done() for v := range in { fmt.Println("after +1:",v) } }
上面經過兩個channel將3個goroutine鏈接起來,其中起鏈接做用的是第二個函數addRandNum()。在這個函數中使用了兩個channel做爲參數:一個channel用於接收、一個channel用於發送。
其實channel類的參數變量能夠指定數據流向:
in <-chan int
:表示channel in通道只用於接收數據out chan<- int
:表示channel out通道只用於發送數據只用於接收數據的通道<-chan
不可被關閉,由於關閉通道是針對發送數據而言的,表示無數據再需發送。對於recv來講,關閉通道是沒有意義的。
因此,上面示例中三個函數可改寫爲:
func getRandNum(out chan<- int) { ... } func addRandNum(in <-chan int, out chan<- int) { ... } func printRes(in <-chan int){ ... }
下面是使用buffered channel實現異步處理請求的示例。
在此示例中:
如下是代碼部分:
package main import ( "fmt" "math/rand" "sync" "time" ) type Task struct { ID int JobID int Status string CreateTime time.Time } func (t *Task) run() { sleep := rand.Intn(1000) time.Sleep(time.Duration(sleep) * time.Millisecond) t.Status = "Completed" } var wg sync.WaitGroup // worker的數量,即便用多少goroutine執行任務 const workerNum = 3 func main() { wg.Add(workerNum) // 建立容量爲10的buffered channel taskQueue := make(chan *Task, 10) // 激活goroutine,執行任務 for workID := 0; workID <= workerNum; workID++ { go worker(taskQueue, workID) } // 將待執行任務放進buffered channel,共15個任務 for i := 1; i <= 15; i++ { taskQueue <- &Task{ ID: i, JobID: 100 + i, CreateTime: time.Now(), } } close(taskQueue) wg.Wait() } // 從buffered channel中讀取任務,並執行任務 func worker(in <-chan *Task, workID int) { defer wg.Done() for v := range in { fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID) v.run() fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID) } }
不少時候想要同時操做多個channel,好比從ch一、ch2讀數據。Go提供了一個select語句塊,它像switch同樣工做,裏面放一些case語句塊,用來輪詢每一個case語句塊的send或recv狀況。
select
用法格式示例:
select { // ch1有數據時,讀取到v1變量中 case v1 := <-ch1: ... // ch2有數據時,讀取到v2變量中 case v2 := <-ch2: ... // 全部case都不知足條件時,執行default default: ... }
defalut語句是可選的,不容許fall through行爲,但容許case語句塊爲空塊。select會被return、break關鍵字中斷:return是退出整個函數,break是退出當前select。
select的行爲模式主要是對channel是否可讀進行輪詢,但也能夠用來向channel發送數據。它的行爲以下:
若是有所疑惑,後文的"select超時時間"有更有助於理解select的說明和示例。
全部的case塊都是按源代碼書寫順序進行評估的。當select未在循環中時,它將只對全部case評估一次,此次結束後就結束select。某次評估過程當中若是有知足條件的case,則全部其它case都直接結束評估,並退出這次select。
其實若是注意到select語句是在某一個goroutine中評估的,就不難理解只有全部case都不知足條件時,select所在goroutine纔會被阻塞,只要有一個case知足條件,本次select就不會出現阻塞的狀況。
須要注意的是,若是在select中執行send操做,則可能會永遠被send阻塞。因此,在使用send的時候,應該也使用defalut語句塊,保證send不會被阻塞。若是沒有default,或者能確保select不阻塞的語句塊,則早晚會被send阻塞。在後文有一個select中send永久阻塞的分析:雙層channel的一個示例。
通常來講,select會放在一個無限循環語句中,一直輪詢channel的可讀事件。
下面是一個示例,pump1()和pump2()都用於產生數據(一個產生偶數,一個產生奇數),並將數據分別放進ch1和ch2兩個通道,suck()則從ch1和ch2中讀取數據。而後在無限循環中使用select輪詢這兩個通道是否可讀,最後main goroutine在1秒後強制中斷全部goroutine。
package main import ( "fmt" "time" ) func main() { ch1 := make(chan int) ch2 := make(chan int) go pump1(ch1) go pump2(ch2) go suck(ch1, ch2) time.Sleep(1e9) } func pump1(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 0 { ch <- i } } } func pump2(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 1 { ch <- i } } } func suck(ch1 chan int, ch2 chan int) { for { select { case v := <-ch1: fmt.Printf("Recv on ch1: %d\n", v) case v := <-ch2: fmt.Printf("Recv on ch2: %d\n", v) } } }