前面剛講到goroutine和channel,經過goroutine啓動一個協程,經過channel的方式在多個goroutine中傳遞消息來保證併發安全。今天咱們來學習sync包,這個包是Go提供的基礎包,提供了鎖的支持。可是Go官方給的建議是:不要以共享內存的方式來通訊,而是要以通訊的手段來共享內存。因此他們是提倡使用channel的方式來實現併發控制。web
學過Java的同窗對鎖的概念確定不陌生,在Java中提供Sychronized
關鍵字提供獨佔鎖,Lock
類提供讀寫鎖。在sync包中實現的功能也是與鎖相關,包中主要包含的對象有:安全
咱們先用Go寫一段經典的併發場景:併發
package main import ( "fmt" "time" ) func main() { var a = 0 for i := 0;i<1000;i++{ go func(i int) { a += 1 fmt.Println(a) }(i) } time.Sleep(time.Second) }
運行這段程序,你會發現最後輸出的不是1000。app
這個時候你可使用Mutex:svg
package main import ( "fmt" "sync" "time" ) func main() { var a = 0 var lock sync.Mutex for i := 0;i<1000;i++{ go func(i int) { lock.Lock() a += 1 fmt.Println(a) lock.Unlock() }(i) } time.Sleep(time.Second) }
Mutex實現了Locker接口,因此他有Lock()方法和Unlock()方法。只須要在須要同步的代碼塊上下使用這兩個方法就好。函數
Mutex等同於Java中的Synchronized關鍵字或者Lock。學習
相似於Java中的ReadWriteLock。讀寫鎖有以下四個方法:ui
寫操做的鎖定和解鎖 * func (*RWMutex) Lock * func (*RWMutex) Unlock 讀操做的鎖定和解鎖 * func (*RWMutex) Rlock * func (*RWMutex) RUnlock
當有一個 goroutine 得到寫鎖定,其它不管是讀鎖定仍是寫鎖定都將阻塞直到寫解鎖;spa
當有一個 goroutine 得到讀鎖定,其它讀鎖定仍然能夠繼續 ;線程
當有一個或任意多個讀鎖定,寫鎖定將等待全部讀鎖定解鎖以後纔可以進行寫鎖定 。
總結上面的三句話能夠得出結論:
看一個讀寫鎖的例子:
package main import ( "fmt" "strconv" "sync" "time" ) var ( rwLock sync.RWMutex data = "" ) func read(ran int) { time.Sleep(time.Duration(ran) * time.Microsecond) rwLock.RLock() fmt.Printf("讀操做開始:%s\n",data) data = "" rwLock.RUnlock() } func write(subData string) { rwLock.Lock() data = subData fmt.Printf("寫操做開始:%s\n",data) rwLock.Unlock() } func deduce() { for i:=0;i<10;i++ { go write(strconv.Itoa(i)) } for i:=0;i<10;i++ { go read(i * 100) } } func main() { deduce() time.Sleep(2*time.Second) }
運行上面的程序,會發現寫操做都執行了,可是讀操做不是將全部寫的數字都讀出來了。這是由於讀操做是能夠同時有多個goroutine獲取鎖的,可是寫操做只能同時有一個goroutine執行。
WaitGroup 用於等待一組 goroutine 結束,它有三個方法:
func (wg *WaitGroup) Add(delta int) func (wg *WaitGroup) Done() func (wg *WaitGroup) Wait()
與Java中類比的話,類似與CountDownLatch。
package main import ( "fmt" "sync" "time" ) func goWithMountain(p int,wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("%d,我已經上來了\n",p) } func main() { var wg sync.WaitGroup wg.Add(10) for i:=0;i<10;i++ { go goWithMountain(i,&wg) } wg.Wait() time.Sleep(2*time.Second) fmt.Printf("=爬山結束\n") } 輸出: 0,我已經上來了 9,我已經上來了 3,我已經上來了 7,我已經上來了 8,我已經上來了 6,我已經上來了 2,我已經上來了 4,我已經上來了 5,我已經上來了 1,我已經上來了 =爬山結束
是否是有同樣同樣的呢。
與互斥量不一樣,條件變量的做用並非保證在同一時刻僅有一個線程訪問某一個共享數據,而是在對應的共享數據的狀態發生變化時,通知其餘所以而被阻塞的線程。條件變量老是與互斥量組合使用。互斥量爲共享數據的訪問提供互斥支持,而條件變量能夠就共享數據的狀態的變化向相關線程發出通知。 下面給出主要的幾個函數:
func NewCond(l Locker) *Cond:用於建立條件,根據實際狀況傳入sync.Mutex或者sync.RWMutex的指針,必定要是指針,不然會發生複製致使鎖的失效 func (c *Cond) Broadcast():喚醒條件上的全部goroutine func (c *Cond) Signal():隨機喚醒等待隊列上的goroutine,隨機的方式效率更高 func (c *Cond) Wait():掛起goroutine的操做
看一個讀寫操做的例子:
package main import ( "bytes" "fmt" "io" "sync" "time" ) type MyDataBucket struct { br *bytes.Buffer gmutex *sync.RWMutex rcond *sync.Cond //讀操做須要用到的條件變量 } func NewDataBucket() *MyDataBucket { buf := make([]byte, 0) db := &MyDataBucket{ br: bytes.NewBuffer(buf), gmutex: new(sync.RWMutex), } db.rcond = sync.NewCond(db.gmutex.RLocker()) return db } func (db *MyDataBucket) Read(i int) { db.gmutex.RLock() defer db.gmutex.RUnlock() var data []byte var d byte var err error for { //讀取一個字節 if d, err = db.br.ReadByte(); err != nil { if err == io.EOF { if string(data) != "" { fmt.Printf("reader-%d: %s\n", i, data) } db.rcond.Wait() data = data[:0] continue } } data = append(data, d) } } func (db *MyDataBucket) Put(d []byte) (int, error) { db.gmutex.Lock() defer db.gmutex.Unlock() //寫入一個數據塊 n, err := db.br.Write(d) db.rcond.Broadcast() return n, err } func main() { db := NewDataBucket() go db.Read(1) go db.Read(2) for i := 0; i < 10; i++ { go func(i int) { d := fmt.Sprintf("data-%d", i) db.Put([]byte(d)) }(i) time.Sleep(100 * time.Millisecond) } }
上例中,讀操做必依賴於寫操做先寫入數據才能開始讀。當讀取的數據爲空的時候,會先調用wait()方法阻塞當前方法,在Put方法中寫完數據以後會調用Broadcast()去廣播,告訴阻塞者能夠開始了。
Pool 用於存儲臨時對象,它將使用完畢的對象存入對象池中,在須要的時候取出來重複使用,目的是爲了不重複建立相同的對象形成 GC 負擔太重。從 Pool 中取出對象時,若是 Pool 中沒有對象,將返回 nil,可是若是給 Pool.New 字段指定了一個函數的話,Pool 將使用該函數建立一個新對象返回。
sync.Pool能夠安全被多個線程同時使用,保證線程安全。這個Pool和咱們通常意義上的Pool不太同樣 ,Pool沒法設置大小,因此理論上只受限於系統內存大小。Pool中的對象不支持自定義過時時間及策略,究其緣由,Pool並非一個Cache。
看一個小例子:
package main import ( "fmt" "sync" ) func main() { //咱們建立一個Pool,並實現New()函數 sp := sync.Pool{ New: func() interface{} { return make([]int, 16) }, } item := sp.Get() fmt.Println("item : ", item) //咱們對item進行操做 //New()返回的是interface{},咱們須要經過類型斷言來轉換 for i := 0; i < len(item.([]int)); i++ { item.([]int)[i] = i } fmt.Println("item : ", item) //使用完後,咱們把item放回池中,讓對象能夠重用 sp.Put(item) //再次從池中獲取對象 item2 := sp.Get() //注意這裏獲取的對象就是上面咱們放回池中的對象 fmt.Println("item2 : ", item2) //咱們再次獲取對象 item3 := sp.Get() //由於池中的對象已經沒有了,因此又從新經過New()建立一個新對象,放入池中,而後返回 //因此item3是大小爲16的空[]int fmt.Println("item3 : ", item3) } 輸出: item : [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] item : [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15] item2 : [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15] item3 : [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
Once 的做用是屢次調用但只執行一次,Once 只有一個方法,Once.Do(),向 Do 傳入一個函數,這個函數在第一次執行 Once.Do() 的時候會被調用,之後再執行 Once.Do() 將沒有任何動做,即便傳入了其它的函數,也不會被執行,若是要執行其它函數,須要從新建立一個 Once 對象。
看一個很簡單的例子:
package main import ( "fmt" "sync" ) func main() { var once sync.Once onceBody := func() { fmt.Println("我只會出現一次") } done := make(chan bool) for i := 0; i < 3; i++ { go func() { once.Do(onceBody) done <- true }() } for i := 0; i < 3; i++ { <-done } }