go中sync.Cond源碼解讀

sync.Cond

前言

本次的代碼是基於go version go1.13.15 darwin/amd64less

什麼是sync.Cond

Go語言標準庫中的條件變量sync.Cond,它可讓一組的Goroutine都在知足特定條件時被喚醒。函數

每一個Cond都會關聯一個Lock(*sync.Mutex or *sync.RWMutex)ui

var (
	locker = new(sync.Mutex)
	cond   = sync.NewCond(locker)
)

func listen(x int) {
	// 獲取鎖
	cond.L.Lock()
	// 等待通知  暫時阻塞
	cond.Wait()
	fmt.Println(x)
	// 釋放鎖
	cond.L.Unlock()
}

func main() {
	// 啓動60個被cond阻塞的線程
	for i := 1; i <= 60; i++ {
		go listen(i)
	}

	fmt.Println("start all")

	// 3秒以後 下發一個通知給已經獲取鎖的goroutine	time.Sleep(time.Second * 3)
	fmt.Println("++++++++++++++++++++one Signal")
	cond.Signal()

	// 3秒以後 下發一個通知給已經獲取鎖的goroutine
	time.Sleep(time.Second * 3)
	fmt.Println("++++++++++++++++++++one Signal")
	cond.Signal()

	// 3秒以後 下發廣播給全部等待的goroutine
	time.Sleep(time.Second * 3)
	fmt.Println("++++++++++++++++++++begin broadcast")
	cond.Broadcast()
	// 阻塞直到全部的所有輸出
	time.Sleep(time.Second * 60)
}

上面是個簡單的例子,咱們啓動了60個線程,而後都被cond阻塞,主函數經過Signal()通知一個goroutine接觸阻塞,經過Broadcast()通知全部被阻塞的所有解除阻塞。atom

sync_cond

看下源碼

// Wait 原子式的 unlock c.L, 並暫停執行調用的 goroutine。
// 在稍後執行後,Wait 會在返回前 lock c.L. 與其餘系統不一樣,
// 除非被 Broadcast 或 Signal 喚醒,不然等待沒法返回。
//
// 由於等待第一次 resume 時 c.L 沒有被鎖定,因此當 Wait 返回時,
// 調用者一般不能認爲條件爲真。相反,調用者應該在循環中使用 Wait():
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
type Cond struct {
	// 用於保證結構體不會在編譯期間拷貝
	noCopy noCopy
	// 鎖
	L Locker
	// goroutine鏈表,維護等待喚醒的goroutine隊列
	notify  notifyList
	// 保證運行期間不會發生copy
	checker copyChecker
}

重點分析下:notifyListcopyChecker線程

  • notify
type notifyList struct {
	// 總共須要等待的數量
	wait   uint32
	// 已經通知的數量
	notify uint32
	// 鎖
	lock   uintptr
	// 指向鏈表頭部
	head   *sudog
	// 指向鏈表尾部
	tail   *sudog
}

這個是核心,全部waitgoroutine都會被加入到這個鏈表中,而後在通知的時候再從這個鏈表中獲取。code

  • copyChecker

保證運行期間不會發生copy對象

type copyChecker uintptr
// copyChecker holds back pointer to itself to detect object copying
func (c *copyChecker) check() {
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")
	}
}

Wait

func (c *Cond) Wait() {
	// 監測是否複製
	c.checker.check()
	// 更新 notifyList中須要等待的wait的數量
	// 返回當前須要插入鏈表節點ticket
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	// 爲當前的加入的waiter構建一個鏈表的節點,插入鏈表的尾部
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}

// go/src/runtime/sema.go
// 更新 notifyList中須要等待的wait的數量
// 同時返回當前的加入的 waiter 的 ticket 編號,從0開始  
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
    // 使用atomic原子的對wait字段進行加一操做
	return atomic.Xadd(&l.wait, 1) - 1
}

// go/src/runtime/sema.go
// 爲當前的加入的waiter構建一個鏈表的節點,插入鏈表的尾部
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
	lock(&l.lock)

	// 當t小於notifyList中的notify,說明當前節點已經被通知了  
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// 構建當前節點
	s := acquireSudog()
	s.g = getg()
	s.ticket = t
	s.releasetime = 0
	t0 := int64(0)
	if blockprofilerate > 0 {
		t0 = cputicks()
		s.releasetime = -1
	}
	// 頭結點沒構建,插入頭結點
	if l.tail == nil {
		l.head = s
	} else {
		// 插入到尾節點
		l.tail.next = s
	}
	l.tail = s
	// 將當前goroutine置於等待狀態並解鎖
	// 經過調用goready(gp),可使goroutine再次可運行。
	// 也就是將 M/P/G 解綁,並將 G 調整爲等待狀態,放入 sudog 等待隊列中
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	if t0 != 0 {
		blockevent(s.releasetime-t0, 2)
	}
	releaseSudog(s)
}

梳理流程blog

一、首先檢測對象的複製行爲,若是有複製發生直接拋出panic;隊列

二、而後調用runtime_notifyListAddnotifynotifyListList中的wait(須要等待的數量)進行加一操做,同時返回一個ticket,用來做爲當前wait的編號,這個編號,會和notifyList中的notify對應起來;get

三、而後調用runtime_notifyListWait把當前的wait封裝成鏈表的一個節點,插入到notifyList維護的鏈表的尾部。

sync_cond

Signal

// 喚醒一個被wait的goroutine
func (c *Cond) Signal() {
	// 監測是否複製
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

// go/src/runtime/sema.go
// 通知鏈表中的第一個
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
	// wait和notify,說明已經所有通知到了
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lock(&l.lock)

	// 這裏作了二次的確認
	// wait和notify,說明已經所有通知到了
	t := l.notify
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// 原子的對notify執行+1操做
	atomic.Store(&l.notify, t+1)

	// 嘗試找到須要被通知的 g
	// 若是目前還沒來得及入隊,是沒法找到的
	// 可是,當它看到通知編號已經發生改變是不會被 park 的
	//
	// 這個查找過程看起來是線性複雜度,但實際上很快就停了
	// 由於 g 的隊列與獲取編號不一樣,於是隊列中會出現少許重排,但咱們但願找到靠前的 g
	// 而 g 只有在再也不 race 後纔會排在靠前的位置,所以這個迭代也不會過久,
	// 同時,即使找不到 g,這個狀況也成立:
	// 它尚未休眠,而且已經失去了咱們在隊列上找到的(少數)其餘 g 的 race。
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		// 順序拿到一個節點的ticket,會和上面會和notifyList中的notify作比較,相同才進行後續的操做
		// 這個咱們分析了,notifyList中的notify和鏈表節點中的ticket是一一對應的  
		if s.ticket == t {
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
			// 經過goready掉起在上面經過goparkunlock掛起的goroutine
			readyWithTime(s, 4)
			return
		}
	}
	unlock(&l.lock)
}

梳理下流程:

一、首先檢測對象的複製行爲,若是有複製發生直接拋出panic

二、判斷waitnotify,若是二者相同說明已經已經所有通知到了;

三、調用notifyListNotifyOne,經過for循環,依次遍歷這個鏈表,直到找到和notifyList中的notify,相匹配的ticket的節點;

四、掉起goroutine,完成通知。

sync_cond

Broadcast

// 喚醒全部被wait的goroutine
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

// go/src/runtime/sema.go
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
	// wait和notify,說明已經所有通知到了
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	// 加鎖
	lock(&l.lock)
	s := l.head
	l.head = nil
	l.tail = nil

	// 這個很粗暴,直接將notify的值置換成wait
	atomic.Store(&l.notify, atomic.Load(&l.wait))
	unlock(&l.lock)

	// 循環鏈表,一個個喚醒goroutine
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

梳理下流程:

一、首先檢測對象的複製行爲,若是有複製發生直接拋出panic;

二、判斷waitnotify,若是二者相同說明已經已經所有通知到了;

三、notifyListNotifyAll,就相對簡單了,直接將notify的值置爲wait,標註這個已經所有通知了;

四、循環鏈表,一個個喚醒goroutine

sync_cond

總結

sync.Cond不是一個經常使用的同步機制,可是在條件長時間沒法知足時,與使用for {}進行忙碌等待相比,sync.Cond可以讓出處理器的使用權,提供CPU的利用率。使用時咱們也須要注意如下問題:

一、sync.Cond.Wait在調用以前必定要使用獲取互斥鎖,不然會觸發程序崩潰;

二、sync.Cond.Signal 喚醒的 Goroutine都是隊列最前面、等待最久的Goroutine

三、sync.Cond.Broadcast會按照必定順序廣播通知等待的所有 Goroutine

相關文章
相關標籤/搜索