16 Go語言併發2——Channel

Go語言併發2——Channel

一、什麼是channel

channel 是一種架設在goroutine之間進行 通訊的管道,相似隊列。channel是引用類型,類型爲chan,能夠經過make關鍵字進行建立指定類型的channel。channel存在的意義是讓goroutine經過通訊來共享內存,一個往通道發送數據,一個從通道獲取數據,來實現數據同步。安全

二、channel的建立和傳遞

聲明通道時,須要指定將要被共享的數據的類型。能夠經過通道共享內置類型、命名類型、結構類型和引用類型的值或者指針。併發

2.1 make關鍵字建立

ch:=make(chan int) //建立一個int類型的channel,因此這個channel只能發送接收int型的數據
ch1:=make(chan int 10)//這個是有緩衝的buffer,這個下面會解釋

2.2 <- 運算符 讀和取

ch <- 2 //發送數值2給這個通道
x:=<-ch //從通道里讀取值,並把讀取的值賦值給x變量
<-ch //從通道里讀取值,而後忽略

2.3 close 函數關閉channel

close(ch)   //可使用內置函數close關閉通道

若是一個通道被關閉了,咱們就不能往這個通道里發送數據了,若是發送的話,會引發painc異常。可是,咱們還能夠接收通道里的數據,若是通道里沒有數據的話,接收的數據是類型零值。函數

package  main
import "fmt"
func main() {
    ch:=make(chan int)//建立int 型的無緩衝的channel
    go func() { 
        sum:=0
        for i:=0;i<100;i++{
            sum+=i
        }
        ch<-sum  //將goroutine 算出的數放進通道
    }()
    fmt.Println(<-ch) //從通道獲取數據,退出main函數,若是channel尚未存入數據,就會阻塞等待
}

上面這個簡單的例子, 執行順序是先建立一個channel,開啓一個goroutine進行計算,而後打印從channel取出的數。會先執行fmt.Println(<-ch),這時候goroutine尚未往裏面寫數據,此時main函數進入等待。一直到goroutine運算完將sum發送給channel,這時候main函數立刻收到數據,打印完退出。測試

三、無緩衝的channel

無緩衝的通道指的是通道的大小爲0,也就是說,這種類型的通道在接收前沒有能力保存任何值,它要求發送goroutine和接收goroutine同時準備好,才能夠完成發送和接收操做。ui

接下來看一個小案例,channel 比如是乒乓球的球桌,球比如數據,數據在channel通訊就比如乒乓球在球桌上來回彈,而兩個goroutine就是兩位選手,這個就是無緩衝的channel的例子指針

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
var wg sync.WaitGroup
func main() {
    ch := make(chan int) //乒乓球檯看作通道
    wg.Add(2)            //2個goroutine
    //至關於兩個選手對打
    go player("張繼科", ch)
    go player("馬龍", ch)
    ch <- 1   //發球
    wg.Wait() //等待比賽結束

}
func player(name string, ch chan int) {
    defer  wg.Done()//一方輸了就告訴main函數,裁判不要等了
    for {
        ball, ok := <-ch
        if !ok { //若是通道關閉
            fmt.Printf("%s贏了!!\n", name)
            return
        }
        n := rand.Intn(100)
        if n%13 == 0 { //隨機數來決定本身是否失誤
            fmt.Printf("%s輸了\n", name)
            close(ch) //輸了就得關閉通道
            return
        }
        fmt.Printf("%s擊球第%d次\n", name, ball)
        ball++
        ch <- ball

    }
}

從上面的例子看出,通道是球桌,球在球桌上來回傳遞,統計次數。兩個選手用for循環持續的在接收和發送數據,也就是要麼接球要麼發球。之因此他們在相互等待對方是由於這個channel是一個無緩衝的channel,也就是球不能放在球桌上,球桌只管傳遞,不能存儲。再看一個例子:code

package main
import (
    "fmt"
    "sync"
    "time"
)
var wg sync.WaitGroup
func main() {//4x100米接力比賽
    ch:=make(chan int) //接力棒
    wg.Add(1) //須要等待的是最後一棒
    go runing(ch)  
    ch<-1
    fmt.Println("比賽開始")
    wg.Wait()
}
func runing(ch chan int){
    var newRunner int
    runner:=<-ch   //接棒
    fmt.Printf("第%d棒正在跑\n",runner)

    time.Sleep(time.Second)//跑步中
    if runner==4 {//第四棒
        fmt.Printf("跑完了\n")
        wg.Done()
        return
    }else{//沒跑完,建立下一棒
        newRunner=runner+1
        fmt.Printf("第%d棒準備就緒\n",newRunner)
        go runing(ch)//等待接棒
        fmt.Printf("接力棒傳遞給第%d棒\n",newRunner)
        ch<-newRunner //接力
    }
}

上面的例子頗有趣,模擬4x100米接力,咱們建立一個無緩衝的channel,比做接力棒,只有雙方都準備好接收和發送,接力纔會發生,否則一方就會處於等待期。 傳遞給channel 一個數字1,表示比賽開始,第一棒取出runner,只要不是第4棒就須要往下一棒傳遞,因此,就建立了第二棒,讓他準備繼續接力。知道runner==4,比賽結束。隊列

總結下:無緩衝channel,可存儲的大小爲0,它保證進行發送和接收的 goroutine 會在同一時間進行數據交換 。若是發送方沒有準備好發送,接收方會進入阻塞,等待發送。內存

四、有緩衝的channel

有緩衝通道,實際上是一個隊列,這個隊列的最大容量就是咱們使用make函數建立通道時,經過第二個參數指定的。同步

ch := make(chan int, 5)

這裏建立容量爲5的,有緩衝的通道。對於有緩衝的通道,向其發送操做就是向隊列的尾部插入元素,接收操做則是從隊列的頭部刪除元素,並返回這個剛剛刪除的元素。

當隊列滿的時候,發送操做會阻塞;當隊列空的時候,接收操做會阻塞。有緩衝的通道,不要求發送和接收操做時同步的,相反能夠解耦發送和接收操做。

// cap  和 len 函數一樣對於有緩衝的channel可用,
cap(ch)        //channel 容量
len(ch)        //當前channel內的數量

看代碼:

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
} 
func request(hostname string) (response string) { /* ... */ }

咱們定義了一個容量爲3的通道responses,而後同時發起3個併發goroutine向這三個鏡像獲取數據,獲取到的數據發送到通道responses中,最後咱們使用return <-responses返回獲取到的第一個數據,也就是最快返回的那個鏡像的數據。

五、單項通道

爲了不channel混亂使用,還能夠在定義的時候定義這個channel是單項的,即只能發送數據,或者只能接受數據。好比:

var send chan<- int //只能發送——只能往channel裏發數據
var receive <-chan int //只能接收——只能從channel中取

//咱們定義函數的時候,能夠明確聲明接受的參數
func  test(ch chan<- int){
    //接受的是一個只能發送數據的channel,
}

區別主要在於 <-符號的位置,在後面,往裏發,,在前面,從裏面取。好像一列車穿過隧道。

注意:不能把單項channel 轉換爲普通channel

d := (chan int)(send) // Error: cannot convert type chan<- int to type chan int
d := (chan int)(receive) // Error: cannot convert type <-chan int to type chan int

六、forange 迭代

package main

import (
    "fmt"
)

func main() {
    data := make(chan int) // 數據交換隊列
    exit := make(chan bool) // 退出通知
    go func() {
        for d := range data { // 從隊列迭代接收數據,直到 close 。
            fmt.Println(d)
        }
        fmt.Println("recv over.")
        exit <- true // 發出退出通知。
    }()
    data <- 1 // 發送數據。
    data <- 2
    data <- 3
    close(data) // 關閉隊列。
    fmt.Println("send over.")
    <-exit // 等待退出通知。
}

forange 用於channel 有一個特色,就是一直進行迭代,無論channel有沒有數據,直到channel (close)關閉。這樣既安全又便利,當channel關閉時,for循環會自動退出,無需主動監測channel是否關閉,能夠防止讀取已經關閉的channel,形成讀到數據爲通道所存儲的數據類型的零值。

七、select 關鍵字

以前的例子都是使用1個channel進行通訊,當咱們使用多個channel進行通訊時,就須要用到 select關鍵字來進行管理。

  • 可處理一個或多個channel的發送和接收
  • 同時又多個可用的channel的按隨機順序處理
  • 可用空的select來阻塞main函數
  • 可設置超時
  • 當通道爲nil時,對應的case永遠爲阻塞,不管讀寫。特殊關注:而普通狀況下,對nil的通道寫操做是要panic的

7.1 處理多個channel發送和接收

package main

import (
    "fmt"
    "os"
)
func main() {
    a, b := make(chan int, 3), make(chan int)
    go func() {
        v, ok, s := 0, false, ""
        for {
            select { // 隨機選擇可用 channel,接收數據。
            case v, ok = <-a:
                s = "a"
            case v, ok = <-b:
                s = "b"
            }
            if ok {
                fmt.Println(s, v)
            } else {
                os.Exit(0)
            }
        }
    }()
    for i := 0; i < 5; i++ {
        select { // 隨機選擇可用 channel,發送數據。
        case a <- i:
        case b <- i:
        }
    }
    close(a)
    select {} // 沒有可用 channel,阻塞 main goroutine。
}

咱們的例子中,使用select有點像switch, 它能夠管理多個channel,隨機的發送也能夠隨機的獲取數據。最後咱們用select{} 很巧妙的阻塞了main goroutine ,由於沒有可用的channel,它進入阻塞直到channel 關閉,執行了os.Exit(0) main函數才推出。

7.2 設置超時

package main

import (
    "fmt"
    "time"
)
func main() {
    exit := make(chan bool)
    c1 := make(chan int, 2)
    c2 := make(chan string, 2)

    go func() {
        select {
        case vi := <-c1:
            fmt.Println(vi)
        case vs := <-c2:
            fmt.Println(vs)
        case <-time.After(time.Second * 3):
            fmt.Println("timeout.")
        }

        exit <- true
    }()
    //咱們先把發送數據代碼註釋掉。
    //這裏咱們並無向c1 和c2 發送任何數據,select 超時後就會打印 timeout
    //c1<-10
    //c2<-"加油"
    <-exit
}

固然select 還有default ,可是在循環中使用default必定要當心,當心,當心。

八、channel 總結

channel存在3種狀態

  • nil,未初始化的狀態,只進行了聲明,或者手動賦值爲nil
  • active,正常的channel,可讀或者可寫
  • closed,已關閉的channel。 關閉的channel存儲的是類型零值。
操做 nil通道 closed 關閉的通道 active正常通道
close panic panic 成功
ch<- 死鎖 panic 阻塞或成功
<-ch 死鎖 零值 阻塞或成功

對於nil通道的狀況,也並不是徹底遵循上表,有1個特殊場景nil的通道在select的某個case中時,這個case會阻塞,但不會形成死鎖。

  • channel分爲有緩衝和無緩衝
  • channel 有單項channel
  • 可使用forange迭代,直到channel 關閉
  • 能夠用select 管理多個channel,隨機處理讀和寫
  • 能夠用select{}阻塞main函數
  • select 能夠設置超時

九、channel 應用場景小結

9.1 forange 迭代,無需關注channel 是否關閉

場景:在須要不斷從channel 取數據時,而不用關心channel是否關閉

for x := range ch{
    fmt.Println(x)
}
//會一直迭代,直到channel 關閉

9.2 使用_,ok判斷channel是否關閉

場景:在不肯定channel是否關閉時,使用

if v, ok := <- ch; ok {
    fmt.Println(v)
}

ok的含義:

  • true:讀到數據,而且通道沒有關閉。
  • false:通道關閉,無數據讀到。

9.3使用select處理多個channel

場景:須要對多個通道進行同時處理,但只處理最早發生的channel時,見上面的例子

注意:當通道爲nil時,對應的case永遠爲阻塞,不管讀寫。特殊關注:普通狀況下,對nil的通道寫操做是要panic的

9.4 使用channel 傳遞結構體時,用指針

場景:channel 傳遞的數據是結構體時,最好用指針。

channel本質上傳遞的是數據的拷貝,拷貝的數據越小傳輸效率越高,傳遞結構體指針,比傳遞結構體更高效

9.5 簡單⼯⼚模式打包併發任務和 channel

package main
import (
    "math/rand"
    "time"
)
func NewTest() chan int { //簡單工廠方法返回一個channel
    c := make(chan int)
    rand.Seed(time.Now().UnixNano())
    go func() {
        time.Sleep(time.Second)
        c <- rand.Int()  
    }()
    //而且返回的channel 是已經準備好發送的,阻塞中,只要接收方一準備好,立馬數據就傳遞出去了
    return c
}
func main() {
    t := NewTest()
    println(<-t) // 等待 goroutine 結束返回。
}

9.6 ⽤channel 實現信號量 (semaphore)

簡單解釋下信號量,也叫信號燈,是能夠用來保證兩個或多個關鍵代碼段不被併發調用。信號量有四種操做,一、初始化,二、等信號三、發信號四、清理

package main
import (
    "fmt"
    "sync"
)
func main() {
    wg := sync.WaitGroup{}
    wg.Add(3)
    sem := make(chan int,1)
    for i := 0; i < 3; i++ {
        go func(id int) {
            defer wg.Done()
            sem <- 1 // 向 sem 發送數據,阻塞或者成功。
            fmt.Printf("第%d個\n",id)
            for x := 0; x < 3; x++ {
                fmt.Println(id, x)
            }
            <-sem // 接收數據,使得其餘阻塞 goroutine 能夠發送數據。
        }(i)
    }
    wg.Wait()
}
//輸出
第2個
2 0
2 1
2 2
第0個
0 0
0 1
0 2
第1個
1 0
1 1
1 2

這裏的channel 是一個容量爲1的有緩衝的通道。也就是說,它只能存一個信號,好比開了3個goroutine,只有一個能發送進去,其餘的都會阻塞,等到這個goroutine處理完本身的事情,將數據取出<-,那麼第二個goroutine就會發送,執行,而後取出。接着是第三個goroutine。 這樣就實現了信號量,保證goroutine一個個的執行。

9.7 利用從closed channel取值,發出退出的通知

應用場景:關閉全部下游的goroutine

nil 的channel在select 中是永久阻塞的,case是不會走的,可是關閉了的channel,就會走。

從關閉了的channel中取值 <- 是不會引起panic,會取出零值。

實現思路就是: 在channel取值,是阻塞的,只要一關閉channel,取值就是零值,而後執行退出就能夠了。

經過將nil channel關閉,使select的 阻塞channel 變爲取出零值, case退出代碼執行,全部讀取這個channel的goroutine就會執行關閉代碼。

package main
import (
    "sync"
    "time"
)
func main() {
    var wg sync.WaitGroup
    quit := make(chan bool)
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for {
                select {
                case <-quit: // closed channel 不會阻塞,會取出零值,所以可用做退出通知。
                    return
                default: // 執行正常任務。
                    func() {
                        println(id, time.Now().Nanosecond())
                        time.Sleep(time.Second)
                    }()
                }
            }
        }(i)
    }
    time.Sleep(time.Second * 5) // 讓測試 goroutine 運⾏⼀會。
    close(quit)                 // 發出退出通知。
    wg.Wait()
}

9.8 channel做爲結構成員

channel 能夠作未結構體的成員,能夠封裝的更好。

package main
import (
    "fmt"
)

type Request struct {
    data []int
    ret  chan int
}
func NewRequest(data ...int) *Request {
    return &Request{data, make(chan int, 1)}
}
//使用結構體指針,效率更高。
func Process(req *Request) {
    x := 0
    for _, i := range req.data {
        x += i
    }
    req.ret <- x
}
func main() {
    req := NewRequest(10, 20, 30)
    Process(req)
    fmt.Println(<-req.ret)
}
相關文章
相關標籤/搜索