Go 併發模式

基本概念

瞭解併發和並行

併發:強調一段時間作多件事編程

並行:強調同一時間作多件事segmentfault

CSP vs Actor 模型

Actor

Actor 模型是一個通用的併發編程模型,能夠應用在幾乎任何一種編程語言中,典型的是 Erlang。多個 actor(進程) 能夠同時運行、不共享狀態、經過向與進程綁定的消息隊列(也稱爲信箱)異步發送消息來進行通訊。設計模式

actor-1 與 actor-2 進程通訊依賴一個消息隊列,並且消息隊列與進程互相耦合綁定。actor-1 在發送完消息以後,在 actor-2 沒有處理該消息的狀況下,能夠繼續執行其餘任務,這說明 actor 進程之間的通訊是異步的。網絡

優勢
  • 消息傳輸和封裝,多個 Actor 能夠同時運行,但不共享狀態,並且單個 actor 中的事件是串行執行(這歸功於隊列)
  • Actor 模型支持共享內存模型,也支持分佈式內存模型
缺點
  • 儘管 Actor 模型比使用線程和鎖模型的程序更易 debug,可是也會存在死鎖的問題,並且還須要擔憂綁定進程的隊列溢出的問題
  • 沒有對並行提供直接支持,須要經過併發的技術來構造並行方案
CSP

CSP即通訊順序進程(communicating sequential processes),與 Actor 模型相似,該模型也是由獨立的、併發執行的實體所組成,實體之間經過發送消息進行通訊。go 中的 csp 模型 channel 對於goroutine來講是匿名的,不須要和 gid 綁定,經過 channel 完成 goroutine 之間的通訊。(channel 在 CSP 表明通道的概念,這裏只討論 Go 相關,channel 等價於 Go 中的 channel)併發

優勢
  • 與 Actor 相比,CSP 最大的優勢是靈活性。Actor 模型,負責通訊的媒介和執行單元是耦合的。而 CSP 中,channel 是第一類對象,能夠被獨立創造、寫入、獨處數據,也能夠在不一樣執行單元中傳遞。
缺點
  • CSP 模型也易受死鎖影響,且沒有提供直接的並行支持。並行須要創建在併發基礎上,引入了不肯定性。
區別
  • Actor 模型重在參與交流的實體(即進程),而 CSP 重在交流的通道,如 Go 中的 channel
  • CSP 模型不關注發送消息的進程,而是關注發送消息時使用的 channel,而 channel 不像 Actor 模型那樣進程與隊列緊耦合。而是能夠單首創建和讀寫,並在進程 (goroutine) 之間傳遞。
GO 中的併發模型

Go 是採用 SCP 的思想的,channel 是 go 在併發編程通訊的推薦手段,Go 的設計者 Rob Pike有一句經典的名言,app

Do not communicate by sharing memory; instead, share memory by communicating.異步

這句話是說「不要使用共享內存通訊,而是應該使用通訊取共享內存」,Go 語言推薦咱們使用通訊來進行進程間同步消息。這樣作有三點好處,來源於 draveness 的博客文章。編程語言

  1. 首先,使用發送消息來同步信息相比於直接使用共享內存和互斥鎖是一種更高級的抽象,使用更高級的抽象可以爲咱們在程序設計上提供更好的封裝,讓程序的邏輯更加清晰;
  2. 其次,消息發送在解耦方面與共享內存相比也有必定優點,咱們能夠將線程的職責分紅生產者和消費者,並經過消息傳遞的方式將它們解耦,不須要再依賴共享內存;
  3. 最後,Go 語言選擇消息發送的方式,經過保證同一時間只有一個活躍的線程可以訪問數據,可以從設計上自然地避免線程競爭和數據衝突的問題;

併發設計模式

上文介紹了 Go 中使用的併發模型,而在這種併發模型下面 channel 是一個重要的概念,而下面每一種模式的設計都依賴於 channel,因此有必要了解一下。分佈式

Barrier 模式

barrier 屏障模式故名思義就是一種屏障,用來阻塞直到聚合全部 goroutine 返回結果。 可使用 channel 來實現。函數

使用場景
  • 多個網絡請求併發,聚合結果
  • 粗粒度任務拆分併發執行,聚合結果

代碼實現
/* * Barrier */
type barrierResp struct {
    Err error
    Resp string
    Status int
}

// 構造請求
func makeRequest(out chan<- barrierResp, url string)  {
    res := barrierResp{}

    client := http.Client{
        Timeout: time.Duration(2*time.Microsecond),
    }

    resp, err := client.Get(url)
    if resp != nil {
        res.Status = resp.StatusCode
    }
    if err != nil {
        res.Err = err
        out <- res
        return
    }

    byt, err := ioutil.ReadAll(resp.Body)
    defer resp.Body.Close()
    if err != nil {
        res.Err = err
        out <- res
        return
    }

    res.Resp = string(byt)
    out <- res
}

// 合併結果
func barrier(endpoints ...string) {
    requestNumber := len(endpoints)

    in := make(chan barrierResp, requestNumber)
    response := make([]barrierResp, requestNumber)

    defer close(in)

    for _, endpoints := range endpoints {
        go makeRequest(in, endpoints)
    }

    var hasError bool
    for i := 0; i < requestNumber; i++ {
        resp := <-in
        if resp.Err != nil {
            fmt.Println("ERROR: ", resp.Err, resp.Status)
            hasError = true
        }
        response[i] = resp
    }
    if !hasError {
        for _, resp := range response {
            fmt.Println(resp.Status)
        }
    }
}

func main() {
    barrier([]string{"https://www.baidu.com", "http://www.sina.com", "https://segmentfault.com/"}...)
}
複製代碼
Tips

Barrier 模式也可使用 errgroup 擴展庫來實現,這樣更加簡單明瞭。這個包有點相似於 sync.WaitGroup,可是區別是當其中一個任務發生錯誤時,能夠返回該錯誤。而這也知足咱們 Barrier 模式的需求。

func barrier(endpoints ...string) {
    var g errgroup.Group
    var mu sync.Mutex
  
    response := make([]barrierResp, len(endpoints))

    for i, endpoint := range endpoints {
        i, endpoint := i, endpoint // create locals for closure below
        g.Go(func() error {
            res := barrierResp{}
            resp, err := http.Get(endpoint)
            if err != nil {
                return err
            }

            byt, err := ioutil.ReadAll(resp.Body)
            defer resp.Body.Close()
            if err != nil {
                return err
            }

            res.Resp = string(byt)
            mu.Lock()
            response[i] = res
            mu.Unlock()
            return err
        })
    }
    if err := g.Wait(); err != nil {
       fmt.Println(err)
    }
    for _, resp := range response {
        fmt.Println(resp.Status)
    }
}
複製代碼

Future 模式

future 即將來,來自將來的模式(手動狗頭)。這個模式經常使用在異步處理也稱爲 Promise 模式,採用一種 fire-and-forget 的方式,是指主 goroutine 不等子 goroutine 執行完就直接返回了,而後等到將來執行完的時候再去取結果。在 Go 中因爲 goroutine 的存在,實現這種模式是挺簡單的。

使用場景
  • 異步

代碼實現
/* * Future */
type Function func(string) (string, error) type Future interface {
    SuccessCallback() error
    FailCallback()    error
    Execute(Function) (bool, chan struct{})
}

type AccountCache struct {
    Name string
}

func (a *AccountCache) SuccessCallback() error {
    fmt.Println("It's success~")
    return nil
}

func (a *AccountCache) FailCallback() error {
    fmt.Println("It's fail~")
    return nil
}

func (a *AccountCache) Execute(f Function) (bool, chan struct{}){
    done := make(chan struct{})
    go func(a *AccountCache) {
        _, err := f(a.Name)
        if err != nil {
            _ = a.FailCallback()
        } else {
            _ = a.SuccessCallback()
        }
        done <- struct{}{}
    }(a)
    return true, done
}

func NewAccountCache(name string) *AccountCache {
    return &AccountCache{
        name,
    }
}

func testFuture() {
    var future Future
    future = NewAccountCache("Tom")
    updateFunc := func(name string) (string, error){
        fmt.Println("cache update:", name)
        return name, nil
    }
    _, done := future.Execute(updateFunc)
    defer func() {
        <-done
    }()
}

func main() {
    var future Future
    future = NewAccountCache("Tom")
    updateFunc := func(name string) (string, error){
        fmt.Println("cache update:", name)
        return name, nil
    }
    _, done := future.Execute(updateFunc)
    defer func() {
        <-done
    }()
    // do something
}
複製代碼

這裏有一個技巧:爲何使用 struct 類型做爲 channel 的通知?

不少開源代碼都是使用這種方式來做爲信號通知機制,主要是由於空 struct 在 Go 中佔的內存是最少的。

Pipeline 模式

使用場景
  • 能夠利用多核的優點把一段粗粒度邏輯分解成多個 goroutine 執行

Pipeline 自己翻譯過來就是管道的意思,注意和 Barrire 模式不一樣的是,它是按順序的,相似於流水線。

這個圖不是很能表達並行的概念,其實三個 goroutine 是同時執行的,經過 buffer channel 將三者串起來,只要前序 goroutine 處理完一部分數據,就往下傳遞,達到並行的目的。

代碼實現

實現一個功能,給定一個切片,而後求它的子項的平方和。

例如,[1, 2, 3] -> 1^2 + 2^2 + 3^2 = 14。

正常的邏輯,遍歷切片,而後求平方累加。使用 pipeline 模式,能夠把求和和求平方拆分出來並行計算。

/* * Pipeline 模式 */

func generator(max int) <-chan int{
    out := make(chan int, 100)
    go func() {
        for i := 1; i <= max; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

func power(in <-chan int) <-chan int{
    out := make(chan int, 100)
    go func() {
        for v := range in {
            out <- v * v
        }
        close(out)
    }()
    return out
}

func sum(in <-chan int) <-chan int{
    out := make(chan int, 100)
    go func() {
        var sum int
        for v := range in {
            sum += v
        }
        out <- sum
        close(out)
    }()
    return out
}

func main() {
    // [1, 2, 3]
    fmt.Println(<-sum(power(generator(3))))
}
複製代碼

Workers Pool 模式

使用場景
  • 高併發任務

在 Go 中 goroutine 已經足夠輕量,甚至 net/http server 的處理方式也是 goroutine-per-connection 的,因此比起其餘語言來講可能場景稍微少一些。每一個 goroutine 的初始內存消耗在 2~8kb,當咱們有大批量任務的時候,須要起不少 goroutine 來處理,這會給系統代理很大的內存開銷和 GC 壓力,這個時候就能夠考慮一下協程池。

代碼實現
/* * Worker pool */
type TaskHandler func(interface{}) type Task struct {
    Param   interface{}
    Handler TaskHandler
}

type WorkerPoolImpl interface {
    AddWorker()                  // 增長 worker
    SendTask(Task)               // 發送任務
    Release()                    // 釋放
}

type WorkerPool struct {
    wg   sync.WaitGroup
    inCh chan Task
}

func (d *WorkerPool) AddWorker() {
    d.wg.Add(1)
    go func(){
        for task := range d.inCh {
            task.Handler(task.Param)
        }
        d.wg.Done()
    }()
}

func (d *WorkerPool) Release() {
    close(d.inCh)
    d.wg.Wait()
}

func (d *WorkerPool) SendTask(t Task) {
    d.inCh <- t
}

func NewWorkerPool(buffer int) WorkerPoolImpl {
    return &WorkerPool{
        inCh: make(chan Task, buffer),
    }
}

func main() {
    bufferSize := 100
    var workerPool = NewWorkerPool(bufferSize)
    workers := 4
    for i := 0; i < workers; i++ {
        workerPool.AddWorker()
    }

    var sum int32
    testFunc := func (i interface{}) {
        n := i.(int32)
        atomic.AddInt32(&sum, n)
    }
    var i, n int32
    n = 1000
    for ; i < n; i++ {
        task := Task{
            i,
            testFunc,
        }
        workerPool.SendTask(task)
    }
    workerPool.Release()
    fmt.Println(sum)
}
複製代碼

協程池使用了反射來獲取執行的函數及參數,在 Go 中可能有點讓人有點膈應。可是若是批量執行的函數是已知的,能夠優化成一種只執行指定函數的協程池,可以提高性能。

Pub/Sub 模式

發佈訂閱模式是一種消息通知模式,發佈者發送消息,訂閱者接收消息。

使用場景
  • 消息隊列

代碼實現
/* * Pub/Sub */
type Subscriber struct {
    in     chan interface{}
    id     int
    topic  string
    stop   chan struct{}
}

func (s *Subscriber) Close() {
    s.stop <- struct{}{}
    close(s.in)
}

func (s *Subscriber) Notify(msg interface{}) (err error) {
    defer func() {
        if rec := recover(); rec != nil {
            err = fmt.Errorf("%#v", rec)
        }
    }()
    select {
    case s.in <-msg:
    case <-time.After(time.Second):
        err = fmt.Errorf("Timeout\n")
    }
    return
}

func NewSubscriber(id int) SubscriberImpl {
    s := &Subscriber{
        id: id,
        in: make(chan interface{}),
        stop: make(chan struct{}),
    }
    go func() {
        for{
            select {
            case <-s.stop:
                close(s.stop)
                return
            default:
                for msg := range s.in {
                    fmt.Printf("(W%d): %v\n", s.id, msg)
                }
            }
    }}()
    return s
}

// 訂閱者須要實現的方法
type SubscriberImpl interface {
    Notify(interface{}) error
    Close()
}

// sub 訂閱 pub
func Register(sub Subscriber, pub *publisher){
    pub.addSubCh <- sub
    return
}

// pub 結果定義
type publisher struct {
    subscribers []SubscriberImpl          
    addSubCh    chan SubscriberImpl
    removeSubCh chan SubscriberImpl
    in          chan interface{}
    stop        chan struct{}
}

// 實例化
func NewPublisher () *publisher{
    return &publisher{
        addSubCh: make(chan SubscriberImpl),
        removeSubCh: make(chan SubscriberImpl),
        in: make(chan interface{}),
        stop: make(chan struct{}),
    }
}

// 監聽
func (p *publisher) start() {
    for {
        select {
        // pub 發送消息
        case msg := <-p.in:
            for _, sub := range p.subscribers{
                _ = sub.Notify(msg)
            }
        // 移除指定 sub
        case sub := <-p.removeSubCh:
            for i, candidate := range p.subscribers {
                if candidate == sub {
                    p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
                    candidate.Close()
                    break
                }
            }
        // 增長一個 sub
        case sub := <-p.addSubCh:
            p.subscribers = append(p.subscribers, sub)
        // 關閉 pub
        case <-p.stop:
            for _, sub := range p.subscribers {
                sub.Close()
            }
            close(p.addSubCh)
            close(p.in)
            close(p.removeSubCh)
            return
        }
    }
}

func main() {
    // 測試代碼
    pub := NewPublisher()
    go pub.start()

    sub1 := NewSubscriber(1)
    Register(sub1, pub)

    sub2 := NewSubscriber(2)
    Register(sub2, pub)

    commands:= []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
    for _, c := range commands {
        pub.in <- c
    }

    pub.stop <- struct{}{}
    time.Sleep(time.Second*1)
}
複製代碼

注意事項

  • 同步問題,尤爲同步原語和 channel 一塊兒用時,容易出現死鎖
  • goroutine 崩潰問題,若是子 goroutine panic 沒有 recover 會引發主 goroutine 異常退出
  • goroutine 泄漏問題,確保 goroutine 能正常關閉

參考

  1. 《go design pattern》書
  2. 《七週七併發模型》書
  3. 爲何使用通訊來共享內存?· Why's THE Design?
  4. advanced-go-concurrency
相關文章
相關標籤/搜索