Condition variables allow threads to wait until some event
or condition has occurred.
複製代碼
條件變量是併發編程中很經典的一個手段,經常使用的條件變量有兩種實現。linux
兩種方式的區別是發出通知的線程是否會馬上失去全部權,阻塞式條件變量的實現是會馬上失去,非阻塞式條件變量的實現式不會馬上失去。c++
條件變量在各語言/基礎庫中都有本身的實現,linux的pthread
庫和c++的std::condition_variable
都是非阻塞式條件變量的實現,而golang sync.Cond
也是典型的非阻塞式條件變量的實現。golang
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
複製代碼
咱們再來看一下go官方對cond
的定義,上述內容是從go源碼中摘出來的。翻譯一下,意思以下:編程
sync.Cond
是golang對條件變量的實現,有兩個關鍵點:安全
wait
操做)。signal/broadcast
操做)。package main
import (
"fmt"
"sync"
"time"
)
// 喚醒檢測標誌
var flag = false
func CondTest(info string, c *sync.Cond) {
// 使用條件變量前須要先加鎖(wait()內部有釋放鎖操做)
c.L.Lock()
for flag == false {
fmt.Println(info, "wait")
// 掛起等待喚醒
c.Wait()
}
fmt.Println(info, flag)
c.L.Unlock()
}
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
// add 2 waiter
go CondTest("go one", c)
go CondTest("go two", c)
// main
time.Sleep(time.Second)
c.L.Lock()
flag = true
c.L.Unlock()
c.Broadcast() // 所有喚醒
// c.Signal() // 喚醒1個
fmt.Println("main broadcast")
time.Sleep(time.Second)
}
複製代碼
沒咋找到😂,sync.Cond的場景基本均可以被chan替換掉markdown
想象這麼一個場景,有1個worker在異步的接收數據,剩下的n個waiter必須等待這個worker接收完數據才能繼續下面的處理流程,這時咱們很容易想到兩種方案。併發
缺點是須要不斷輪詢對應的變量來判斷是否知足條件,且較難支持單waiter通知的操做。app
Don't communicate by sharing memory, share memory by communicating.
less
按照go的哲學,在這種併發的場景下,更推薦chan,可是若是使用chan
來操做,須要worker明確感知到等待的waiter數來進行處理,好比有n個waiter就須要notify n次(用close也能夠),而使用sync.Cond
就能夠極大的簡化這個操做,只須要調用Broadcast
便可完成多waiter的通知,除此以外cond也提供了相似於chan send
單信號的通知(Singal
)。異步
一句話總結:多waiter單worker的場景均可以使用sync.Cond
代碼部分基於golang 1.16 64位機
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
// Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree.
// 每個waiter都會有1個ticket,能夠理解爲waiter的惟一標識,單調遞增
type notifyList struct {
wait uint32 //下一個waiter的最大ticket,單調遞增
notify uint32 //下一個待喚醒的waiter的ticket值(ticket 小於該值的waiter都 即將/已經 處於喚醒態)
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
type copyChecker uintptr
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
複製代碼
函數定義:
func (c *copyChecker) check()
func (*noCopy) Lock()
func (*noCopy) Unlock()
複製代碼
實現:
// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
// copyChecker會存放copyChecker的地址,經過這個地址判斷是否被拷貝
func (c *copyChecker) check() {
// 檢測copyChecker的地址是否與copyChecker中存儲的相同
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
// 第一次調用會將copyChecker的地址存儲在copyChecker中
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
// 保證第一次調用check()不報錯
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
複製代碼
函數定義:
//新建cond
func NewCond(l Locker) *Cond
//等待
func (c *Cond) Wait()
//單waiter喚醒
func (c *Cond) Signal()
//全waiter喚醒
func (c *Cond) Broadcast()
複製代碼
實現:
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
// 檢測cond是否被拷貝(若是拷貝則會panic)
c.checker.check()
// 獲得waiter的惟一標識
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 將waiter惟一標識添加到等待通知的隊列中
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
// 檢測cond是否被拷貝(若是拷貝則會panic)
c.checker.check()
// 喚醒等待隊列中的一個waiter(喚醒前須要加鎖)
runtime_notifyListNotifyOne(&c.notify)
}
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
// 檢測cond是否被拷貝(若是拷貝則會panic)
c.checker.check()
// 喚醒等待隊列中全部的waiter(喚醒前須要加鎖)
runtime_notifyListNotifyAll(&c.notify)
}
複製代碼
如下代碼中省略了一些不重要的邏輯,已使用//...標識出來
和sync.Mutex同樣,sync.Cond在sync包內只有函數聲明,具體的函數實現會在連接時link到runtime/sema.go。
//能夠理解爲併發安全的id生成器,爲每個waiter生成1個惟一標識
func runtime_notifyListAdd(l *notifyList) uint32
//等待事件(Signal/Broadcast)的發生,t爲當前waiter的惟一標識
func runtime_notifyListWait(l *notifyList, t uint32)
//喚醒當前等待的所有waiter
func runtime_notifyListNotifyAll(l *notifyList)
//喚醒1個waiter
func runtime_notifyListNotifyOne(l *notifyList)
// 內存安全保證,在init時會執行檢查,保證sync包的notifyList結構大小等於runtime包的notifyList
func runtime_notifyListCheck(size uintptr)
func init() {
var n notifyList
runtime_notifyListCheck(unsafe.Sizeof(n))
}
複製代碼
// notifyListAdd adds the caller to a notify list such that it can receive
// notifications. The caller must eventually call notifyListWait to wait for
// such a notification, passing the returned ticket number.
//go:linkname notifyListAdd sync.runtime_notifyListAdd
//獲取waiter的惟一標識,單調遞增,也是實現fifo的基礎
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
//開始wait,等待被喚醒
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
// 若是當前waiter的編號小於notify,則無須等待,直接返回便可
if less(t, l.notify) {
unlock(&l.lock)
return
}
//將當前goroutine加入到notifyList鏈表中,單鏈表,尾插法
//sudog represents a g in a wait list
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
//...
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
//調用gopark,將goroutine狀態由 _Grunning切換爲 _Gwaiting
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
//...
releaseSudog(s)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock.
// fastpath,若是當前的notify和wait一致,則表明無新的waiter,直接返回便可
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// Pull the list out into a local variable, waiters will be readied
// outside the lock.
lockWithRank(&l.lock, lockRankNotifyList)
// 將notifyList鏈表清空
s := l.head
l.head = nil
l.tail = nil
// Update the next ticket to be notified. We can set it to the current
// value of wait because any previous waiters are already in the list
// or will notice that they have already been notified when trying to
// add themselves to the list.
//將notify的值置爲wait的值,意思將當前全部waiter都已經能夠被喚醒了
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// Go through the local list and ready all waiters.
// 遍歷鏈表,循環喚醒全部waiter
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
// notifyListNotifyOne notifies one entry in the list.
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock at all.
// fastpath,若是當前的notify和wait一致,則表明無新的waiter,直接返回便可
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// Re-check under the lock if we need to do anything.
// 很經典的操做,加鎖後再二次確認
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// Update the next notify ticket number.
// 標識下一個可喚醒的waiter
atomic.Store(&l.notify, t+1)
// Try to find the g that needs to be notified.
// If it hasn't made it to the list yet we won't find it,
// but it won't park itself once it sees the new notify number.
//
// This scan looks linear but essentially always stops quickly.
// Because g's queue separately from taking numbers,
// there may be minor reorderings in the list, but we
// expect the g we're looking for to be near the front.
// The g has others in front of it on the list only to the
// extent that it lost the race, so the iteration will not
// be too long. This applies even when the g is missing:
// it hasn't yet gotten to sleep and has lost the race to
// the (few) other g's that we find on the list.
// 從waiter鏈表中找到須要喚醒的waiter,將對應waiter喚醒,並從鏈表中移除
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
// 將g的狀態由 _Gwaiting切換到_Grunnable
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
//go:linkname notifyListCheck sync.runtime_notifyListCheck
// 檢查sync.notifyList和runtime.notifyList大小是否一致
func notifyListCheck(sz uintptr) {
if sz != unsafe.Sizeof(notifyList{}) {
print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
throw("bad notifyList size")
}
}
複製代碼
u1s1 sync.cond確實一次都沒用過,踩坑都是從網上扒的😓
func main() {
// 建立一個cond1並使用
cond1 := sync.NewCond(&sync.Mutex{})
go func() {
cond1.L.Lock()
cond1.Wait()
}()
cond1.Signal()
// 對cond1進行深拷貝
var cond2 = new(sync.Cond)
*cond2 = *cond1
f := func(cond *sync.Cond, v int) {
cond.L.Lock()
for {
fmt.Println(v)
cond.Wait()
}
}
go f(cond1, 1)
// 當使用cond2的時候會報錯(panic: sync.Cond is copied)
go f(cond2, 2)
time.Sleep(time.Second)
}
複製代碼
sync - The Go Programming Language (studygolang.com)
管程 - 維基百科,自由的百科全書 (wikipedia.org)
Golang sync.Cond 條件變量源碼分析 | 編程沉思錄 (cyhone.com)
Linux條件變量pthread_condition細節(爲什麼先加鎖,pthread_cond_wait爲什麼先解鎖,返回時又加鎖)