Golang 併發Groutine詳解

概述

一、並行和併發編程

        並行(parallel):指在同一時刻,有多條指令在多個處理器上同時執行。api

        併發(concurrency):指在同一時刻只能有一條指令執行,但多個進程指令被快速的輪換執行,使得在宏觀上具備多個進程同時執行的效果,但在微觀上並非同時執行的,只是把時間分紅若干段,使多個進程快速交替的執行。緩存

  • 並行是兩個隊列同時使用兩臺咖啡機
  • 併發是兩個隊列交替使用一臺咖啡機

二、go併發優點

        有人把Go比做21世紀的C語言,第一是由於Go語言設計簡單;第二,21世紀最重要的就是併發程序設計,而Go從語言層面就支持了併發。同時,併發程序的內存管理有時候是很是複雜的,而Go語言提供了自動垃圾回收機制。安全

        Go語言爲併發編程而內置的上層API基於CSP(communication sequential process,順序通訊進程)模型。這就意味着顯式鎖都是能夠避免的,由於Go語言經過安全的通道發送和接受數據以實現同步,這大大地簡化了併發程序的編寫。併發

        通常狀況下,一個普通的桌面計算機跑十幾二十個線程就有點負載過大了,可是一樣這臺機器卻能夠輕鬆地讓成百上千甚至過萬個goroutine進行資源競爭。異步

2.1 goroutine是什麼

    goroutine是Go併發設計的核心。goroutine說到底其實就是就是協程,可是它比線程更小,十幾個goroutine可能體如今底層就是五六個線程,Go語言內部幫你實現了這些goroutine之間的內存共享。執行goroutine只需極少的內存(大概是4~5KB),固然會根據相應的數據伸縮。也正由於如此,可同時運行成千上萬個併發任務。goroutine比thread更易用、更高效、更輕便。函數

2.2 建立goroutine

        只須要在函數調用語句前添加go關鍵字,就能夠建立併發執行單元。開發人員無需瞭解任何執行細節,調度器會自動將其安排到合適的系統線程上執行。ui

        在併發編程裏,咱們一般想將一個過程切分紅幾塊,而後讓每一個goroutine各自負責一塊工做。當一個程序啓動時,其主函數即在一個單獨的goroutine中運行,咱們叫它main goroutine。新的goroutine會用go語句來建立。this

示例:spa

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    go newTask()    //新建一個goroutine
    for {
        fmt.Println("this is a main goroutine.")
        time.Sleep(time.Second)
    }
}
 
func newTask(){
    for {
        fmt.Println("this is a new Task.")
        time.Sleep(time.Second)    //延時1s
    }
}

 以上實例運行結果爲:

this is a main goroutine.
this is a new Task.
this is a new Task.
this is a main goroutine.
this is a main goroutine.
this is a new Task.
.....

2.3 主goroutine先退出

        主協程退出了,其餘子協程也要跟着退出。

實例:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    go func (){
        i:=0
        for {
            fmt.Println("this is a new Task : ",i)
            time.Sleep(time.Second)
            i++
        }
    }()
 
    i := 0
    for {
        fmt.Println("this is a main goroutine :",i)
        time.Sleep(time.Second)
        i++
        if i==2 {
            break
        }
    }
}

 以上實例運行結果爲:

this is a main goroutine : 0
this is a new Task :  0
this is a new Task :  1
this is a main goroutine : 1

主協程先退出致使子協程沒有來得及調用:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    go func (){
        i:=0
        for {
            fmt.Println("this is a new Task : ",i)
            time.Sleep(time.Second)
            i++
        }
    }()
}

2.4 runtime包

 

Gosched

        runtime.Gosched()用於讓出CPU時間片,讓出當前goroutine的執行權限,調度器安排其餘等待的任務運行,並在下次某個時候從該位置恢復執行。

        這就像跑接力賽,A跑了一會碰到代碼runtime.Gosched()就把接力棒交給B了,A歇着了,B繼續跑。

實例:

package main
 
import (
    "fmt"
    "runtime"
)
 
func main(){
    go func (){
        for i:=0;i<5;i++{
            fmt.Println("Oh!")
        }
    }()
 
    for i:=0;i<2;i++{
        //讓出時間片,先讓別的協程執行,執行完了,再回來執行此協程
        runtime.Gosched()
        fmt.Println("Yeah!")
    }
}

Goexit

        調用runtime.Goexit()將當即終止當前goroutine執行,調度器確保全部已註冊defer延遲調用被執行

package main
 
import (
    "fmt"
    "runtime"
)
 
func main(){
    //建立協程
    go func(){
        fmt.Println("En...")
        //調用函數
        test()
        fmt.Println("Oops...")
    }()
 
    //不讓主協程結束
    for{}
}
 
func test() {
    defer fmt.Println("Yeah!")
    runtime.Goexit()    //終止所在的協程
    fmt.Println("Oh!")
}

GOMAXPROCS

        調用runtime.GOMAXPROCS()用來設置能夠並行計算的CPU核數的最大值,並返回以前的值。

package main
 
import (
    "fmt"
    "runtime"
)
 
func main(){
    n:=runtime.GOMAXPROCS(1)    //把參數改成2試一試
    fmt.Println("n=",n)
    for {
        go fmt.Print(0)
        fmt.Print(1)
    }
}

   在第一次執行(runtime.GOMAXPROCS(1))時,最多同時只能有一個goroutine被執行。因此會打印不少1。過了一段時間後,Go調度器會將其置爲休眠,並喚醒另外一個goroutine,這時候就開始打印不少0了,在打印的時候,goroutine是被調度到操做系統線程上的。

        在第二次執行(runtime.GOMAXPROCS(2))時,咱們使用了兩個CPU,因此兩個goroutine能夠一塊兒被執行,以一樣的頻率交替打印0和1。

 

多任務資源競爭問題:

package main
 
import (
    "fmt"
    "time"
)
 
func Printer(str string){
    for _,data:=range str {
        fmt.Printf("%c",data)
        time.Sleep(time.Second)
    }
    fmt.Printf("\n")
}
 
func person1(){
    Printer("Oh!")
}
 
func person2(){
    Printer("Yeah!")
}
 
func main() {
 
    //新建2個協程,表明2我的。兩我的共同使用打印機
    go person1()
    go person2()
 
    //不讓主協程結束
    for{}
}

三、channel

        goroutine運行在相同的地址空間,所以訪問共享內存必須作好同步。goroutine奉行經過通訊來共享內存,而不是共享內存來通訊。

        引用類型channel是CSP模式的具體實現,用於多個goroutine通信。其內部實現了同步,確保併發安全。

3.1 channel類型

        定義一個channel時,也須要定義發送到channel的值的類型。channel可使用內置的make()函數來建立:

make(chan Type)  //等價於make(chan Type,0)
make(chan Type,capacity)

 當capacity=0時,channel是無緩衝阻塞讀寫的;當capacity>0時,channel有緩衝、是非阻塞的,直到寫滿capacity個元素才阻塞寫入。

        channel經過操做符<-來接收和發送數據,發送和接收數據語法:

channel <- value  //發送value到channel
<- channel  //接收並將其丟棄
x := <-channel  //從channel中接收數據,並賦值給x
x,ok := <-channel  //功能同上,同時檢查通道是否已關閉或者是否爲空

  默認狀況下,channel接收和發送數據都是阻塞的,除非另外一端已經準備好,這樣就使得goroutine同步變得更加簡單,而不須要顯示的lock。

實例:

package main
 
import (
    "fmt"
    "time"
)
 
var ch = make(chan int)
 
func Printer(str string){
    for _,data:=range str {
        fmt.Printf("%c",data)
        time.Sleep(time.Second)
    }
    fmt.Printf("\n")
}
 
//person1執行完成,纔到person2執行
func person1(){
    Printer("Oh!")
    ch<-0    //給管道/通道寫數據,發送
}
 
func person2(){
    <-ch    //從管道取數據,接收,若是通道沒有數據它就會阻塞
    Printer("Yeah!")
}
 
func main() {
 
    //新建2個協程,表明2我的。兩我的共同使用打印機
    go person1()
    go person2()
 
    //不讓主協程結束
    for{}
}

 以上實例執行結果爲:

Oh!
Yeah!

        經過channel實現同步和數據交互。

實例:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
    defer fmt.Println("主協程結束。")
 
    ch := make(chan string)
 
    go func() {
        defer fmt.Println("子協程調用完畢。")
        for i := 0; i < 2; i++ {
            fmt.Println("子協程 i = ", i)
            time.Sleep(time.Second)
        }
        ch <- "子協程幹活兒了。" //把這行註釋掉再運行一下,看看什麼結果
    }()
 
    str := <-ch    //沒有數據前,阻塞
    fmt.Println("str = ", str)
}

以上實例執行結果爲:

子協程 i =  0
子協程 i =  1
子協程調用完畢。
str =  子協程幹活兒了。
主協程結束。

3.2 無緩衝的channel

        無緩衝的通道(unbuffersd channel)是指在接收前沒有能力保存任何值的通道。

        這種類型的通道要求發送goroutine和接收goroutine同時準備好,才能完成發送和接收操做。若是兩個goroutine沒有同時準備好,通道會致使先執行發送或接收操做的goroutine阻塞等待。

        這種對通道進行發送和接收的交互行爲自己就是同步的。其中任意一個操做都沒法離開另外一個操做單獨存在。

        下圖展現兩個goroutine如何利用無緩衝的通道來共享一個值:

  • 在第1步,兩個goroutine都到達通道,但哪一個都沒有開始執行發送或者接收。
  • 在第2步,左側的goroutine將它的手伸進了通道,這模擬了向通道發送數據的行爲。這時,這個goroutine會在通道中被鎖住,直到交換完成。
  • 在第3步,右側的goroutine將它的手放入通道,這模擬了從通道里接收數據。這個goroutine同樣也會在通道中被鎖住,直到交換完成。
  • 在第4步和第5步,進行交換,並最終在第6步,兩個goroutine都將它們的手從通道里拿出來,這模擬了被鎖住的goroutine獲得釋放。兩個goroutine如今均可以去作別的事情了。

        無緩衝的channel建立格式:

make(chan Type)  //等價於make(chan Type,0)

 若是沒有指定緩衝區容量,那麼該通道就是同步的,所以會阻塞到發送者準備好發送和接收者準備好接收。

實例:

package main
 
import (
    "fmt"
    "time"
    )
 
func main()  {
    //建立一個無緩存的channel
    ch := make(chan int,0)
 
    //len(ch)緩衝區剩餘數據個數,cap(ch)緩衝區大小
    fmt.Printf("len(ch)=%d,cap(ch)=%d\n",len(ch),cap(ch))
 
    //新建協程
    go func() {
        for i:=0;i<3;i++{
            fmt.Println("子協程:i=",i)
            ch <- i
        }
    }()
 
    //延時
    time.Sleep(2*time.Second)
 
    for i:=0;i<3;i++{
        num := <-ch    //讀取管道中內容,沒有內容前,阻塞
        fmt.Println("num =",num)
    }
}

 以上實例執行結果爲:

len(ch)=0,cap(ch)=0
子協程:i= 0
num = 0
子協程:i= 1
子協程:i= 2
num = 1
num = 2

3.3 有緩衝的channel

        有緩衝的通道(buffered channel)是一種在被接收前能存儲一個或多個值的通道。

        這種類型的通道並不強制要求goroutine之間必須同時完成發送和接收。通道會阻塞發送和接收動做的條件也會不一樣。只有在通道中沒有要接收的值時,接收動做纔會阻塞。只有在通道沒有可用緩衝區容納被髮送的值時,發送動做纔會阻塞。

        這致使有緩衝的通道和無緩衝的通道之間的一個很大的不一樣:無緩衝的通道保證進行發送和接收的goroutine會在同一時間進行數據交換;有緩衝的通道沒有這種保證。

  • 在第1步,右側的goroutine正在從通道接收一個值。
  • 在第2步,右側的這個goroutine獨立完成了接收值的動做,而左側的goroutine正在發送一個新值到通道里。
  • 在第3步,左側的goroutine還在向通道發送新值,而右側的goroutine正在從通道接收另一個值。這個步驟裏的兩個操做既不是同步的,也不是互相阻塞。
  • 在第4步,全部的發送和接收都完成,而通道里還有幾個值,也有一些空間能夠存更多的值。

        有緩衝的channel建立格式:

make(chan Type,capicity)

   若是給定了一個緩衝區容量,通道就是異步的。只要緩衝區有未使用空間用於發送數據,或還包含能夠接收的數據,那麼其通訊就會無阻塞地進行。

實例:

package main
 
import "fmt"
 
func main() {
 
    //建立一個有緩存的channel,容量爲3
    ch := make(chan int, 3)
    fmt.Printf("len(ch)=%d,cap(ch)=%d", len(ch), cap(ch))
 
}

輸出結果爲:

len(ch)=0,cap(ch)=3

實例:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    //建立一個有緩存的channel,容量爲3
    ch := make(chan int, 3)
    fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch))
 
    //新建協程
    go func() {
        for i := 0; i < 3; i++ {    //改爲i<10試試
            ch <- i //不會阻塞,ch容量爲3
            fmt.Printf("子協程[%d]:len(ch)=%d,cap(ch)=%d\n", i, len(ch), cap(ch))
        }
    }()
 
    //延時
    time.Sleep(2 * time.Second)
 
    for i := 0; i < 3; i++ {    //改爲i<10試試
        num := <-ch //讀取管道中內容,沒有內容前,阻塞
        fmt.Println("num =", num)
    }
 
}

輸出結果爲:

len(ch)=0,cap(ch)=3
子協程[0]:len(ch)=1,cap(ch)=3
子協程[1]:len(ch)=2,cap(ch)=3
子協程[2]:len(ch)=3,cap(ch)=3
num = 0
num = 1
num = 2

3.4 range和close

        close的用法

package main
 
import (
    "fmt"
)
 
func main() {
 
    //建立一個無緩存的channel
    ch := make(chan int)
    fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch))
 
    //新建協程
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i //往通道寫數據
        }
        //不須要再寫數據,關閉channel
        close(ch)
                ch <- 5  //關閉channel後沒法再發送數據
    }()
 
    for {
        //若是ok爲true,說明通道沒有關閉
        if num,ok:=<-ch;ok==true{
            fmt.Println("num = ",num)
        }else {        //通道關閉
            //fmt.Println(num)
            break
        }
    }
 
}

 上述實例打印結果爲:

len(ch)=0,cap(ch)=0
num =  0
num =  1
num =  2
num =  3
num =  4

注意點:

  • channel不像文件同樣須要常常去關閉,只有當你肯定沒有任何發送數據了,或者你想顯式地結束range循環之類的,纔去關閉channel;
  • 關閉channel後,沒法向channel再發送數據(引起panic錯誤後致使接收當即返回零值);
  • 關閉channel後,能夠繼續從channel接收數據;
  • 對於nil channel,不管收發都會被阻塞。

        range的用法:

package main
 
import (
    "fmt"
)
 
func main() {
 
    //建立一個無緩存的channel
    ch := make(chan int)
    fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch))
 
    //新建協程
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i //往通道寫數據
        }
        //不須要再寫數據,關閉channel
        close(ch)
        //ch <- 5  //關閉channel後沒法再發送數據
    }()
 
    for num:=range ch{        //能夠自動跳出循環
        fmt.Println("num = ",num)
    }
 
}

上述實例打印結果爲:

len(ch)=0,cap(ch)=0
num =  0
num =  1
num =  2
num =  3
num =  4

3.5 單方向的channel

        默認狀況下,通道是雙向的,也就是,既能夠往裏面發送數據也能夠從裏面取出數據。

        可是,咱們常常見一個通道做爲參數進行值傳遞並且但願對方是單向使用的,要麼只讓它發送數據,要麼只讓它接收數據,這時候咱們能夠指定通道的方向。

        單向channel變量的聲明很是簡單,以下:

var ch1 chan int  //ch1是一個正常的channel,不是單向的
var ch2 chan<- float64  //ch2是單向channel,只用於寫float64數據
var ch3 <-chan int  //ch3是單向channel,只用於讀取int數據
  • chan<- 表示數據進入管道,要把數據寫進管道,對於調用者就是輸出。
  • <-chan 表示數據從管道出來,對於調用者就是獲得管道的數據,固然就是輸入。

 能夠將channel隱式轉換爲單向隊列,只收或只發,不能將單向channel轉換爲普通channel。

實例:

package main
 
import "fmt"
 
func main()  {
 
    //建立一個雙向通道
    ch := make(chan int)
 
    //生產者,生產數字,寫入channel
    //新開一個協程
    go producer(ch)        //channel傳參,引用傳遞
 
    //消費者,從channel讀內容
    consumer(ch)
 
}
 
//此channel只能寫
func producer(in chan<- int){
    for i:=0;i<10;i++{
        in<-i
    }
    close(in)
}
 
//此channel只能讀
func consumer(out <-chan int)  {
    for num := range out{
        fmt.Println("num = ",num)
    }
}

上述實例打印結果爲:

num =  0
num =  1
num =  2
num =  3
num =  4
num =  5
num =  6
num =  7
num =  8
num =  9

3.6 定時器

1.Timer

        Timer是一個定時器,表明將來的一個單一事件,你能夠告訴timer你要等待多長時間,它提供一個channel,在將來的那個時間那個channel提供了一個時間值。

        time.MewTimer()方法:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
    //建立一個定時器,設置時間爲2s,2s後往time通道寫內容(當前時間)
    timer := time.NewTimer(2*time.Second)
    fmt.Println("Current time :",time.Now())
 
    // 2s後,往timer.C寫數據,有數據後,就能夠讀取
    t := <-timer.C    //channel沒有數據先後阻塞
    fmt.Println("t = ",t)
}

上述實例打印結果爲:

Current time : 2018-05-25 19:06:32.3679043 +0800 CST m=+0.005014201
t =  2018-05-25 19:06:34.3681931 +0800 CST m=+2.005303101

 time.NewTimer()時間到了,只會響應一次:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
    //建立一個定時器,設置時間爲2s,2s後往time通道寫內容(當前時間)
    timer := time.NewTimer(2*time.Second)
 
    for {
        <-timer.C    //只會寫一次,而後就阻塞,死鎖報錯
        fmt.Println("Time out.")
    }
 
}

上述實例輸出結果爲:

time out.
fatal error: all goroutines are asleep - deadlock!

time.Sleep()方法

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    //延時2s後打印
    time.Sleep(2*time.Second)
    fmt.Println("Time out.")
 
}

2s後打印:

Time out.

 time.After()方法:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    <-time.After(2*time.Second)        //定時2s,阻塞2s,2s後產生一個事件,往channel寫內容
    fmt.Println("Time out.")
 
}

2s後打印

Time out.

 time的停用:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    timer := time.NewTimer(3*time.Second)
    
    go func() {
        <-timer.C
        fmt.Println("Time out.")
    }()
    
    timer.Stop()    //中止定時器
 
    for {
 
    }
 
}

  time的重置:

package main
 
import (
    "fmt"
    "time"
)
 
func main() {
 
    timer := time.NewTimer(3*time.Second)
    timer.Reset(1*time.Second)
 
    <-timer.C
    fmt.Println("Time out.")
 
}

2.Ticker

        Ticker是一個定時觸發的計時器,它會以一個間隔(interval)往channel發送一個事件(當前時間),而channel的接收者能夠以固定的時間間隔從channel中讀取事件。

實例:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    ticker := time.NewTicker(1*time.Second)
 
    i:=0
    for {
        <-ticker.C
        i++
        fmt.Println(i)
    }
}

 上述實例輸出結果爲:

1
2
3
4
5
6
7
8
9
...

        ticker的中止:

package main
 
import (
    "fmt"
    "time"
)
 
func main(){
    ticker := time.NewTicker(1*time.Second)
 
    i:=0
    for {
        <-ticker.C
        fmt.Println(i)
        i++
        if i==5{
            ticker.Stop()
            break
        }
    }
}

上述實例輸出結果爲:

0
1
2
3
4

四、select

        Go裏面提供了一個關鍵字select,經過select能夠監聽channel上的數據流動。

        select的用法與switch語言很是相似,由select開始一個新的選擇塊,每一個選擇條件由case語句來描述。

        與switch語句能夠選擇任何可以使用相等比較的條件相比,select有比較多的限制,其中最大的一條限制就是每一個case語句裏必須是一個IO操做,大體的結構以下:

select{
case <-chan1:
    //若是chan1成功讀到數據,則進行case處理語句
case chan2<-1:
    //若是成功向chan2寫入數據,則進行該case處理語句
default:
    //若是上面都沒有成功,則進入default處理流程
}

在一個select語句中,Go語言會按順序從頭到尾評估每個發送和接收的語句。

        若是其中的任意一語句能夠繼續執行(即沒有被阻塞),那麼就從那些能夠執行的語句中任意選擇一條來使用。

        若是沒有任意一條語句能夠執行(即全部的通道都被阻塞),那麼有兩種可能的狀況:

 

  • 若是給出了default語句,那麼就會執行default語句,同時程序的執行會從select語句後的語句中恢復。
  • 若是沒有default語句,那麼select語句將被阻塞,直到至少有一個通訊能夠進行下去。

有時候會出現goroutine阻塞的狀況,那麼咱們如何避免整個程序進入阻塞的狀況呢?咱們能夠利用select來設置超時,經過以下的方式實現:

package main
 
import (
    "fmt"
    "time"
)
 
func main()  {
 
    ch := make(chan int)
    quit := make(chan bool)
 
    // 新開一個協程
    go func() {
        for ; ;  {
            select {
            case v := <-ch:
                fmt.Println(v)
            case <-time.After(3*time.Second):
                fmt.Println("Timeout.")
                quit<-true
                break
            }
        }
    }()
 
    //往ch中存放數據
    for i:=0;i<5;i++{
        ch<-i
        time.Sleep(time.Second)
    }
 
    <-quit
    fmt.Println("It is the end of the program.")
}

  上述實例輸出結果爲:

0
1
2
3
4
Timeout.
It is the end of the program.

 

 https://blog.csdn.net/u012869599/article/details/80404057
相關文章
相關標籤/搜索