本次的代碼是基於go version go1.13.15 darwin/amd64
less
Go語言標準庫中的條件變量sync.Cond
,它可讓一組的Goroutine
都在知足特定條件時被喚醒。函數
每一個Cond
都會關聯一個Lock(*sync.Mutex or *sync.RWMutex)
ui
var ( locker = new(sync.Mutex) cond = sync.NewCond(locker) ) func listen(x int) { // 獲取鎖 cond.L.Lock() // 等待通知 暫時阻塞 cond.Wait() fmt.Println(x) // 釋放鎖 cond.L.Unlock() } func main() { // 啓動60個被cond阻塞的線程 for i := 1; i <= 60; i++ { go listen(i) } fmt.Println("start all") // 3秒以後 下發一個通知給已經獲取鎖的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++one Signal") cond.Signal() // 3秒以後 下發一個通知給已經獲取鎖的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++one Signal") cond.Signal() // 3秒以後 下發廣播給全部等待的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++begin broadcast") cond.Broadcast() // 阻塞直到全部的所有輸出 time.Sleep(time.Second * 60) }
上面是個簡單的例子,咱們啓動了60個線程,而後都被cond
阻塞,主函數經過Signal()
通知一個goroutine
接觸阻塞,經過Broadcast()
通知全部被阻塞的所有解除阻塞。atom
// Wait 原子式的 unlock c.L, 並暫停執行調用的 goroutine。 // 在稍後執行後,Wait 會在返回前 lock c.L. 與其餘系統不一樣, // 除非被 Broadcast 或 Signal 喚醒,不然等待沒法返回。 // // 由於等待第一次 resume 時 c.L 沒有被鎖定,因此當 Wait 返回時, // 調用者一般不能認爲條件爲真。相反,調用者應該在循環中使用 Wait(): // // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // type Cond struct { // 用於保證結構體不會在編譯期間拷貝 noCopy noCopy // 鎖 L Locker // goroutine鏈表,維護等待喚醒的goroutine隊列 notify notifyList // 保證運行期間不會發生copy checker copyChecker }
重點分析下:notifyList
和copyChecker
線程
type notifyList struct { // 總共須要等待的數量 wait uint32 // 已經通知的數量 notify uint32 // 鎖 lock uintptr // 指向鏈表頭部 head *sudog // 指向鏈表尾部 tail *sudog }
這個是核心,全部wait
的goroutine
都會被加入到這個鏈表中,而後在通知的時候再從這個鏈表中獲取。code
保證運行期間不會發生copy對象
type copyChecker uintptr // copyChecker holds back pointer to itself to detect object copying func (c *copyChecker) check() { if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } }
func (c *Cond) Wait() { // 監測是否複製 c.checker.check() // 更新 notifyList中須要等待的wait的數量 // 返回當前須要插入鏈表節點ticket t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 爲當前的加入的waiter構建一個鏈表的節點,插入鏈表的尾部 runtime_notifyListWait(&c.notify, t) c.L.Lock() } // go/src/runtime/sema.go // 更新 notifyList中須要等待的wait的數量 // 同時返回當前的加入的 waiter 的 ticket 編號,從0開始 //go:linkname notifyListAdd sync.runtime_notifyListAdd func notifyListAdd(l *notifyList) uint32 { // 使用atomic原子的對wait字段進行加一操做 return atomic.Xadd(&l.wait, 1) - 1 } // go/src/runtime/sema.go // 爲當前的加入的waiter構建一個鏈表的節點,插入鏈表的尾部 //go:linkname notifyListWait sync.runtime_notifyListWait func notifyListWait(l *notifyList, t uint32) { lock(&l.lock) // 當t小於notifyList中的notify,說明當前節點已經被通知了 if less(t, l.notify) { unlock(&l.lock) return } // 構建當前節點 s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64(0) if blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } // 頭結點沒構建,插入頭結點 if l.tail == nil { l.head = s } else { // 插入到尾節點 l.tail.next = s } l.tail = s // 將當前goroutine置於等待狀態並解鎖 // 經過調用goready(gp),可使goroutine再次可運行。 // 也就是將 M/P/G 解綁,並將 G 調整爲等待狀態,放入 sudog 等待隊列中 goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) if t0 != 0 { blockevent(s.releasetime-t0, 2) } releaseSudog(s) }
梳理流程blog
一、首先檢測對象的複製行爲,若是有複製發生直接拋出panic;隊列
二、而後調用runtime_notifyListAdd
對notifynotifyListList
中的wait
(須要等待的數量)進行加一操做,同時返回一個ticket
,用來做爲當前wait
的編號,這個編號,會和notifyList
中的notify
對應起來;get
三、而後調用runtime_notifyListWait
把當前的wait
封裝成鏈表的一個節點,插入到notifyList
維護的鏈表的尾部。
// 喚醒一個被wait的goroutine func (c *Cond) Signal() { // 監測是否複製 c.checker.check() runtime_notifyListNotifyOne(&c.notify) } // go/src/runtime/sema.go // 通知鏈表中的第一個 //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne func notifyListNotifyOne(l *notifyList) { // wait和notify,說明已經所有通知到了 if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } lock(&l.lock) // 這裏作了二次的確認 // wait和notify,說明已經所有通知到了 t := l.notify if t == atomic.Load(&l.wait) { unlock(&l.lock) return } // 原子的對notify執行+1操做 atomic.Store(&l.notify, t+1) // 嘗試找到須要被通知的 g // 若是目前還沒來得及入隊,是沒法找到的 // 可是,當它看到通知編號已經發生改變是不會被 park 的 // // 這個查找過程看起來是線性複雜度,但實際上很快就停了 // 由於 g 的隊列與獲取編號不一樣,於是隊列中會出現少許重排,但咱們但願找到靠前的 g // 而 g 只有在再也不 race 後纔會排在靠前的位置,所以這個迭代也不會過久, // 同時,即使找不到 g,這個狀況也成立: // 它尚未休眠,而且已經失去了咱們在隊列上找到的(少數)其餘 g 的 race。 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { // 順序拿到一個節點的ticket,會和上面會和notifyList中的notify作比較,相同才進行後續的操做 // 這個咱們分析了,notifyList中的notify和鏈表節點中的ticket是一一對應的 if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil // 經過goready掉起在上面經過goparkunlock掛起的goroutine readyWithTime(s, 4) return } } unlock(&l.lock) }
梳理下流程:
一、首先檢測對象的複製行爲,若是有複製發生直接拋出panic
;
二、判斷wait
和notify
,若是二者相同說明已經已經所有通知到了;
三、調用notifyListNotifyOne
,經過for循環,依次遍歷這個鏈表,直到找到和notifyList
中的notify
,相匹配的ticket
的節點;
四、掉起goroutine
,完成通知。
// 喚醒全部被wait的goroutine func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) } // go/src/runtime/sema.go // notifyListNotifyAll notifies all entries in the list. //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll func notifyListNotifyAll(l *notifyList) { // wait和notify,說明已經所有通知到了 if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } // 加鎖 lock(&l.lock) s := l.head l.head = nil l.tail = nil // 這個很粗暴,直接將notify的值置換成wait atomic.Store(&l.notify, atomic.Load(&l.wait)) unlock(&l.lock) // 循環鏈表,一個個喚醒goroutine for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } }
梳理下流程:
一、首先檢測對象的複製行爲,若是有複製發生直接拋出panic;
二、判斷wait
和notify
,若是二者相同說明已經已經所有通知到了;
三、notifyListNotifyAll
,就相對簡單了,直接將notify
的值置爲wait
,標註這個已經所有通知了;
四、循環鏈表,一個個喚醒goroutine
。
sync.Cond
不是一個經常使用的同步機制,可是在條件長時間沒法知足時,與使用for {}
進行忙碌等待相比,sync.Cond
可以讓出處理器的使用權,提供CPU
的利用率。使用時咱們也須要注意如下問題:
一、sync.Cond.Wait
在調用以前必定要使用獲取互斥鎖,不然會觸發程序崩潰;
二、sync.Cond.Signal
喚醒的 Goroutine
都是隊列最前面、等待最久的Goroutine
;
三、sync.Cond.Broadcast
會按照必定順序廣播通知等待的所有 Goroutine
。