條件變量源碼剖析

1. 條件變量介紹

Condition variables allow threads to wait until some event 

or condition has occurred.
複製代碼

條件變量是併發編程中很經典的一個手段,經常使用的條件變量有兩種實現。linux

  1. 非阻塞式條件變量(Nonblocking condition variables),通知且繼續(signal and continue)。
  2. 阻塞式條件變量(Blocking condition variables),通知且等待(signal and wait)。

兩種方式的區別是發出通知的線程是否會馬上失去全部權,阻塞式條件變量的實現是會馬上失去,非阻塞式條件變量的實現式不會馬上失去。c++

條件變量在各語言/基礎庫中都有本身的實現,linux的pthread庫和c++的std::condition_variable 都是非阻塞式條件變量的實現,而golang sync.Cond也是典型的非阻塞式條件變量的實現。golang

// Cond implements a condition variable, a rendezvous point

// for goroutines waiting for or announcing the occurrence

// of an event.
複製代碼

咱們再來看一下go官方對cond的定義,上述內容是從go源碼中摘出來的。翻譯一下,意思以下:編程

sync.Cond 是golang對條件變量的實現,有兩個關鍵點:安全

  1. 等待條件變量成立而進入等待狀態的多個goroutine(wait 操做)。
  2. 通知事件的發生goroutine,使條件變量成立(signal/broadcast 操做)。

2. 使用

2.1 基本使用

package main



import (

   "fmt"

   "sync"

   "time"

)



// 喚醒檢測標誌

var flag = false



func CondTest(info string, c *sync.Cond) {

   // 使用條件變量前須要先加鎖(wait()內部有釋放鎖操做)

   c.L.Lock()

   for flag == false {

      fmt.Println(info, "wait")

      // 掛起等待喚醒

      c.Wait()

   }

   fmt.Println(info, flag)

   c.L.Unlock()

}



func main() {

   m := sync.Mutex{}

   c := sync.NewCond(&m)

   // add 2 waiter

   go CondTest("go one", c)

   go CondTest("go two", c)



   // main

   time.Sleep(time.Second)

  

   c.L.Lock()

   flag = true

   c.L.Unlock()

   c.Broadcast() // 所有喚醒

   // c.Signal() // 喚醒1個

   fmt.Println("main broadcast")



   time.Sleep(time.Second)

}
複製代碼

2.2 開源庫使用

沒咋找到😂,sync.Cond的場景基本均可以被chan替換掉markdown

2.3 適用場景

想象這麼一個場景,有1個worker在異步的接收數據,剩下的n個waiter必須等待這個worker接收完數據才能繼續下面的處理流程,這時咱們很容易想到兩種方案。併發

  1. 自旋鎖+全局變量。

缺點是須要不斷輪詢對應的變量來判斷是否知足條件,且較難支持單waiter通知的操做。app

  1. select + chan。

Don't communicate by sharing memory, share memory by communicating.less

按照go的哲學,在這種併發的場景下,更推薦chan,可是若是使用chan來操做,須要worker明確感知到等待的waiter數來進行處理,好比有n個waiter就須要notify n次(用close也能夠),而使用sync.Cond就能夠極大的簡化這個操做,只須要調用Broadcast便可完成多waiter的通知,除此以外cond也提供了相似於chan send單信號的通知(Singal)。異步

一句話總結:多waiter單worker的場景均可以使用sync.Cond

3. 源碼分析

代碼部分基於golang 1.16 64位機

3.1 內存模型

image.png

// Cond implements a condition variable, a rendezvous point

// for goroutines waiting for or announcing the occurrence

// of an event.

//

// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),

// which must be held when changing the condition and

// when calling the Wait method.

//

// A Cond must not be copied after first use.

type Cond struct {

    noCopy noCopy



    // L is held while observing or changing the condition

    L Locker



    notify  notifyList

    checker copyChecker

}



// A Locker represents an object that can be locked and unlocked.

type Locker interface {

   Lock()

   Unlock()

}



// Approximation of notifyList in runtime/sema.go. Size and alignment must

// agree.

// 每個waiter都會有1個ticket,能夠理解爲waiter的惟一標識,單調遞增

type notifyList struct {

   wait   uint32 //下一個waiter的最大ticket,單調遞增

   notify uint32 //下一個待喚醒的waiter的ticket值(ticket 小於該值的waiter都 即將/已經 處於喚醒態)

   lock   uintptr // key field of the mutex

   head   unsafe.Pointer

   tail   unsafe.Pointer

}



type copyChecker uintptr



// noCopy may be embedded into structs which must not be copied

// after the first use.

//

// See https://golang.org/issues/8005#issuecomment-190753527

// for details.

type noCopy struct{}
複製代碼

3.2 核心接口

copy

函數定義:

func (c *copyChecker) check()



func (*noCopy) Lock()



func (*noCopy) Unlock()
複製代碼

實現:

// copyChecker holds back pointer to itself to detect object copying.

type copyChecker uintptr



// copyChecker會存放copyChecker的地址,經過這個地址判斷是否被拷貝

func (c *copyChecker) check() {

   // 檢測copyChecker的地址是否與copyChecker中存儲的相同

   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&

      // 第一次調用會將copyChecker的地址存儲在copyChecker中

      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&

      // 保證第一次調用check()不報錯

      uintptr(*c) != uintptr(unsafe.Pointer(c)) {

      panic("sync.Cond is copied")

   }

}



// noCopy may be embedded into structs which must not be copied

// after the first use.

//

// See https://golang.org/issues/8005#issuecomment-190753527

// for details.

type noCopy struct{}



// Lock is a no-op used by -copylocks checker from `go vet`.

func (*noCopy) Lock()   {}

func (*noCopy) Unlock() {}
複製代碼

cond

函數定義:

//新建cond

func NewCond(l Locker) *Cond 



//等待

func (c *Cond) Wait() 



//單waiter喚醒

func (c *Cond) Signal() 



//全waiter喚醒

func (c *Cond) Broadcast() 
複製代碼

實現:

// NewCond returns a new Cond with Locker l.

func NewCond(l Locker) *Cond {

   return &Cond{L: l}

}



// Wait atomically unlocks c.L and suspends execution

// of the calling goroutine. After later resuming execution,

// Wait locks c.L before returning. Unlike in other systems,

// Wait cannot return unless awoken by Broadcast or Signal.

//

// Because c.L is not locked when Wait first resumes, the caller

// typically cannot assume that the condition is true when

// Wait returns. Instead, the caller should Wait in a loop:

//

// c.L.Lock()

// for !condition() {

// c.Wait()

// }

// ... make use of condition ...

// c.L.Unlock()

//

func (c *Cond) Wait() {

   // 檢測cond是否被拷貝(若是拷貝則會panic)

   c.checker.check()

   // 獲得waiter的惟一標識

   t := runtime_notifyListAdd(&c.notify)

   c.L.Unlock()

   // 將waiter惟一標識添加到等待通知的隊列中

   runtime_notifyListWait(&c.notify, t)

   c.L.Lock()

}



// Signal wakes one goroutine waiting on c, if there is any.

//

// It is allowed but not required for the caller to hold c.L

// during the call.

func (c *Cond) Signal() {

   // 檢測cond是否被拷貝(若是拷貝則會panic)

   c.checker.check()

   // 喚醒等待隊列中的一個waiter(喚醒前須要加鎖)

   runtime_notifyListNotifyOne(&c.notify)

}



// Broadcast wakes all goroutines waiting on c.

//

// It is allowed but not required for the caller to hold c.L

// during the call.

func (c *Cond) Broadcast() {

   // 檢測cond是否被拷貝(若是拷貝則會panic)

   c.checker.check()

   // 喚醒等待隊列中全部的waiter(喚醒前須要加鎖)

   runtime_notifyListNotifyAll(&c.notify)

}
複製代碼

3.3 runtime實現

sync/runtime.go

如下代碼中省略了一些不重要的邏輯,已使用//...標識出來

和sync.Mutex同樣,sync.Cond在sync包內只有函數聲明,具體的函數實現會在連接時link到runtime/sema.go。

//能夠理解爲併發安全的id生成器,爲每個waiter生成1個惟一標識

func runtime_notifyListAdd(l *notifyList) uint32



//等待事件(Signal/Broadcast)的發生,t爲當前waiter的惟一標識

func runtime_notifyListWait(l *notifyList, t uint32)



//喚醒當前等待的所有waiter

func runtime_notifyListNotifyAll(l *notifyList)



//喚醒1個waiter

func runtime_notifyListNotifyOne(l *notifyList)



// 內存安全保證,在init時會執行檢查,保證sync包的notifyList結構大小等於runtime包的notifyList

func runtime_notifyListCheck(size uintptr)

func init() {

   var n notifyList

   runtime_notifyListCheck(unsafe.Sizeof(n))

}
複製代碼

runtime/sema.go

// notifyListAdd adds the caller to a notify list such that it can receive

// notifications. The caller must eventually call notifyListWait to wait for

// such a notification, passing the returned ticket number.

//go:linkname notifyListAdd sync.runtime_notifyListAdd

//獲取waiter的惟一標識,單調遞增,也是實現fifo的基礎

func notifyListAdd(l *notifyList) uint32 {

   // This may be called concurrently, for example, when called from

   // sync.Cond.Wait while holding a RWMutex in read mode.

   return atomic.Xadd(&l.wait, 1) - 1

}



// notifyListWait waits for a notification. If one has been sent since

// notifyListAdd was called, it returns immediately. Otherwise, it blocks.

//go:linkname notifyListWait sync.runtime_notifyListWait

//開始wait,等待被喚醒

func notifyListWait(l *notifyList, t uint32) {

   lockWithRank(&l.lock, lockRankNotifyList)



   // Return right away if this ticket has already been notified.

   // 若是當前waiter的編號小於notify,則無須等待,直接返回便可

   if less(t, l.notify) {

      unlock(&l.lock)

      return

   }



   //將當前goroutine加入到notifyList鏈表中,單鏈表,尾插法

   //sudog represents a g in a wait list

   s := acquireSudog()

   s.g = getg()

   s.ticket = t

   s.releasetime = 0

//...

   if l.tail == nil {

      l.head = s

   } else {

      l.tail.next = s

   }

   l.tail = s

   //調用gopark,將goroutine狀態由 _Grunning切換爲 _Gwaiting

   goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)

//...

   releaseSudog(s)

}



// notifyListNotifyAll notifies all entries in the list.

//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll

func notifyListNotifyAll(l *notifyList) {

   // Fast-path: if there are no new waiters since the last notification

   // we don't need to acquire the lock.

   // fastpath,若是當前的notify和wait一致,則表明無新的waiter,直接返回便可

   if atomic.Load(&l.wait) == atomic.Load(&l.notify) {

      return

   }



   // Pull the list out into a local variable, waiters will be readied

   // outside the lock.

   lockWithRank(&l.lock, lockRankNotifyList)

   // 將notifyList鏈表清空

   s := l.head

   l.head = nil

   l.tail = nil



   // Update the next ticket to be notified. We can set it to the current

   // value of wait because any previous waiters are already in the list

   // or will notice that they have already been notified when trying to

   // add themselves to the list.

//將notify的值置爲wait的值,意思將當前全部waiter都已經能夠被喚醒了

   atomic.Store(&l.notify, atomic.Load(&l.wait))

   unlock(&l.lock)



   // Go through the local list and ready all waiters.

   // 遍歷鏈表,循環喚醒全部waiter

   for s != nil {

      next := s.next

      s.next = nil

      readyWithTime(s, 4)

      s = next

   }

}



// notifyListNotifyOne notifies one entry in the list.

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne

func notifyListNotifyOne(l *notifyList) {

   // Fast-path: if there are no new waiters since the last notification

   // we don't need to acquire the lock at all.

   // fastpath,若是當前的notify和wait一致,則表明無新的waiter,直接返回便可

   if atomic.Load(&l.wait) == atomic.Load(&l.notify) {

      return

   }



   lockWithRank(&l.lock, lockRankNotifyList)



   // Re-check under the lock if we need to do anything.

   // 很經典的操做,加鎖後再二次確認

   t := l.notify

   if t == atomic.Load(&l.wait) {

      unlock(&l.lock)

      return

   }



   // Update the next notify ticket number.

   // 標識下一個可喚醒的waiter

   atomic.Store(&l.notify, t+1)



   // Try to find the g that needs to be notified.

   // If it hasn't made it to the list yet we won't find it,

   // but it won't park itself once it sees the new notify number.

   //

   // This scan looks linear but essentially always stops quickly.

   // Because g's queue separately from taking numbers,

   // there may be minor reorderings in the list, but we

   // expect the g we're looking for to be near the front.

   // The g has others in front of it on the list only to the

   // extent that it lost the race, so the iteration will not

   // be too long. This applies even when the g is missing:

   // it hasn't yet gotten to sleep and has lost the race to

   // the (few) other g's that we find on the list.

   // 從waiter鏈表中找到須要喚醒的waiter,將對應waiter喚醒,並從鏈表中移除

   for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {

      if s.ticket == t {

         n := s.next

         if p != nil {

            p.next = n

         } else {

            l.head = n

         }

         if n == nil {

            l.tail = p

         }

         unlock(&l.lock)

         s.next = nil

         // 將g的狀態由 _Gwaiting切換到_Grunnable

         readyWithTime(s, 4)

         return

      }

   }

   unlock(&l.lock)

}



//go:linkname notifyListCheck sync.runtime_notifyListCheck

// 檢查sync.notifyList和runtime.notifyList大小是否一致

func notifyListCheck(sz uintptr) {

   if sz != unsafe.Sizeof(notifyList{}) {

      print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")

      throw("bad notifyList size")

   }

}
複製代碼

4. 踩坑

u1s1 sync.cond確實一次都沒用過,踩坑都是從網上扒的😓

  1. 不要copy 「使用過的」 cond,運行時會有panic。
func main() {

   // 建立一個cond1並使用

   cond1 := sync.NewCond(&sync.Mutex{})

   go func() {

      cond1.L.Lock()

      cond1.Wait()

   }()

   cond1.Signal()

   

   // 對cond1進行深拷貝

   var cond2 = new(sync.Cond)

   *cond2 = *cond1

   f := func(cond *sync.Cond, v int) {

      cond.L.Lock()

      for {

         fmt.Println(v)

         cond.Wait()

      }

   }

   go f(cond1, 1)

   

   // 當使用cond2的時候會報錯(panic: sync.Cond is copied)

   go f(cond2, 2)

   time.Sleep(time.Second)

}
複製代碼

5. 參考文檔

sync - The Go Programming Language (studygolang.com)

管程 - 維基百科,自由的百科全書 (wikipedia.org)

Golang sync.Cond 條件變量源碼分析 | 編程沉思錄 (cyhone.com)

Linux條件變量pthread_condition細節(爲什麼先加鎖,pthread_cond_wait爲什麼先解鎖,返回時又加鎖)

相關文章
相關標籤/搜索