Today that you are wasting is the unattainable tomorrow to someone who expired yesterday. This very moment that you detest is the unreturnable experience to your future self.
sync.Cond
實現了一個條件變量,用於等待一個或一組goroutines知足條件後喚醒的場景。每一個
Cond關聯一個
Locker一般是一個
*Mutex或
RWMutex\`根據需求初始化不一樣的鎖。數據結構
老規矩正式剖析源碼前,先來看看sync.Cond
如何使用。好比咱們實現一個FIFO
的隊列app
`package main` `import (` `"fmt"` `"math/rand"` `"os"` `"os/signal"` `"sync"` `"time"` `)` `type FIFO struct {` `lock sync.Mutex` `cond *sync.Cond` `queue []int` `}` `type Queue interface {` `Pop() int` `Offer(num int) error` `}` `func (f *FIFO) Offer(num int) error {` `f.lock.Lock()` `defer f.lock.Unlock()` `f.queue = append(f.queue, num)` `f.cond.Broadcast()` `return nil` `}` `func (f *FIFO) Pop() int {` `f.lock.Lock()` `defer f.lock.Unlock()` `for {` `for len(f.queue) == 0 {` `f.cond.Wait()` `}` `item := f.queue[0]` `f.queue = f.queue[1:]` `return item` `}` `}` `func main() {` `l := sync.Mutex{}` `fifo := &FIFO{` `lock: l,` `cond: sync.NewCond(&l),` `queue: []int{},` `}` `go func() {` `for {` `fifo.Offer(rand.Int())` `}` `}()` `time.Sleep(time.Second)` `go func() {` `for {` `fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))` `}` `}()` `go func() {` `for {` `fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))` `}` `}()` `ch := make(chan os.Signal, 1)` `signal.Notify(ch, os.Interrupt)` `<-ch` `}`
咱們定一個FIFO
隊列有Offer
和Pop
兩個操做,咱們起一個gorountine
不斷向隊列投放數據,另外兩個gorountine
不斷取拿數據。less
Pop
操做會判斷若是隊列裏沒有數據len(f.queue) == 0
則調用f.cond.Wait()
將goroutine
掛起。Offer
操做投放數據成功,裏面調用f.cond.Broadcast()
來喚醒全部掛起在這個mutex
上的goroutine
。固然sync.Cond
也提供了一個Signal()
,有點兒相似Java中的notify()
和notifyAll()
的意思 主要是喚醒一個和喚醒所有的區別。總結一下sync.Mutex
的大體用法ide
mutex
,這裏sync.Mutex
/sync.RWMutex
可根據實際狀況選用sync.NewCond(l Locker) *Cond
使用1中的mutex
做爲入參 注意 這裏傳入的是指針 爲了不c.L.Lock()
、c.L.Unlock()
調用頻繁複制鎖 致使死鎖cond.Wait()
掛起goroutine
cond.Broadcast()
喚起全部掛起的gorotune
另外一個方法cond.Signal()
喚醒一個最早掛起的goroutine
須要注意的是cond.wait()
的使用須要參照以下模版 具體爲啥咱們後續分析源碼分析
`c.L.Lock()` `for !condition() {` `c.Wait()` `}` `... make use of condition ...` `c.L.Unlock()`
分析具體方法前,咱們先來了解下sync.Cond
的數據結構。具體源碼以下:ui
`type Cond struct {` `noCopy noCopy // Cond使用後不容許拷貝` `// L is held while observing or changing the condition` `L Locker` `//通知列表調用wait()方法的goroutine會被放到notifyList中` `notify notifyList` `checker copyChecker //檢查Cond實例是否被複制` `}`
noCopy
以前講過 不清楚的能夠看下《你真的瞭解mutex嗎》,除此以外,Locker
是咱們剛剛談到的mutex
,copyChecker
是用來檢查Cond實例是否被複制的,就有一個方法 :this
`func (c *copyChecker) check() {` `if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&` `!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&` `uintptr(*c) != uintptr(unsafe.Pointer(c)) {` `panic("sync.Cond is copied")` `}` `}`
大體意思是說,初始type copyChecker uintptr
默認爲0,當第一次調用check()
會將copyChecker
自身的地址複製給本身,至於爲何uintptr(*c) != uintptr(unsafe.Pointer(c))
會被調用2次,由於期間goroutine
可能已經改變copyChecker
。二次調用若是不相等,則說明sync.Cond
被複制,從新分配了內存地址。atom
sync.Cond
比較有意思的是notifyList
spa
`type notifyList struct {` `// wait is the ticket number of the next waiter. It is atomically` `// incremented outside the lock.` `wait uint32 // 等待goroutine操做的數量` `// notify is the ticket number of the next waiter to be notified. It can` `// be read outside the lock, but is only written to with lock held.` `//` `// Both wait & notify can wrap around, and such cases will be correctly` `// handled as long as their "unwrapped" difference is bounded by 2^31.` `// For this not to be the case, we'd need to have 2^31+ goroutines` `// blocked on the same condvar, which is currently not possible.` `notify uint32 // 喚醒goroutine操做的數量` `// List of parked waiters.` `lock mutex` `head *sudog` `tail *sudog` `}`
包含了3類字段:指針
wait
和notify
兩個無符號整型,分別表示了Wait()
操做的次數和goroutine
被喚醒的次數,wait
應該是恆大於等於notify
lock mutex
這個跟sync.Mutex
咱們分析信號量阻塞隊列時semaRoot
裏的mutex
同樣,並非Go
提供開發者使用的sync.Mutex
,而是系統內部運行時實現的一個簡單版本的互斥鎖。head
和tail
看名字,咱們就能腦補出跟鏈表很像 沒錯這裏就是維護了阻塞在當前sync.Cond
上的goroutine
構成的鏈表總體來說sync.Cond
大致結構爲:
cond architecture
`func (c *Cond) Wait() {` `//1. 檢查cond是否被拷貝` `c.checker.check()` `//2. notifyList.wait+1` `t := runtime_notifyListAdd(&c.notify)` `//3. 釋放鎖 讓出資源給其餘goroutine` `c.L.Unlock()` `//4. 掛起goroutine` `runtime_notifyListWait(&c.notify, t)` `//5. 嘗試得到鎖` `c.L.Lock()` `}`
從Wait()
方法源碼很容易看出它的操做大概分了5步:
copyChecker.check()
保證sync.Cond
不會被拷貝Wait()
會將sync.Cond.notifyList.wait
屬性進行加一操做,這也是它完成FIFO
的基石,根據wait
來判斷\`goroutine1等待的順序`//go:linkname notifyListAdd sync.runtime_notifyListAdd` `func notifyListAdd(l *notifyList) uint32 {` `// This may be called concurrently, for example, when called from` `// sync.Cond.Wait while holding a RWMutex in read mode.` `return atomic.Xadd(&l.wait, 1) - 1` `}`
c.L.Unlock()
釋放鎖,由於當前goroutine
即將被gopark
,讓出鎖給其餘goroutine
避免死鎖runtime_notifyListWait(&c.notify, t)
可能稍微複雜一點兒`// notifyListWait waits for a notification. If one has been sent since` `// notifyListAdd was called, it returns immediately. Otherwise, it blocks.` `//go:linkname notifyListWait sync.runtime_notifyListWait` `func notifyListWait(l *notifyList, t uint32) {` `lockWithRank(&l.lock, lockRankNotifyList)` `// 若是已經被喚醒 則當即返回` `if less(t, l.notify) {` `unlock(&l.lock)` `return` `}` `// Enqueue itself.` `s := acquireSudog()` `s.g = getg()` `// 把等待遞增序號賦值給s.ticket 爲FIFO打基礎` `s.ticket = t` `s.releasetime = 0` `t0 := int64(0)` `if blockprofilerate > 0 {` `t0 = cputicks()` `s.releasetime = -1` `}` `// 將當前goroutine插入到notifyList鏈表中` `if l.tail == nil {` `l.head = s` `} else {` `l.tail.next = s` `}` `l.tail = s` `// 最終調用gopark掛起當前goroutine` `goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)` `if t0 != 0 {` `blockevent(s.releasetime-t0, 2)` `}` `// goroutine被喚醒後釋放sudog` `releaseSudog(s)` `}`
主要完成兩個任務:
Signal
或Broadcast
方法,當前goroutine
被喚醒後 再次嘗試得到鎖Signal
喚醒一個等待時間最長的goroutine
,調用時不要求持有鎖。
`func (c *Cond) Signal() {` `c.checker.check()` `runtime_notifyListNotifyOne(&c.notify)` `}`
具體實現也不復雜,先判斷sync.Cond
是否被複制,而後調用runtime_notifyListNotifyOne
`//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne` `func notifyListNotifyOne(l *notifyList) {` `// wait==notify 說明沒有等待的goroutine了` `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {` `return` `}` `lockWithRank(&l.lock, lockRankNotifyList)` `// 鎖下二次檢查` `t := l.notify` `if t == atomic.Load(&l.wait) {` `unlock(&l.lock)` `return` `}` `// 更新下一個須要被喚醒的ticket number` `atomic.Store(&l.notify, t+1)` `// Try to find the g that needs to be notified.` `// If it hasn't made it to the list yet we won't find it,` `// but it won't park itself once it sees the new notify number.` `//` `// This scan looks linear but essentially always stops quickly.` `// Because g's queue separately from taking numbers,` `// there may be minor reorderings in the list, but we` `// expect the g we're looking for to be near the front.` `// The g has others in front of it on the list only to the` `// extent that it lost the race, so the iteration will not` `// be too long. This applies even when the g is missing:` `// it hasn't yet gotten to sleep and has lost the race to` `// the (few) other g's that we find on the list.` `//這裏是FIFO實現的核心 其實就是遍歷鏈表 sudog.ticket查找指定須要喚醒的節點` `for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {` `if s.ticket == t {` `n := s.next` `if p != nil {` `p.next = n` `} else {` `l.head = n` `}` `if n == nil {` `l.tail = p` `}` `unlock(&l.lock)` `s.next = nil` `readyWithTime(s, 4)` `return` `}` `}` `unlock(&l.lock)` `}`
主要邏輯:
notify
屬性,由於是根據notify
和sudog.ticket
匹配來查找須要喚醒的goroutine
,由於其是遞增生成的,故而有了FIFO
語義。head
開始依據next
指針依次遍歷。這個過程是線性的,故而時間複雜度爲O(n),不過官方說法這個過程實際比較快This scan looks linear but essentially always stops quickly.
有個小細節:還記得咱們Wait()
操做中,wait
屬性原子更新和goroutine插入等待鏈表是兩個單獨的步驟,因此存在競爭的狀況下,鏈表中的節點可能會輕微的亂序產生。可是不要擔憂,由於ticket是原子遞增的 因此喚醒順序不會亂。
Broadcast()
與Singal()
區別主要是它能夠喚醒所有等待的goroutine
,並直接將wait
屬性的值賦值給notify
。
`func (c *Cond) Broadcast() {` `c.checker.check()` `runtime_notifyListNotifyAll(&c.notify)` `}` `// notifyListNotifyAll notifies all entries in the list.` `//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll` `func notifyListNotifyAll(l *notifyList) {` `// Fast-path 無等待goroutine直接返回` `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {` `return` `}` `lockWithRank(&l.lock, lockRankNotifyList)` `s := l.head` `l.head = nil` `l.tail = nil` `// 直接更新notify=wait` `atomic.Store(&l.notify, atomic.Load(&l.wait))` `unlock(&l.lock)` `// 依次調用goready喚醒goroutine` `for s != nil {` `next := s.next` `s.next = nil` `readyWithTime(s, 4)` `s = next` `}` `}`
邏輯比較簡單再也不贅述
sync.Cond
一旦建立使用 不容許被拷貝,由noCopy
和copyChecker
來限制保護。Wait()
操做先是遞增notifyList.wait
屬性 而後將goroutine
封裝進sudog
,將notifyList.wait
賦值給sudog.ticket
,而後將sudog
插入notifyList
鏈表中Singal()
實際是按照notifyList.notify
跟notifyList
鏈表中節點的ticket
匹配 來肯定喚醒的goroutine,由於notifyList.notify
和notifyList.wait
都是原子遞增的,故而有了FIFO
的語義Broadcast()
相對簡單 就是喚醒所有等待的goroutine
若是閱讀過程當中發現本文存疑或錯誤的地方,能夠關注公衆號留言。若是以爲還能夠 幫忙點個在看😁