Go語言在設計上對同步(Synchronization,數據同步和線程同步)提供大量的支持,好比 goroutine和channel同步原語,庫層面有 - sync:提供基本的同步原語(好比Mutex、RWMutex、Locker)和 工具類(Once、WaitGroup、Cond、Pool、Map) - sync/atomic:提供變量的原子操做(基於硬件指令 compare-and-swap)
-- 引用自《Golang package sync 剖析(二): sync.WaitGroup》git
上一期中,咱們介紹瞭如何使用 sync.WaitGroup
提升程序的並行度。本期文章咱們介紹 package sync
下的另外一個工具類:sync.Cond
。github
sync.Cond
對標 同步原語「條件變量」,它能夠阻塞一個,或同時阻塞多個線程,直到另外一個線程 1) 修改了條件變量; 2)通知一個(或全部)等待的線程。golang
注:Go語言裏沒有線程,只有更輕量級的協程。本文中,「線程」均代指「協程」(goroutine)。segmentfault
相對於 sync.Once 和 sync.WaitGroup, sync.Cond 比較難以理解,使用門檻也很高,在 Google 上搜一下,排名前10結果中有這樣幾個:數組
很是神奇的是:一篇名爲 「如何正確使用sync.Cond」 的帖子居然有 16k 的瀏覽量!微信
到底是條件變量這個概念難以理解,仍是 sync.Cond 的設計太反人類,咱們一探究竟。併發
開篇咱們就提到了條件變量的應用場景,咱們回顧一下:函數
sync.Cond 對標 同步原語「條件變量」,它能夠阻塞一個,或同時阻塞多個線程,直到另外一個線程 1) 修改了共享變量; 2)通知該條件變量。
首先,咱們把概念搞清楚,條件變量的做用是控制多個線程對一個共享變量的讀寫。咱們有三類主體:工具
其次,咱們看看 sync.Cond 的說明書:atom
// 建立一個 sync.Cond 對象 func NewCond(l Locker) *Cond // 阻塞當前線程,並等待條件觸發 func (c *Cond) Wait() // 喚醒全部等待線程 func (c *Cond) Broadcast() // 喚起一個等待線程 // 沒有等待線程也不會報錯 func (c *Cond) Signal()
你們看完這段代碼,腦子裏第一個問題大概是:NewCond
要一把鎖是幹嗎用的?爲了便於理解,咱們以 kubernetes 源碼裏 FIFO 隊列爲例,一步一步說 sync.Cond 的用法:
type FIFO struct { // lock 控制對象讀寫 lock sync.RWMutex // 阻塞Pop操做,Add成功後激活被阻塞線程 cond sync.Cond // items 存儲數據 items map[string]interface{} // queue 存儲key queue []string // keyFunc是hash函數 keyFunc KeyFunc // 維護items和queue同步 populated bool initialPopulationCount int // 隊列狀態:是否已經關閉 closed bool closedLock sync.Mutex }
首先,這是一個 FIFO 隊列,問題又來了:go 內置的 channel 不香嗎?還真的是不夠香。
FIFO 具有一些額外的特性:
FIFO 的成員函數有:
// 從隊頭取一個元素,沒有則會被阻塞 Pop(PopProcessFunc) (interface{}, error) // 向隊尾加一個元素,若是已經存在,則不作任何操做 Add(obj interface{}) error AddIfNotPresent(interface{}) error // 更新元素 Update(obj interface{}) error // 刪除元素 Delete(obj interface{}) error // 關閉隊列 Close() // 讀取全部元素 List() []interface{} // 讀取全部 key ListKeys() []string // 經過元素讀取元素(經過 keyFunc 映射到一樣的 key) Get(obj interface{}) (item interface{}, exists bool, err error) // 經過key讀取元素 GetByKey(key string) (item interface{}, exists bool, err error) // 用傳入的數組替換隊列內容 Replace([]interface{}, string) error // 同步items和queue Resync() error // items和queue是否同步 HasSynced() bool
回到本文的主題 sync.Cond, 在上面這個例子中
lock sync.RWMutex
用於控制對共享變量的併發訪問,本質上是控制對 queue
和 items
兩個字段的併發訪問。
因爲條件變量 cond sync.Cond
在實現 Wait
時,把鎖操做也包含進去了,因此初始化時須要傳入一個鎖變量。在使用時,是這樣的:
// 初始化一個 FIFO func NewFIFO(keyFunc KeyFunc) *FIFO { // lock 和 cond 均是默認值 f := &FIFO{ items: map[string]interface{}{}, queue: []string{}, keyFunc: keyFunc, } // 將 lock 共享給 cond f.cond.L = &f.lock return f } // Pop 操做 func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { // 鎖住共享變量 f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // 隊列已關閉 if f.IsClosed() { return nil, ErrFIFOClosed } // 隊列爲空,等待數據 f.cond.Wait() } // 此處省略一段代碼... // 從 items 和 queue 刪除元素 } } // Add 操做 func (f *FIFO) Add(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } // 鎖住共享變量 f.lock.Lock() defer f.lock.Unlock() // 此處省略一段代碼 ... // 添加元素到 items 和 queue // 通知等待線程 f.cond.Broadcast() return nil }
上面的代碼中,等待線程作的是:
Wait
等數據更新線程作的是:
看起來很簡單,Ok? 可是你品一品,你細品,發現事情沒那麼簡單。
等待線程 加鎖之後,更新線程 要更新共享變量,怎麼會取到鎖呢?
咱們先看看官方文檔對 Wait 的解釋:
Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning.
大概意思是: Wait
首先會解鎖 c.L,而後阻塞當前的協程;後續協程被 Broadcast/Signal 喚醒之後,在對 c.L 加鎖,而後 return。
因此,cond sync.Cond
的初始化須要一把鎖,而且和 FIFO 實例用同一把鎖。
若是不考慮 runtime 如何實現阻塞和激活,sync.Cond
自己的實現邏輯仍是比較簡單的。咱們看下源碼(刪減版):
type Cond struct { noCopy noCopy // 共享變量被訪問前,必須取到鎖 L L Locker notify notifyList checker copyChecker } // Wait func (c *Cond) Wait() { // 給當前協程分配一張船票 t := runtime_notifyListAdd(&c.notify) // 解鎖 c.L.Unlock() // 暫定當前協程的執行,等通知 runtime_notifyListWait(&c.notify, t) // 加鎖 c.L.Lock() } // Signal 喚醒被 c 阻塞的一個協程(若是有) func (c *Cond) Signal() { runtime_notifyListNotifyOne(&c.notify) } // Broadcast 喚醒全部被 c 阻塞的協程 func (c *Cond) Broadcast() { runtime_notifyListNotifyAll(&c.notify) }
這裏着重說下 runtime_* 函數的功能:
runtime_notifyListAdd
將當前線程添加到通知列表,以可以接收通知;runtime_notifyListWait
將當前協程休眠,接收到通知之後纔會被喚醒;runtime_notifyListNotifyOne
發送通知,喚醒 notify
列表裏一個協程runtime_notifyListNotifyAll
發送通知,喚醒 notify
列表裏全部協程sync.Cond
是Go語言對條件變量的一個實現方式,但不是惟一的方式。本質上,sync.Once
和 channel 也是條件變量的實現。
sync.Once
裏鎖和原子操做用於控制共享變量的讀寫;close(ch)
能夠通知其餘協程讀取數據;但 sync.Once
和 channel 有一個明顯的缺點是:它們都只能保證第一次知足條件變量,而 sync.Cond 能夠提供持續的保障。
因爲 sync.Cond
的複雜性(我認爲是 godoc 寫的太差了),且應用場景相對較少,其出現頻次低於 sync.Once
和 sync.WaitGroup
。不過在合適的應用場景出現時,它就會展現出本身的不可替代性。
掃碼關注微信公衆號「深刻Go語言」