Golang package sync 剖析(三):sync.Cond

1、前言

Go語言在設計上對同步(Synchronization,數據同步和線程同步)提供大量的支持,好比 goroutine和channel同步原語,庫層面有
- sync:提供基本的同步原語(好比Mutex、RWMutex、Locker)和 工具類(Once、WaitGroup、Cond、Pool、Map)
- sync/atomic:提供變量的原子操做(基於硬件指令 compare-and-swap)

-- 引用自《Golang package sync 剖析(二): sync.WaitGroup》git

上一期中,咱們介紹瞭如何使用 sync.WaitGroup 提升程序的並行度。本期文章咱們介紹 package sync 下的另外一個工具類:sync.Condgithub

sync.Cond 對標 同步原語「條件變量」,它能夠阻塞一個,或同時阻塞多個線程,直到另外一個線程 1) 修改了條件變量; 2)通知一個(或全部)等待的線程。golang

注:Go語言裏沒有線程,只有更輕量級的協程。本文中,「線程」均代指「協程」(goroutine)。segmentfault

相對於 sync.Once 和 sync.WaitGroup, sync.Cond 比較難以理解,使用門檻也很高,在 Google 上搜一下,排名前10結果中有這樣幾個:數組

sync_Cond_issue.jpeg

很是神奇的是:一篇名爲 「如何正確使用sync.Cond」 的帖子居然有 16k 的瀏覽量!微信

sync_Cond_issue_detail.jpeg

到底是條件變量這個概念難以理解,仍是 sync.Cond 的設計太反人類,咱們一探究竟。併發

2、sync.Cond 怎麼用

開篇咱們就提到了條件變量的應用場景,咱們回顧一下:函數

sync.Cond 對標 同步原語「條件變量」,它能夠阻塞一個,或同時阻塞多個線程,直到另外一個線程 
1) 修改了共享變量; 
2)通知該條件變量。

首先,咱們把概念搞清楚,條件變量的做用是控制多個線程對一個共享變量的讀寫。咱們有三類主體:工具

  1. 共享變量:條件變量控制多個線程對該變量的讀寫;
  2. 等待線程:被條件變量阻塞的線程,有一個或多個;
  3. 更新線程:更新共享變量,並喚起一個或多個等待線程。

其次,咱們看看 sync.Cond 的說明書:atom

// 建立一個 sync.Cond 對象
func NewCond(l Locker) *Cond

// 阻塞當前線程,並等待條件觸發
func (c *Cond) Wait()

// 喚醒全部等待線程
func (c *Cond) Broadcast()

// 喚起一個等待線程
// 沒有等待線程也不會報錯
func (c *Cond) Signal()

你們看完這段代碼,腦子裏第一個問題大概是:NewCond 要一把鎖是幹嗎用的?爲了便於理解,咱們以 kubernetes 源碼裏 FIFO 隊列爲例,一步一步說 sync.Cond 的用法:

type FIFO struct {
  // lock 控制對象讀寫
  lock sync.RWMutex
  // 阻塞Pop操做,Add成功後激活被阻塞線程
  cond sync.Cond
  // items 存儲數據
  items map[string]interface{}
  // queue 存儲key
  queue []string
  // keyFunc是hash函數
  keyFunc KeyFunc

  // 維護items和queue同步
  populated bool
  initialPopulationCount int

  // 隊列狀態:是否已經關閉
  closed     bool
  closedLock sync.Mutex
}

首先,這是一個 FIFO 隊列,問題又來了:go 內置的 channel 不香嗎?還真的是不夠香。

FIFO 具有一些額外的特性:

  1. 支持自定義處理函數,並保障每一個元素只被處理一次(exactly once);
  2. 支持元素去重,版本更新,並只處理最新版本,而不是每次更新都處理一次;
  3. 支持元素刪除,刪除的元素不進行處理;
  4. 支持 list 全部元素。

really_xiang_warning.jpeg

FIFO 的成員函數有:

// 從隊頭取一個元素,沒有則會被阻塞
Pop(PopProcessFunc) (interface{}, error)
// 向隊尾加一個元素,若是已經存在,則不作任何操做
Add(obj interface{}) error
AddIfNotPresent(interface{}) error
// 更新元素
Update(obj interface{}) error
// 刪除元素
Delete(obj interface{}) error
// 關閉隊列
Close()
// 讀取全部元素
List() []interface{}
// 讀取全部 key
ListKeys() []string
// 經過元素讀取元素(經過 keyFunc 映射到一樣的 key)
Get(obj interface{}) (item interface{}, exists bool, err error)
// 經過key讀取元素
GetByKey(key string) (item interface{}, exists bool, err error)
// 用傳入的數組替換隊列內容
Replace([]interface{}, string) error
// 同步items和queue
Resync() error
// items和queue是否同步
HasSynced() bool

回到本文的主題 sync.Cond, 在上面這個例子中

  • 一個 FIFO 實例就是一個共享變量
  • 調用 Pop 的線程是等待線程
  • 調用 Add 的線程是更新線程

lock sync.RWMutex 用於控制對共享變量的併發訪問,本質上是控制對 queueitems 兩個字段的併發訪問。

因爲條件變量 cond sync.Cond 在實現 Wait 時,把鎖操做也包含進去了,因此初始化時須要傳入一個鎖變量。在使用時,是這樣的:

// 初始化一個 FIFO
func NewFIFO(keyFunc KeyFunc) *FIFO {
  // lock 和 cond 均是默認值
  f := &FIFO{
    items:   map[string]interface{}{},
    queue:   []string{},
    keyFunc: keyFunc,
  }
  // 將 lock 共享給 cond
  f.cond.L = &f.lock
  return f
}

// Pop 操做
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
  // 鎖住共享變量
  f.lock.Lock()
  defer f.lock.Unlock()
  for {
    for len(f.queue) == 0 {
      // 隊列已關閉
      if f.IsClosed() {
        return nil, ErrFIFOClosed
      }
      // 隊列爲空,等待數據
      f.cond.Wait()
    }
    
    // 此處省略一段代碼...
    // 從 items 和 queue 刪除元素
  }
}

// Add 操做
func (f *FIFO) Add(obj interface{}) error {
  id, err := f.keyFunc(obj)
  if err != nil {
    return KeyError{obj, err}
  }
  
  // 鎖住共享變量
  f.lock.Lock()
  defer f.lock.Unlock()
  
  // 此處省略一段代碼 ...
  // 添加元素到 items 和 queue

  // 通知等待線程
  f.cond.Broadcast()
  return nil
}

上面的代碼中,等待線程作的是:

  1. 給共享變量加鎖
  2. 有數據,就返回數據;沒有數據就調用 Wait 等數據

更新線程作的是:

  1. 給共享變量加鎖
  2. 寫入數據,調用 Broadcast

看起來很簡單,Ok? 可是你品一品,你細品,發現事情沒那麼簡單。

zhoudongyu_easy_thing.jpg

等待線程 加鎖之後,更新線程 要更新共享變量,怎麼會取到鎖呢?

咱們先看看官方文檔對 Wait 的解釋:

Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning.

大概意思是: Wait 首先會解鎖 c.L,而後阻塞當前的協程;後續協程被 Broadcast/Signal 喚醒之後,在對 c.L 加鎖,而後 return。

因此,cond sync.Cond 的初始化須要一把鎖,而且和 FIFO 實例用同一把鎖。

3、sync.Cond 實現

若是不考慮 runtime 如何實現阻塞和激活,sync.Cond 自己的實現邏輯仍是比較簡單的。咱們看下源碼(刪減版):

type Cond struct {
  noCopy noCopy

  // 共享變量被訪問前,必須取到鎖 L
  L Locker

  notify  notifyList
  checker copyChecker
}

// Wait 
func (c *Cond) Wait() {
  // 給當前協程分配一張船票
  t := runtime_notifyListAdd(&c.notify)
  // 解鎖
  c.L.Unlock()
  // 暫定當前協程的執行,等通知
  runtime_notifyListWait(&c.notify, t)
  // 加鎖
  c.L.Lock()
}

// Signal 喚醒被 c 阻塞的一個協程(若是有)
func (c *Cond) Signal() {
  runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast 喚醒全部被 c 阻塞的協程
func (c *Cond) Broadcast() {
  runtime_notifyListNotifyAll(&c.notify)
}

這裏着重說下 runtime_* 函數的功能:

  1. runtime_notifyListAdd 將當前線程添加到通知列表,以可以接收通知;
  2. runtime_notifyListWait 將當前協程休眠,接收到通知之後纔會被喚醒;
  3. runtime_notifyListNotifyOne 發送通知,喚醒 notify 列表裏一個協程
  4. runtime_notifyListNotifyAll 發送通知,喚醒 notify 列表裏全部協程

4、總結

sync.Cond 是Go語言對條件變量的一個實現方式,但不是惟一的方式。本質上,sync.Once 和 channel 也是條件變量的實現。

  1. sync.Once 裏鎖和原子操做用於控制共享變量的讀寫;
  2. channel 經過 close(ch) 能夠通知其餘協程讀取數據;

sync.Once 和 channel 有一個明顯的缺點是:它們都只能保證第一次知足條件變量,而 sync.Cond 能夠提供持續的保障。

因爲 sync.Cond 的複雜性(我認爲是 godoc 寫的太差了),且應用場景相對較少,其出現頻次低於 sync.Oncesync.WaitGroup。不過在合適的應用場景出現時,它就會展現出本身的不可替代性。

References

  1. C++ std::condition_variable
  2. kubernetes FIFO queue

掃碼關注微信公衆號「深刻Go語言」

圖片描述

相關文章
相關標籤/搜索