Golang 高效實踐之併發實踐channel篇

前言

在我前面一篇文章Golang受歡迎的緣由中已經提到,Golang是在語言層面(runtime)就支持了併發模型。那麼做爲編程人員,咱們在實踐Golang的併發編程時,又有什麼須要注意的點呢?下面我會跟你們詳細的介紹一些在實際生產編程中很容易踩坑的知識點。html

CSP

在介紹Golang的併發實踐前,有必要先介紹簡單介紹一下CSP理論。CSP,全稱是Communicating sequential processes,翻譯爲通訊順序進程,又翻譯爲交換消息的順序程序,用來描述併發性系統的交互模式。CSP有如下三個特色:git

1.每一個程序是爲了順序執行而建立的github

2.數據經過管道來通訊,而不是經過共享內存golang

3.經過增長相同的程序來擴容編程

Golang的併發模型基於CSP理論,Golang併發的口號是:不用經過共享內存來通訊,而是經過通訊來共享內存。多線程

Golang併發模式

Golang用來支持併發的元素集:併發

  • goroutines
  • channels
  • select
  • sync package

其中goroutines,channels和select 對應於實現CSP理論,即經過通訊來共享內存。這幾乎能解決Golang併發的90%問題,另外的10%場景須要經過同步原語來解決,即sync包相關的結構。app

看圖識channel

如上圖所示,咱們從一個簡單的沙桶傳遞小遊戲來認識Golang中的channel。其中藍色的Gopher爲發送方,紫色的Gopher爲接受方,中間的灰色Gopher表明channel的緩衝區大小。異步

 

 

channel介紹

阻塞channel

不帶buffer的channel阻塞狀況socket

unbuffered := make(chan int)

a := <- unbuffered // 阻塞

unbuffered  := make(chan int) 

// 1) 阻塞

a := <- unbuffered

// 2) 阻塞

unbuffered <- 1

// 3) 同步

go func() { <-unbuffered }()

unbuffered <- 1

帶buffer的channel阻塞狀況

buffered := make(chan int, 1)

// 4) 阻塞

a := <- buffered

// 5) 不阻塞

buffered <-1

// 6) buffer滿,阻塞

buffered <-2

上述狀況其實概括起來很簡單:無論有無緩衝區channel,寫滿或者讀空都會阻塞。

不帶buffer和帶buffer的channel用途:

  • 不帶buffer的channel:用於同步通訊。
  • 帶buffer的channel:用於異步通訊。

關閉channel

c := make(chan int)

close(c)

fmt.Println(<-c) //接收並輸出chan類型的零值,這裏int是0 

須要特殊說明的是,channel不像socket或者文件,不須要經過close來釋放資源。須要close的惟一狀況是,經過close觸發channel讀事件,comma,ok := <- c 中ok爲false,表示channel已經關閉。只能在發送端close channel,由於channel關閉接收端能感知到,可是發送端感知不到,只能主動關閉。往已經關閉的channel發送信息將會觸發panic。

select

相似switch語句,只不過case都是channel的讀或者寫操做,也多是default。case的順序一點都不重要,不要依賴case的前後來定義優先級,第一個非阻塞(send and/or receive)的case將會被選中。

使channel不阻塞

func TryReceive(c <-chan int) (data int, more, ok bool) {

  select {

  case data, more = <- c:

    return data, more, true

  }

  default:

    return 0, true, false

}

當select中的case都處於阻塞狀態時,就會選中default分支。

或者超時返回:

func TryReceiveWithTimeout(c <-chan int, duration time.Duration) (data int, more, ok bool) {

  select {

  case data, more = <-c:

    return data, more, true

  case <- time.After(duration):

    return 0, true, false
  }
}

time.After(duration)會返回一個channel,當duration到期時會觸發channel的讀事件。

Channel的缺點:

1.Channel可能會致使死鎖(循環阻塞)

2.channel中傳遞的都是數據的拷貝,可能會影響性能

3.channel中傳遞指針會致使數據競態問題(data race/ race conditions)

第三點中提到了數據競態問題,也就是一般所說data race。在接着往下講以前有必要先簡單講解下data race的危害。data race 指的是多線程併發讀寫一個變量,對應到Golang中就是多個goroutine同時讀寫一個變量,這種行爲是未定義的,也就是說讀變量出來的值頗有可能不是寫入的值,這個值是任意值都有可能。 

例以下面這段代碼:

package main

import (
    "fmt"
    "runtime"
    "time"
)

var i int64 = 0

func main() {
    runtime.GOMAXPROCS(2)
    go func() {
        for {
            fmt.Println("i is", i)
            time.Sleep(time.Second)
        }
    }()

    for {
        i += 1
    }
}

在我mac本地環境會不斷的輸出0。全局變量i被兩個goroutine同時讀寫,也就是咱們所說的data race,致使了i的值是未定義的。若是讀寫的是一塊動態伸縮的內存,頗有可能會致使panic。例如多goroutine讀寫map。幸運的是,Golang針對data race有專門的內置工具,例如把上面的代碼保存爲main.go,執行 go run -race main.go 會把相關的data race輸出:

==================

WARNING: DATA RACE

Read at 0x00000121e848 by goroutine 6:

  main.main.func1()

      /Users/saas/src/awesomeProject/datarace/main.go:15 +0x3e

 

Previous write at 0x00000121e848 by main goroutine:

  main.main()

      /Users/saas/src/awesomeProject/datarace/main.go:21 +0x7b

 

Goroutine 6 (running) created at:

  main.main()

      /Users/saas/src/awesomeProject/datarace/main.go:13 +0x4f

==================

那要怎麼改良這個程序呢?改法很簡單,也有不少種。上面咱們已經提到了Golang併發的口號是:不要經過共享內存來通訊,而是經過通訊來共享內存。先來看下經過共享內存來通訊的改良版:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

var i int64 = 0

func main() {
    runtime.GOMAXPROCS(2)
    var m sync.Mutex
    go func() {
        for {
            m.Lock()
            fmt.Println("i is", i)
            m.Unlock()
            time.Sleep(time.Second)
        }
    }()

    for {
        m.Lock()
        i += 1
        m.Unlock()
    }
}

經過加鎖互斥訪問(共享)變量i,也就是上面所說的經過共享內存來通訊。那麼經過通訊來共享內存也是怎麼實施的呢?答案是用channel:

package main

import (
    "fmt"
    "runtime"
    "time"
)

var i int64 = 0

func main() {
    runtime.GOMAXPROCS(2)
    c := make(chan int64)
    go func() {
        for {
            fmt.Println("i is", <-c)
            time.Sleep(time.Second)
        }
    }()

    for {
        i += 1
        c<-i
    }
}

上面提到了一些channel的缺點,文章一開始我也提到了channel能解決Golang併發編程的90%問題,那剩下的一些少數併發狀況用什麼更優的方案呢?

鎖會不會是個更優的解決方案呢?

鎖就像廁所的坑位同樣,你佔用的時間越長,等待的人排的隊就會越長。讀寫鎖只會減緩這種狀況。另外使用多個鎖很容易致使死鎖。總而言之,鎖不是咱們只在尋找的方案。 

原子操做

原子操做是這10%場景有限考慮的解決方案。原子操做是在CPU層面保證了原子性。不用編程人員加鎖。Golang對應的操做在sync.atomic 包。Store, Load,  Add, Swap 和 CompareAndSwap方法。

CompareAndSwap 方法

type Spinlock struct {

  state *int32

}

const free = int32(0)

func (l *Spinlock) Lock() {

  for !atomic.CompareAndSwapInt32(l.state, free, 42) { //若是state等於0就賦值爲42

    runtime.Gosched() //讓出CPU

  }

}

func (l *Spinlock) Unlock(){

  atomic.StoreInt32(l.state, free)  // 全部操做state變量的操做都應該是原子的

}

 

基於上面的一些併發實踐的建議是:

1.避免阻塞,避免數據競態

2.用channel避免共享內存,用select管理channel

3.當channel不適用於你的場景時,儘可能用sync包的原子操做,若是實在須要用到鎖,儘可能縮小鎖的粒度(鎖住儘可能少的代碼)。

 

併發程序找錯 

根據前面介紹的內容,咱們來看看下面的這個例子有沒有什麼問題:

func restore(repos []string) error {
    errChan := make(chan error, 1)
    sem := make(chan int, 4) // four jobs at once
    var wg sync.WaitGroup
    wg.Add(len(repos))
    for _, repo := range repos {
        sem <- 1
        go func() {
            defer func() {
                wg.Done()
                <- sem    
            }()
            if err := fetch(repo); err != nil {
                errChan <- err
            }
        }()
    }
    wg.Wait()
    close(sem)
    close(errChan)
    return <- errChan
}

Bug1. sem無需關閉

Bug2.go和匿名函數觸發的bug,repo不斷在更新,fetch拿到的repo是未定義的。有data race問題。

Bug3.sem<-1放在go func外面啓動同時有4個goroutine在運行,並不能很好的控制同時有4個fetch任務。

Bug4. errChan的緩衝區大小爲1,當多個fetch產生err時,將會致使程序死鎖。

改良後的程序:

func restore(repos []string) error {
    errChan := make(chan error, 1)
    sem := make(chan int, 4) // four jobs at once
    var wg sync.WaitGroup
    wg.Add(len(repos))
    for _, repo := range repos {
        go worker(repo, sem, &wg, errChan)
    }
    wg.Wait()
    close(errChan)
    return <- errChan
}

Func worker(repo string, sem chan int, wg *sync.WaitGroup, errChan chan err) {
    defer wg.Done()
    sem <- 1        
    if err := fetch(repo); err != nil {
        select {
        case errChan <- err:
            // we are the first worker to fail
        default:
            // some other failure has already happened, drop this one
        }
    }
    <- sem    
}

最後思考:爲何errChan必定要close?

由於最後的return<-errChan,若是fetch的err都爲nil,那麼errChan就是空,<-errChan是個永久阻塞的操做,close(sem)會觸發讀事件,返回chan累心的零值,這裏是nil。

基於上面的一些併發實踐的建議是:

1.channel不是socket和file這種資源,不須要經過close來釋放資源

2.避免將goroutine和匿名函數一塊兒使用

3.在你啓動一個goroutine以前,必定要清楚它會在何時,什麼狀況下會退出。

 

總結

本文介紹了Golang併發編程的一些高效實踐建議,旨在讓你們在Golang併發實踐中少踩坑。其中data race問題和goroutine退出的時機尤其重要。

 

參考

https://www.youtube.com/watch?v=YEKjSzIwAdA

https://www.youtube.com/watch?v=yKQOunhhf4A

https://www.youtube.com/watch?v=QDDwwePbDtw

https://ms2008.github.io/2019/05/12/golang-data-race/

相關文章
相關標籤/搜索