channel 是 Go 語言中的一個核心類型,能夠把它當作管道。併發核心單元經過它就能夠發送或者接收數據進行通信,這在必定程度上又進一步下降了編程的難度。git
channel 是一個數據類型,主要用來解決 go 程的同步問題以及 go 程之間數據共享(數據傳遞)的問題。編程
goroutine 運行在相同的地址空間,所以訪問共享內存必須作好同步。goroutine 奉行經過通訊來共享內存,而不是共享內存來通訊。緩存
引⽤類型 channel 可用於多個 goroutine 通信。其內部實現了同步,確保併發安全(經過 CSP)。安全
強調一下:數據結構
channel 是一個數據類型,對應一個「管道(通道)」。併發
和 map 相似,channel 也是一個對應 make
建立的底層數據結構的引用。異步
既然是引用, 那麼咱們在傳參的時候就能完成在 A 函數棧幀內修改 B 函數棧幀數據的目的. 說白了就是傳的地址.函數
當咱們複製一個 channel 或用於函數參數傳遞時,咱們只是拷貝了一個 channel 引用,所以調用者和被調用者將引用同一個 channel 對象。 和其它的引用類型同樣,channel 的零值也是 nil。高併發
定義一個 channel 時,也須要定義發送到 channel 的值的類型。channel 可使用內置的 make()
函數來建立:網站
make(chan Type) // 等價於 make(chan Type, 0) make(chan Type, capacity)
當參數 capacity = 0
時,channel 是無緩衝阻塞讀寫的;當 capacity > 0
時,channel 有緩衝、是非阻塞的,直到寫滿 capacity 個元素才阻塞寫入。
channel 很是像生活中的管道,一邊能夠存放東西,另外一邊能夠取出東西。channel 經過操做符 <-
來接收和發送數據,發送和接收數據語法:
channel <- value // 發送 value 到 channel <- channel // 接收並將其丟棄 x := <- channel // 從 channel 中接收數據, 並賦值給 x x, ok := <- channel // 功能同上, 同時檢查通道是否已關閉或者是否爲空
默認狀況下,channel 接收和發送數據都是阻塞的,除非另外一端已經準備好,這樣就使得 goroutine 同步變的更加的簡單,而不須要顯式的 lock。
咱們先看一下沒有用 channel 的例子:
package main import ( "fmt" "time" ) // 定義一個打印機 func printer(s string) { for _, value := range s { fmt.Printf("%c", value) time.Sleep(time.Millisecond * 300) } } /* 定義兩我的使用打印機 */ func person1() { printer("hello") } func person2() { printer("world") } func main() { go person1() go person2() time.Sleep(time.Second * 5) // 注意,只寫上面兩行會直接運行完畢,想想 go 程的特性 }
結果:
hwoelrllod
那麼,怎麼用 channel 實現來保證順序輸出呢?
由於,person1 與 person2 都須要用一個 channel,因此要在全局定義一個 channel。具體代碼以下:
PS:你要傳的什麼類型數據與 channel 中定義的類型沒有必然的聯繫。
package main import ( "fmt" "time" ) // 全局定義一個 channel,用來完成數據同步 var ch = make(chan int) // 傳的什麼類型數據與 channel 中定義的類型沒有必然的聯繫 // 定義一個打印機 func printer(s string) { for _, value := range s { fmt.Printf("%c", value) time.Sleep(time.Millisecond * 300) } } /* 定義兩我的使用打印機 */ func person1() { printer("hello") ch <- 777 } func person2() { <-ch printer("world") } func main() { go person1() go person2() time.Sleep(time.Second * 3) // 注意,只寫上面兩行會直接運行完畢,想想 go 程的特性 }
這個時候,當運行 person2
函數時,會阻塞在 <-ch
處,運行 person1
函數時,打印完 「hello」,會在 ch <- 777
處阻塞。
可是這時,ch <- 777
對應這寫端已經準備好了,同時 <-ch
對應讀端也已經準備好了,因此代碼就會繼續執行,接下來就會打印 「world」。
咱們再來看一段代碼:
package main import "fmt" func main() { c := make(chan int) go func() { defer fmt.Println("子 go 程結束") fmt.Println("子 go 程正在運行 ...") c <- 666 /// 把 666 發送到 c }() num := <-c // 從 c 中接收數據, 並賦值給 num fmt.Println("num = ", num) fmt.Println("main go 程結束") }
運行結果:
子 go 程正在運行 ... 子 go 程結束 num = 666 main go 程結束
以上咱們都是用 channel 用來作數據同步,並無用到 channel 中的數據,下面咱們看一個用 channel 完成數據傳遞的例子:
package main import "fmt" func main() { ch := make(chan string) // len(ch): channel 中剩餘未讀取的數據個數; cap(ch): channel 的容量 fmt.Println("len(ch) = ", len(ch), "cap(ch) = ", cap(ch)) go func() { for i := 0; i < 2; i++ { fmt.Println("i = ", i) } ch <- "子 go 程打印完畢" }() str := <-ch fmt.Println(str) }
注意:len(ch): channel 中剩餘未讀取的數據個數; cap(ch): channel 的容量
運行結果:
len(ch) = 0 cap(ch) = 0 i = 0 i = 1 子 go 程打印完畢
強調一下:
channel 有兩個端:
要求:讀端和寫端必須同時知足條件(讀端有數據可讀,寫端有數據可寫),才能在 channel 中完成數據流動。不然,阻塞。
【補充知識點】
每當有一個進程啓動時,系統會自動打開三個文件:標準輸入、標準輸出、標準錯誤,對應三個文件:stdin、stdout、stderr。
當進程運行結束時,系統會自動關閉這三個文件。
無緩衝的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。
這種類型的通道要求發送 goroutine 和接收 goroutine 同時準備好,才能完成發送和接收操做。不然,通道會致使先執行發送或接收操做的 goroutine 阻塞等待。
這種對通道進行發送和接收的交互行爲自己就是同步的。其中任意一個操做都沒法離開另外一個操做單獨存在。
阻塞:因爲某種緣由數據沒有到達,當前協程(線程)持續處於等待狀態,直到條件知足,才接觸阻塞。
同步:在兩個或多個協程(線程)間,保持數據內容一致性的機制。
下圖展現兩個 goroutine 如何利用無緩衝的通道來共享一個值:
簡單說明:
無緩衝的 channel 建立格式:
make(chan Type) // 等價於 make(chan Type, 0)
若是沒有指定緩衝區容量,那麼該通道就是同步的,所以會阻塞到發送者準備好發送和接收者準備好接收。
例如:
package main import ( "fmt" "time" ) func main() { // 建立無緩衝的 channel ch := make(chan int, 0) go func() { defer fmt.Println("子 go 程結束") for i := 0; i < 3; i++ { fmt.Println("子 go 程正在運行, i = ", i) ch <- i } }() time.Sleep(time.Second) // 延時一秒 for i := 0; i < 3; i++ { // 從 ch 中接收數據, 並賦值給 num num := <-ch fmt.Println("num = ", num) } fmt.Println("main go程結束") }
運行結果:
子 go 程正在運行, i = 0 num = 0 子 go 程正在運行, i = 1 子 go 程正在運行, i = 2 num = 1 num = 2 main go程結束
強調一下:
無緩衝 channel 的容量爲0。
channel 至少應用於兩個 go 程中:一個讀、另外一個寫。
具有同步能力。讀、寫同步。(好比 打電話)
有緩衝的通道(buffered channel)是一種在被接收前能存儲一個或者多個數據值的通道。
這種類型的通道並不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動做的條件也不一樣。
只有通道中沒有要接收的值時,接收動做纔會阻塞。
只有通道沒有可用緩衝區容納被髮送的值時,發送動做纔會阻塞。
這致使有緩衝的通道和無緩衝的通道之間的一個很大的不一樣:無緩衝的通道保證進行發送和接收的 goroutine 會在同一時間進行數據交換;有緩衝的通道沒有這種保證。
使用有緩衝channel在goroutine之間同步的示例圖:
有緩衝的 channel 建立格式:
make(chan Type, capacity)
若是給定了一個緩衝區容量,通道就是異步的。只要緩衝區有未使用空間用於發送數據,或還包含能夠接收的數據,那麼其通訊就會無阻塞地進行。
請看如下代碼:
package main import ( "fmt" "time" ) func main() { // 建立一個有緩衝的 channel ch := make(chan int, 3) // 存滿 3 個元素以前不會阻塞 // 查看一下 channel 的未被讀取的緩衝元素數量以及 channel 容量 fmt.Printf("len(ch) = %d, cap(ch) = %d\n", len(ch), cap(ch)) go func() { defer fmt.Println("子 go 程結束") for i := 0; i < 5; i++ { ch <- i fmt.Println("子 go 程正在運行, i = ", i) } }() time.Sleep(time.Second) for i := 0; i < 5; i++ { num := <-ch fmt.Println("num = ", num) } fmt.Println("main go 程結束") }
運行結果:
len(ch) = 0, cap(ch) = 3 子 go 程正在運行, i = 0 子 go 程正在運行, i = 1 子 go 程正在運行, i = 2 num = 0 num = 1 num = 2 num = 3 子 go 程正在運行, i = 3 子 go 程正在運行, i = 4 子 go 程結束 num = 4 main go 程結束
強調一下:
有緩衝 channel 的容量大於 0。
channel 應用於兩個 go 程中:一個讀、另外一個寫。
緩衝區能夠進行數據存儲,存儲至容量上限才阻塞。
具有異步的能力,不須要同時操做 channel 緩衝區。(好比發短信)
若是發送者知道,沒有更多的值須要發送到 channel 的話,那麼讓接收者也能及時知道沒有多餘的值可接收將是有用的,由於接收者能夠中止沒必要要的接收等待。
這能夠經過內置的 close
函數來關閉 channel 實現。當咱們肯定再也不向對端發送、接收數據時,咱們能夠關閉 channel。(通常關閉發送端)
對端能夠判斷 channel 是否關閉:
if num, ok := <- ch; ok { // 對端沒有關閉,num 保存讀到的數據 } else { // 對端已經關閉,num 保存對應類型的零值 }
例如:
package main import "fmt" func main() { ch := make(chan int) go func() { for i := 0; i < 5; i++ { ch <- i } // 若是沒有 close(ch), 那麼當程序打印完 0 1 2 3 4 時, 會由於沒有寫端 channel 形成死鎖 close(ch) // 寫端,寫完數據主動關閉 channel }() // 從 channel 中讀取數據,可是不知道讀多少次,咱們能夠判斷當 channel 關閉時意味着讀取數據完畢 for true { // ok 爲 true說明 channel 沒有關閉, 爲 false 說明 channel 已經關閉 if data, ok := <-ch; ok { fmt.Println("寫端沒有關閉,data = ", data) } else { fmt.Println("寫端關閉,data = ", data) break } } fmt.Println("結束.") }
運行結果:
寫端沒有關閉,data = 0 寫端沒有關閉,data = 1 寫端沒有關閉,data = 2 寫端沒有關閉,data = 3 寫端沒有關閉,data = 4 寫端關閉,data = 0 結束.
咱們也能夠用 for range
獲取 channel 中的數據:
package main import ( "fmt" "time" ) func main() { ch := make(chan int, 5) go func() { for i := 0; i < 5; i++ { ch <- i } // 若是沒有 close(ch), 那麼當程序打印完 0 1 2 3 4 時, 會由於沒有寫端 channel 形成死鎖 close(ch) // 寫端,寫完數據主動關閉 channel fmt.Println("子 go 程結束") }() time.Sleep(time.Second) // 使用 for range 循環讀取 channel 的數據,注意這裏前面只接收一個變量 for num := range ch { fmt.Println(num) } fmt.Println("結束.") }
運行結果:
子 go 程結束 0 1 2 3 4 結束.
強調一下:
for num := range ch{} // 注意形式,不是 <-ch
默認狀況下,通道 channel 是雙向的,也就是,既能夠往裏面發送數據也能夠同裏面接收數據。
可是,咱們常常見一個通道做爲參數進行傳遞而只但願對方是單向使用的,要麼只讓它發送數據,要麼只讓它接收數據,這時候咱們能夠指定通道的方向。
單向 channel 變量的聲明很是簡單,以下:
var ch1 chan int // ch1 是一個正常的 channel,是雙向的 var ch2 chan<- float64 // ch2 是一個單向 channel,只能用於寫 float64 數據 var ch3 <-chan int // ch3 是一個單向 channel,只能用於讀 int 數據
chan<-
表示數據進入管道,要把數據寫進管道,對於調用者就是輸出。<-chan
表示數據從管道出來,對於調用者就是獲得管道的數據,固然就是輸入。能夠將 channel 隱式轉換爲單向隊列,只收或只發,不能將單向 channel 轉換爲雙向 channel:
ch := make(chan int, 3) var sendCh chan<- int = ch // 只寫 var recvCh <-chan int // 只讀
來看一下單向 channel 的簡單示例(記住了,channel 是傳引用):
package main import "fmt" // 只寫 func send(sendCh chan<- int) { sendCh <- 777 close(sendCh) } // 只讀 func recv(recvCh <-chan int) { num := <-recvCh fmt.Println("num = ", num) } func main() { ch := make(chan int) go send(ch) recv(ch) }
運行結果:
num = 777
單向 channel 最典型的應用是: 生產者消費者模型.
所謂生產者消費者模型: 某個模塊(函數等)負責產生數據, 這些數據由另外一個模塊來負責處理(此處的模塊是廣義的, 能夠是類, 函數, 協程, 線程, 進程等). 產生數據的模塊, 就形象地稱爲生產者; 而處理數據的模塊, 就稱爲消費者.
單單抽象出生產者和消費者, 還夠不上是生產者消費者模型. 該模式還須要有一個緩衝區處於生產者和消費者之間, 做爲一箇中介. 生產者把數據放入緩衝區, 而消費者從緩衝區取出數據. 以下圖所示
能夠這樣理解, 假設你要寄一封信, 大體過程以下:
那麼, 這個緩衝區有什麼用呢? 爲何不讓生產者直接調用消費者的某個函數, 直接把數據傳遞過去, 而去設置一個緩衝區呢?
緩衝區的好處大概以下:
1: 解耦 ( 下降 生產者 和 消費者 之間的耦合度 )
假設生產者和消費者分別是兩個類. 若是讓生產者直接調用消費者的某個方法, 那麼生產者對於消費者就會產生依賴(也就是耦合). 未來若是消費者的代碼發生變化, 可能會直接影響到生產者. 而若是二者都依賴某個緩衝區, 二者之間不直接依賴, 耦合度也就相應下降了.
依然用寄信的例子簡單說一下, 假設生產者就是你, 你負責寫信, 若是沒有郵筒(即緩衝區), 你就須要直接把信給郵遞員(消費者). 可是, 過了幾個月, 郵遞員換人了, 你想要寄信就必須再認識新的郵遞員, 你剛和新的郵遞員熟悉以後, 又換了一個郵遞員, 你又要從新認識... 這就顯得很麻煩, 就是想寄個信而已, 不想認識那麼多郵遞員...
可是若是有郵筒(緩衝區)呢, 不管郵遞員怎麼更換, 這個與你無關, 我依然是把信放入郵筒就能夠了. 這樣一來, 就簡單多了.
2: 提升併發能力 ( 生產者與消費者數量不對等時, 能保持正常通訊 )
生產者直接調用消費者的某個方法, 還有另外一個弊端
因爲函數調用是同步的(或者叫阻塞的), 在消費者的方法沒有返回以前, 生產者只好一直等在那邊. 萬一消費者處理數據很慢, 生產者只能白白浪費時間.
使用了生產者/消費者模式以後, 生產者和消費者能夠是兩個獨立的併發主體.
生產者把製造出來的數據放入緩衝區, 就能夠再去生產下一個數據. 基本上不用依賴消費者的處理速度.
其實最初這個生產者消費者模式, 主要就是用來處理併發問題的.
從寄信的例子來看, 若是沒有郵筒, 你得拿着信傻站在路口等郵遞員過來收(至關於生產者阻塞); 又或者郵遞員得挨家挨戶問, 誰要寄信(至關於消費者輪詢).
3: 緩存 ( 生產者與消費者數據處理速度不一致時, 暫存數據 )
若是生產者製造數據的速度時快時慢, 緩衝區的好處就體現出來了.
當數據製造快的時候, 消費者來不及處理, 未處理的數據能夠暫時存在緩衝區中. 等生產者的製造速度慢下來, 消費者再慢慢處理掉.
再拿寄信的例子舉例, 假設郵遞員一次只能帶走1000封信. 萬一某次碰上情人節送賀卡, 須要寄出的信超過1000封, 這時候郵筒這個緩衝區就派上用場了. 郵遞員把來不及帶走的信暫存在郵筒中, 等下次過來時再拿走.
先來看一下無緩衝的例子
package main import "fmt" // 生產者 func producer(ch chan<- int) { for i := 0; i < 5; i++ { fmt.Println("生產者寫入數據, num = ", i) ch <- i } close(ch) } // 消費者 func consumer(ch <-chan int) { for num := range ch { fmt.Println("消費者拿到數據, num = ", num) } } func main() { // 無緩衝 channel ch := make(chan int) go producer(ch) // 子 go 程,生產者 consumer(ch) // 主 go 程,消費者 }
運行結果:
生產者寫入數據, num = 0 生產者寫入數據, num = 1 消費者拿到數據, num = 0 消費者拿到數據, num = 1 生產者寫入數據, num = 2 生產者寫入數據, num = 3 消費者拿到數據, num = 2 消費者拿到數據, num = 3 生產者寫入數據, num = 4 消費者拿到數據, num = 4
再來看一下有緩衝的例子 二者對比結果
package main import "fmt" // 生產者 func producer(ch chan<- int) { for i := 0; i < 5; i++ { fmt.Println("生產者寫入數據, num = ", i) ch <- i } close(ch) } // 消費者 func consumer(ch <-chan int) { for num := range ch { fmt.Println("消費者拿到數據, num = ", num) } } func main() { // 有緩衝 channel ch := make(chan int, 2) go producer(ch) // 子 go 程,生產者 consumer(ch) // 主 go 程,消費者 }
運行結果:
生產者寫入數據, num = 0 生產者寫入數據, num = 1 生產者寫入數據, num = 2 生產者寫入數據, num = 3 消費者拿到數據, num = 0 消費者拿到數據, num = 1 消費者拿到數據, num = 2 消費者拿到數據, num = 3 生產者寫入數據, num = 4 消費者拿到數據, num = 4
簡單說明
首先建立一個雙向的 channel, 而後開啓一個新的 goroutine, 把雙向通道做爲參數傳遞到 producer 方法中, 同時轉成只寫通道. 子 go 程開始執行循環, 向只寫通道中添加數據, 這就是生產者.
主 go 程直接調用 consumer 方法, 該方法將雙向通道轉成只讀通道, 經過循環每次從通道中讀取數據, 這就是消費者.
注意, channel 做爲參數傳遞, 是引用傳遞.
在實際的開發中, 生產者消費者模式應用也很是的普遍.
例如, 在電商網站中, 訂單處理, 就是很是典型的生產者消費者模式.
當不少用戶單擊下訂單按鈕後, 訂單生產的數據所有放到緩衝區(隊列)中, 而後消費者將隊列中的數據取出來發送至倉庫管理等系統.
經過生產者消費者模式, 將訂單系統與倉庫管理系統隔離開, 且用戶能夠隨時下單(生產數據). 若是訂單系統直接調用倉庫系統, 那麼用戶單擊下訂單按鈕後, 要等到倉庫系統的結果返回, 這樣速度很慢.
接下來咱們就來模擬一下訂單處理的過程.
package main import "fmt" type OrderInfo struct { id int } func producer2(out chan<- OrderInfo) { // 生成訂單 -- 生產者 for i:=0; i < 10; i++ { // 循環生成10個訂單 order := OrderInfo{id: i+1} fmt.Println("生成的訂單ID: ", order.id) out <- order } close(out) // 寫完, 關閉channel } func consumer2(in <-chan OrderInfo) { // 處理訂單 -- 消費者 for order := range in { // 從channel取出訂單 fmt.Println("訂單ID爲: ", order.id) // 模擬處理訂單 } } func main() { ch := make(chan OrderInfo, 5) go producer2(ch) consumer2(ch) }
簡單說明: OrderInfo
爲訂單信息, 這裏爲了簡單隻定義了一個訂單編號屬性, 而後生產者模擬生成10個訂單, 消費者對產生的訂單進行處理.
Timer 是一個定時器. 表明將來的一個單一事件, 你能夠告訴 Timer 你要等待多長時間.
type Timer struct { C <- chan Time r runtimeTimer }
它提供一個channel, 在定時時間到達以前, 沒有數據寫入 Timer.C
會一直阻塞. 直到定時時間到, 系統會自動向 Timer.C
這個channel中寫入當前時間, 阻塞即被解除.
示例代碼:
package main import ( "fmt" "time" ) func main() { fmt.Println("當前時間: ", time.Now()) // 建立定時器, 指定定時時長 myTimer := time.NewTimer(time.Second * 2) // 定時到達後, 系統會自動向定時器的成員 C 寫入系統當前系統時間 //讀取 myTimer.C 獲得定時後的系統時間, 並完成一次chan的讀操做. nowTime := <- myTimer.C fmt.Println("當前時間: ", nowTime) }
1. Sleep time.Sleep(time.Second) 2. Time.C fmt.Println("當前時間: ", time.Now()) myTimer := time.NewTimer(time.Second * 2) nowTime := <- myTimer.C fmt.Println("如今時間: ", nowTime) 3. time.After fmt.Println("當前時間: ", time.Now()) nowTime := <- time.After(time.Second * 2) fmt.Println("如今時間: ", nowTime)
package main import ( "fmt" "time" ) func main(){ myTimer := time.NewTimer(time.Second * 3) // 建立定時器 go func() { <- myTimer.C fmt.Println("子go程, 定時完畢") }() myTimer.Stop() // 設置定時器中止 for { ; } }
死循環只是爲了方便查看結果.
package main import ( "fmt" "time" ) func main() { myTimer := time.NewTimer(time.Second * 10) myTimer.Reset(time.Second * 2) // 重置定時時長爲 2 秒 go func(){ <- myTimer.C fmt.Println("子go程, 定時完畢") }() for { ; } }
Ticker是一個週期觸發定時的計時器, 它會按照一個時間間隔往channel發送系統當前時間, 而channel的接受者能夠以固定的時間間隔從channel中讀取.
type Ticker struct { C <- chan Time r runtimeTimer }
package main import ( "fmt" "time" ) func main() { myTicker := time.NewTicker(time.Second) // 定義一個週期定時器 go func() { for { nowTime := <- myTicker.C fmt.Println("如今時間: ", nowTime) } }() // 死循環, 特意不讓main goroutine結束 for { ; } }
package main import ( "fmt" "time" ) func main(){ quit := make(chan bool) // 建立一個判斷是否終止的channel myTicker := time.NewTicker(time.Second) // 定義一個週期定時器 go func() { i := 0 for { nowTime := <- myTicker.C i++ fmt.Println("如今時間: ", nowTime) if i == 5 { quit <- true // 解除 主go程阻塞 } } }() <- quit // 在子go程循環獲取 <- myTicker.C 期間, 一直阻塞 }
歡迎訪問個人我的網站:
李培冠博客:lpgit.com