在併發編程中同步原語也就是咱們一般說的鎖的主要做用是保證多個線程或者 goroutine
在訪問同一片內存時不會出現混亂的問題。Go
語言的sync
包提供了常見的併發編程同步原語,上一期轉載的文章《Golang 併發編程之同步原語》中也詳述了 Mutex
、RWMutex
、WaitGroup
、Once
和 Cond
這些同步原語的實現原理。今天的文章裏讓咱們回到應用層,聚焦sync
包裏這些同步原語的應用場景,同時也會介紹sync
包中的Pool
和Map
的應用場景和使用方法。話很少說,讓咱們開始吧。shell
sync.Mutex
多是sync
包中使用最普遍的原語。它容許在共享資源上互斥訪問(不能同時訪問):數據庫
mutex := &sync.Mutex{} mutex.Lock() // Update共享變量 (好比切片,結構體指針等) mutex.Unlock()
必須指出的是,在第一次被使用後,不能再對sync.Mutex
進行復制。(sync
包的全部原語都同樣)。若是結構體具備同步原語字段,則必須經過指針傳遞它。編程
sync.RWMutex
是一個讀寫互斥鎖,它提供了咱們上面的剛剛看到的sync.Mutex
的Lock
和UnLock
方法(由於這兩個結構都實現了sync.Locker
接口)。可是,它還容許使用RLock
和RUnlock
方法進行併發讀取:緩存
mutex := &sync.RWMutex{} mutex.Lock() // Update 共享變量 mutex.Unlock() mutex.RLock() // Read 共享變量 mutex.RUnlock()
sync.RWMutex
容許至少一個讀鎖或一個寫鎖存在,而sync.Mutex
容許一個讀鎖或一個寫鎖存在。安全
經過基準測試來比較這幾個方法的性能:併發
BenchmarkMutexLock-4 83497579 17.7 ns/op BenchmarkRWMutexLock-4 35286374 44.3 ns/op BenchmarkRWMutexRLock-4 89403342 15.3 ns/op
能夠看到鎖定/解鎖sync.RWMutex
讀鎖的速度比鎖定/解鎖sync.Mutex
更快,另外一方面,在sync.RWMutex
上調用Lock()
/ Unlock()
是最慢的操做。函數
所以,只有在頻繁讀取和不頻繁寫入的場景裏,才應該使用sync.RWMutex
。性能
sync.WaitGroup
也是一個常常會用到的同步原語,它的使用場景是在一個goroutine
等待一組goroutine
執行完成。測試
sync.WaitGroup
擁有一個內部計數器。當計數器等於0
時,則Wait()
方法會當即返回。不然它將阻塞執行Wait()
方法的goroutine
直到計數器等於0
時爲止。優化
要增長計數器,咱們必須使用Add(int)
方法。要減小它,咱們可使用Done()
(將計數器減1
),也能夠傳遞負數給Add
方法把計數器減小指定大小,Done()
方法底層就是經過Add(-1)
實現的。
在如下示例中,咱們將啓動八個goroutine
,並等待他們完成:
wg := &sync.WaitGroup{} for i := 0; i < 8; i++ { wg.Add(1) go func() { // Do something wg.Done() }() } wg.Wait() // 繼續往下執行...
每次建立goroutine
時,咱們都會使用wg.Add(1)
來增長wg
的內部計數器。咱們也能夠在for
循環以前調用wg.Add(8)
。
與此同時,每一個goroutine
完成時,都會使用wg.Done()
減小wg
的內部計數器。
main goroutine
會在八個goroutine
都執行wg.Done()
將計數器變爲0
後才能繼續執行。
sync.Map
是一個併發版本的Go
語言的map
,咱們能夠:
Store(interface {},interface {})
添加元素。Load(interface {}) interface {}
檢索元素。Delete(interface {})
刪除元素。LoadOrStore(interface {},interface {}) (interface {},bool)
檢索或添加以前不存在的元素。若是鍵以前在map
中存在,則返回的布爾值爲true
。Range
遍歷元素。m := &sync.Map{} // 添加元素 m.Store(1, "one") m.Store(2, "two") // 獲取元素1 value, contains := m.Load(1) if contains { fmt.Printf("%s\n", value.(string)) } // 返回已存value,不然把指定的鍵值存儲到map中 value, loaded := m.LoadOrStore(3, "three") if !loaded { fmt.Printf("%s\n", value.(string)) } m.Delete(3) // 迭代全部元素 m.Range(func(key, value interface{}) bool { fmt.Printf("%d: %s\n", key.(int), value.(string)) return true })
上面的程序會輸出:
one three 1: one 2: two
如你所見,Range
方法接收一個類型爲func(key,value interface {})bool
的函數參數。若是函數返回了false
,則中止迭代。有趣的事實是,即便咱們在恆定時間後返回false
,最壞狀況下的時間複雜度仍爲O(n)
。
咱們應該在何時使用sync.Map
而不是在普通的map
上使用sync.Mutex
?
map
有頻繁的讀取和不頻繁的寫入時。goroutine
讀取,寫入和覆蓋不相交的鍵時。具體是什麼意思呢?例如,若是咱們有一個分片實現,其中包含一組4個goroutine
,每一個goroutine
負責25%的鍵(每一個負責的鍵不衝突)。在這種狀況下,sync.Map
是首選。sync.Pool
是一個併發池,負責安全地保存一組對象。它有兩個導出方法:
Get() interface{}
用來從併發池中取出元素。Put(interface{})
將一個對象加入併發池。pool := &sync.Pool{} pool.Put(NewConnection(1)) pool.Put(NewConnection(2)) pool.Put(NewConnection(3)) connection := pool.Get().(*Connection) fmt.Printf("%d\n", connection.id) connection = pool.Get().(*Connection) fmt.Printf("%d\n", connection.id) connection = pool.Get().(*Connection) fmt.Printf("%d\n", connection.id)
輸出:
1 3 2
須要注意的是Get()
方法會從併發池中隨機取出對象,沒法保證以固定的順序獲取併發池中存儲的對象。
還能夠爲sync.Pool
指定一個建立者方法:
pool := &sync.Pool{ New: func() interface{} { return NewConnection() }, } connection := pool.Get().(*Connection)
這樣每次調用Get()
時,將返回由在pool.New
中指定的函數建立的對象(在本例中爲指針)。
那麼何時使用sync.Pool?有兩個用例:
第一個是當咱們必須重用共享的和長期存在的對象(例如,數據庫鏈接)時。第二個是用於優化內存分配。
讓咱們考慮一個寫入緩衝區並將結果持久保存到文件中的函數示例。使用sync.Pool
,咱們能夠經過在不一樣的函數調用之間重用同一對象來重用爲緩衝區分配的空間。
第一步是檢索先前分配的緩衝區(若是是第一個調用,則建立一個緩衝區,但這是抽象的)。而後,defer
操做是將緩衝區放回sync.Pool
中。
func writeFile(pool *sync.Pool, filename string) error { buf := pool.Get().(*bytes.Buffer) defer pool.Put(buf) // Reset 緩存區,否則會鏈接上次調用時保存在緩存區裏的字符串foo // 編程foofoo 以此類推 buf.Reset() buf.WriteString("foo") return ioutil.WriteFile(filename, buf.Bytes(), 0644) }
sync.Once
是一個簡單而強大的原語,可確保一個函數僅執行一次。在下面的示例中,只有一個goroutine
會顯示輸出消息:
once := &sync.Once{} for i := 0; i < 4; i++ { i := i go func() { once.Do(func() { fmt.Printf("first %d\n", i) }) }() }
咱們使用了Do(func ())
方法來指定只能被調用一次的部分。
sync.Cond
多是sync
包提供的同步原語中最不經常使用的一個,它用於發出信號(一對一)或廣播信號(一對多)到goroutine
。讓咱們考慮一個場景,咱們必須向一個goroutine
指示共享切片的第一個元素已更新。建立sync.Cond
須要sync.Locker
對象(sync.Mutex
或sync.RWMutex
):
cond := sync.NewCond(&sync.Mutex{})
而後,讓咱們編寫負責顯示切片的第一個元素的函數:
func printFirstElement(s []int, cond *sync.Cond) { cond.L.Lock() cond.Wait() fmt.Printf("%d\n", s[0]) cond.L.Unlock() }
咱們可使用cond.L
訪問內部的互斥鎖。一旦得到了鎖,咱們將調用cond.Wait()
,這會讓當前goroutine
在收到信號前一直處於阻塞狀態。
讓咱們回到main goroutine
。咱們將經過傳遞共享切片和先前建立的sync.Cond
來建立printFirstElement
池。而後咱們調用get()
函數,將結果存儲在s[0]
中併發出信號:
s := make([]int, 1) for i := 0; i < runtime.NumCPU(); i++ { go printFirstElement(s, cond) } i := get() cond.L.Lock() s[0] = i cond.Signal() cond.L.Unlock()
這個信號會解除一個goroutine
的阻塞狀態,解除阻塞的goroutine
將會顯示s[0]
中存儲的值。
可是,有的人可能會爭辯說咱們的代碼破壞了Go
的最基本原則之一:
不要經過共享內存進行通訊;而是經過通訊共享內存。
確實,在這個示例中,最好使用channel
來傳遞get()
返回的值。可是咱們也提到了sync.Cond
也能夠用於廣播信號。咱們修改一下上面的示例,把Signal()
調用改成調用Broadcast()
。
i := get() cond.L.Lock() s[0] = i cond.Broadcast() cond.L.Unlock()
在這種狀況下,全部goroutine都將被觸發。
衆所周知,channel
裏的元素只會由一個goroutine
接收到。經過channel
模擬廣播的惟一方法是關閉channel
。
當一個channel被關閉後,channel中已經發送的數據都被成功接收後,後續的接收操做將再也不阻塞,它們會當即返回一個零值。
可是這種方式只能廣播一次。所以,儘管存在很大爭議,但這無疑是sync.Cond
的一個有趣的功能。