回顧上篇文章《Go併發編程之傳統同步—(1)互斥鎖》其中說到,同步最終是爲了達到如下兩種目的:linux
- 維持共享數據一致性,併發安全
- 控制流程管理,更好的協同工做
示例程序經過使用互斥鎖,達到了數據一致性目的,那麼流程管理應該怎麼作呢?git
上篇文章的示例程序,僅僅實現了累加功能,但在現實的工做場景中,需求每每不可能這麼簡單,如今擴展一下這個程序,給它加上累減的功能。github
加上了累減的示例程序,能夠抽象的理解爲一個固定容量的「儲水池」,能夠注水、排水。編程
當水注滿之後,中止注水,開始排水,當水排空之後,開始注水,反反覆覆...segmentfault
func TestDemo1(t *testing.T) { var mut sync.Mutex maxSize := 10 counter := 0 // 排水口 go func() { for { mut.Lock() if counter == maxSize { for i := 0; i < maxSize; i++ { counter-- log.Printf("OUTPUT counter = %d", counter) } } mut.Unlock() time.Sleep(1 * time.Second) } }() // 注水口 for { mut.Lock() if counter == 0 { for i := 0; i < maxSize; i++ { counter++ log.Printf(" INPUT counter = %d", counter) } } mut.Unlock() time.Sleep(1 * time.Second) } }
結果安全
=== RUN TestDemo1 ··· 2020/10/06 13:52:50 INPUT counter = 8 2020/10/06 13:52:50 INPUT counter = 9 2020/10/06 13:52:50 INPUT counter = 10 2020/10/06 13:52:50 OUTPUT counter = 9 2020/10/06 13:52:50 OUTPUT counter = 8 2020/10/06 13:52:50 OUTPUT counter = 7 ···
看着沒有什麼問題,一切正常,但就是這樣工做的策略效率過低。多線程
優化策略,不用等注滿水再排水,也不用放空以後,再注水,注水口和排水口一塊兒工做。併發
func TestDemo2(t *testing.T) { var mut sync.Mutex maxSize := 10 counter := 0 // 排水口 go func() { for { mut.Lock() if counter != 0 { counter-- } log.Printf("OUTPUT counter = %d", counter) mut.Unlock() time.Sleep(5 * time.Second) // 爲了演示效果,睡眠5秒 } }() // 注水口 for { mut.Lock() if counter != maxSize { counter++ } log.Printf(" INPUT counter = %d", counter) mut.Unlock() time.Sleep(1 * time.Second) // 爲了演示效果,睡眠1秒 } }
結果異步
=== RUN TestDemo2 ··· 2020/10/06 14:11:46 INPUT counter = 7 2020/10/06 14:11:47 INPUT counter = 8 2020/10/06 14:11:48 OUTPUT counter = 7 2020/10/06 14:11:48 INPUT counter = 8 2020/10/06 14:11:49 INPUT counter = 9 2020/10/06 14:11:50 INPUT counter = 10 2020/10/06 14:11:51 INPUT counter = 10 2020/10/06 14:11:52 INPUT counter = 10 2020/10/06 14:11:53 OUTPUT counter = 9 2020/10/06 14:11:53 INPUT counter = 10 2020/10/06 14:11:54 INPUT counter = 10 2020/10/06 14:11:55 INPUT counter = 10 2020/10/06 14:11:56 INPUT counter = 10 2020/10/06 14:11:57 INPUT counter = 10 2020/10/06 14:11:58 OUTPUT counter = 9 2020/10/06 14:11:58 INPUT counter = 10 2020/10/06 14:11:59 INPUT counter = 10 ···
經過日誌輸出,能夠看到程序達到了需求,運做正常。優化
可是,經過日誌輸出發現,當排水口效率低下的時候,注水口一直在輪詢,這裏頻繁的上鎖操做形成的開銷非常浪費。
那有沒有什麼好的辦法,省去沒必要要的輪詢?若是注水口和排水口能互相「通知」就行了!這個功能,條件變量能夠作到。
條件變量老是與互斥鎖組合使用,除了可使用 Lock、Unlock,還有以下三個方法:
- Wait 等待通知
- Signal 單發通知
- Broadcast 廣播通知
func TestDemo3(t *testing.T) { cond := sync.NewCond(new(sync.Mutex)) // 初始化條件變量 maxSize := 10 counter := 0 // 排水口 go func() { for { cond.L.Lock() // 上鎖 if counter == 0 { // 沒水了 cond.Wait() // 啥時候來水?等通知! } counter-- log.Printf("OUTPUT counter = %d", counter) cond.Signal() // 單發通知:已排水 cond.L.Unlock() // 解鎖 time.Sleep(5 * time.Second) // 爲了演示效果,睡眠5秒 } }() // 注水口 for { cond.L.Lock() // 上鎖 if counter == maxSize { // 水滿了 cond.Wait() // 啥時候排水?等待通知! } counter++ log.Printf(" INPUT counter = %d", counter) cond.Signal() // 單發通知:已來水 cond.L.Unlock() // 解鎖 time.Sleep(1 * time.Second) // 爲了演示效果,睡眠1秒 } }
結果
=== RUN TestDemo3 ··· 2020/10/06 14:51:22 INPUT counter = 7 2020/10/06 14:51:23 INPUT counter = 8 2020/10/06 14:51:24 OUTPUT counter = 7 2020/10/06 14:51:24 INPUT counter = 8 2020/10/06 14:51:25 INPUT counter = 9 2020/10/06 14:51:26 INPUT counter = 10 2020/10/06 14:51:29 OUTPUT counter = 9 2020/10/06 14:51:29 INPUT counter = 10 2020/10/06 14:51:34 OUTPUT counter = 9 2020/10/06 14:51:34 INPUT counter = 10 ···
經過日誌輸出,能夠看出來,注水口沒有一直輪詢了,而是等到排水口發通知後,再進行注水,注水口一直再等排水口。那麼新的問題又來了,如何提升排水口的效率呢?
多製造出一個排水口,提升排水效率。
那就不能繼續使用單發通知了(Signal),由於單發通知只會通知到一個等待(Wait),針對多等待的這種狀況,就須要使用廣播通知(Broadcast)。
func TestDemo4(t *testing.T) { cond := sync.NewCond(new(sync.Mutex)) // 初始化條件變量 maxSize := 10 counter := 0 // 排水口 1 go func() { for { cond.L.Lock() // 上鎖 if counter == 0 { // 沒水了 //for counter == 0 { // 沒水了 cond.Wait() // 啥時候來水?等通知! } counter-- log.Printf("OUTPUT A counter = %d", counter) cond.Broadcast() // 單發通知:已排水 cond.L.Unlock() // 解鎖 //time.Sleep(2 * time.Second) // 爲了演示效果,睡眠5秒 } }() // 排水口 2 go func() { for { cond.L.Lock() // 上鎖 if counter == 0 { // 沒水了 //for counter == 0 { // 沒水了 cond.Wait() // 啥時候來水?等通知! } counter-- log.Printf("OUTPUT B counter = %d", counter) cond.Broadcast() // 單發通知:已排水 cond.L.Unlock() // 解鎖 //time.Sleep(2 * time.Second) // 爲了演示效果,睡眠5秒 } }() // 注水口 for { cond.L.Lock() // 上鎖 if counter == maxSize { // 水滿了 //for counter == maxSize { // 水滿了 cond.Wait() // 啥時候排水?等待通知! } counter++ log.Printf(" INPUT counter = %d", counter) cond.Broadcast() // 單發通知:已來水 cond.L.Unlock() // 解鎖 //time.Sleep(1 * time.Second) // 爲了演示效果,睡眠1秒 } }
結果
=== RUN TestDemo4 ··· 2020/10/07 20:57:30 OUTPUT B counter = 2 2020/10/07 20:57:30 OUTPUT B counter = 1 2020/10/07 20:57:30 OUTPUT B counter = 0 2020/10/07 20:57:30 OUTPUT A counter = -1 2020/10/07 20:57:30 OUTPUT A counter = -2 2020/10/07 20:57:30 OUTPUT A counter = -3 2020/10/07 20:57:30 OUTPUT A counter = -4 ··· 2020/10/07 20:57:31 OUTPUT B counter = -7605 2020/10/07 20:57:31 INPUT counter = -7604 2020/10/07 20:57:31 OUTPUT A counter = -7605 2020/10/07 20:57:31 OUTPUT A counter = -7606 ···
經過日誌輸出能夠看到,剛開始的時候還很正常,到後面的時候就變成負值了,一直在負增加,What?
在《Go併發編程之傳統同步—(1)互斥鎖》文章中,程序由於沒有加上互斥鎖,出現過 counter 值異常的狀況。
但此次程序此次加了互斥鎖,按理說造成了一個臨界區應該是沒有問題了,因此問題應該不是出在臨界區上,難道問題出在 Wait 上?
經過IDE追蹤一下Wait的源碼
func (c *Cond) Wait() { // 檢查 c 是不是被複制的,若是是就 panic c.checker.check() // 將當前 goroutine 加入等待隊列 t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 等待當前 goroutine 被喚醒 runtime_notifyListWait(&c.notify, t) c.L.Lock() }
原來 Wait 內部的執行流程是,先執行了解鎖,而後進入等待狀態,接到通知以後,再執行加鎖操做。
那按照這個代碼邏輯結合輸出日誌,走一程序遍流程,看看能不能復現出 counter 爲負值的狀況:
- 注水口將 counter 累加到 10 以後,發送廣播通知(Broadcast)。
- goroutine A 在「第1步」以前的時候進入了等待通知(Wait),如今接收到了廣播通知(Broadcast),從 runtime_notifyListWait() 返回,而且成功執行了加鎖(Lock)操做。
- goroutine B 在「第1步」以前的時候進入了等待通知(Wait),如今接收到了廣播通知(Broadcast),從 runtime_notifyListWait() 返回,在執行加鎖(Lock)操做的時候,發現 goroutine A 先搶佔了臨界區,因此一直阻塞在 c.L.Lock()。
- goroutine A 雖然完成任務後會釋放鎖,可是每次也成功將鎖搶佔,因此就這樣 一直將 counter 減到了 0,而後發送廣播通知(Broadcast)、解鎖(Unlock)。
- goroutine B 在 goroutine A 解鎖後,成功得到鎖並從 Lock 方法中返回,接下來跳出 Wait 方法、跳出 if 判斷,執行 counter--(0--),這時候 counter 的值是 -1
圖示
問題就出如今第五步,只要 goroutine B 加鎖成功的時候,再判斷一下 counter 是否爲 0 就行了。
因此將 if counter == 0 改爲 for counter == 0,這樣上面的「第五步」就變成了
5.goroutine B 在 goroutine A 解鎖後,成功加鎖(Lock)並從阻塞總返回,接下來跳出 Wait 方法、再次進入 for 循環,判斷 counter == 0 結果爲真,再次進入等待(Wait)。
代碼作出相應的修改後,再執行看結果,沒有問題了。
等待通知(Wait)確定是要在臨界區裏面的,那發送通知(Signal、Broadcast)在哪裏更好呢?
Luck() Wait() Broadcast()// Signal() Unlock() // 或者 Luck() Wait() Unlock() Broadcast()// Signal() // 兩種寫法都不會報錯
在 go 的發送通知方法(Broadcast、Signal)上有這麼一段話:
// It is allowed but not required for the caller to hold c.L
// during the call.
在我以往的 C 多線程開發的時候,發送通知老是在鎖中的:
pthread_mutex_lock(&thread->mutex); // ... pthread_cond_signal(&thread->cond); pthread_mutex_unlock(&thread->mutex);
在 man 手冊中有寫到:
The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().
我的對此並無什麼看法,就不亂下定論了,有想法的小夥伴能夠在文章下面留言,一塊兒討論。
消息通知是有即時性的,若是沒有 goroutine 在等待通知,那麼此次通知直接被丟棄。
Sown專欄地址:https://segmentfault.com/blog/sown