前面咱們爲了解決go程同步的問題咱們使用了channel, 可是go也提供了傳統的同步工具.git
它們都在go的標準庫代碼包 sync
和 sync/atomic
中.編程
下面咱們來看一下鎖的應用.併發
什麼是鎖呢? 就是某個協程(線程)在訪問某個資源時先鎖住, 防止其餘協程的訪問, 等訪問完畢解鎖後其餘協程再來加鎖進行訪問.函數
這和咱們生活中加鎖使用公共資源類似, 例如: 公共衛生間.工具
死鎖是指兩個或者兩個以上的進程在執行過程當中, 因爲競爭資源或者因爲彼此通訊而形成的一種阻塞的現象, 若無外力做用, 它們都將沒法推動下去. 此時稱系統處於死鎖狀態或系統產生了死鎖.性能
死鎖不是鎖的一種! 它是一種錯誤使用鎖致使的現象.網站
讀寫鎖
講到)單go程本身死鎖 示例代碼:atom
package main import "fmt" // 單go程本身死鎖 func main() { ch := make(chan int) ch <- 789 num := <- ch fmt.Println(num) }
上面這段乍一看有可能會以爲沒有什麼問題, 但是仔細一看就會發現這個 ch
是一個無緩衝的channel, 當789寫入緩衝區時, 這時讀端尚未準備好. 因此, 寫端 會發生阻塞, 後面的代碼再也不運行.操作系統
因此能夠得出一個結論: channel應該在至少2個及以上的go程進行通訊, 不然會形成死鎖.線程
咱們繼續看 go程間channel訪問順序致使死鎖 的例子:
package main import "fmt" // go程間channel訪問順序致使死鎖 func main(){ ch := make(chan int) num := <- ch fmt.Println("num = ", num) go func() { ch <- 789 }() }
在代碼運行到 num := <- ch
時, 發生阻塞, 而且下面的代碼不會執行, 因此發生死鎖.
正確應該這樣寫:
package main import "fmt" func main(){ ch := make(chan int) go func() { ch <- 789 }() num := <- ch fmt.Println("num = ", num) }
因此, 在使用channel一端讀(寫)時, 要保證另外一端寫(讀)操做有機會執行.
咱們再來看下 多go程, 多channel交叉死鎖 的示例代碼:
package main import "fmt" // 多go程, 多channel交叉死鎖 func main(){ ch1 := make(chan int) ch2 := make(chan int) go func() { for { select { case num := <- ch1: ch2 <- num } } }() for { select { case num := <- ch2: ch1 <- num } } }
每一個資源都對應於一個可稱爲"互斥鎖"的標記, 這個標記用來保證在任意時刻, 只能有一個協程(線程)訪問該資源, 其它的協程只能等待.
互斥鎖是傳統併發編程對共享資源進行訪問控制的主要手段, 它由標準庫 sync
中的 Mutex
結構體類型表示.
sync.Mutex
類型只有兩個公開的指針方法, Lock 和 Unlock.
Lock鎖定當前的共享資源, Unlock進行解鎖.
在使用互斥鎖時, 必定要注意, 對資源操做完成後, 必定要解鎖, 不然會出現流程執行異常, 死鎖等問題, 一般藉助defer. 鎖定後, 當即使用 defer
語句保證互斥鎖及時解鎖. 以下所示:
var mutex sync.Mutex // 定義互斥鎖變量: mutex func write() { mutex.Lock() defer mutex.Unlock() }
咱們先來回顧一下channel是怎麼樣完成數據同步的.
package main import ( "fmt" "time" ) var ch = make(chan int) func printer(str string) { for _, s := range str { fmt.Printf("%c ", s) time.Sleep(time.Millisecond * 300) } } func person1() { // 先 printer("hello") ch <- 666 } func person2() { // 後 <-ch printer("world") } func main() { go person1() go person2() time.Sleep(5 * time.Second) }
一樣可使用互斥鎖來解決, 以下所示:
package main import ( "fmt" "sync" "time" ) // 使用傳統的 "鎖" 完成同步 -- 互斥鎖 var mutex sync.Mutex // 建立一個互斥鎖(互斥量), 新建的互斥鎖狀態爲0 -> 未加鎖狀態. 鎖只有一把. func printer(str string) { mutex.Lock() // 訪問共享數據以前, 加鎖 for _, s := range str { fmt.Printf("%c ", s) time.Sleep(time.Millisecond * 300) } mutex.Unlock() // 共享數據訪問結束, 解鎖 } func person1() { printer("hello") } func person2() { printer("world") } func main() { go person1() go person2() time.Sleep(5 * time.Second) }
這種鎖爲建議鎖: 操做系統提供, 建議你在編程時使用.
強制鎖只會在底層操做系統本身用到, 咱們在寫代碼時用不到.
person1與person2兩個go程共同訪問共享數據, 因爲CPU調度隨機, 須要對 共享數據訪問順序加以限定(同步).
建立mutex(互斥鎖), 訪問共享數據以前, 加鎖; 訪問結束, 解鎖.
在person1的go程加鎖期間, person2的go程加鎖會失敗 --> 阻塞.
直至person1的go程解鎖mutext, person2從阻塞處, 恢復執行.
互斥鎖的本質是當一個goroutine訪問的時候, 其它goroutine都不能訪問. 這樣在資源同步, 避免競爭的同時, 也下降了程序的併發性能, 程序由原來的並行執行變成了串行執行.
其實, 當咱們對一個不會變化的數據只作讀操做的話, 是不存在資源競爭的問題的. 由於數據是不變的, 無論怎麼讀取, 多少goroutine同時讀取, 都是能夠的.
因此問題不是出在讀上, 主要是修改, 也就是寫. 修改的數據要同步, 這樣其它goroutine才能夠感知到. 因此真正的互斥應該是讀取和修改、修改和修改之間, 讀和讀是沒有互斥操做的必要的.
所以, 衍生出另一種鎖, 叫作讀寫鎖.
讀寫鎖可讓多個讀操做併發, 同時讀取, 可是對於寫操做是徹底互斥的. 也就是說, 當一個goroutine進行寫操做的時候, 其它goroutine既不能進行讀操做, 也不能進行寫操做.
Go中的讀寫鎖由結構體類型 sync.RWMutex
表示. 此類型的方法集合中包含兩對方法:
一組是對寫操做的鎖定和解鎖, 簡稱爲: 寫鎖定 和 寫解鎖.
func (*RWMutex) Lock() func (*RWMutex) Unlock()
另外一組表示對讀操做的鎖定和解鎖, 簡稱爲: 讀鎖定 和 讀解鎖.
func (*RWMutex) RLock() func (*RWMutex) RUnlock()
咱們先來看一下沒有使用讀寫鎖的狀況下會發生什麼:
package main import ( "fmt" "math/rand" "time" ) func readGo(in <-chan int, idx int){ for { num := <- in fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num) } } func writeGo(out chan<- int, idx int){ for { // 生成隨機數 num := rand.Intn(1000) out <- num fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num) time.Sleep(time.Millisecond * 300) } } func main() { // 隨機數種子 rand.Seed(time.Now().UnixNano()) ch := make(chan int) for i:=0; i<5; i++ { go readGo(ch, i+1) } for i:=0; i<5; i++ { go writeGo(ch, i+1) } time.Sleep(time.Second * 3) }
結果(截取部分):
...... 第4個寫go程, 寫入: 763 ----第1個讀go程, 讀入: 998 第1個寫go程, 寫入: 238 第3個寫go程, 寫入: 998 ...... 第5個寫go程, 寫入: 607 第4個寫go程, 寫入: 151 ----第1個讀go程, 讀入: 992 ----第2個讀go程, 讀入: 151 ......
經過結果咱們能夠知道, 當寫入 763
時, 因爲建立的是無緩衝的channel, 應該先把這個數讀出來, 而後才能夠繼續寫數據, 可是結果顯示, 讀到的是 998
, 998
在下面才顯示寫入啊, 怎麼會先讀出來呢? 出現這個狀況的問題在於, 當運行到 num := <- in
時, 已經把 998
寫進去了, 可是這個時候尚未來得及打印, 就失去了CPU, 失去CPU以後, 緩衝區中的數據就會被覆蓋掉, 這時被 763
所覆蓋.
這是第一個錯誤現象, 咱們再來看一下第二個錯誤現象.
既然都是對數據進行讀操做, 相鄰的讀入應該都是相同的數, 好比說----第1個讀go程, 讀入: 992 ----第2個讀go程, 讀入: 151
, 這兩個應該讀到的數都是同樣的, 可是結果顯示倒是不一樣的.
那麼加了讀寫鎖以後, 先來看一下錯誤代碼, 你們能夠想一下爲何會出現這種錯誤.
package main import ( "fmt" "math/rand" "sync" "time" ) var rwMutex sync.RWMutex func readGo(in <-chan int, idx int){ for { rwMutex.RLock() // 以讀模式加鎖 num := <- in fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num) rwMutex.RUnlock() // 以讀模式解鎖 } } func writeGo(out chan<- int, idx int){ for { // 生成隨機數 num := rand.Intn(1000) rwMutex.Lock() // 以寫模式加鎖 out <- num fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num) time.Sleep(time.Millisecond * 300) rwMutex.Unlock() // 以寫模式解鎖 } } func main() { // 隨機數種子 rand.Seed(time.Now().UnixNano()) ch := make(chan int) for i:=0; i<5; i++ { go readGo(ch, i+1) } for i:=0; i<5; i++ { go writeGo(ch, i+1) } time.Sleep(time.Second * 3) }
上面代碼的結果會一直阻塞, 沒有輸出, 你們能夠簡單想一下出現這種狀況的緣由是什麼?
代碼看得仔細的應該均可以看出來, 這上面的代碼中, 好比說讀操做先搶到了CPU, 運行代碼 rwMutex.RLock()
讀加鎖, 而後運行到 num := <- in
時, 會要求寫端同時在線, 不然就會發生阻塞, 可是這時寫端不可能在線, 由於讀加鎖了. 因此就會一直在這發生阻塞.
這也就是咱們以前在死鎖部分中提到的 隱性死鎖 (不報錯).
那麼解決辦法有兩種: 一種是不混用, 另外一種是使用條件變量(以後會講到)
咱們先看一下不混用讀寫鎖與channel的解決辦法(只使用讀寫鎖, 若是隻使用channel達不到想要的效果):
package main import ( "fmt" "math/rand" "sync" "time" ) var rwMutex2 sync.RWMutex // 鎖只有一把, 兩個屬性: r w var value int // 定義全局變量, 模擬共享數據 func readGo2(in <-chan int, idx int){ for { rwMutex2.RLock() // 以讀模式加鎖 num := value fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num) rwMutex2.RUnlock() // 以讀模式解鎖 } } func writeGo2(out chan<- int, idx int){ for { // 生成隨機數 num := rand.Intn(1000) rwMutex2.Lock() // 以寫模式加鎖 value = num fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num) time.Sleep(time.Millisecond * 300) rwMutex2.Unlock() // 以寫模式解鎖 } } func main() { // 隨機數種子 rand.Seed(time.Now().UnixNano()) ch := make(chan int) for i:=0; i<5; i++ { go readGo2(ch, i+1) } for i:=0; i<5; i++ { go writeGo2(ch, i+1) } time.Sleep(time.Second * 3) }
結果:
...... 第5個寫go程, 寫入: 363 ----第4個讀go程, 讀入: 363 ----第4個讀go程, 讀入: 363 ----第4個讀go程, 讀入: 363 ----第4個讀go程, 讀入: 363 ----第2個讀go程, 讀入: 363 第5個寫go程, 寫入: 726 ----第5個讀go程, 讀入: 726 ----第4個讀go程, 讀入: 726 ----第2個讀go程, 讀入: 726 ----第1個讀go程, 讀入: 726 ----第3個讀go程, 讀入: 726 第1個寫go程, 寫入: 764 ----第5個讀go程, 讀入: 764 ----第2個讀go程, 讀入: 764 ----第5個讀go程, 讀入: 764 ----第1個讀go程, 讀入: 764 ----第3個讀go程, 讀入: 764 ......
處於讀鎖定狀態, 那麼針對它的寫鎖定操做將永遠不會成功, 且相應的goroutine也會被一直阻塞, 由於它們是互斥的.
總結: 讀寫鎖控制下的多個寫操做之間都是互斥的, 而且寫操做與讀操做之間也都是互斥的. 可是多個讀操做之間不存在互斥關係.
從互斥鎖和讀寫鎖的源碼能夠看出, 它們是同源的. 讀寫鎖的內部用互斥鎖來實現寫鎖定操做之間的互斥. 能夠把讀寫鎖看做是互斥鎖的一種擴展.
在講條件變量以前, 咱們先來回顧一下以前的生產者消費者模型:
package main import ( "fmt" "time" ) func producer(out chan <- int) { for i:=0; i<5; i++ { fmt.Println("生產者, 生產: ", i) out <- i } close(out) } func consumer(in <- chan int) { for num := range in { fmt.Println("---消費者, 消費: ", num) } } func main() { ch := make(chan int) go producer(ch) go consumer(ch) time.Sleep(5 * time.Second) }
以前都是一個生產者與一個消費者, 那麼若是是多個生產者與多個消費者的狀況呢?
package main import ( "fmt" "math/rand" "time" ) func producer(out chan <- int, idx int) { for i:=0; i<10; i++ { num := rand.Intn(800) fmt.Printf("第%d個生產者, 生產: %d\n", idx, num) out <- num } } func consumer(in <- chan int, idx int) { for num := range in { fmt.Printf("---第%d個消費者, 消費: %d\n", idx, num) } } func main() { ch := make(chan int) rand.Seed(time.Now().UnixNano()) for i := 0; i < 5; i++ { go producer(ch, i + 1) } for i := 0; i < 5; i++ { go consumer(ch, i + 1) } time.Sleep(5 * time.Second) }
若是是按照上面的代碼寫的話, 就又會出現以前的錯誤.
上面已經說過了, 解決這種錯誤有兩種方法: 用鎖或者用條件變量.
此次就用條件變量來解決一下.
首先, 強調一下. 條件變量自己不是鎖!! 可是常常與鎖結合使用!!
還有另一個問題, 若是消費者比生產者多, 倉庫中就會出現沒有數據的狀況. 咱們須要不斷的經過循環來判斷倉庫隊列中是否有數據, 這樣會形成cpu的浪費. 反之, 若是生產者比較多, 倉庫很容易滿, 滿了就不能繼續添加數據, 也須要循環判斷倉庫滿這一事件, 一樣也會形成cpu的浪費.
咱們但願當倉庫滿時, 生產者中止生產, 等待消費者消費; 同理, 若是倉庫空了, 咱們但願消費者停下來等待生產者生產. 爲了達到這個目的, 這裏就引入了條件變量. (須要注意, 若是倉庫隊列用channel, 是不存在以上狀況的, 由於channel被填滿後就阻塞了, 或者channel中沒有數據也會阻塞).
條件變量: 條件變量的做用並不保證在同一時刻僅有一個協程(線程)訪問某個共享的數據資源, 而是在對應的共享數據的狀態發生變化時, 通知阻塞在某個條件上的協程(線程). 條件變量不是鎖, 在併發中不能達到同步的目的, 所以條件變量老是與鎖一塊使用.
例如, 咱們上面說的, 若是倉庫隊列滿了, 咱們可使用條件變量讓生產者對應的goroutine暫停(阻塞), 可是當消費者消費了某個產品後, 倉庫就再也不滿了, 應該喚醒(發送通知給)阻塞的生產者goroutine繼續生產產品.
Go標準庫中的 sync.Cond
類型表明了條件變量. 條件變量要與鎖(互斥鎖或者讀寫鎖)一塊兒使用. 成員變量L表明與條件變量搭配使用的鎖.
type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker }
對應的有3個經常使用的方法, Wait
, Signal
, Broadcast
該函數的做用可概括爲以下三點:
Wait()
函數返回時, 解除阻塞並從新獲取互斥鎖. 至關於cond.L.Lock()單發通知, 給一個正等待(阻塞)在該條件變量上的goroutine(線程)發送通知.
廣播通知, 給正在等待(阻塞)在該條件變量上的全部goroutine(線程)發送通知
下面, 咱們就用條件變量來寫一個生產者消費者模型.
package main import ( "fmt" "math/rand" "sync" "time" ) var cond sync.Cond // 定義全局變量 func producer2(out chan<- int, idx int) { for { // 先加鎖 cond.L.Lock() // 判斷緩衝區是否滿 for len(out) == 3 { cond.Wait() } num := rand.Intn(800) out <- num fmt.Printf("第%d個生產者, 生產: %d\n", idx, num) // 訪問公共區結束, 而且打印結束, 解鎖 cond.L.Unlock() // 喚醒阻塞在條件變量上的 消費者 cond.Signal() } } func consumer2(in <- chan int, idx int) { for { // 先加鎖 cond.L.Lock() // 判斷緩衝區是否爲 空 for len(in) == 0 { cond.Wait() } num := <- in fmt.Printf("---第%d個消費者, 消費: %d\n", idx, num) // 訪問公共區結束後, 解鎖 cond.L.Unlock() // 喚醒阻塞在條件變量上的生產者 cond.Signal() } } func main() { // 設置隨機種子數 rand.Seed(time.Now().UnixNano()) ch := make(chan int, 3) cond.L = new(sync.Mutex) for i := 0; i < 5; i++ { go producer2(ch, i + 1) } for i := 0; i < 5; i++ { go consumer2(ch, i + 1) } time.Sleep(time.Second * 1) }
1)定義 ch
做爲隊列, 生產者產生數據保存至隊列中, 最多存儲3個數據, 消費者從中取出數據模擬消費
2)條件變量要與鎖一塊兒使用, 這裏定義全局條件變量 cond
, 它有一個屬性: L Locker
, 是一個互斥鎖.
3)開啓5個消費者go程, 開啓5個生產者go程.
4)producer2
生產者, 在該方法中開啓互斥鎖, 保證數據完整性. 而且判斷隊列是否滿, 若是已滿, 調用 cond.Wait()
讓該goroutine阻塞. 當消費者取出數據後執行 cond.Signal()
, 會喚醒該goroutine, 繼續產生數據.
5)consumer2
消費者, 一樣開啓互斥鎖, 保證數據完整性. 判斷隊列是否爲空, 若是爲空, 調用 cond.Wait()
使得當前goroutine阻塞. 當生產者產生數據並添加到隊列, 執行 cond.Signal()
喚醒該goroutine.
條件變量使用流程:
for len(ch) == cap(ch) { cond.Wait() } 或者 for len(ch) == 0 { cond.Wait() } 1) 阻塞 2)解鎖 3)加鎖
歡迎訪問個人我的網站:
李培冠博客:lpgit.com