手摸手Go 深刻理解sync.Cond

Today that you are wasting is the unattainable tomorrow to someone who expired yesterday. This very moment that you detest is the unreturnable experience to your future self.

sync.Cond實現了一個條件變量,用於等待一個或一組goroutines知足條件後喚醒的場景。每一個Cond關聯一個Locker一般是一個*MutexRWMutex\`根據需求初始化不一樣的鎖。數據結構

基本用法

老規矩正式剖析源碼前,先來看看sync.Cond如何使用。好比咱們實現一個FIFO的隊列app

`package main`
`import (`
 `"fmt"`
 `"math/rand"`
 `"os"`
 `"os/signal"`
 `"sync"`
 `"time"`
`)`
`type FIFO struct {`
 `lock  sync.Mutex`
 `cond  *sync.Cond`
 `queue []int`
`}`
`type Queue interface {`
 `Pop() int`
 `Offer(num int) error`
`}`
`func (f *FIFO) Offer(num int) error {`
 `f.lock.Lock()`
 `defer f.lock.Unlock()`
 `f.queue = append(f.queue, num)`
 `f.cond.Broadcast()`
 `return nil`
`}`
`func (f *FIFO) Pop() int {`
 `f.lock.Lock()`
 `defer f.lock.Unlock()`
 `for {`
 `for len(f.queue) == 0 {`
 `f.cond.Wait()`
 `}`
 `item := f.queue[0]`
 `f.queue = f.queue[1:]`
 `return item`
 `}`
`}`
`func main() {`
 `l := sync.Mutex{}`
 `fifo := &FIFO{`
 `lock:  l,`
 `cond:  sync.NewCond(&l),`
 `queue: []int{},`
 `}`
 `go func() {`
 `for {`
 `fifo.Offer(rand.Int())`
 `}`
 `}()`
 `time.Sleep(time.Second)`
 `go func() {`
 `for {`
 `fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))`
 `}`
 `}()`
 `go func() {`
 `for {`
 `fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))`
 `}`
 `}()`
 `ch := make(chan os.Signal, 1)`
 `signal.Notify(ch, os.Interrupt)`
 `<-ch`
`}`

咱們定一個FIFO 隊列有OfferPop兩個操做,咱們起一個gorountine不斷向隊列投放數據,另外兩個gorountine不斷取拿數據。less

  1. Pop操做會判斷若是隊列裏沒有數據len(f.queue) == 0則調用f.cond.Wait()goroutine掛起。
  2. 等到Offer操做投放數據成功,裏面調用f.cond.Broadcast()來喚醒全部掛起在這個mutex上的goroutine。固然sync.Cond也提供了一個Signal(),有點兒相似Java中的notify()notifyAll()的意思 主要是喚醒一個和喚醒所有的區別。

總結一下sync.Mutex的大體用法ide

  1. 首先聲明一個mutex,這裏sync.Mutex/sync.RWMutex可根據實際狀況選用
  2. 調用sync.NewCond(l Locker) *Cond 使用1中的mutex做爲入參 注意 這裏傳入的是指針 爲了不c.L.Lock()c.L.Unlock()調用頻繁複制鎖 致使死鎖
  3. 根據業務條件 知足則調用cond.Wait()掛起goroutine
  4. cond.Broadcast()喚起全部掛起的gorotune 另外一個方法cond.Signal()喚醒一個最早掛起的goroutine

須要注意的是cond.wait()的使用須要參照以下模版 具體爲啥咱們後續分析源碼分析

`c.L.Lock()`
 `for !condition() {`
 `c.Wait()`
 `}`
 `... make use of condition ...`
 `c.L.Unlock()`

源碼分析

數據結構

分析具體方法前,咱們先來了解下sync.Cond的數據結構。具體源碼以下:ui

`type Cond struct {`
 `noCopy noCopy // Cond使用後不容許拷貝`
 `// L is held while observing or changing the condition`
 `L Locker`
 `//通知列表調用wait()方法的goroutine會被放到notifyList中`
 `notify  notifyList`
 `checker copyChecker //檢查Cond實例是否被複制`
`}`

noCopy以前講過 不清楚的能夠看下《你真的瞭解mutex嗎》,除此以外,Locker是咱們剛剛談到的mutexcopyChecker是用來檢查Cond實例是否被複制的,就有一個方法 :this

`func (c *copyChecker) check() {`
 `if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&`
 `!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&`
 `uintptr(*c) != uintptr(unsafe.Pointer(c)) {`
 `panic("sync.Cond is copied")`
 `}`
`}`

大體意思是說,初始type copyChecker uintptr默認爲0,當第一次調用check()會將copyChecker自身的地址複製給本身,至於爲何uintptr(*c) != uintptr(unsafe.Pointer(c))會被調用2次,由於期間goroutine可能已經改變copyChecker。二次調用若是不相等,則說明sync.Cond被複制,從新分配了內存地址。atom

sync.Cond比較有意思的是notifyListspa

`type notifyList struct {`
 `// wait is the ticket number of the next waiter. It is atomically`
 `// incremented outside the lock.`
 `wait uint32 // 等待goroutine操做的數量`
 `// notify is the ticket number of the next waiter to be notified. It can`
 `// be read outside the lock, but is only written to with lock held.`
 `//`
 `// Both wait & notify can wrap around, and such cases will be correctly`
 `// handled as long as their "unwrapped" difference is bounded by 2^31.`
 `// For this not to be the case, we'd need to have 2^31+ goroutines`
 `// blocked on the same condvar, which is currently not possible.`
 `notify uint32 // 喚醒goroutine操做的數量`
 `// List of parked waiters.`
 `lock mutex`
 `head *sudog`
 `tail *sudog`
`}`

包含了3類字段:指針

  • waitnotify兩個無符號整型,分別表示了Wait()操做的次數和goroutine被喚醒的次數,wait應該是恆大於等於notify
  • lock mutex 這個跟sync.Mutex咱們分析信號量阻塞隊列時semaRoot裏的mutex同樣,並非Go提供開發者使用的sync.Mutex,而是系統內部運行時實現的一個簡單版本的互斥鎖。
  • headtail看名字,咱們就能腦補出跟鏈表很像 沒錯這裏就是維護了阻塞在當前sync.Cond上的goroutine構成的鏈表

總體來說sync.Cond大致結構爲:

圖片

cond architecture

操做方法

Wait()操做

`func (c *Cond) Wait() {`
 `//1. 檢查cond是否被拷貝`
 `c.checker.check()`
 `//2. notifyList.wait+1`
 `t := runtime_notifyListAdd(&c.notify)`
 `//3. 釋放鎖 讓出資源給其餘goroutine`
 `c.L.Unlock()`
 `//4. 掛起goroutine`
 `runtime_notifyListWait(&c.notify, t)`
 `//5. 嘗試得到鎖`
 `c.L.Lock()`
`}`

Wait()方法源碼很容易看出它的操做大概分了5步:

  1. 調用copyChecker.check()保證sync.Cond不會被拷貝
  2. 每次調用Wait()會將sync.Cond.notifyList.wait屬性進行加一操做,這也是它完成FIFO的基石,根據wait來判斷\`goroutine1等待的順序
`//go:linkname notifyListAdd sync.runtime_notifyListAdd`
`func notifyListAdd(l *notifyList) uint32 {`
 `// This may be called concurrently, for example, when called from`
 `// sync.Cond.Wait while holding a RWMutex in read mode.`
 `return atomic.Xadd(&l.wait, 1) - 1`
`}`
  1. 調用c.L.Unlock()釋放鎖,由於當前goroutine即將被gopark,讓出鎖給其餘goroutine避免死鎖
  2. 調用runtime_notifyListWait(&c.notify, t)可能稍微複雜一點兒
`// notifyListWait waits for a notification. If one has been sent since`
`// notifyListAdd was called, it returns immediately. Otherwise, it blocks.`
`//go:linkname notifyListWait sync.runtime_notifyListWait`
`func notifyListWait(l *notifyList, t uint32) {`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `// 若是已經被喚醒 則當即返回`
 `if less(t, l.notify) {`
 `unlock(&l.lock)`
 `return`
 `}`
 `// Enqueue itself.`
 `s := acquireSudog()`
 `s.g = getg()`
 `// 把等待遞增序號賦值給s.ticket 爲FIFO打基礎`
 `s.ticket = t`
 `s.releasetime = 0`
 `t0 := int64(0)`
 `if blockprofilerate > 0 {`
 `t0 = cputicks()`
 `s.releasetime = -1`
 `}`
 `// 將當前goroutine插入到notifyList鏈表中`
 `if l.tail == nil {`
 `l.head = s`
 `} else {`
 `l.tail.next = s`
 `}`
 `l.tail = s`
 `// 最終調用gopark掛起當前goroutine`
 `goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)`
 `if t0 != 0 {`
 `blockevent(s.releasetime-t0, 2)`
 `}`
 `// goroutine被喚醒後釋放sudog`
 `releaseSudog(s)`
`}`

主要完成兩個任務:

  • 將當前goroutine插入到notifyList鏈表中
  • 調用gopark將當前goroutine掛起
  1. 當其餘goroutine調用了SignalBroadcast方法,當前goroutine被喚醒後 再次嘗試得到鎖

Signal操做

Signal喚醒一個等待時間最長的goroutine,調用時不要求持有鎖。

`func (c *Cond) Signal() {`
 `c.checker.check()`
 `runtime_notifyListNotifyOne(&c.notify)`
`}`

具體實現也不復雜,先判斷sync.Cond是否被複制,而後調用runtime_notifyListNotifyOne

`//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne`
`func notifyListNotifyOne(l *notifyList) {`
 `// wait==notify 說明沒有等待的goroutine了`
 `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
 `return`
 `}`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `// 鎖下二次檢查`
 `t := l.notify`
 `if t == atomic.Load(&l.wait) {`
 `unlock(&l.lock)`
 `return`
 `}`
 `// 更新下一個須要被喚醒的ticket number`
 `atomic.Store(&l.notify, t+1)`
 `// Try to find the g that needs to be notified.`
 `// If it hasn't made it to the list yet we won't find it,`
 `// but it won't park itself once it sees the new notify number.`
 `//`
 `// This scan looks linear but essentially always stops quickly.`
 `// Because g's queue separately from taking numbers,`
 `// there may be minor reorderings in the list, but we`
 `// expect the g we're looking for to be near the front.`
 `// The g has others in front of it on the list only to the`
 `// extent that it lost the race, so the iteration will not`
 `// be too long. This applies even when the g is missing:`
 `// it hasn't yet gotten to sleep and has lost the race to`
 `// the (few) other g's that we find on the list.`
 `//這裏是FIFO實現的核心 其實就是遍歷鏈表 sudog.ticket查找指定須要喚醒的節點`
 `for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {`
 `if s.ticket == t {`
 `n := s.next`
 `if p != nil {`
 `p.next = n`
 `} else {`
 `l.head = n`
 `}`
 `if n == nil {`
 `l.tail = p`
 `}`
 `unlock(&l.lock)`
 `s.next = nil`
 `readyWithTime(s, 4)`
 `return`
 `}`
 `}`
 `unlock(&l.lock)`
`}`

主要邏輯:

  1. 判斷是否存在等待須要被喚醒的goroutine 沒有直接返回
  2. 遞增notify屬性,由於是根據notifysudog.ticket匹配來查找須要喚醒的goroutine,由於其是遞增生成的,故而有了FIFO語義。
  3. 遍歷notifyList持有的鏈表,從head開始依據next指針依次遍歷。這個過程是線性的,故而時間複雜度爲O(n),不過官方說法這個過程實際比較快This scan looks linear but essentially always stops quickly.

有個小細節:還記得咱們Wait()操做中,wait屬性原子更新和goroutine插入等待鏈表是兩個單獨的步驟,因此存在競爭的狀況下,鏈表中的節點可能會輕微的亂序產生。可是不要擔憂,由於ticket是原子遞增的 因此喚醒順序不會亂。

Broadcast操做

Broadcast()Singal()區別主要是它能夠喚醒所有等待的goroutine,並直接將wait屬性的值賦值給notify

`func (c *Cond) Broadcast() {`
 `c.checker.check()`
 `runtime_notifyListNotifyAll(&c.notify)`
`}`
`// notifyListNotifyAll notifies all entries in the list.`
`//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll`
`func notifyListNotifyAll(l *notifyList) {`
 `// Fast-path 無等待goroutine直接返回`
 `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
 `return`
 `}`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `s := l.head`
 `l.head = nil`
 `l.tail = nil`
 `// 直接更新notify=wait`
 `atomic.Store(&l.notify, atomic.Load(&l.wait))`
 `unlock(&l.lock)`
 `// 依次調用goready喚醒goroutine`
 `for s != nil {`
 `next := s.next`
 `s.next = nil`
 `readyWithTime(s, 4)`
 `s = next`
 `}`
`}`

邏輯比較簡單再也不贅述

總結

  1. sync.Cond一旦建立使用 不容許被拷貝,由noCopycopyChecker來限制保護。
  2. Wait()操做先是遞增notifyList.wait屬性 而後將goroutine封裝進sudog,將notifyList.wait賦值給sudog.ticket,而後將sudog插入notifyList鏈表中
  3. Singal()實際是按照notifyList.notifynotifyList鏈表中節點的ticket匹配 來肯定喚醒的goroutine,由於notifyList.notifynotifyList.wait都是原子遞增的,故而有了FIFO的語義
  4. Broadcast()相對簡單 就是喚醒所有等待的goroutine

若是閱讀過程當中發現本文存疑或錯誤的地方,能夠關注公衆號留言。若是以爲還能夠 幫忙點個在看😁

圖片

相關文章
相關標籤/搜索