Go基礎系列:channel入門

Go channel系列html

channel基礎

channel用於goroutines之間的通訊,讓它們之間能夠進行數據交換。像管道同樣,一個goroutine_A向channel_A中放數據,另外一個goroutine_B從channel_A取數據golang

channel是指針類型的數據類型,經過make來分配內存。例如:bash

ch := make(chan int)

這表示建立一個channel,這個channel中只能保存int類型的數據。也就是說一端只能向此channel中放進int類型的值,另外一端只能今後channel中讀出int類型的值。併發

須要注意,chan TYPE才表示channel的類型。因此其做爲參數或返回值時,需指定爲xxx chan int相似的格式。dom

向ch這個channel放數據的操做形式爲:異步

ch <- VALUE

從ch這個channel讀數據的操做形式爲:函數

<-ch             // 從ch中讀取一個值
val = <-ch
val := <-ch      // 從ch中讀取一個值並保存到val變量中
val,ok = <-ch    // 從ch讀取一個值,判斷是否讀取成功,若是成功則保存到val變量中

其實很簡單,當ch出如今<-的左邊表示send,當ch出如今<-的右邊表示recv。ui

例如:命令行

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go sender(ch)         // sender goroutine
    go recver(ch)         // recver goroutine
    time.Sleep(1e9)
}

func sender(ch chan string) {
    ch <- "malongshuai"
    ch <- "gaoxiaofang"
    ch <- "wugui"
    ch <- "tuner"
}

func recver(ch chan string) {
    var recv string
    for {
        recv = <-ch
        fmt.Println(recv)
    }
}

輸出結果:指針

malongshuai
gaoxiaofang
wugui
tuner

上面激活了一個goroutine用於執行sender()函數,該函數每次向channel ch中發送一個字符串。同時還激活了另外一個goroutine用於執行recver()函數,該函數每次從channel ch中讀取一個字符串。

注意上面的recv = <-ch,當channel中沒有數據可讀時,recver goroutine將會阻塞在此行。因爲recver中讀取channel的操做放在了無限for循環中,表示recver goroutine將一直阻塞,直到從channel ch中讀取到數據,讀取到數據後進入下一輪循環由被阻塞在recv = <-ch上。直到main中的time.Sleep()指定的時間到了,main程序終止,全部的goroutine將所有被強制終止。

由於receiver要不斷從channel中讀取可能存在的數據,因此receiver通常都使用一個無限循環來讀取channel,避免sender發送的數據被丟棄。

channel的屬性和分類

channel的3種操做

每一個channel都有3種操做:send、receive和close

  • send:表示sender端的goroutine向channel中投放數據
  • receive:表示receiver端的goroutine從channel中讀取數據
  • close:表示關閉channel
    • 關閉channel後,send操做將致使painc
    • 關閉channel後,recv操做將返回對應類型的0值以及一個狀態碼false
    • close並不是強制須要使用close(ch)來關閉channel,在某些時候能夠自動被關閉
    • 若是使用close(),建議條件容許的狀況下加上defer
    • 只在sender端上顯式使用close()關閉channel。由於關閉通道意味着沒有數據再須要發送

例如,判斷channel是否被關閉:

val, ok := <-counter
if ok {
    fmt.Println(val)
}

由於關閉通道也會讓recv成功讀取(只不過讀取到的值爲類型的空值),使得本來阻塞在recv操做上的goroutine變得不阻塞,藉此技巧能夠實現goroutine的執行前後順序。具體示例見後文:指定goroutine的執行順序

channel的兩種分類

channel分爲兩種:unbuffered channel和buffered channel

  • unbuffered channel:阻塞、同步模式
    • sender端向channel中send一個數據,而後阻塞,直到receiver端將此數據receive
    • receiver端一直阻塞,直到sender端向channel發送了一個數據
  • buffered channel:非阻塞、異步模式
    • sender端能夠向channel中send多個數據(只要channel容量未滿),容量滿以前不會阻塞
    • receiver端按照隊列的方式(FIFO,先進先出)從buffered channel中按序receive其中數據

能夠認爲阻塞和不阻塞是由channel控制的,不管是send仍是recv操做,都是在向channel發送請求

  • 對於unbuffered channel,sender發送一個數據,channel暫時不會向sender的請求返回ok消息,而是等到receiver準備接收channel數據了,channel纔會向sender和receiver雙方發送ok消息。在sender和receiver接收到ok消息以前,二者一直處於阻塞。
  • 對於buffered channel,sender每發送一個數據,只要channel容量未滿,channel都會向sender的請求直接返回一個ok消息,使得sender不會阻塞,直到channel容量已滿,channel不會向sender返回ok,因而sender被阻塞。對於receiver也同樣,只要channel非空,receiver每次請求channel時,channel都會向其返回ok消息,直到channel爲空,channel不會返回ok消息,receiver被阻塞。

buffered channel的兩個屬性

buffered channel有兩個屬性:容量和長度:和slice的capacity和length的概念是同樣的

  • capacity:表示bufffered channel最多能夠緩衝多少個數據
  • length:表示buffered channel當前已緩衝多少個數據
  • 建立buffered channel的方式爲make(chan TYPE,CAP)

unbuffered channel能夠認爲是容量爲0的buffered channel,因此每發送一個數據就被阻塞。注意,不是容量爲1的buffered channel,由於容量爲1的channel,是在channel中已有一個數據,併發送第二個數據的時候才被阻塞。

換句話說,send被阻塞的時候,實際上是沒有發送成功的,只有被另外一端讀走一個數據以後纔算是send成功。對於unbuffered channel來講,這是send/recv的同步模式。而buffered channel則是在每次發送數據到通道的時候,(通道)都向發送者返回一個消息,容量未滿的時候返回成功的消息,發送者所以而不會阻塞,容量已滿的時候由於已滿而遲遲不返回消息,使得發送者被阻塞

實際上,當向一個channel進行send的時候,先關閉了channel,再讀取channel時會發現錯誤在send,而不是recv。它會提示向已經關閉了的channel發送數據。

func main() {
    counter := make(chan int)
    go func() {
        counter <- 32
    }()
    close(counter)
    fmt.Println(<-counter)
}

輸出報錯:

panic: send on closed channel

因此,在Go的內部行爲中,send和recv是一個總體行爲,數據未讀就表示未send成功

兩種特殊的channel

有兩種特殊的channel:nil channel和channal類型的channel。

當未爲channel分配內存時,channel就是nil channel,例如var ch1 chan int。nil channel會永遠阻塞對該channel的讀、寫操做。

nil channel在某些時候有些妙用,例如在select(關於select,見後文)的某個case分支A將其它某case分支B所操做的channel忽然設置爲nil,這將會禁用case分支B。

當channel的類型爲一個channel時,就是channel的channel,也就是雙層通道。例如:

var chch1 chan chan int

channel的channel是指通道里的數據是通道,能夠認爲通道里面嵌套了一個或多個通道:只能將整個通道發送到外層通道,讀取外層通道時獲取到的是內層通道,而後能夠操做內層通道。

// 發送通道給外層通道
chch1 <-ch1
chch1 <-ch2

// 從外層通道取出內層通道
c <-chch1

// 操做取出的內層通道
c <-123
val := <-c

channel of channel的妙用之一是將外層通道做爲通道的加工廠:在某個goroutine中不斷生成通道,在其它goroutine能夠不斷取出通道來操做。

死鎖(deadlock)

當channel的某一端(sender/receiver)期待另外一端的(receiver/sender)操做,另外一端正好在期待本端的操做時,也就是說兩端都由於對方而使得本身當前處於阻塞狀態,這時將會出現死鎖問題。

更通俗地說,只要全部goroutine都被阻塞,就會出現死鎖

好比,在main函數中,它有一個默認的goroutine,若是在此goroutine中建立一個unbuffered channel,並在main goroutine中向此channel中發送數據並直接receive數據,將會出現死鎖:

package main 

import (
    "fmt"
)

func main (){
    goo(32)
}

func goo(s int) {
    counter := make(chan int)
    counter <- s
    fmt.Println(<-counter)
}

在上面的示例中,向unbuffered channel中send數據的操做counter <- s是在main goroutine中進行的,今後channel中recv的操做<-counter也是在main goroutine中進行的。send的時候會直接阻塞main goroutine,使得recv操做沒法被執行,go將探測到此問題,並報錯:

fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:

要修復此問題,只需將send操做放在另外一個goroutine中執行便可:

package main

import (
    "fmt"
)

func main() {
    goo(32)
}

func goo(s int) {
    counter := make(chan int)
    go func() {
        counter <- s
    }()
    fmt.Println(<-counter)
}

或者,將counter設置爲一個容量爲1的buffered channel:

counter := make(chan int,1)

這樣放完一個數據後send不會阻塞(被recv以前放第二個數據纔會阻塞),能夠執行到recv操做。

unbuffered channel同步通訊示例

下面經過sync.WaitGroup類型來等待程序的結束,分析多個goroutine之間通訊時狀態的轉換。由於建立的channel是unbuffered類型的,因此send和recv都是阻塞的。

package main

import (
    "fmt"
    "sync"
)

// wg用於等待程序執行完成
var wg sync.WaitGroup

func main() {
    count := make(chan int)

    // 增長兩個待等待的goroutines
    wg.Add(2)
    fmt.Println("Start Goroutines")

    // 激活一個goroutine,label:"Goroutine-1"
    go printCounts("Goroutine-1", count)
    // 激活另外一個goroutine,label:"Goroutine-2"
    go printCounts("Goroutine-2", count)

    fmt.Println("Communication of channel begins")
    // 向channel中發送初始數據
    count <- 1

    // 等待goroutines都執行完成
    fmt.Println("Waiting To Finish")
    wg.Wait()
    fmt.Println("\nTerminating the Program")
}
func printCounts(label string, count chan int) {
    // goroutine執行完成時,wg的計數器減1
    defer wg.Done()
    for {
        // 從channel中接收數據
        // 若是無數據可recv,則goroutine阻塞在此
        val, ok := <-count
        if !ok {
            fmt.Println("Channel was closed:",label)
            return
        }
        fmt.Printf("Count: %d received from %s \n", val, label)
        if val == 10 {
            fmt.Printf("Channel Closed from %s \n", label)
            // Close the channel
            close(count)
            return
        }
        // 輸出接收到的數據後,加1,並從新將其send到channel中
        val++
        count <- val
    }
}

上面的程序中,激活了兩個goroutine,激活這兩個goroutine後,向channel中發送一個初始數據值1,而後main goroutine將由於wg.Wait()等待2個goroutine都執行完成而被阻塞。

再看這兩個goroutine,這兩個goroutine執行徹底同樣的函數代碼,它們都接收count這個channel的數據,但多是goroutine1先接收到channel中的初始值1,也多是goroutine2先接收到初始值1。接收到數據後輸出值,並在輸出後對數據加1,而後將加1後的數據再次send到channel,每次send都會將本身這個goroutine阻塞(由於unbuffered channel),此時另外一個goroutine由於等待recv而執行。當加1後發送給channel的數據爲10以後,某goroutine將關閉count channel,該goroutine將退出,wg的計數器減1,另外一個goroutine因等待recv而阻塞的狀態將由於channel的關閉而失敗,ok狀態碼將讓該goroutine退出,因而wg的計數器減爲0,main goroutine由於wg.Wait()而繼續執行後面的代碼。

使用for range迭代channel

前面都是在for無限循環中讀取channel中的數據,但也可使用range來迭代channel,它會返回每次迭代過程當中所讀取的數據,直到channel被關閉。必須注意,只要channel未關閉,range迭代channel就會一直被阻塞。

例如,將上面示例中的printCounts()改成for-range的循環形式。

func printCounts(label string, count chan int) {
    defer wg.Done()
    for val := range count {
        fmt.Printf("Count: %d received from %s \n", val, label)
        if val == 10 {
            fmt.Printf("Channel Closed from %s \n", label)
            close(count)
            return
        }
        val++
        count <- val
    }
}

多個"管道":輸出做爲輸入

channel是goroutine與goroutine之間通訊的基礎,一邊產生數據放進channel,另外一邊從channel讀取放進來的數據。能夠藉此實現多個goroutine之間的數據交換,例如goroutine_1->goroutine_2->goroutine_3,就像bash的管道同樣,上一個命令的輸出能夠不斷傳遞給下一個命令的輸入,只不過golang藉助channel能夠在多個goroutine(如函數的執行)之間傳,而bash是在命令之間傳。

如下是一個示例,第一個函數getRandNum()用於生成隨機整數,並將生成的整數放進第一個channel ch1中,第二個函數addRandNum()用於接收ch1中的數據(來自第一個函數),將其輸出,而後對接收的值加1後放進第二個channel ch2中,第三個函數printRes接收ch2中的數據並將其輸出。

若是將函數認爲是Linux的命令,則相似於下面的命令行:ch1至關於第一個管道,ch2至關於第二個管道

getRandNum | addRandNum | printRes

如下是代碼部分:

package main

import (
    "fmt"
    "math/rand"
    "sync"
)

var wg sync.WaitGroup

func main() {
    wg.Add(3)
    // 建立兩個channel
    ch1 := make(chan int)
    ch2 := make(chan int)

    // 3個goroutine並行
    go getRandNum(ch1)
    go addRandNum(ch1, ch2)
    go printRes(ch2)

    wg.Wait()
}

func getRandNum(out chan int) {
    // defer the wg.Done()
    defer wg.Done()

    var random int
    // 總共生成10個隨機數
    for i := 0; i < 10; i++ {
        // 生成[0,30)之間的隨機整數並放進channel out
        random = rand.Intn(30)
        out <- random
    }
    close(out)
}

func addRandNum(in,out chan int) {
    defer wg.Done()
    for v := range in {
        // 輸出從第一個channel中讀取到的數據
        // 並將值+1後放進第二個channel中
        fmt.Println("before +1:",v)
        out <- (v + 1)
    }
    close(out)
}

func printRes(in chan int){
    defer wg.Done()
    for v := range in {
        fmt.Println("after +1:",v)
    }
}

指定channel的方向

上面經過兩個channel將3個goroutine鏈接起來,其中起鏈接做用的是第二個函數addRandNum()。在這個函數中使用了兩個channel做爲參數:一個channel用於接收、一個channel用於發送。

其實channel類的參數變量能夠指定數據流向:

  • in <-chan int:表示channel in通道只用於接收數據
  • out chan<- int:表示channel out通道只用於發送數據

只用於接收數據的通道<-chan不可被關閉,由於關閉通道是針對發送數據而言的,表示無數據再需發送。對於recv來講,關閉通道是沒有意義的。

因此,上面示例中三個函數可改寫爲:

func getRandNum(out chan<- int) {
    ...
}

func addRandNum(in <-chan int, out chan<- int) {
    ...
}

func printRes(in <-chan int){
    ...
}

buffered channel異步隊列請求示例

下面是使用buffered channel實現異步處理請求的示例。

在此示例中:

  • 有(最多)3個worker,每一個worker是一個goroutine,它們有worker ID。
  • 每一個worker都從一個buffered channel中取出待執行的任務,每一個任務是一個struct結構,包含了任務id(JobID),當前任務的隊列號(ID)以及任務的狀態(worker是否執行完成該任務)。
  • 在main goroutine中將每一個任務struct發送到buffered channel中,這個buffered channel的容量爲10,也就是最多隻容許10個任務進行排隊。
  • worker每次取出任務後,輸出任務號,而後執行任務(run),最後輸出任務id已完成。
  • 每一個worker執行任務的方式很簡單:隨機睡眠0-1秒鐘,並將任務標記爲完成。

如下是代碼部分:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Task struct {
    ID         int
    JobID      int
    Status     string
    CreateTime time.Time
}

func (t *Task) run() {
    sleep := rand.Intn(1000)
    time.Sleep(time.Duration(sleep) * time.Millisecond)
    t.Status = "Completed"
}

var wg sync.WaitGroup

// worker的數量,即便用多少goroutine執行任務
const workerNum = 3

func main() {
    wg.Add(workerNum)

    // 建立容量爲10的buffered channel
    taskQueue := make(chan *Task, 10)

    // 激活goroutine,執行任務
    for workID := 0; workID <= workerNum; workID++ {
        go worker(taskQueue, workID)
    }
    // 將待執行任務放進buffered channel,共15個任務
    for i := 1; i <= 15; i++ {
        taskQueue <- &Task{
            ID:         i,
            JobID:      100 + i,
            CreateTime: time.Now(),
        }
    }
    close(taskQueue)
    wg.Wait()
}

// 從buffered channel中讀取任務,並執行任務
func worker(in <-chan *Task, workID int) {
    defer wg.Done()
    for v := range in {
        fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
        v.run()
        fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
    }
}

select多路監聽

不少時候想要同時操做多個channel,好比從ch一、ch2讀數據。Go提供了一個select語句塊,它像switch同樣工做,裏面放一些case語句塊,用來輪詢每一個case語句塊的send或recv狀況。

select

用法格式示例:

select {
    // ch1有數據時,讀取到v1變量中
    case v1 := <-ch1:
        ...
    // ch2有數據時,讀取到v2變量中
    case v2 := <-ch2:
        ...
    // 全部case都不知足條件時,執行default
    default:
        ...
}

defalut語句是可選的,不容許fall through行爲,但容許case語句塊爲空塊。select會被return、break關鍵字中斷:return是退出整個函數,break是退出當前select。

select的行爲模式主要是對channel是否可讀進行輪詢,但也能夠用來向channel發送數據。它的行爲以下:

  • 若是全部的case語句塊評估時都被阻塞,則阻塞直到某個語句塊能夠被處理
  • 若是多個case同時知足條件,則隨機選擇一個進行處理,對於這一次的選擇,其它的case都不會被阻塞,而是處理完被選中的case後進入下一輪select(若是select在循環中)或者結束select(若是select不在循環中或循環次數結束)
  • 若是存在default且其它case都不知足條件,則執行default。因此default必需要可執行而不能阻塞

若是有所疑惑,後文的"select超時時間"有更有助於理解select的說明和示例。

全部的case塊都是按源代碼書寫順序進行評估的。當select未在循環中時,它將只對全部case評估一次,此次結束後就結束select。某次評估過程當中若是有知足條件的case,則全部其它case都直接結束評估,並退出這次select

其實若是注意到select語句是在某一個goroutine中評估的,就不難理解只有全部case都不知足條件時,select所在goroutine纔會被阻塞,只要有一個case知足條件,本次select就不會出現阻塞的狀況。

須要注意的是,若是在select中執行send操做,則可能會永遠被send阻塞。因此,在使用send的時候,應該也使用defalut語句塊,保證send不會被阻塞。若是沒有default,或者能確保select不阻塞的語句塊,則早晚會被send阻塞。在後文有一個select中send永久阻塞的分析:雙層channel的一個示例

通常來講,select會放在一個無限循環語句中,一直輪詢channel的可讀事件。

下面是一個示例,pump1()和pump2()都用於產生數據(一個產生偶數,一個產生奇數),並將數據分別放進ch1和ch2兩個通道,suck()則從ch1和ch2中讀取數據。而後在無限循環中使用select輪詢這兩個通道是否可讀,最後main goroutine在1秒後強制中斷全部goroutine。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go pump1(ch1)
    go pump2(ch2)
    go suck(ch1, ch2)
    time.Sleep(1e9)
}
func pump1(ch chan int) {
    for i := 0; i <= 30; i++ {
        if i%2 == 0 {
            ch <- i
        }
    }
}
func pump2(ch chan int) {
    for i := 0; i <= 30; i++ {
        if i%2 == 1 {
            ch <- i
        }
    }
}
func suck(ch1 chan int, ch2 chan int) {
    for {
        select {
        case v := <-ch1:
            fmt.Printf("Recv on ch1: %d\n", v)
        case v := <-ch2:
            fmt.Printf("Recv on ch2: %d\n", v)
        }
    }
}
相關文章
相關標籤/搜索