Go語言基礎之12--Channel

 1、不一樣goroutine之間如何進行通信?

一、全局變量和鎖同步web

缺點:多個goroutine要通訊時,定義太多的全局變量(每一個全局變量功能不同),很差維護安全

二、Channel異步

2、channel概念

a. 相似unix中管道(pipe)函數

b. 先進先出spa

c. 線程安全,多個goroutine同時訪問,不須要加鎖線程

d. channel是有類型的,好比說:一個整數(int)的channel只能存放整數(int)3d

 

注意:channel是引用類型,channel的零值也是nilunix

3、channel聲明

var 變量名 chan 類型指針

var test chan int日誌

var test chan string

var test chan map[string]string

var test chan stu

var test chan *stu

4、channel初始化

channel是引用類型,channel的零值也是nil,因此須要使用make進行初始化,好比:

var test chan int
test = make(chan int, 10) //第1個參數chan是聲明爲channel,第2個參數是channel的類型,第3個參數是channel的長度

 

上述channel長度爲10,channel最多隻能存10個元素,當第11個元素插入時,channel也不會擴容,此時channel已經滿了,隊列只能阻塞住了,除非第1個被取走了,第11個才能進去channel。

 

總結:channel隊列兩種狀況會阻塞

第一種:channel爲空時,取數據會阻塞;

第二種:channel滿了,再往channel中插入數據,也會阻塞

注意:若是初始化channel時不定義隊列長度(無緩衝區(長度爲0)channel),channel就至關於沒有長度,也就至關於沒有空間去存放元素。可是也有解決辦法:

就是程序中:一個去放,還有一個去取,至關於立馬取出來。

代碼示例以下:

package main

import (
    "fmt"
    "time"
)

func main() {
    var intChan chan int = make(chan int) //channel沒有長度
    fmt.Printf("%p\n", intChan)
    go func() {
        intChan <- 100 //放100進沒有長度的channel
        fmt.Printf("insert item end\n")
    }()
    go func() {
        fmt.Printf("start\n")
        time.Sleep(time.Second * 3)
        var a int
        a = <-intChan //讀取100,至關於立馬取
        fmt.Printf("a=%d\n", a)

    }()

    time.Sleep(time.Second * 5)
}

 執行結果以下:

 

再來看一個定義了channel(帶緩衝區channel)長度的實例:

package main

import (
    "fmt"
    "time"
)

func main() {
    var intChan chan int = make(chan int, 1) //channel長度爲1
    fmt.Printf("%p\n", intChan)
    go func() {
        intChan <- 100 //放100進channel,以後隨時能夠讀取channel
        fmt.Printf("insert item end\n")
    }()
    go func() {
        fmt.Printf("start\n")
        time.Sleep(time.Second * 3)
        var a int
        a = <-intChan //a讀取channel中的元素
        fmt.Printf("a=%d\n", a)

    }()

    time.Sleep(time.Second * 5)
}

 執行結果以下:

5、channel的基本操做

一、 從channel讀取數據:

var testChan chan int
testChan = make(chan int, 10)
var a int
a = <- testChan  //至關於從testChan中讀取出來數據並賦值給a

 

二、爲channel寫入數據:

var testChan chan int
testChan = make(chan int, 10)
var a int = 10
testChan <- a  //寫入數據10給管道testChan

intChan <- 100

intChan是一個channel類型的變量,根據箭頭方向來判斷,很形象,此處就表示將100插入到管道(channel)intChan中去。

a <- intChan

a爲新定義的變量,表示a讀取管道intChan中的值。

6、goroutine與channel結合

代碼實例:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string) //不帶緩衝區的channel
    go sendData(ch)
    go getData(ch)
    time.Sleep(100 * time.Second)
}
func sendData(ch chan string) { //該goroutine函數爲channel中插入數據,相似於生產者
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
}
func getData(ch chan string) { //該goroutine函數讀取channel中數據,相似於消費者
    var input string
    for {
        input = <-ch
        fmt.Println(input)
    }
}

執行結果以下:

7、channel阻塞(無緩衝區)

總結:channel隊列兩種狀況會阻塞

第一種:channel爲空時,取數據會阻塞;

第二種:channel滿了,再往channel中插入數據,也會阻塞

 

實例以下:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string) //定義一個無緩衝區(長度爲0)channel
    go sendData(ch)
    time.Sleep(100 * time.Second)
}
func sendData(ch chan string) { //往channel中插入數據,可是沒有人取,只能阻塞了
    var i int
    for {
        var str string
        str = fmt.Sprintf("stu %d", i)
        fmt.Println("write:", str)
        ch <- str
        i++
    }
}

 執行結果:

 

解釋:

能夠看到只有寫入沒有讀取,阻塞住了

8、帶緩衝區channel

一、以下所示, testChan長度爲0:

var testChan chan int
testChan = make(chan int)
var a int
a = <- testChan

 

二、以下所示, testChan是帶緩衝區的chan,一次能夠放10個元素:

var testChan chan int
testChan = make(chan int, 10)
var a int = 10
testChan <- a

 

9、channel之間的同步

代碼實例:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go sendData(ch)
    go getData(ch)
    time.Sleep(100 * time.Second)
}
func sendData(ch chan string) {
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
}
func getData(ch chan string) {
    var input string
    for {
        input = <-ch
        fmt.Println(input)
    }
}

會有一個問題,若是sleep時間都結束了,可是sendData和getdata所在的函數還沒執行完,那麼也會被中斷執行,如何解決呢:

解決辦法:

一、死循環:( 缺點:有時生產者和消費者已經執行完,卻依然還在死循環,退不出。)

二、標識位,也就是全局變量和加鎖(缺點:比較麻煩,若是有100個goroutine,也要寫100個標識位)

上述2個辦法都太麻煩不可取,能夠pass掉了,下面咱們有更好辦法:

9.1 方法1:channel

代碼實例:

package main

import (
    "fmt"
    //  "time"
)

func main() {
    ch := make(chan string)
    exitChan := make(chan bool, 3) //此例咱們有3個goroutine,因此咱們定義一個長度爲3的channel,當個人channel中能夠讀取到3個元素時,即表示3個goroutine都執行完畢了。
    go sendData(ch, exitChan) //每個goroutine執行結束時,往channel中插入一個數據
    go getData(ch, exitChan)
    go getData2(ch, exitChan)

    //等待其餘goroutine退出,當goroutine都執行完畢退出以後,channel中有3個元素,咱們能夠作一個取3次的操做,當3次都取完了,表示全部goroutine都退出了
    <-exitChan  //從channel中取出來元素並未賦值給任何變量,就至關於丟棄了
    <-exitChan
    <-exitChan
    fmt.Printf("main goroutine exited\n")
}

func sendData(ch chan string, exitCh chan bool) {
    ch <- "aaa"
    ch <- "bbb"
    ch <- "ccc"
    ch <- "ddd"
    ch <- "eee"
    close(ch) //插入數據結束後,關閉管道channnel
    fmt.Printf("send data exited")
    exitCh <- true //此時已經往goroutine中插入數據結束,goroutine退出以前,往咱們定義的channel中插入一個數據true,至關於告知我已經執行完成
}

func getData(ch chan string, exitCh chan bool) {
    //var input string
    for {
        //input = <- ch
        input, ok := <-ch  //檢查管道是否被關閉
        if !ok {  //若是被關閉了,ok=false,咱們就break退出
            break
        }
        // 此處 打印出來的順序 和寫入的順序 是一致的
        // 遵循隊列的原則: 先入先出
        fmt.Printf("getData中的input值:%s\n", input)
    }
    fmt.Printf("get data exited\n")
    exitCh <- true
}

func getData2(ch chan string, exitCh chan bool) {
    //var input2 string
    for {
        //input2 = <- ch
        input2, ok := <-ch
        if !ok {
            break
        }
        // 此處 打印出來的順序 和寫入的順序 是一致的
        // 遵循隊列的原則: 先入先出
        fmt.Printf("getData2中的input值:%s\n", input2)
    }
    fmt.Printf("get data2 exited\n")
    exitCh <- true
}

 執行結果以下:

注意:當咱們爲channel中放入10個元素,而後把channel關閉,這些元素仍是在channel中的,不會消失的,以後想取仍是能夠取出來的。

經過以下實例來證實:

package main

import (
    "fmt"
    "time"
)

func main() {
    var intChan chan int
    intChan = make(chan int, 10)
    for i := 0; i < 10; i++ {
        intChan <- i
    }

    close(intChan)
    time.Sleep(time.Second * 10)
    for i := 0; i < 10; i++ {
        var a int
        a = <-intChan
        fmt.Printf("a=%d\n", a)
    }
}

執行結果以下圖:

解釋:

能夠看到在爲channel中放入10個元素以後,就關閉了channel,以後依然能夠取出來。

9.1 方法2:(推薦

針對大批量goroutine,用sync包中的waitGroup方法,其自己是一個結構體,該方法的本質在底層就是一個計數。

代碼實例以下:

package main

import (
    "fmt"
    "sync"
    //  "time"
)

func main() {
    var wg sync.WaitGroup //定義一個waitgroup(結構體)類型的變量,針對大批量goroutine時比較方便。
    ch := make(chan string)
    wg.Add(3) //3個goroutine,就傳入3,Add方法至關於計數
    go sendData(ch, &wg) //,至關於goroutine執行完,Add計數就減1,因此咱們將wg傳入,但注意結構體必需要傳入一個地址進去
    go getData(ch, &wg)
    go getData2(ch, &wg)

    wg.Wait() //只要Add中計數依然存在,就一直Wait,除非爲0
    fmt.Printf("main goroutine exited\n")
}

func sendData(ch chan string, waitGroup *sync.WaitGroup) {
    ch <- "aaa"
    ch <- "bbb"
    ch <- "ccc"
    ch <- "ddd"
    ch <- "eee"
    close(ch)
    fmt.Printf("send data exited")
    waitGroup.Done()  //goroutine退出時,計數減1,因此這裏用Done方法來通知Add方法
}

func getData(ch chan string, waitGroup *sync.WaitGroup) {
    //var input string
    for {
        //input = <- ch
        input, ok := <-ch
        if !ok {
            break
        }
        // 此處 打印出來的順序 和寫入的順序 是一致的
        // 遵循隊列的原則: 先入先出
        fmt.Printf("getData中的input值:%s\n", input)
    }
    fmt.Printf("get data exited\n")
    waitGroup.Done()
}

func getData2(ch chan string, waitGroup *sync.WaitGroup) {
    //var input2 string
    for {
        //input2 = <- ch
        input2, ok := <-ch
        if !ok {
            break
        }
        // 此處 打印出來的順序 和寫入的順序 是一致的
        // 遵循隊列的原則: 先入先出
        fmt.Printf("getData2中的input值:%s\n", input2)
    }
    fmt.Printf("get data2 exited\n")
    waitGroup.Done()
}

 執行結果以下:

10、for range 遍歷channel

for range遍歷channel的好處,channel關閉了,for range循環會自動退出

for range結束判斷的標準也是看channel是否close關閉,否則就會阻塞,具體可看以下例子:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go sendData(ch)
    go getData(ch)
    time.Sleep(100 * time.Second)
}
func sendData(ch chan string) {
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
    close(ch)
}
func getData(ch chan string) {
    for input := range ch {
        fmt.Println(input)
    }
}

 執行結果以下:

 

下面再看一個有channel關閉的例子,for range執行完會自動退出

實例以下:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    ch := make(chan string)
    wg.Add(2)
    go sendData(ch, &wg)
    go getData(ch, &wg)

    wg.Wait()
    fmt.Printf("main goroutine exited\n")
}

func sendData(ch chan string, waitGroup *sync.WaitGroup) {
    ch <- "aaa"
    ch <- "bbb"
    ch <- "ccc"
    ch <- "ddd"
    ch <- "eee"
    close(ch)
    fmt.Printf("send data exited")
    waitGroup.Done()
}

func getData(ch chan string, waitGroup *sync.WaitGroup) {
    //var input string
    for {
        //input = <- ch
        input, ok := <-ch
        if !ok {
            break
        }
        // 此處 打印出來的順序 和寫入的順序 是一致的
        // 遵循隊列的原則: 先入先出
        fmt.Printf("getData中的input值:%s\n", input)
    }
    fmt.Printf("get data exited\n")
    waitGroup.Done()
}

 執行結果以下:

11、channel的關閉

1. 使用內置函數close進行關閉, chan關閉以後, for range遍歷chan中

已經存在的元素後結束

2. 使用內置函數close進行關閉, chan關閉以後,沒有使用for range的寫法

須要使用, v, ok := <- ch進行判斷chan是否關閉

12、channel的只讀和只寫

a. 只讀chan的聲明

Var 變量的名字 <-chan int

Var readChan <- chan int

 

只讀實例:

package main

func main() {
    var intChan <-chan int = make(chan int, 100)
    intChan <- 100
}

 執行結果以下:

解釋:

只讀實例進行寫入,能夠看見編譯時直接報錯。

 

b. 只寫chan的聲明

Var 變量的名字 chan<- int

Var writeChan chan<- int

 

只寫實例:

package main

func main() {
    var ch chan<- int = make(chan int, 100)
    <-ch
}

執行結果:

解釋:

能夠看見只寫實例進行讀取channel時,也是編譯時直接報錯。

 

應用場景:

好比說寫一個第三方的自定義包,暴露channel給別人去掉用,這個時候就能夠控制返回給別人channel的權限控制,來防止誤操做。

十3、對channel進行select操做

13.1 場景:

假如channel中有數據或無數據,咱們是經過一個阻塞的讀或者阻塞的寫去操做數據,若是程序是去阻塞的讀,那麼至關於程序直接是阻塞的了,這種形式是很差的,好比說處理一個web請求,不可以阻塞的,這時候就有一種機制select操做,經過判斷channel中有沒有數據,若是沒有數據,則當即返回。

13.2 爲何要用select操做channel?

經過select語句來監測channel究竟是滿了仍是空了,來避免程序阻塞,可是若是沒有加default分支,程序依然仍是會被阻塞。

 

補充:

1)select語句的形式其實和switch語句有點相似,這裏每一個case表明一個通訊操做;

2)在某個channel上發送或者接收,而且會包含一些語句組成的一個語句塊 ;

3)select中的default來設置當 其它的操做都不可以立刻被處理時程序須要執行哪些邏輯;

4)channel 的零值是nil,  而且對nil的channel 發送或者接收操做都會永遠阻塞,在select語句中操做nil的channel永遠都不會被select到。因此咱們能夠用nil來激活或者禁用case,來達成處理其餘輸出或者輸出時間超時和取消的邏輯

13.3 聲明語法

語法以下:

select {
    case u := <- ch1:  //channel有數據,該分支就會被激活
    case e := <- ch2: //channel有數據,該分支也會被激活
    default: //若是上述分支都未被激活,則進入default分支
}

注意:不一樣的case分支調度整體來講是平衡的,不是說永遠只執行第1個分支,而不執行第2個分支。

13.4 實例

實例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var intChan chan int = make(chan int, 10)       //定義1個int類型channel,給10個空間
    var strChan chan string = make(chan string, 10) //定義1個string類型channel,給10個空間

    var wg sync.WaitGroup //經過waitgroup來控制goroutine的同步
    wg.Add(2)
    //插入數據,空間滿了,channel也會阻塞,因此經過select解決
    go func() {
        var count int //由於目前for循環是一個死循環,因此須要有一個限制條件來break
        for count < 15 {
            count++
            select {
            case intChan <- 10: //插入一個10進去
                fmt.Printf("write to int chan succ\n")
            case strChan <- "hello": //插入一個hello進去
                fmt.Printf("write to str chan succ\n")
            default: //當上述全部case分支對應的管道都被被插滿數據後,會走到以下default分支
                fmt.Printf("all chan is full\n")
                time.Sleep(time.Second)
            }
        }
        wg.Done() //for循環結束就能夠退出了
    }()

    //讀取數據
    go func() {
        var count int
        for count < 15 {
            count++
            select {
            case a := <-intChan: //讀取intChan中的數據
                fmt.Printf("read to int chan succ a:%d\n", a)
            case <-strChan: //若是隻想讀出來strChan中的數據,並不賦值,能夠這麼寫,但實際數據仍是讀出來了
                fmt.Printf("read to str chan succ\n")
            default: //當取完上述case分支對應的全部channel數據後,其會走以下的default分支
                fmt.Printf("all chan is empty\n")
                time.Sleep(time.Second)
            }
        }
        wg.Done() //for循環結束就能夠退出了
    }()
    wg.Wait()
}

 執行結果:

解釋:

如上圖爲插入數據匿名函數執行結果(往channel中插入數據)咱們能夠看到當前兩個分支都寫滿以後,就會進入default分支,能夠看到程序是不會阻塞的

十4、定時器使用

14.1 定時器的使用

package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.NewTicker(time.Second)
    for v := range t.C {  //定時器Newticker會返回一個時間的channel
        fmt.Println("hello, ", v)
    }
}

  執行結果:

解釋:

定時器C的方法實際上是一個只讀的channel,裏面放的是時間。

由於是channel,因此咱們能夠用for range去遍歷。

14.2 一次性定時器

package main

import (
    "fmt"
    "time"
)

func main() {
    select {
    case <-time.After(time.Second): //用time.After方法,到了1秒以後,就會觸發這個分支
        fmt.Println("after")
    }
}

 執行結果以下:

14.3 超時定時器

場景

線上進行DB查詢時,若是超過必定時間沒有返回,那麼咱們就應該給調用方返回一個值,不能一直在乾等着吧,因此咱們就須要有一個超時控制。好比說:查詢結果1秒沒有返回,就返回一個錯誤給調用方。

 

如何作一個超時控制呢?

經過select來實現。

 

實例:

package main

import (
    "fmt"
    "time"
)

func queryDb(ch chan int) {
    time.Sleep(time.Second)
    ch <- 100
}
func main() {
    ch := make(chan int)
    go queryDb(ch) //起了1個goroutine,異步查詢db,傳入一個channel進去(異步的線程查詢完,會將結果放入到channel中)。
    t := time.NewTicker(time.Second)
    select { //主線程進行查詢,若是channel中有數據,就會去指定分支,若是沒有也會去指定分支
    case v := <-ch:
        fmt.Println("result", v)
    case <-t.C: //超過1秒,就會觸發該分支,上面channel中還有數據的話,就會走以下分支,也就是超時了。
        fmt.Println("timeout")
    }
}

執行結果:

 

十5、goroutine中使用recover

應用場景,若是某個goroutine panic了,並且這個goroutine裏面沒有捕獲(recover), 那麼整個進程就會掛掉。因此,好的習慣是每當go產生一個goroutine,就須要寫下recover。

首先咱們來模擬一下這種狀況:

實例:

package main

import (
    "fmt"
    "time"
)

func main() {
    go func() {
        var p *int
        *p = 1000
        fmt.Printf("hello")
    }()

    var i int
    for {
        fmt.Printf("%d\n", i)
        time.Sleep(time.Second)
    }
}

 執行結果:

解釋:

咱們能夠看到匿名函數所在的goroutine線程由於打印了一個空指針致使panic了,進而最終致使主線程也panic了。

 

因此咱們該如何取捕獲(recover)子線程的panic,使其不影響主線程的運行呢?

如何解決是很重要的,好比web應用場景,不能由於一個web請求掛掉而影響其餘的web請求致使服務崩掉。下面咱們來看一看解決方案:

解決方案:

經過recover函數來捕獲goroutine內的任何異常。

實例以下:

package main

import (
    "fmt"
    "time"
)

func main() {
    go func() {
        defer func() { //捕獲異常
            err := recover() //調用recover函數來作
            if err != nil {
                fmt.Printf("catch panic exception err:%v\n", err)
            }
        }()
        var p *int
        *p = 1000
        fmt.Printf("hello")
    }()

    var i int
    for {
        fmt.Printf("%d\n", i)
        time.Sleep(time.Second)
    }
}

 執行結果:

能夠看到經過捕獲(recover)goroutine的panic異常後,只會影響panic的goroutine,並不會影響到其餘goroutine和主線程。

總結:

因此以後咱們須要養成一個好的習慣,每起一個goroutine時,須要捕獲一下異常,至關於記一個日誌錯誤,這樣咱們也能夠經過這個錯誤日誌知道程序出問題在哪裏,也能夠去修復了

相關文章
相關標籤/搜索