go--併發

goroutine

在go語言中無須像其餘語言中同樣維護線程池、進程等,而是使用goroutine進行調度和管理。golang

使用goroutine

go程序中使用go關鍵字爲一個函數建立一個goroutine。一個函數能夠被建立爲多個goroutine,一個goroutine一定對應一個函數。安全

啓動單個goroutine

啓動goroutine的方式很是簡單,只須要在調用的函數(普通函數和匿名函數)前面加上一個go關鍵字。併發

舉個例子以下:函數

func hello() {
    fmt.Println("Hello goroutine")
}

func main() {
    hello()
    fmt.Println("main goroutine done!")
}

這個實例中hello函數和下面的語句是串行的,執行的結果是打印完hello goroutine後打印main goroutine done!性能

接下來咱們在調用hello函數前面加上關鍵字go,也就是啓動一個goroutine去執行hello這個函數。優化

func main () {
    go hello()
    fmt.Println("main goroutine done!")
}

此次執行結果只打印了main goroutine done!,並無打印hello goroutine,由於在程序啓動時,go程序就會爲main()函數建立一個默認的goroutine。當main()函數返回的時候該goroutine就結束了,全部在main()函數中啓動的goroutine會一同結束。線程

此時,先要讓main函數等一等hello函數,最簡單的方式是使用Sleepcode

func main() {
    go hello()
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}

sync.WaitGroup

在代碼中聲音的使用time.Sleep不是最好的辦法,go語言中可使用sync.WaitGroup來實現併發任務的同步。sync.WaitGroup方法以下:生命週期

方法 說明
(wg *WaitGroup) Add(delta int) 計數器+delta
(wg *WaitGroup) Done() 計數器-1
(wg *WaitGroup Wait()) 阻塞直到計數器變爲0

sync.WaitGroup內部維護着一個計數器,計數器的值能夠增減和減小。當計數器值爲0時,表示全部併發任務已經完成。隊列

咱們利用sync.WaitGroup將上面的代碼優化一下

var wg sync.WaitGroup

func hello() {
    defer wg.Done()
    fmt.Println("Hello goroutine!")
}

func main() {
    wg.Add(1)
    go hello()
    fmt.Println("main goroutine done!")
    wg.Wait()
}

goroutine與線程

可增加的棧

OS線程通常都有固定的棧內存(一般爲2MB),一個goroutine的棧在其生命週期開始時只有很小的棧(典型狀況下爲2KB),goroutine的棧不是固定的,他能夠按需增大和縮小,goroutine的棧大小限制能夠達到1GB,雖然極少會用到這個大小。因此在go語言中一次建立十萬左右的goroutine也是能夠的。

goroutine調度

OS線程是由OS內核來調度的,goroutine則是由go運行時(runtime)本身的調度器調度的,這個調度器使用一個稱爲m:n調度的技術(複用/調度 m個goroutine到n個OS線程)。goroutine的調度不須要切換內核預警,因此調用一個goroutine比調度一個線程成本低不少。

GOMAXPROCS

go運行時的調度器使用GOMAXPROCS參數來肯定須要使用多少個OS線程來同時執行go代碼。默認值是機器上的CPU核心數。例如在一個8核心的機器上,調度器會把go代碼同時調度到8個OS線程上(GOMAXPROCS是m:n調度中的n)。

go語言中可使用runtime.GOMAXPROCS函數設置當前程序併發時佔用的CPU邏輯核心數。

go1.5版本以前,默認使用的是單核心執行。go1.5版本以後,默認是會用所有的CPU邏輯核心數。

channel

單純地將函數併發執行時沒有意義的。函數與函數之間須要交換數據才能體現併發執行函數的意義。

雖然可使用共享內存進行數據交換,可是共享內存在不一樣的goroutine中容易發生竟態問題。爲了保證數據交換的正確性,必須使用互斥量對內存進行加鎖,這種作法勢必形成性能問題。

go語言的併發模型是CSP,提倡經過通訊共享內存而不是經過共享內存而實現通訊。

Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,老是遵循先入先出(First In First Out)的規則,保證收發數據的順序。每個通道都是一個具體類型的導管,也就是聲明channel的時候須要爲其指定元素類型。

聲明channel

聲明通道類型的格式以下:

var 變量 chan 元素類型

var ch1 chan int
var ch2 chan bool
var ch3 chan []int

建立channel

通道是引用類型 ,通道類型的空值是nil

聲明通道後須要使用make函數初始化後才能使用,建立channel的格式以下:

make (chan 元素類型, [緩衝個數])

channel操做

通道有發送(send)、接收(receive)和關閉(close)三種操做。發送和接收都使用<-符號。

ch := make(chan int)

// 發送
ch <- 10 // 把10發送到ch中

// 接收
x := <- ch // 從ch中接收值並賦值給變量x
<-ch       // 從ch中接收值,忽略結果

// 關閉
close(ch)

關於關閉通道須要注意的事情是,只有在通知接收方goroutine全部的數據都發送完畢的時候才須要關閉通道。通道是能夠被垃圾回收機制回收的,它和關閉文件是不同的,在結束操做以後關閉文件是必需要作的,但關閉通道不是必須的。

關閉後的通道有如下特色:

  • 對一個關閉的通道再發送值就會致使panic。
  • 對一個關閉的通道進行接收會一直獲取值直到通道爲空。
  • 對一個關閉的而且沒有值的通道執行接收操做會獲得對應類型的零值。
  • 關閉一個已經關閉的通道會致使panic。

無緩衝的通道

無緩衝的通道又稱爲阻塞的通道。

func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("發送成功")
}

上面這段代碼可以經過編譯,可是執行時會報deadlock的錯

爲何會出現死鎖呢?由於ch := make(chal int)建立的是無緩衝通道,無緩衝通道只有在有人接收值的時候才能發送值。上面的代碼會阻塞在ch := make(chan int)這一行造成死鎖,解決這個問題能夠用如下的方法:

// 啓用一個goroutine區接收值

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}

func main() {
    ch := make(chan int)
    go recv(ch)
    ch <- 10
    fmt.Println("發送成功")
}

無緩衝通道上發送操做會阻塞,直到另外一個goroutine在該通道上執行接收操做,這時值才能發送成功,兩個goroutine將繼續執行。相反,若是接收操做先執行,接收方的goroutine將阻塞,直到另外一個goroutine在該通道上發送一個值。

使用無緩衝通道進行通訊將致使發送和接收的goroutine同步化。所以,無緩衝通道也被稱爲同步通道

有緩衝的通道

解決上面的問題的方法還有一種就是使用有緩衝區的通道。有緩衝的通道即便用make函數初始化通道的時候爲其指定一個容量。

func main() {
    ch := make(chal int, 1)    // 建立一個容量爲1的有緩衝區的通道
    ch  <- 10
}

只要通道的容量大於0,那麼該通道就是有緩衝的通道,通道的容量表示通道中能存放元素的數量。

如何優雅的從通道循環取值

當經過通道發送有限的數據時,能夠經過close函數關閉通道來告知從該通道接收值的goroutine中止等待。當通道被關閉時,往該通道發送值會引起panic,從該通道里接收的值一直都是類型零值,如何判斷一個通道是否被關閉了呢?

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    // 開啓goroutine將0~100的數發送到ch1中
    go func() {
        for i := 0;i < 100;i++ {
            ch1 < i
        }
    }()

    // 開啓goroutine從ch1中接收值,並將該值的平方發送到ch2中
    go func() {
        for {
            i, ok := <- ch1
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()

    // 在主goroutine中從ch2接收值打印
    for i := range ch2 {
        fmt.Println(i)
    }
}

單向通道

func counter(out chan <- int) {
    for i := 0; i < 100; i ++ {
        out <- i
    }

    close(out)
}

func squarer(out chan <- int, in <- chan int) {
    for i := range in {
        out <- i * i
    }

    close(out)
}

func printer(in <- chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中:

  • chan <- int是一個只能發送的通道,能夠發送可是不能接收。
  • <- chan int是一個只能接收的通道,能夠接收可是不能發送。

worker pool(goroutine池)

在工做中咱們一般會使用能夠指定啓動的goroutine數量的worker pool模式,控制goroutine的數量,防止gouroutine泄露和暴漲。

func worker(id int, jobs <- chan int, results chan <- int) {
    fmt.Printf("start worder: %d\n", id)
    for j := range jobs {
        fmt.Printf("worker:%d start job:%d\n", id, j)
        time.Sleep(time.Second)
        fmt.Printf("worker:%d end job:%d\n", id, j)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    fmt.Println("start fill")
    for j := 1; j <= 5;j ++ {
        jobs <- j
    }

    close(jobs)

    for a := 1; a <= 5; a ++ {
        <- results
    }
}

select多路複用

在某些場景下咱們須要同時從多個通道接收數據。通道在接收數據時,若是沒有數據能夠接收將會發生阻塞。你也許會寫出一下代碼使用遍從來實現:

for {
    data, ok := <- ch1
    data, ok := <- ch2
}

這種方式雖然能夠實現從多個通道接收值的需求,可是運行性能會差不少。爲了應對這種場景,go內置了select關鍵字,能夠同時響應多個通道的操做。

select的使用相似於switch語句,它有一系列case分支和一個默認的分支。每一個case會對應一個通道的通訊(接收或發送)過程。select會一直等待,知道某個case的通訊操做完成時,就會執行case分支對應的語句。格式以下:

select {
    case <- ch1:
        ...
    case data := <- ch2:
        ...
    case ch3 <- data:
        ...
    default:
        默認操做
}

舉個例子來演示:

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i ++ {
        select {
            case x := <- ch:
                fmt.Println(x)
            case ch <- i:
        }
    }
}

注:

  • 可處理一個活多個channel的發送/接收操做。
  • 若是多個case同時知足,select會隨機選擇一個。
  • 對於沒有case的select{}會一直等待,可用於阻塞main函數

併發安全和鎖

相關文章
相關標籤/搜索