Go中sync包學習

前面剛講到goroutine和channel,經過goroutine啓動一個協程,經過channel的方式在多個goroutine中傳遞消息來保證併發安全。今天咱們來學習sync包,這個包是Go提供的基礎包,提供了鎖的支持。可是Go官方給的建議是:不要以共享內存的方式來通訊,而是要以通訊的手段來共享內存。因此他們是提倡使用channel的方式來實現併發控制。web

學過Java的同窗對鎖的概念確定不陌生,在Java中提供Sychronized關鍵字提供獨佔鎖,Lock類提供讀寫鎖。在sync包中實現的功能也是與鎖相關,包中主要包含的對象有:安全

  • Locker:提供了加鎖和解鎖的接口
  • Cond:條件等待經過 Wait 讓例程等待,經過 Signal 讓一個等待的例程繼續,經過 Broadcast 讓全部等待的例程繼續。
  • Map:線程安全的map ,同時被多個goroutines調用是安全的。
  • Mutex:互斥鎖,用來保證在任一時刻,只能有一個例程訪問某對象。實現了Locker接口。Mutex 的初始值爲解鎖狀態,Mutex 一般做爲其它結構體的匿名字段使用,使該結構體具備 Lock 和 Unlock 方法
  • Once:Once 是一個能夠被屢次調用可是隻執行一次,若每次調用Do時傳入參數f不一樣,可是隻有第一個纔會被執行。
  • Pool:用於存儲臨時對象,它將使用完畢的對象存入對象池中,在須要的時候取出來重複使用,其中存放的臨時對象隨時可能被 GC 回收掉若是該對象再也不被其它變量引用
  • RWMutex:讀寫互斥鎖,RWMutex 比 Mutex 多了一個「讀鎖定」和「讀解鎖」,可讓多個例程同時讀取某對象。RWMutex 的初始值爲解鎖狀態。RWMutex 一般做爲其它結構體的匿名字段使用。
  • WaitGroup :用於等待一組例程的結束。主例程在建立每一個子例程的時候先調用 Add 增長等待計數,每一個子例程在結束時調用 Done 減小例程計數。以後主例程經過 Wait 方法開始等待,直到計數器歸零才繼續執行。

1. Mutex 互斥鎖使用

咱們先用Go寫一段經典的併發場景:併發

package main

import (
	"fmt"
	"time"
)

func main() {
	var a = 0
	for i := 0;i<1000;i++{
		go func(i int) {
			a += 1
			fmt.Println(a)
		}(i)
	}
	time.Sleep(time.Second)
}

運行這段程序,你會發現最後輸出的不是1000。app

這個時候你可使用Mutex:svg

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var a = 0
	var lock sync.Mutex
	for i := 0;i<1000;i++{
		go func(i int) {
			lock.Lock()
			a += 1
			fmt.Println(a)
			lock.Unlock()
		}(i)
	}
	time.Sleep(time.Second)
}

Mutex實現了Locker接口,因此他有Lock()方法和Unlock()方法。只須要在須要同步的代碼塊上下使用這兩個方法就好。函數

Mutex等同於Java中的Synchronized關鍵字或者Lock。學習

2. 讀寫鎖-RWMutex

相似於Java中的ReadWriteLock。讀寫鎖有以下四個方法:ui

寫操做的鎖定和解鎖
* func (*RWMutex) Lock
* func (*RWMutex) Unlock
讀操做的鎖定和解鎖
* func (*RWMutex) Rlock
* func (*RWMutex) RUnlock

當有一個 goroutine 得到寫鎖定,其它不管是讀鎖定仍是寫鎖定都將阻塞直到寫解鎖;spa

當有一個 goroutine 得到讀鎖定,其它讀鎖定仍然能夠繼續 ;線程

當有一個或任意多個讀鎖定,寫鎖定將等待全部讀鎖定解鎖以後纔可以進行寫鎖定 。

總結上面的三句話能夠得出結論:

  1. 同時只能有一個 goroutine 可以得到寫鎖定;
  2. 同時能夠有任意多個 goroutine 得到讀鎖定;
  3. 同時只能存在寫鎖定或讀鎖定(讀和寫互斥)。

看一個讀寫鎖的例子:

package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

var (
	rwLock sync.RWMutex
	data = ""
)

func read(ran int) {
	time.Sleep(time.Duration(ran) * time.Microsecond)
	rwLock.RLock()
	fmt.Printf("讀操做開始:%s\n",data)
	data = ""
	rwLock.RUnlock()
}

func write(subData string)  {
	rwLock.Lock()
	data = subData
	fmt.Printf("寫操做開始:%s\n",data)
	rwLock.Unlock()
}

func deduce()  {
	for i:=0;i<10;i++ {
		go write(strconv.Itoa(i))
	}
	for i:=0;i<10;i++ {
		go read(i * 100)
	}
}


func main() {
	deduce()
	time.Sleep(2*time.Second)
}

運行上面的程序,會發現寫操做都執行了,可是讀操做不是將全部寫的數字都讀出來了。這是由於讀操做是能夠同時有多個goroutine獲取鎖的,可是寫操做只能同時有一個goroutine執行。

3. WaitGroup

WaitGroup 用於等待一組 goroutine 結束,它有三個方法:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()

與Java中類比的話,類似與CountDownLatch。

package main

import (
	"fmt"
	"sync"
	"time"
)

func goWithMountain(p int,wg *sync.WaitGroup)  {
	defer wg.Done()
	fmt.Printf("%d,我已經上來了\n",p)
}

func main() {
	var wg sync.WaitGroup
	wg.Add(10)
	for i:=0;i<10;i++ {
		go goWithMountain(i,&wg)
	}
	wg.Wait()
	time.Sleep(2*time.Second)
	fmt.Printf("=爬山結束\n")
}

輸出:
0,我已經上來了
9,我已經上來了
3,我已經上來了
7,我已經上來了
8,我已經上來了
6,我已經上來了
2,我已經上來了
4,我已經上來了
5,我已經上來了
1,我已經上來了
=爬山結束

是否是有同樣同樣的呢。

4. Cond條件變量

與互斥量不一樣,條件變量的做用並非保證在同一時刻僅有一個線程訪問某一個共享數據,而是在對應的共享數據的狀態發生變化時,通知其餘所以而被阻塞的線程。條件變量老是與互斥量組合使用。互斥量爲共享數據的訪問提供互斥支持,而條件變量能夠就共享數據的狀態的變化向相關線程發出通知。 下面給出主要的幾個函數:

func NewCond(l Locker) *Cond:用於建立條件,根據實際狀況傳入sync.Mutex或者sync.RWMutex的指針,必定要是指針,不然會發生複製致使鎖的失效
func (c *Cond) Broadcast():喚醒條件上的全部goroutine
func (c *Cond) Signal():隨機喚醒等待隊列上的goroutine,隨機的方式效率更高
func (c *Cond) Wait():掛起goroutine的操做

看一個讀寫操做的例子:

package main

import (
	"bytes"
	"fmt"
	"io"
	"sync"
	"time"
)

type MyDataBucket struct {
	br     *bytes.Buffer
	gmutex *sync.RWMutex
	rcond  *sync.Cond //讀操做須要用到的條件變量
}

func NewDataBucket() *MyDataBucket {
	buf := make([]byte, 0)
	db := &MyDataBucket{
		br:     bytes.NewBuffer(buf),
		gmutex: new(sync.RWMutex),
	}
	db.rcond = sync.NewCond(db.gmutex.RLocker())
	return db
}

func (db *MyDataBucket) Read(i int) {
	db.gmutex.RLock()
	defer db.gmutex.RUnlock()
	var data []byte
	var d byte
	var err error
	for {
		//讀取一個字節
		if d, err = db.br.ReadByte(); err != nil {
			if err == io.EOF {
				if string(data) != "" {
					fmt.Printf("reader-%d: %s\n", i, data)
				}
				db.rcond.Wait()
				data = data[:0]
				continue
			}
		}
		data = append(data, d)
	}
}

func (db *MyDataBucket) Put(d []byte) (int, error) {
	db.gmutex.Lock()
	defer db.gmutex.Unlock()
	//寫入一個數據塊
	n, err := db.br.Write(d)
	db.rcond.Broadcast()
	return n, err
}

func main() {
	db := NewDataBucket()
	go db.Read(1)
	go db.Read(2)
	for i := 0; i < 10; i++ {
		go func(i int) {
			d := fmt.Sprintf("data-%d", i)
			db.Put([]byte(d))
		}(i)
		time.Sleep(100 * time.Millisecond)
	}
}

上例中,讀操做必依賴於寫操做先寫入數據才能開始讀。當讀取的數據爲空的時候,會先調用wait()方法阻塞當前方法,在Put方法中寫完數據以後會調用Broadcast()去廣播,告訴阻塞者能夠開始了。

5.Pool 臨時對象池

Pool 用於存儲臨時對象,它將使用完畢的對象存入對象池中,在須要的時候取出來重複使用,目的是爲了不重複建立相同的對象形成 GC 負擔太重。從 Pool 中取出對象時,若是 Pool 中沒有對象,將返回 nil,可是若是給 Pool.New 字段指定了一個函數的話,Pool 將使用該函數建立一個新對象返回。

sync.Pool能夠安全被多個線程同時使用,保證線程安全。這個Pool和咱們通常意義上的Pool不太同樣 ,Pool沒法設置大小,因此理論上只受限於系統內存大小。Pool中的對象不支持自定義過時時間及策略,究其緣由,Pool並非一個Cache。

看一個小例子:

package main

import (
	"fmt"
	"sync"
)


func main() {
	//咱們建立一個Pool,並實現New()函數
	sp := sync.Pool{
		New: func() interface{} {
			return make([]int, 16)
		},
	}
	item := sp.Get()
	fmt.Println("item : ", item)
	//咱們對item進行操做
	//New()返回的是interface{},咱們須要經過類型斷言來轉換
	for i := 0; i < len(item.([]int)); i++ {
		item.([]int)[i] = i
	}
	fmt.Println("item : ", item)

	//使用完後,咱們把item放回池中,讓對象能夠重用
	sp.Put(item)

	//再次從池中獲取對象
	item2 := sp.Get()
	//注意這裏獲取的對象就是上面咱們放回池中的對象
	fmt.Println("item2 : ", item2)
	//咱們再次獲取對象
	item3 := sp.Get()
	//由於池中的對象已經沒有了,因此又從新經過New()建立一個新對象,放入池中,而後返回
	//因此item3是大小爲16的空[]int
	fmt.Println("item3 : ", item3)
}

輸出:
item :  [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
item :  [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
item2 :  [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
item3 :  [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]

6. Once 執行一次

Once 的做用是屢次調用但只執行一次,Once 只有一個方法,Once.Do(),向 Do 傳入一個函數,這個函數在第一次執行 Once.Do() 的時候會被調用,之後再執行 Once.Do() 將沒有任何動做,即便傳入了其它的函數,也不會被執行,若是要執行其它函數,須要從新建立一個 Once 對象。

看一個很簡單的例子:

package main

import (
	"fmt"
	"sync"
)


func main() {
	var once sync.Once
	onceBody := func() {
		fmt.Println("我只會出現一次")
	}
	done := make(chan bool)
	for i := 0; i < 3; i++ {
		go func() {
			once.Do(onceBody)
			done <- true
		}()
	}
	for i := 0; i < 3; i++ {
		<-done
	}
}
相關文章
相關標籤/搜索