爲何須要goroutine同步html
gorotine同步概念、以及同步的幾種方式node
package main import ( "fmt" "sync" ) var A = 10 var wg = sync.WaitGroup{} func Add(){ defer wg.Done() for i:=0;i<1000000;i++{ A += 1 } } func main() { wg.Add(2) go Add() go Add() wg.Wait() fmt.Println(A) }
# output: 1061865 # 每運行一次結果都不同,可是都不是咱們預期的結果2000000
多goroutine【多任務】,有共享資源,且多goroutine修改共享資源,出現數據不安全問題【數據錯誤】,保證數據安全一致,須要goroutine同步瀏覽器
緩存
channel 【csp模型】安全
互斥鎖 【傳統同步機制】併發
讀寫鎖 【傳統同步機制】svg
條件變量 【傳統同步機制】函數
加鎖成功則操做資源,加鎖失敗則等待直至鎖加鎖成功----全部的goroutine互斥,一個獲得鎖其餘所有等待性能
解決了數據安全問題,下降了程序的性能,適用讀寫不太頻繁的場景單元測試
顆粒度是指,加鎖的範圍,哪裏使用資源哪裏加鎖,儘量減小加鎖範圍
單元測試基本使用流程
新建單元測試文件
編寫測試案例
gotest運行生成對應的prof文件
go tool 查看生成的prof文件
package main_test import ( "fmt" "sync" "testing" ) var A = 10 var wg = sync.WaitGroup{} var mux sync.Mutex func Add(){ defer wg.Done() for i:=0;i<1000000;i++{ mux.Lock() A += 1 mux.Unlock() } } /* // 加大鎖顆粒度 func Add(){ defer wg.Done() mux.Lock() for i:=0;i<1000000;i++{ A += 1 } mux.Unlock() }*/ // 單元測試格式, func TestMux(t *testing.T) { wg.Add(2) go Add() go Add() wg.Wait() fmt.Println(A) }
# 生成prof文件,-cpuprofile 參數指定生成什麼類型的prof cpu.prof指定生成profile文件名字 go test mutex_test.go -cpuprofile cpu.prof # 查看生成的prof文件,pprof 指定查看的文件類型 go tool pprof cpu.prof # 下面是輸出信息 Type: cpu Time: Jul 10, 2019 at 2:38pm (CST) Duration: 201.43ms, Total samples = 80ms (39.72%) Entering interactive mode (type "help" for commands, "o" for options) (pprof) top # 這裏使用top命令查看測試中cpu使用的信息 Showing nodes accounting for 80ms, 100% of 80ms total flat flat% sum% cum cum% 60ms 75.00% 75.00% 60ms 75.00% sync.(*Mutex).Unlock 20ms 25.00% 100% 20ms 25.00% sync.(*Mutex).Lock 0 0% 100% 80ms 100% command-line-arguments_test.Add (pprof) svg #svg 保存可視化文件,可使用瀏覽器可視化查看 (pprof) list Add # 查看對應函數的詳細時間消耗信息
注意:
// Once is an object that will perform exactly one action. type Once struct { m Mutex done uint32 // 標識是否已執行過任務,若是設置爲1 則說明任務已執行過了 } // DO 調用用戶執行的方法,僅調用一次 func (o *Once) Do(f func()) { // 原子操做判斷done,已被置成1,若是done是1 說明方法已被執行,直接返回 if atomic.LoadUint32(&o.done) == 1 { return } // 加鎖 o.m.Lock() defer o.m.Unlock() // done爲0則開始,調用用戶函數方法 if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }
讀寫互斥,讀者能夠重複加鎖。寫加鎖須要等待全部讀者解鎖,寫加鎖期間全部讀者wait【寫優先級高於讀,讀寫同時加鎖寫着加鎖先成功】
適用寫少讀多的場景,相比互斥鎖能夠必定程度提升程序性能
僅有讀者
條件變量的做用並不保證在同一時刻僅有一個協程(線程)訪問某個共享的數據資源,而是在對應的共享數據的狀態發生變化時,通知阻塞在某個條件上的協程(線程)。條件變量不是鎖,在併發中不能達到同步的目的,所以條件變量老是與鎖一塊使用,能夠認爲條件變量是對鎖的一種補充,某種程度上提升鎖機制帶來的效率低下的問題
建立條件變量 【建立後不能被被拷貝】
// 參數傳遞一把鎖,返回指針類型 cond:=sync.NewCond(&sync.Mutex{})
Cond.Wait() ,阻塞再條件變量上讓出cup資源
// 阻塞在條件變量上面,會把當前gorotine掛載到Cond隊列上面 cond.Wait() // 1. 釋放鎖,並把本身掛載到通知隊列,阻塞等待【原子操做】 // 2. 接收到喚醒信號,嘗試獲取鎖 // 3. 獲取鎖成功則 返回
Cond.Signal() 隨機喚醒一個阻塞在條件變量上的goroutine
// 喚醒阻塞在條件變量上的goroutine,處於wait【調用了cond.wait】狀態的goroutine // 隨機喚醒通知隊列上的一個線程,並從通知隊列移除 cond.Signal() // 發送喚醒信號
Cond.Broadcast() 廣播通知全部處於wait狀態的goroutine
// 廣播通知全部處於wait狀態的goroutine // 通知通知隊列上的全部的gorotine,而且把全部的goroutine從通知隊列 取下來 cond.Broadcast()
package main import "fmt" import "sync" import "math/rand" import "time" var cond sync.Cond // 建立全局條件變量 // 生產者 func producer(out chan<- int, idx int) { for { cond.L.Lock() // 條件變量對應互斥鎖加鎖 for len(out) == 3 { // 產品區滿 等待消費者消費 cond.Wait() // 掛起當前協程, 等待條件變量知足,被消費者喚醒 } num := rand.Intn(1000) // 產生一個隨機數 out <- num // 寫入到 channel 中 (生產) fmt.Printf("%dth 生產者,產生數據 %3d, 公共區剩餘%d個數據\n", idx, num, len(out)) cond.L.Unlock() // 生產結束,解鎖互斥鎖 cond.Signal() // 喚醒 阻塞的 消費者 time.Sleep(time.Second) // 生產完休息一會,給其餘協程執行機會, 解決了死鎖機會的下降 } } //消費者 func consumer(in <-chan int, idx int) { for { cond.L.Lock() // 條件變量對應互斥鎖加鎖(與生產者是同一個) for len(in) == 0 { // 產品區爲空 等待生產者生產 cond.Wait() // 掛起當前協程, 等待條件變量知足,被生產者喚醒 } num := <-in // 將 channel 中的數據讀走 (消費) fmt.Printf("---- %dth 消費者, 消費數據 %3d,公共區剩餘%d個數據\n", idx, num, len(in)) cond.L.Unlock() // 消費結束,解鎖互斥鎖 cond.Signal() // 喚醒 阻塞的 生產者 time.Sleep(time.Millisecond * 500) //消費完 休息一會,給其餘協程執行機會, 解決了死鎖機會的下降 } } func main() { rand.Seed(time.Now().UnixNano()) // 設置隨機數種子 quit := make(chan bool) // 建立用於結束通訊的 channel product := make(chan int, 3) // 產品區(公共區)使用channel 模擬 cond.L = new(sync.Mutex) // 建立互斥鎖和條件變量 for i := 0; i < 5; i++ { // 5個消費者 go producer(product, i+1) } for i := 0; i < 3; i++ { // 3個生產者 go consumer(product, i+1) } <-quit // 主協程阻塞 不結束 }
極端處理: 1個生產者 2 消費 channle 緩存1
因爲極端一些狀況,會致使全部的生產者與消費者都會進入到一個wait 狀態,沒有人喚醒
解決bug----單向喚醒,由生產者喚醒消費者
喚醒方向問題: 由速率低的一方喚醒速率高的一方
package main import ( "fmt" "runtime" ) import "sync" import "math/rand" import "time" var cond sync.Cond // 建立全局條件變量 // 生產者 func producer(out chan<- int, idx int) { for { num := rand.Intn(1000) // 產生一個隨機數 cond.L.Lock() // 條件變量對應互斥鎖加鎖 select { // 嘗試向channel寫入數據 case out <- num: fmt.Printf("%dth 生產者,產生數據 %3d, 公共區剩餘%d個數據\n", idx, num, len(out)) default: } cond.L.Unlock() // 生產結束,解鎖互斥鎖 cond.Signal() // 喚醒 阻塞的 消費者 runtime.Gosched() // 給別更多的機會建立鎖 } } //消費者 func consumer(in <-chan int, idx int) { var num int for { cond.L.Lock() // 條件變量對應互斥鎖加鎖(與生產者是同一個) for len(in)==0{ cond.Wait() } num=<-in fmt.Printf("%dth 消費者,消費了 %d, 公共區剩餘%d個數據\n", idx, num, len(in)) cond.L.Unlock() // 消費結束,解鎖互斥鎖 } } func main() { rand.Seed(time.Now().UnixNano()) // 設置隨機數種子 quit := make(chan bool) // 建立用於結束通訊的 channel product := make(chan int, 3) // 產品區(公共區)使用channel 模擬 cond.L = new(sync.Mutex) // 建立互斥鎖和條件變量 for i := 0; i < 3; i++ { // 3個生產者 go producer(product, i+1) } for i := 0; i < 5; i++ { // 5個消費者 go consumer(product, i+1) } <-quit // 主協程阻塞 不結束 }
問題:
當咱們把條件變量取消,使用帶緩存的channel,一樣很好的完成生產者與消費者模型【channel空與非空主動阻塞等待,直至解除阻塞】,why use cond?
使用channel通知多個關注條件的goroutine問題?
關閉的channle 與廣播的做用,僅僅單次使用
當狀態多重狀況的時候,channel 不行了,使用cond廣播的方式進行狀態更新