cond是go語言sync提供的條件變量,經過cond能夠讓一系列的goroutine在觸發某個條件時才被喚醒。每個cond結構體都包含一個鎖L。cond提供了三個方法:less
建立40個goroutine都wait阻塞住。調用Signal則喚醒第一個goroutine。調用Broadcast則喚醒全部等待的goroutine。ide
package main import ( "fmt" "sync" "time" ) var locker = new(sync.Mutex) var cond = sync.NewCond(locker) func test(x int) { cond.L.Lock() //獲取鎖 cond.Wait() //等待通知 暫時阻塞 fmt.Println(x) time.Sleep(time.Second * 1) cond.L.Unlock() //釋放鎖 } func main() { for i := 0; i < 40; i++ { go test(i) } fmt.Println("start all") time.Sleep(time.Second * 3) fmt.Println("broadcast") cond.Signal() // 下發一個通知給已經獲取鎖的goroutine time.Sleep(time.Second * 3) cond.Signal() // 3秒以後 下發一個通知給已經獲取鎖的goroutine time.Sleep(time.Second * 3) cond.Broadcast() //3秒以後 下發廣播給全部等待的goroutine time.Sleep(time.Second * 60) }
type Cond struct { noCopy noCopy // 鎖的具體實現,一般爲 mutex 或者rwmutex L Locker // notifyList對象,維護等待喚醒的goroutine隊列,使用鏈表實現 notify notifyList checker copyChecker } // 新建cond初始化cond對象 func NewCond(l Locker) *Cond { return &Cond{L: l} } type notifyList struct { // 等待數量 wait uint32 // 通知數量 notify uint32 // 鎖對象 lock mutex // 鏈表頭 head *sudog // 鏈表尾 tail *sudog }
// 等待函數 func (c *Cond) Wait() { c.checker.check() // 等待計數器加1 看下面具體實現 t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // runtime_notifyListWait(&c.notify, t) c.L.Lock() } // 此函數在sema.go中控制計數器加1 func notifyListAdd(l *notifyList) uint32 { // This may be called concurrently, for example, when called from // sync.Cond.Wait while holding a RWMutex in read mode. return atomic.Xadd(&l.wait, 1) - 1 } // 此函數在sema.go中 // 獲取當前goroutine 添加到鏈表末端,而後goparkunlock函數休眠阻塞當前goroutine // goparkunlock函數會讓出當前處理器的使用權並等待調度器的喚醒 func notifyListWait(l *notifyList, t uint32) { lock(&l.lock) // Return right away if this ticket has already been notified. if less(t, l.notify) { unlock(&l.lock) return } // Enqueue itself. s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64(0) if blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3) if t0 != 0 { blockevent(s.releasetime-t0, 2) } releaseSudog(s) }
喚醒鏈表中全部的阻塞中的goroutine,仍是使用readyWithTime來實現這個功能函數
func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) } // 源代碼在sema.go中 func notifyListNotifyAll(l *notifyList) { // Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } // Pull the list out into a local variable, waiters will be readied // outside the lock. lock(&l.lock) s := l.head l.head = nil l.tail = nil // Update the next ticket to be notified. We can set it to the current // value of wait because any previous waiters are already in the list // or will notice that they have already been notified when trying to // add themselves to the list. atomic.Store(&l.notify, atomic.Load(&l.wait)) unlock(&l.lock) // Go through the local list and ready all waiters. for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } }
// 調用runtime_notifyListNotifyOne方法喚醒鏈表頭的goroutine func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) } // runtime_notifyListNotifyOne具體實現 獲取鏈表頭部的G,而後調用readyWithTime喚醒goroutine // 源代碼在sema.go中 func notifyListNotifyOne(l *notifyList) { if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } lock(&l.lock) t := l.notify if t == atomic.Load(&l.wait) { unlock(&l.lock) return } atomic.Store(&l.notify, t+1) for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock) }