Go併發編程之傳統同步—(2)條件變量

前言

回顧上篇文章《Go併發編程之傳統同步—(1)互斥鎖》其中說到,同步最終是爲了達到如下兩種目的:linux

  • 維持共享數據一致性,併發安全
  • 控制流程管理,更好的協同工做

示例程序經過使用互斥鎖,達到了數據一致性目的,那麼流程管理應該怎麼作呢?git

傳統同步

條件變量

上篇文章的示例程序,僅僅實現了累加功能,但在現實的工做場景中,需求每每不可能這麼簡單,如今擴展一下這個程序,給它加上累減的功能。github

加上了累減的示例程序,能夠抽象的理解爲一個固定容量的「儲水池」,能夠注水、排水。編程

僅用互斥鎖

當水注滿之後,中止注水,開始排水,當水排空之後,開始注水,反反覆覆...segmentfault

func TestDemo1(t *testing.T) {
    var mut sync.Mutex
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            mut.Lock()
            if counter == maxSize {
                for i := 0; i < maxSize; i++ {
                    counter--
                    log.Printf("OUTPUT counter = %d", counter)
                }
            }
            mut.Unlock()
            time.Sleep(1 * time.Second)
        }
    }()

    // 注水口
    for {
        mut.Lock()
        if counter == 0 {
            for i := 0; i < maxSize; i++ {
                counter++
                log.Printf(" INPUT counter = %d", counter)
            }
        }
        mut.Unlock()
        time.Sleep(1 * time.Second)
    }
}

結果安全

=== RUN   TestDemo1
                ···
2020/10/06 13:52:50  INPUT counter = 8
2020/10/06 13:52:50  INPUT counter = 9
2020/10/06 13:52:50  INPUT counter = 10
2020/10/06 13:52:50 OUTPUT counter = 9
2020/10/06 13:52:50 OUTPUT counter = 8
2020/10/06 13:52:50 OUTPUT counter = 7
                ···

看着沒有什麼問題,一切正常,但就是這樣工做的策略效率過低。多線程

優化互斥鎖

優化策略,不用等注滿水再排水,也不用放空以後,再注水,注水口和排水口一塊兒工做。併發

func TestDemo2(t *testing.T) {
    var mut sync.Mutex
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            mut.Lock()
            if counter != 0 {
                counter--
            }
            log.Printf("OUTPUT counter = %d", counter)
            mut.Unlock()
            time.Sleep(5 * time.Second) // 爲了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        mut.Lock()
        if counter != maxSize {
            counter++
        }
        log.Printf(" INPUT counter = %d", counter)
        mut.Unlock()
        time.Sleep(1 * time.Second) // 爲了演示效果,睡眠1秒
    }
}

結果異步

=== RUN   TestDemo2
                ···
2020/10/06 14:11:46  INPUT counter = 7
2020/10/06 14:11:47  INPUT counter = 8
2020/10/06 14:11:48 OUTPUT counter = 7
2020/10/06 14:11:48  INPUT counter = 8
2020/10/06 14:11:49  INPUT counter = 9
2020/10/06 14:11:50  INPUT counter = 10
2020/10/06 14:11:51  INPUT counter = 10
2020/10/06 14:11:52  INPUT counter = 10
2020/10/06 14:11:53 OUTPUT counter = 9
2020/10/06 14:11:53  INPUT counter = 10
2020/10/06 14:11:54  INPUT counter = 10
2020/10/06 14:11:55  INPUT counter = 10
2020/10/06 14:11:56  INPUT counter = 10
2020/10/06 14:11:57  INPUT counter = 10
2020/10/06 14:11:58 OUTPUT counter = 9
2020/10/06 14:11:58  INPUT counter = 10
2020/10/06 14:11:59  INPUT counter = 10
                ···

經過日誌輸出,能夠看到程序達到了需求,運做正常。優化

可是,經過日誌輸出發現,當排水口效率低下的時候,注水口一直在輪詢,這裏頻繁的上鎖操做形成的開銷非常浪費。

條件變量:單發通知

那有沒有什麼好的辦法,省去沒必要要的輪詢?若是注水口和排水口能互相「通知」就行了!這個功能,條件變量能夠作到。

條件變量老是與互斥鎖組合使用,除了可使用 Lock、Unlock,還有以下三個方法:

  • Wait 等待通知
  • Signal 單發通知
  • Broadcast 廣播通知
func TestDemo3(t *testing.T) {
    cond := sync.NewCond(new(sync.Mutex)) // 初始化條件變量
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            cond.L.Lock() // 上鎖
            if counter == 0 { // 沒水了
                cond.Wait() // 啥時候來水?等通知!
            }
            counter--
            log.Printf("OUTPUT counter = %d", counter)
            cond.Signal() // 單發通知:已排水
            cond.L.Unlock() // 解鎖
            time.Sleep(5 * time.Second) // 爲了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        cond.L.Lock() // 上鎖
        if counter == maxSize { // 水滿了
            cond.Wait() // 啥時候排水?等待通知!
        }
        counter++
        log.Printf(" INPUT counter = %d", counter)
        cond.Signal() // 單發通知:已來水
        cond.L.Unlock() // 解鎖
        time.Sleep(1 * time.Second) // 爲了演示效果,睡眠1秒
    }
}

結果

=== RUN   TestDemo3
                ···
2020/10/06 14:51:22  INPUT counter = 7
2020/10/06 14:51:23  INPUT counter = 8
2020/10/06 14:51:24 OUTPUT counter = 7
2020/10/06 14:51:24  INPUT counter = 8
2020/10/06 14:51:25  INPUT counter = 9
2020/10/06 14:51:26  INPUT counter = 10
2020/10/06 14:51:29 OUTPUT counter = 9
2020/10/06 14:51:29  INPUT counter = 10
2020/10/06 14:51:34 OUTPUT counter = 9
2020/10/06 14:51:34  INPUT counter = 10
                ···

經過日誌輸出,能夠看出來,注水口沒有一直輪詢了,而是等到排水口發通知後,再進行注水,注水口一直再等排水口。那麼新的問題又來了,如何提升排水口的效率呢?

條件變量:廣播通知

多製造出一個排水口,提升排水效率。

那就不能繼續使用單發通知了(Signal),由於單發通知只會通知到一個等待(Wait),針對多等待的這種狀況,就須要使用廣播通知(Broadcast)。

func TestDemo4(t *testing.T) {
    cond := sync.NewCond(new(sync.Mutex)) // 初始化條件變量
    maxSize := 10
    counter := 0

    // 排水口 1
    go func() {
        for {
            cond.L.Lock() // 上鎖
            if counter == 0 { // 沒水了
            //for counter == 0 { // 沒水了
                cond.Wait() // 啥時候來水?等通知!
            }
            counter--
            log.Printf("OUTPUT A counter = %d", counter)
            cond.Broadcast() // 單發通知:已排水
            cond.L.Unlock() // 解鎖
            //time.Sleep(2 * time.Second) // 爲了演示效果,睡眠5秒
        }
    }()

    // 排水口 2
    go func() {
        for {
            cond.L.Lock() // 上鎖
            if counter == 0 { // 沒水了
            //for counter == 0 { // 沒水了
                cond.Wait() // 啥時候來水?等通知!
            }
            counter--
            log.Printf("OUTPUT B counter = %d", counter)
            cond.Broadcast() // 單發通知:已排水
            cond.L.Unlock() // 解鎖
            //time.Sleep(2 * time.Second) // 爲了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        cond.L.Lock() // 上鎖
        if counter == maxSize { // 水滿了
        //for counter == maxSize { // 水滿了
            cond.Wait() // 啥時候排水?等待通知!
        }
        counter++
        log.Printf(" INPUT   counter = %d", counter)
        cond.Broadcast() // 單發通知:已來水
        cond.L.Unlock() // 解鎖
        //time.Sleep(1 * time.Second) // 爲了演示效果,睡眠1秒
    }
}

結果

=== RUN   TestDemo4
                ···
2020/10/07 20:57:30 OUTPUT B counter = 2
2020/10/07 20:57:30 OUTPUT B counter = 1
2020/10/07 20:57:30 OUTPUT B counter = 0
2020/10/07 20:57:30 OUTPUT A counter = -1
2020/10/07 20:57:30 OUTPUT A counter = -2
2020/10/07 20:57:30 OUTPUT A counter = -3
2020/10/07 20:57:30 OUTPUT A counter = -4
                ···
2020/10/07 20:57:31 OUTPUT B counter = -7605
2020/10/07 20:57:31  INPUT   counter = -7604
2020/10/07 20:57:31 OUTPUT A counter = -7605
2020/10/07 20:57:31 OUTPUT A counter = -7606
                ···

經過日誌輸出能夠看到,剛開始的時候還很正常,到後面的時候就變成負值了,一直在負增加,What?

《Go併發編程之傳統同步—(1)互斥鎖》文章中,程序由於沒有加上互斥鎖,出現過 counter 值異常的狀況。

但此次程序此次加了互斥鎖,按理說造成了一個臨界區應該是沒有問題了,因此問題應該不是出在臨界區上,難道問題出在 Wait 上?

經過IDE追蹤一下Wait的源碼

func (c *Cond) Wait() {
    // 檢查 c 是不是被複制的,若是是就 panic
    c.checker.check()
    // 將當前 goroutine 加入等待隊列
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    // 等待當前 goroutine 被喚醒
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

原來 Wait 內部的執行流程是,先執行了解鎖,而後進入等待狀態,接到通知以後,再執行加鎖操做。

那按照這個代碼邏輯結合輸出日誌,走一程序遍流程,看看能不能復現出 counter 爲負值的狀況:

  1. 注水口將 counter 累加到 10 以後,發送廣播通知(Broadcast)。
  2. goroutine A 在「第1步」以前的時候進入了等待通知(Wait),如今接收到了廣播通知(Broadcast),從 runtime_notifyListWait() 返回,而且成功執行了加鎖(Lock)操做。
  3. goroutine B 在「第1步」以前的時候進入了等待通知(Wait),如今接收到了廣播通知(Broadcast),從 runtime_notifyListWait() 返回,在執行加鎖(Lock)操做的時候,發現 goroutine A 先搶佔了臨界區,因此一直阻塞在 c.L.Lock()。
  4. goroutine A 雖然完成任務後會釋放鎖,可是每次也成功將鎖搶佔,因此就這樣 一直將 counter 減到了 0,而後發送廣播通知(Broadcast)、解鎖(Unlock)。
  5. goroutine B 在 goroutine A 解鎖後,成功得到鎖並從 Lock 方法中返回,接下來跳出 Wait 方法、跳出 if 判斷,執行 counter--(0--),這時候 counter 的值是 -1

圖示

image

問題就出如今第五步,只要 goroutine B 加鎖成功的時候,再判斷一下 counter 是否爲 0 就行了。

因此將 if counter == 0 改爲 for counter == 0,這樣上面的「第五步」就變成了

5.goroutine B 在 goroutine A 解鎖後,成功加鎖(Lock)並從阻塞總返回,接下來跳出 Wait 方法、再次進入 for 循環,判斷 counter == 0 結果爲真,再次進入等待(Wait)。

代碼作出相應的修改後,再執行看結果,沒有問題了。

延伸

發送通知

等待通知(Wait)確定是要在臨界區裏面的,那發送通知(Signal、Broadcast)在哪裏更好呢?

Luck()
Wait()
Broadcast()// Signal()
Unlock()

// 或者

Luck()
Wait()
Unlock()
Broadcast()// Signal()

// 兩種寫法都不會報錯

在 go 的發送通知方法(Broadcast、Signal)上有這麼一段話:

// It is allowed but not required for the caller to hold c.L
// during the call.

在我以往的 C 多線程開發的時候,發送通知老是在鎖中的:

pthread_mutex_lock(&thread->mutex);
//              ...
pthread_cond_signal(&thread->cond);
pthread_mutex_unlock(&thread->mutex);

man 手冊中有寫到:

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

我的對此並無什麼看法,就不亂下定論了,有想法的小夥伴能夠在文章下面留言,一塊兒討論。

等待通知

消息通知是有即時性的,若是沒有 goroutine 在等待通知,那麼此次通知直接被丟棄。

kubernetes

https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/cache/fifo.go

總結

  1. Wait() 內會執行解鎖、等待、加鎖。
  2. Wait() 必須在 for 循環裏面。
  3. Wait() 方法會把當前的 goroutine 添加到通知隊列的隊尾。
  4. 單發通知,喚醒通知隊列第一個排隊的 goroutine。
  5. 廣播通知,喚醒通知隊列裏面所有的 goroutine。
  6. 程序示例只是爲了演示效果,實際的開發中,生產者和消費者應該是異步消費,不該該使用同一個互斥鎖。

文章示例代碼

Sown專欄地址:https://segmentfault.com/blog/sown

相關文章
相關標籤/搜索