通道(channel)設計模式
單純地將函數併發執行是沒有意義的。函數與函數間須要交換數據才能體現併發執行函數的意義。雖然可使用共享內存進行數據交換,可是共享內存在不一樣的goroutine中容易發生競態問題。爲了保證數據交換的正確性,必須使用互斥量對內存進行加鎖,這種作法勢必形成性能問題。bash
Go語言提倡使用通訊的方法代替共享內存,這裏通訊的方法就是使用通道(channel),如圖1-1所示所示。服務器
圖1-1 goroutine與channel的通訊網絡
通道的特性併發
Go 語言中的通道(channel)是一種特殊的類型。在任什麼時候候,同時只能有一個 goroutine 訪問通道進行發送和獲取數據。goroutine 間經過通道就能夠通訊。通道像一個傳送帶或者隊列,老是遵循先入先出(First In First Out)的規則,保證收發數據的順序。異步
聲明通道類型socket
通道自己須要一個類型進行修飾,就像切片類型須要標識元素類型。通道的元素類型就是在其內部傳輸的數據類型,聲明以下:函數
var 通道變量 chan 通道類型
chan 類型的空值是 nil,聲明後須要配合 make 後才能使用。性能
建立通道ui
通道是引用類型,須要使用 make 進行建立,格式以下:
通道實例 := make(chan 數據類型)
例如:
ch1 := make(chan int) // 建立一個整型類型的通道 ch2 := make(chan interface{}) // 建立一個空接口類型的通道, 能夠存聽任意格式 type Equip struct{ /* 一些字段 */ } ch2 := make(chan *Equip) // 建立Equip指針類型的通道, 能夠存放*Equip
使用通道發送數據
通道建立後,就可使用通道進行發送和接收操做。
1.通道發送數據的格式
通道的發送使用特殊的操做符「<-」,將數據經過通道發送的格式爲:通道變量 <- 值。
2.經過通道發送數據的例子
使用 make 建立一個通道後,就可使用<-向通道發送數據,代碼以下:
// 建立一個空接口通道 ch := make(chan interface{}) // 將0放入通道中 ch <- 0 // 將hello字符串放入通道中 ch <- "hello"
3.發送將持續阻塞直到數據被接收
把數據往通道中發送時,若是接收方一直都沒有接收,那麼發送操做將持續阻塞。Go 程序運行時能智能地發現一些永遠沒法發送成功的語句並作出提示,代碼以下:
package main func main() { // 建立一個整型通道 ch := make(chan int) // 嘗試將0經過通道發送 ch <- 0 }
運行代碼,報錯:
fatal error: all goroutines are asleep - deadlock!
報錯的意思是:運行時發現全部的goroutine(包括main)都處於等待goroutine。也就是說所goroutine中的channel並無造成發送和接收對應的代碼。
使用通道接收數據
通道接收一樣使用<-操做符,通道接收有以下特性:
通道的數據接收一共有如下 4 種寫法。
1.阻塞接收數據
阻塞模式接收數據時,將接收變量做爲<-操做符的左值,格式以下:
data := <-ch
執行該語句時將會阻塞,直到接收到數據並賦值給 data 變量。
2.非阻塞接收數據
使用非阻塞方式從通道接收數據時,語句不會發生阻塞,格式以下:
data, ok := <-ch
data:表示接收到的數據。未接收到數據時,data 爲通道類型的零值。
ok:表示是否接收到數據。
非阻塞的通道接收方法可能形成高的 CPU 佔用,所以使用很是少。若是須要實現接收超時檢測,能夠配合 select 和計時器 channel 進行,後面還會再介紹。
3.接收任意數據,忽略接收的數據
阻塞接收數據後,忽略從通道返回的數據,格式以下:
<-ch
執行該語句時將會發生阻塞,直到接收到數據,但接收到的數據會被忽略。這個方式實際上只是經過通道在 goroutine 間阻塞收發實現併發同步。
使用通道作併發同步的寫法,能夠參考下面的例子:
package main import ( "fmt" ) func main() { // 構建一個通道 ch := make(chan int) // 開啓一個併發匿名函數 go func() { fmt.Println("start goroutine") // 經過通道通知main的goroutine ch <- 0 fmt.Println("exit goroutine") }() fmt.Println("wait goroutine") // 等待匿名goroutine <-ch fmt.Println("all done") }
代碼說明以下:
執行代碼,輸出以下:
wait goroutine start goroutine exit goroutine all done
4.循環接收
通道的數據接收能夠借用for range語句進行多個元素的接收操做,格式以下:
for data := range ch { }
通道ch 是能夠進行遍歷的,遍歷的結果就是接收到的數據。數據類型就是通道的數據類型。經過for遍歷得到的變量只有一個,即上面例子中的data。
遍歷通道數據的例子請參考下面的代碼。
使用 for 從通道中接收數據:
package main import ( "fmt" "time" ) func main() { // 構建一個通道 ch := make(chan int) // 開啓一個併發匿名函數 go func() { // 從3循環到0 for i := 3; i >= 0; i-- { // 發送3到0之間的數值 ch <- i // 每次發送完時等待 time.Sleep(time.Second) } }() // 遍歷接收通道數據 for data := range ch { // 打印通道數據 fmt.Println(data) // 當遇到數據0時, 退出接收循環 if data == 0 { break } } }
代碼說明以下:
執行代碼,輸出以下:
3 2 1 0
併發打印
上面的例子建立的都是無緩衝通道。使用無緩衝通道往裏面裝入數據時,裝入方將被阻塞,直到另外通道在另一個goroutine中被取出。一樣,若是通道中沒有放入任何數據,接收方試圖從通道中獲取數據時,一樣也是阻塞。發送和接收的操做是同步完成的。
下面經過一個併發打印的例子,將goroutine和channel放在一塊兒展現它們的用法。
package main import ( "fmt" ) func printer(c chan int) { // 開始無限循環等待數據 for { // 從channel中獲取一個數據 data := <-c // 將0視爲數據結束 if data == 0 { break } // 打印數據 fmt.Println(data) } // 通知main已經結束循環(我搞定了!) c <- 0 } func main() { // 建立一個channel c := make(chan int) // 併發執行printer, 傳入channel go printer(c) for i := 1; i <= 10; i++ { // 將數據經過channel投送給printer c <- i } // 通知併發的printer結束循環(沒數據啦!) c <- 0 // 等待printer結束(搞定喊我!) <-c }
代碼說明以下:
代碼說明以下:
1 2 3 4 5 6 7 8 9 10
本例的設計模式就是典型的生產者和消費者。生產者是第37行的循環,而消費者是printer()函數。整個例子使用了兩個goroutine,一個是main(),一個是經過第35行printer()函數建立的goroutine。兩個goroutine經過第32行建立的通道進行通訊。這個通道有下面兩重功能。
數據傳送:第40行中發送數據和第13行接收數據。
控制指令:相似於信號量的功能。同步goroutine的操做。功能簡單描述爲:
單向通道
Go的通道能夠在聲明時約束其操做方向,如只發送或只接收。這種被約束方向的通道被稱做單向通道。
1.單向通道的聲明格式
只能發送的通道類型爲chan<-,只能接收的通道類型爲<-chan,格式以下:
var 通道實例 chan<- 元素類型 // 只能發送通道 var 通道實例 <-chan 元素類型 // 只能接收通道
2.單向通道的使用例子
示例代碼以下:
ch := make(chan int) // 聲明一個只能發送的通道類型, 並賦值爲ch var chSendOnly chan<- int = ch //聲明一個只能接收的通道類型, 並賦值爲ch var chRecvOnly <-chan int = ch
上面的例子中,chSendOnly只能發送數據,若是嘗試接收數據,將會出現以下報錯:
invalid operation: <-chSendOnly (receive from send-only type chan<- int)
同理,chRecvOnly也是不能發送的。固然,使用make建立通道時,也能夠建立一個只發送或只讀取的通道:
ch := make(<-chan int) var chReadOnly <-chan int = ch <-chReadOnly
上面代碼編譯正常,運行也是正確的。可是,一個不能填充數據(發送)只能讀取的通道是毫無心義的。
time包中的單向通道
time包中的計時器會返回一個timer實例,代碼以下:
timer := time.NewTimer(time.Second)
timer的Timer類型定義以下:
type Timer struct { C <-chan Time r runtimeTimer }
第2行中C通道的類型就是一種只能接收的單向通道。若是此處不進行通道方向約束,一旦外部向通道發送數據,將會形成其餘使用到計時器的地方邏輯產生混亂。所以,單向通道有利於代碼接口的嚴謹性。
Go語言帶緩衝的通道
在無緩衝通道的基礎上,爲通道增長一個有限大小的存儲空間造成帶緩衝通道。帶緩衝通道在發送時無需等待接收方接收便可完成發送過程,而且不會發生阻塞,只有當存儲空間滿時纔會發生阻塞。同理,若是緩衝通道中有數據,接收時將不會發生阻塞,直到通道中沒有數據可讀時,通道將會再度阻塞。
無緩衝通道保證收發過程同步。無緩衝收發過程相似於快遞員給你電話讓你下樓取快遞,整個遞交快遞的過程是同步發生的,你和快遞員不見不散。但這樣作快遞員就必須等待全部人下樓完成操做後才能完成全部投遞工做。若是快遞員將快遞放入快遞櫃中,並通知用戶來取,快遞員和用戶就成了異步收發過程,效率能夠有明顯的提高。帶緩衝的通道就是這樣的一個「快遞櫃」。
1.建立帶緩衝通道
如何建立帶緩衝的通道呢?參見以下代碼:
通道實例 := make(chan 通道類型, 緩衝大小)
下面經過一個例子中來理解帶緩衝通道的用法,參見下面的代碼:
package main import "fmt" func main() { // 建立一個3個元素緩衝大小的整型通道 ch := make(chan int, 3) // 查看當前通道的大小 fmt.Println(len(ch)) // 發送3個整型元素到通道 ch <- 1 ch <- 2 ch <- 3 // 查看當前通道的大小 fmt.Println(len(ch)) }
代碼說明以下:
代碼輸出以下:
0 3
2.阻塞條件
帶緩衝通道在不少特性上和無緩衝通道是相似的。無緩衝通道能夠看做是長度永遠爲0的帶緩衝通道。所以根據這個特性,帶緩衝通道在下面列舉的狀況下依然會發生阻塞:
爲何Go語言對通道要限制長度而不提供無限長度的通道?
咱們知道通道(channel)是在兩個goroutine間通訊的橋樑。使用goroutine的代碼必然有一方提供數據,一方消費數據。當提供數據一方的數據供給速度大於消費方的數據處理速度時,若是通道不限制長度,那麼內存將不斷膨脹直到應用崩潰。所以,限制通道的長度有利於約束數據提供方的供給速度,供給數據量必須在消費方處理量+通道長度的範圍內,才能正常地處理數據。
Go語言通道的多路複用
多路複用是通訊和網絡中的一個專業術語。多路複用一般表示在一個信道上傳輸多路信號或數據流的過程和技術。
提示:報話機同一時刻只能有一邊進行收或者發的單邊通訊,報話機須要遵照的通訊流程以下:
電話能夠在說話的同時聽到對方說話,因此電話是一種多路複用的設備,一條通訊線路上能夠同時接收或者發送數據。一樣的,網線、光纖也都是基於多路複用模式來設計的,網線、光纖不只可支持同時收發數據,還支持多我的同時收發數據。
在使用通道時,想同時接收多個通道的數據是一件困難的事情。通道在接收數據時,若是沒有數據能夠接收將會發生阻塞。雖然可使用以下模式進行遍歷,但運行性能會很是差。
for{ // 嘗試接收ch1通道 data, ok := <-ch1 // 嘗試接收ch2通道 data, ok := <-ch2 // 接收後續通道 … }
Go語言中提供了select關鍵字,能夠同時響應多個通道的操做。select的每一個case都會對應一個通道的收發過程。當收發完成時,就會觸發case中響應的語句。多個操做在每次select中挑選一個進行響應。格式以下:
select{ case 操做1: 響應操做1 case 操做2: 響應操做2 … default: 沒有操做狀況 }
操做一、操做2:包含通道收發語句,請參考表1-1:
操做 | 語句示例 |
接收任意數據 | case <-ch; |
接收變量 | case d :=<-ch; |
發送數據 | case ch <-100; |
響應操做一、響應操做2:當操做發生時,會執行對應 case 的響應操做。default:當沒有任何操做時,默認執行 default 中的語句。
Go語言RPC
服務器開發中會使用RPC(Remote Procedure Call,遠程過程調用)簡化進程間通訊的過程。RPC 能有效地封裝通訊過程,讓遠程的數據收發通訊過程看起來就像本地的函數調用同樣。
本例中,使用通道代替socket實現RPC的過程。客戶端與服務器運行在同一個進程,服務器和客戶端在兩個goroutine中運行。
1.客戶端請求和接收封裝
下面的代碼封裝了向服務器請求數據,等待服務器返回數據,若是請求方超時,該函數還會處理超時邏輯。
// 模擬RPC客戶端的請求和接收消息封裝 func RPCClient(ch chan string, req string) (string, error) { // 向服務器發送請求 ch <- req // 等待服務器返回 select { case ack := <-ch: // 接收到服務器返回數據 return ack, nil case <-time.After(time.Second): // 超時 return "", errors.New("Time out") } }
代碼說明以下:
RPCClient()函數中,執行到select語句時,第9行和第11行的通道操做會同時開啓。若是第9行的通道先返回,則執行第10行邏輯,表示正常接收到服務器數據;若是第11行的通道先返回,則執行第12行的邏輯,表示請求超時,返回錯誤。
2.服務器接收和反饋數據
服務器接收到客戶端的任意數據後,先打印再經過通道返回給客戶端一個固定字符串,表示服務器已經收到請求。
// 模擬RPC服務器端接收客戶端請求和迴應 func RPCServer(ch chan string) { for { // 接收客戶端請求 data := <-ch // 打印接收到的數據 fmt.Println("server received:", data) //向客戶端反饋已收到 ch <- "roger" } }
代碼說明以下:
運行整個程序,客戶端能夠正確收到服務器返回的數據,客戶端RPCClient()函數的代碼按下面代碼中第三行分支執行。
// 等待服務器返回 select { case ack := <-ch: // 接收到服務器返回數據 return ack, nil case <-time.After(time.Second): // 超時 return "", errors.New("Time out") }
程序輸出以下:
server received: hi client received roger
3.模擬超時
上面的例子雖然有客戶端超時處理,可是永遠不會觸發,由於服務器的處理速度很快,也沒有真正的網絡延時或者「服務器宕機」的狀況。所以,爲了展現select中超時的處理,在服務器邏輯中增長一條語句,故意讓服務器延時處理一段時間,形成客戶端請求超時,代碼以下:
// 模擬RPC服務器端接收客戶端請求和迴應 func RPCServer(ch chan string) { for { // 接收客戶端請求 data := <-ch // 打印接收到的數據 fmt.Println("server received:", data) // 經過睡眠函數讓程序執行阻塞2秒的任務 time.Sleep(time.Second * 2) // 反饋給客戶端收到 ch <- "roger" } }
第11行中,time.Sleep()函數會讓goroutine執行暫停2秒。使用這種方法模擬服務器延時,形成客戶端超時。客戶端處理超時1秒時通道就會返回:
// 等待服務器返回 select { case ack := <-ch: // 接收到服務器返回數據 return ack, nil case <-time.After(time.Second): // 超時 return "", errors.New("Time out") }
4.主流程
主流程中會建立一個無緩衝的字符串格式通道。將通道傳給服務器的RPCServer()函數,這個函數併發執行。使用RPCClient()函數經過ch對服務器發出RPC請求,同時接收服務器反饋數據或者等待超時。參考下面代碼:
func main() { // 建立一個無緩衝字符串通道 ch := make(chan string) // 併發執行服務器邏輯 go RPCServer(ch) // 客戶端請求數據和接收數據 recv, err := RPCClient(ch, "hi") if err != nil { // 發生錯誤打印 fmt.Println(err) } else { // 正常接收到數據 fmt.Println("client received", recv) } }
代碼說明以下:
完成代碼:
package main import ( "errors" "fmt" "time" ) // 模擬RPC客戶端的請求和接收消息封裝 func RPCClient(ch chan string, req string) (string, error) { // 向服務器發送請求 ch <- req // 等待服務器返回 select { case ack := <-ch: // 接收到服務器返回數據 return ack, nil case <-time.After(time.Second): // 超時 return "", errors.New("Time out") } } // 模擬RPC服務器端接收客戶端請求和迴應 func RPCServer(ch chan string) { for { // 接收客戶端請求 data := <-ch // 打印接收到的數據 fmt.Println("server received:", data) // 反饋給客戶端收到 ch <- "roger" } } func main() { // 建立一個無緩衝字符串通道 ch := make(chan string) // 併發執行服務器邏輯 go RPCServer(ch) // 客戶端請求數據和接收數據 recv, err := RPCClient(ch, "hi") if err != nil { // 發生錯誤打印 fmt.Println(err) } else { // 正常接收到數據 fmt.Println("client received", recv) } }
使用通道響應計時器的事件
Go語言中的time包提供了計時器的封裝。因爲Go語言中的通道和goroutine的設計,定時任務能夠在goroutine中經過同步的方式完成,也能夠經過在goroutine中異步回調完成。這裏將分兩種用法進行例子展現。
1.一段時間以後(time.After)
package main import ( "fmt" "time" ) func main() { // 聲明一個退出用的通道 exit := make(chan int) // 打印開始 fmt.Println("start") // 過1秒後, 調用匿名函數 time.AfterFunc(time.Second, func() { // 1秒後, 打印結果 fmt.Println("one second after") // 通知main()的goroutine已經結束 exit <- 0 }) // 等待結束 <-exit }
代碼說明以下:
time.AfterFunc()函數是在time.After基礎上增長了到時的回調,方便使用。而time.After()函數又是在time.NewTimer()函數上進行的封裝,下面的例子展現如何使用timer.NewTimer()和time.NewTicker()。
2.定點計時
計時器(Timer)的原理和倒計時鬧鐘相似,都是給定多少時間後觸發。打點器(Ticker)的原理和鐘錶相似,鐘錶每到整點就會觸發。這兩種方法建立後會返回time.Ticker對象和time.Timer對象,裏面經過一個C成員,類型是隻能接收的時間通道(<-chanTime),使用這個通道就能夠得到時間觸發的通知。
下面代碼建立一個打點器,每500毫秒觸發一塊兒;建立一個計時器,2秒後觸發,只觸發一次。
package main import ( "fmt" "time" ) func main() { // 建立一個打點器, 每500毫秒觸發一次 ticker := time.NewTicker(time.Millisecond * 500) // 建立一個計時器, 2秒後觸發 stopper := time.NewTimer(time.Second * 2) // 聲明計數變量 var i int // 不斷地檢查通道狀況 for { // 多路複用通道 select { case <-stopper.C: // 計時器到時了 fmt.Println("stop") // 跳出循環 goto StopHere case <-ticker.C: // 打點器觸發了 // 記錄觸發了多少次 i++ fmt.Println("tick", i) } } // 退出的標籤, 使用goto跳轉 StopHere: fmt.Println("done") }
代碼說明以下:
關閉通道後繼續使用通道
通道是一個引用對象,和map相似。map在沒有任何外部引用時,Go程序在運行時(runtime)會自動對內存進行垃圾回收(GarbageCollection,GC)。相似的,通道也能夠被垃圾回收,可是通道也能夠被主動關閉。
1.格式
使用 close() 來關閉一個通道:
close(ch)
關閉的通道依然能夠被訪問,訪問被關閉的通道將會發生一些問題。
2.給被關閉通道發送數據將會觸發panic
被關閉的通道不會被置爲 nil。若是嘗試對已經關閉的通道進行發送,將會觸發宕機,代碼以下:
package main import "fmt" func main() { // 建立一個整型的通道 ch := make(chan int) // 關閉通道 close(ch) // 打印通道的指針, 容量和長度 fmt.Printf("ptr:%p cap:%d len:%d\n", ch, cap(ch), len(ch)) // 給關閉的通道發送數據 ch <- 1 }
代碼說明以下:
代碼運行後觸發宕機:
ptr:0xc042052060 cap:0 len:0 panic: send on closed channel
提示觸發宕機的緣由是給一個已經關閉的通道發送數據。
3.從已關閉的通道接收數據時將不會發生阻塞
從已經關閉的通道接收數據或者正在接收數據時,將會接收到通道類型的零值,而後中止阻塞並返回。
操做關閉後的通道:
package main import "fmt" func main() { // 建立一個整型帶兩個緩衝的通道 ch := make(chan int, 2) // 給通道放入兩個數據 ch <- 0 ch <- 1 // 關閉緩衝 close(ch) // 遍歷緩衝全部數據, 且多遍歷1個 for i := 0; i < cap(ch)+1; i++ { // 從通道中取出數據 v, ok := <-ch // 打印取出數據的狀態 fmt.Println(v, ok) } }
代碼說明以下:
代碼運行結果以下:
0 true 1 true 0 false
運行結果前兩行正確輸出帶緩衝通道的數據,代表緩衝通道在關閉後依然能夠訪問內部的數據。
運行結果第三行的「0false」表示通道在關閉狀態下取出的值。0表示這個通道的默認值,false表示沒有獲取成功,由於此時通道已經空了。咱們發現,在通道關閉後,即使通道沒有數據,在獲取時也不會發生阻塞,但此時取出數據會失敗。