Go語言sync包的應用詳解

在併發編程中同步原語也就是咱們一般說的鎖的主要做用是保證多個線程或者 goroutine在訪問同一片內存時不會出現混亂的問題。Go語言的sync包提供了常見的併發編程同步原語,上一期轉載的文章《Golang 併發編程之同步原語》中也詳述了 MutexRWMutexWaitGroupOnceCond 這些同步原語的實現原理。今天的文章裏讓咱們回到應用層,聚焦sync包裏這些同步原語的應用場景,同時也會介紹sync包中的PoolMap的應用場景和使用方法。話很少說,讓咱們開始吧。shell

sync.Mutex

sync.Mutex多是sync包中使用最普遍的原語。它容許在共享資源上互斥訪問(不能同時訪問):數據庫

mutex := &sync.Mutex{}

mutex.Lock()
// Update共享變量 (好比切片,結構體指針等)
mutex.Unlock()
複製代碼

必須指出的是,在第一次被使用後,不能再對sync.Mutex進行復制。(sync包的全部原語都同樣)。若是結構體具備同步原語字段,則必須經過指針傳遞它。編程

sync.RWMutex

sync.RWMutex是一個讀寫互斥鎖,它提供了咱們上面的剛剛看到的sync.MutexLockUnLock方法(由於這兩個結構都實現了sync.Locker接口)。可是,它還容許使用RLockRUnlock方法進行併發讀取:緩存

mutex := &sync.RWMutex{}

mutex.Lock()
// Update 共享變量
mutex.Unlock()

mutex.RLock()
// Read 共享變量
mutex.RUnlock()
複製代碼

sync.RWMutex容許至少一個讀鎖或一個寫鎖存在,而sync.Mutex容許一個讀鎖或一個寫鎖存在。安全

經過基準測試來比較這幾個方法的性能:bash

BenchmarkMutexLock-4       83497579         17.7 ns/op
BenchmarkRWMutexLock-4     35286374         44.3 ns/op
BenchmarkRWMutexRLock-4    89403342         15.3 ns/op
複製代碼

能夠看到鎖定/解鎖sync.RWMutex讀鎖的速度比鎖定/解鎖sync.Mutex更快,另外一方面,在sync.RWMutex上調用Lock()/ Unlock()是最慢的操做。併發

所以,只有在頻繁讀取和不頻繁寫入的場景裏,才應該使用sync.RWMutex函數

sync.WaitGroup

sync.WaitGroup也是一個常常會用到的同步原語,它的使用場景是在一個goroutine等待一組goroutine執行完成。性能

sync.WaitGroup擁有一個內部計數器。當計數器等於0時,則Wait()方法會當即返回。不然它將阻塞執行Wait()方法的goroutine直到計數器等於0時爲止。測試

要增長計數器,咱們必須使用Add(int)方法。要減小它,咱們可使用Done()(將計數器減1),也能夠傳遞負數給Add方法把計數器減小指定大小,Done()方法底層就是經過Add(-1)實現的。

在如下示例中,咱們將啓動八個goroutine,並等待他們完成:

wg := &sync.WaitGroup{}

for i := 0; i < 8; i++ {
  wg.Add(1)
  go func() {
    // Do something
    wg.Done()
  }()
}

wg.Wait()
// 繼續往下執行...
複製代碼

每次建立goroutine時,咱們都會使用wg.Add(1)來增長wg的內部計數器。咱們也能夠在for循環以前調用wg.Add(8)

與此同時,每一個goroutine完成時,都會使用wg.Done()減小wg的內部計數器。

main goroutine會在八個goroutine都執行wg.Done()將計數器變爲0後才能繼續執行。

sync.Map

sync.Map是一個併發版本的Go語言的map,咱們能夠:

  • 使用Store(interface {},interface {})添加元素。
  • 使用Load(interface {}) interface {}檢索元素。
  • 使用Delete(interface {})刪除元素。
  • 使用LoadOrStore(interface {},interface {}) (interface {},bool)檢索或添加以前不存在的元素。若是鍵以前在map中存在,則返回的布爾值爲true
  • 使用Range遍歷元素。
m := &sync.Map{}

// 添加元素
m.Store(1, "one")
m.Store(2, "two")

// 獲取元素1
value, contains := m.Load(1)
if contains {
  fmt.Printf("%s\n", value.(string))
}

// 返回已存value,不然把指定的鍵值存儲到map中
value, loaded := m.LoadOrStore(3, "three")
if !loaded {
  fmt.Printf("%s\n", value.(string))
}

m.Delete(3)

// 迭代全部元素
m.Range(func(key, value interface{}) bool {
  fmt.Printf("%d: %s\n", key.(int), value.(string))
  return true
})
複製代碼

上面的程序會輸出:

one
three
1: one
2: two
複製代碼

如你所見,Range方法接收一個類型爲func(key,value interface {})bool的函數參數。若是函數返回了false,則中止迭代。有趣的事實是,即便咱們在恆定時間後返回false,最壞狀況下的時間複雜度仍爲O(n)

咱們應該在何時使用sync.Map而不是在普通的map上使用sync.Mutex

  • 當咱們對map有頻繁的讀取和不頻繁的寫入時。
  • 當多個goroutine讀取,寫入和覆蓋不相交的鍵時。具體是什麼意思呢?例如,若是咱們有一個分片實現,其中包含一組4個goroutine,每一個goroutine負責25%的鍵(每一個負責的鍵不衝突)。在這種狀況下,sync.Map是首選。

sync.Pool

sync.Pool是一個併發池,負責安全地保存一組對象。它有兩個導出方法:

  • Get() interface{} 用來從併發池中取出元素。
  • Put(interface{}) 將一個對象加入併發池。
pool := &sync.Pool{}

pool.Put(NewConnection(1))
pool.Put(NewConnection(2))
pool.Put(NewConnection(3))

connection := pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
複製代碼

輸出:

1
3
2
複製代碼

須要注意的是Get()方法會從併發池中隨機取出對象,沒法保證以固定的順序獲取併發池中存儲的對象。

還能夠爲sync.Pool指定一個建立者方法:

pool := &sync.Pool{
  New: func() interface{} {
    return NewConnection()
  },
}

connection := pool.Get().(*Connection)
複製代碼

這樣每次調用Get()時,將返回由在pool.New中指定的函數建立的對象(在本例中爲指針)。

那麼何時使用sync.Pool?有兩個用例:

第一個是當咱們必須重用共享的和長期存在的對象(例如,數據庫鏈接)時。第二個是用於優化內存分配。

讓咱們考慮一個寫入緩衝區並將結果持久保存到文件中的函數示例。使用sync.Pool,咱們能夠經過在不一樣的函數調用之間重用同一對象來重用爲緩衝區分配的空間。 第一步是檢索先前分配的緩衝區(若是是第一個調用,則建立一個緩衝區,但這是抽象的)。而後,defer操做是將緩衝區放回sync.Pool中。

func writeFile(pool *sync.Pool, filename string) error {
	buf := pool.Get().(*bytes.Buffer)

  defer pool.Put(buf)

	// Reset 緩存區,否則會鏈接上次調用時保存在緩存區裏的字符串foo
	// 編程foofoo 以此類推
	buf.Reset()

	buf.WriteString("foo")

	return ioutil.WriteFile(filename, buf.Bytes(), 0644)
}
複製代碼

sync.Once

sync.Once是一個簡單而強大的原語,可確保一個函數僅執行一次。在下面的示例中,只有一個goroutine會顯示輸出消息:

once := &sync.Once{}
for i := 0; i < 4; i++ {
	i := i
	go func() {
		once.Do(func() {
			fmt.Printf("first %d\n", i)
		})
	}()
}
複製代碼

咱們使用了Do(func ())方法來指定只能被調用一次的部分。

sync.Cond

sync.Cond多是sync包提供的同步原語中最不經常使用的一個,它用於發出信號(一對一)或廣播信號(一對多)到goroutine。讓咱們考慮一個場景,咱們必須向一個goroutine指示共享切片的第一個元素已更新。建立sync.Cond須要sync.Locker對象(sync.Mutexsync.RWMutex):

cond := sync.NewCond(&sync.Mutex{})
複製代碼

而後,讓咱們編寫負責顯示切片的第一個元素的函數:

func printFirstElement(s []int, cond *sync.Cond) {
	cond.L.Lock()
	cond.Wait()
	fmt.Printf("%d\n", s[0])
	cond.L.Unlock()
}
複製代碼

咱們可使用cond.L訪問內部的互斥鎖。一旦得到了鎖,咱們將調用cond.Wait(),這會讓當前goroutine在收到信號前一直處於阻塞狀態。

讓咱們回到main goroutine。咱們將經過傳遞共享切片和先前建立的sync.Cond來建立printFirstElement池。而後咱們調用get()函數,將結果存儲在s[0]中併發出信號:

s := make([]int, 1)
for i := 0; i < runtime.NumCPU(); i++ {
	go printFirstElement(s, cond)
}

i := get()
cond.L.Lock()
s[0] = i
cond.Signal()
cond.L.Unlock()
複製代碼

這個信號會解除一個goroutine的阻塞狀態,解除阻塞的goroutine將會顯示s[0]中存儲的值。

可是,有的人可能會爭辯說咱們的代碼破壞了Go的最基本原則之一:

不要經過共享內存進行通訊;而是經過通訊共享內存。

確實,在這個示例中,最好使用channel來傳遞get()返回的值。可是咱們也提到了sync.Cond也能夠用於廣播信號。咱們修改一下上面的示例,把Signal()調用改成調用Broadcast()

i := get()
cond.L.Lock()
s[0] = i
cond.Broadcast()
cond.L.Unlock()
複製代碼

在這種狀況下,全部goroutine都將被觸發。 衆所周知,channel裏的元素只會由一個goroutine接收到。經過channel模擬廣播的惟一方法是關閉channel

當一個channel被關閉後,channel中已經發送的數據都被成功接收後,後續的接收操做將再也不阻塞,它們會當即返回一個零值。

可是這種方式只能廣播一次。所以,儘管存在很大爭議,但這無疑是sync.Cond的一個有趣的功能。

推薦閱讀

學會使用context取消goroutine執行的方法

使用SecureCookie實現客戶端Session管理

Go Web編程--解析JSON請求和生成JSON響應

相關文章
相關標籤/搜索