Golang 鎖和條件變量

前言

前面咱們爲了解決go程同步的問題咱們使用了channel, 可是go也提供了傳統的同步工具.git

它們都在go的標準庫代碼包 syncsync/atomic 中.編程

下面咱們來看一下鎖的應用.併發

什麼是鎖呢? 就是某個協程(線程)在訪問某個資源時先鎖住, 防止其餘協程的訪問, 等訪問完畢解鎖後其餘協程再來加鎖進行訪問.函數

這和咱們生活中加鎖使用公共資源類似, 例如: 公共衛生間.工具

死鎖

死鎖是指兩個或者兩個以上的進程在執行過程當中, 因爲競爭資源或者因爲彼此通訊而形成的一種阻塞的現象, 若無外力做用, 它們都將沒法推動下去. 此時稱系統處於死鎖狀態系統產生了死鎖.性能

死鎖不是鎖的一種! 它是一種錯誤使用鎖致使的現象.網站

產生死鎖的幾種狀況

  • 單go程本身死鎖
  • go程間channel訪問順序致使死鎖
  • 多go程, 多channel交叉死鎖
  • 將 互斥鎖、讀寫鎖與channel混用 -- 隱性死鎖(在 讀寫鎖 講到)

單go程本身死鎖 示例代碼:atom

package main

import "fmt"

// 單go程本身死鎖
func main() {
	ch := make(chan int)
	ch <- 789
	num := <- ch
	fmt.Println(num)
}

上面這段乍一看有可能會以爲沒有什麼問題, 但是仔細一看就會發現這個 ch 是一個無緩衝的channel, 當789寫入緩衝區時, 這時讀端尚未準備好. 因此, 寫端 會發生阻塞, 後面的代碼再也不運行.操作系統

因此能夠得出一個結論: channel應該在至少2個及以上的go程進行通訊, 不然會形成死鎖.線程

咱們繼續看 go程間channel訪問順序致使死鎖 的例子:

package main

import "fmt"

// go程間channel訪問順序致使死鎖
func main(){
	ch := make(chan int)
	num := <- ch
	fmt.Println("num = ", num)
	go func() {
		ch <- 789
	}()
}

在代碼運行到 num := <- ch 時, 發生阻塞, 而且下面的代碼不會執行, 因此發生死鎖.

正確應該這樣寫:

package main

import "fmt"

func main(){
	ch := make(chan int)
	go func() {
		ch <- 789
	}()
	num := <- ch
	fmt.Println("num = ", num)
}

因此, 在使用channel一端讀(寫)時, 要保證另外一端寫(讀)操做有機會執行.

咱們再來看下 多go程, 多channel交叉死鎖 的示例代碼:

package main

import "fmt"

// 多go程, 多channel交叉死鎖
func main(){
	ch1 := make(chan int)
	ch2 := make(chan int)

	go func() {
		for {
			select {
			case num := <- ch1:
				ch2 <- num
			}
		}
	}()

	for {
		select {
		case num := <- ch2:
			ch1 <- num
		}
	}
}

互斥鎖(互斥量)

每一個資源都對應於一個可稱爲"互斥鎖"的標記, 這個標記用來保證在任意時刻, 只能有一個協程(線程)訪問該資源, 其它的協程只能等待.

互斥鎖是傳統併發編程對共享資源進行訪問控制的主要手段, 它由標準庫 sync 中的 Mutex 結構體類型表示.

sync.Mutex 類型只有兩個公開的指針方法, LockUnlock.

Lock鎖定當前的共享資源, Unlock進行解鎖.

在使用互斥鎖時, 必定要注意, 對資源操做完成後, 必定要解鎖, 不然會出現流程執行異常, 死鎖等問題, 一般藉助defer. 鎖定後, 當即使用 defer 語句保證互斥鎖及時解鎖. 以下所示:

var mutex sync.Mutex  // 定義互斥鎖變量: mutex

func write() {
    mutex.Lock()
    defer mutex.Unlock()
}

咱們先來回顧一下channel是怎麼樣完成數據同步的.

package main

import (
	"fmt"
	"time"
)

var ch = make(chan int)

func printer(str string) {
	for _, s := range str {
		fmt.Printf("%c ", s)
		time.Sleep(time.Millisecond * 300)
	}
}

func person1() {        // 先
	printer("hello")
	ch <- 666
}

func person2() {        // 後
	<-ch
	printer("world")
}

func main() {
	go person1()
	go person2()
	time.Sleep(5 * time.Second)
}

一樣可使用互斥鎖來解決, 以下所示:

package main

import (
	"fmt"
	"sync"
	"time"
)

// 使用傳統的 "鎖" 完成同步  -- 互斥鎖
var mutex sync.Mutex  // 建立一個互斥鎖(互斥量), 新建的互斥鎖狀態爲0 -> 未加鎖狀態. 鎖只有一把.
func printer(str string) {
	mutex.Lock()        // 訪問共享數據以前, 加鎖
	for _, s := range str {
		fmt.Printf("%c ", s)
		time.Sleep(time.Millisecond * 300)
	}
	mutex.Unlock()  // 共享數據訪問結束, 解鎖
}

func person1() {
	printer("hello")
}

func person2() {
	printer("world")
}

func main() {
	go person1()
	go person2()
	time.Sleep(5 * time.Second)
}

這種鎖爲建議鎖: 操做系統提供, 建議你在編程時使用.

強制鎖只會在底層操做系統本身用到, 咱們在寫代碼時用不到.

person1與person2兩個go程共同訪問共享數據, 因爲CPU調度隨機, 須要對 共享數據訪問順序加以限定(同步).

建立mutex(互斥鎖), 訪問共享數據以前, 加鎖; 訪問結束, 解鎖.

在person1的go程加鎖期間, person2的go程加鎖會失敗 --> 阻塞.

直至person1的go程解鎖mutext, person2從阻塞處, 恢復執行.

讀寫鎖

互斥鎖的本質是當一個goroutine訪問的時候, 其它goroutine都不能訪問. 這樣在資源同步, 避免競爭的同時, 也下降了程序的併發性能, 程序由原來的並行執行變成了串行執行.

其實, 當咱們對一個不會變化的數據只作操做的話, 是不存在資源競爭的問題的. 由於數據是不變的, 無論怎麼讀取, 多少goroutine同時讀取, 都是能夠的.

因此問題不是出在上, 主要是修改, 也就是. 修改的數據要同步, 這樣其它goroutine才能夠感知到. 因此真正的互斥應該是讀取和修改、修改和修改之間, 讀和讀是沒有互斥操做的必要的.

所以, 衍生出另一種鎖, 叫作讀寫鎖.

讀寫鎖可讓多個讀操做併發, 同時讀取, 可是對於寫操做是徹底互斥的. 也就是說, 當一個goroutine進行寫操做的時候, 其它goroutine既不能進行讀操做, 也不能進行寫操做.

Go中的讀寫鎖由結構體類型 sync.RWMutex 表示. 此類型的方法集合中包含兩對方法:

一組是對寫操做的鎖定和解鎖, 簡稱爲: 寫鎖定寫解鎖.

func (*RWMutex) Lock()
func (*RWMutex) Unlock()

另外一組表示對讀操做的鎖定和解鎖, 簡稱爲: 讀鎖定讀解鎖.

func (*RWMutex) RLock()
func (*RWMutex) RUnlock()

咱們先來看一下沒有使用讀寫鎖的狀況下會發生什麼:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func readGo(in <-chan int, idx int){
	for {
		num := <- in
		fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num)
	}

}

func writeGo(out chan<- int, idx int){
	for {
		// 生成隨機數
		num := rand.Intn(1000)
		out <- num
		fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num)
		time.Sleep(time.Millisecond * 300)
	}

}

func main() {
	// 隨機數種子
	rand.Seed(time.Now().UnixNano())

	ch := make(chan int)

	for i:=0; i<5; i++ {
		go readGo(ch, i+1)
	}

	for i:=0; i<5; i++ {
		go writeGo(ch, i+1)
	}
	time.Sleep(time.Second * 3)
}

結果(截取部分):

......
第4個寫go程, 寫入: 763
----第1個讀go程, 讀入: 998
第1個寫go程, 寫入: 238
第3個寫go程, 寫入: 998
......
第5個寫go程, 寫入: 607
第4個寫go程, 寫入: 151
----第1個讀go程, 讀入: 992
----第2個讀go程, 讀入: 151
......

經過結果咱們能夠知道, 當寫入 763 時, 因爲建立的是無緩衝的channel, 應該先把這個數讀出來, 而後才能夠繼續寫數據, 可是結果顯示, 讀到的是 998, 998 在下面才顯示寫入啊, 怎麼會先讀出來呢? 出現這個狀況的問題在於, 當運行到 num := <- in 時, 已經把 998 寫進去了, 可是這個時候尚未來得及打印, 就失去了CPU, 失去CPU以後, 緩衝區中的數據就會被覆蓋掉, 這時被 763 所覆蓋.

這是第一個錯誤現象, 咱們再來看一下第二個錯誤現象.

既然都是對數據進行讀操做, 相鄰的讀入應該都是相同的數, 好比說----第1個讀go程, 讀入: 992 ----第2個讀go程, 讀入: 151, 這兩個應該讀到的數都是同樣的, 可是結果顯示倒是不一樣的.

那麼加了讀寫鎖以後, 先來看一下錯誤代碼, 你們能夠想一下爲何會出現這種錯誤.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var rwMutex sync.RWMutex

func readGo(in <-chan int, idx int){
	for {
		rwMutex.RLock()    // 以讀模式加鎖
		num := <- in
		fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num)
		rwMutex.RUnlock()    // 以讀模式解鎖
	}
}

func writeGo(out chan<- int, idx int){

	for {
		// 生成隨機數
		num := rand.Intn(1000)
		rwMutex.Lock()    // 以寫模式加鎖
		out <- num
		fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num)
		time.Sleep(time.Millisecond * 300)
		rwMutex.Unlock()    // 以寫模式解鎖
	}
}

func main() {
	// 隨機數種子
	rand.Seed(time.Now().UnixNano())

	ch := make(chan int)

	for i:=0; i<5; i++ {
		go readGo(ch, i+1)
	}

	for i:=0; i<5; i++ {
		go writeGo(ch, i+1)
	}
	time.Sleep(time.Second * 3)
}

上面代碼的結果會一直阻塞, 沒有輸出, 你們能夠簡單想一下出現這種狀況的緣由是什麼?

代碼看得仔細的應該均可以看出來, 這上面的代碼中, 好比說讀操做先搶到了CPU, 運行代碼 rwMutex.RLock() 讀加鎖, 而後運行到 num := <- in 時, 會要求寫端同時在線, 不然就會發生阻塞, 可是這時寫端不可能在線, 由於讀加鎖了. 因此就會一直在這發生阻塞.

這也就是咱們以前在死鎖部分中提到的 隱性死鎖 (不報錯).

那麼解決辦法有兩種: 一種是不混用, 另外一種是使用條件變量(以後會講到)

咱們先看一下不混用讀寫鎖與channel的解決辦法(只使用讀寫鎖, 若是隻使用channel達不到想要的效果):

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var rwMutex2 sync.RWMutex    // 鎖只有一把, 兩個屬性: r w

var value int    // 定義全局變量, 模擬共享數據

func readGo2(in <-chan int, idx int){
	for {
		rwMutex2.RLock()    // 以讀模式加鎖
		num := value
		fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num)
		rwMutex2.RUnlock()    // 以讀模式解鎖
	}
}

func writeGo2(out chan<- int, idx int){
	for {
		// 生成隨機數
		num := rand.Intn(1000)
		rwMutex2.Lock()    // 以寫模式加鎖
		value = num
		fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num)
		time.Sleep(time.Millisecond * 300)
		rwMutex2.Unlock()    // 以寫模式解鎖
	}
}

func main() {
	// 隨機數種子
	rand.Seed(time.Now().UnixNano())

	ch := make(chan int)

	for i:=0; i<5; i++ {
		go readGo2(ch, i+1)
	}

	for i:=0; i<5; i++ {
		go writeGo2(ch, i+1)
	}
	time.Sleep(time.Second * 3)
}

結果:

......
第5個寫go程, 寫入: 363
----第4個讀go程, 讀入: 363
----第4個讀go程, 讀入: 363
----第4個讀go程, 讀入: 363
----第4個讀go程, 讀入: 363
----第2個讀go程, 讀入: 363
第5個寫go程, 寫入: 726
----第5個讀go程, 讀入: 726
----第4個讀go程, 讀入: 726
----第2個讀go程, 讀入: 726
----第1個讀go程, 讀入: 726
----第3個讀go程, 讀入: 726
第1個寫go程, 寫入: 764
----第5個讀go程, 讀入: 764
----第2個讀go程, 讀入: 764
----第5個讀go程, 讀入: 764
----第1個讀go程, 讀入: 764
----第3個讀go程, 讀入: 764
......

處於讀鎖定狀態, 那麼針對它的寫鎖定操做將永遠不會成功, 且相應的goroutine也會被一直阻塞, 由於它們是互斥的.

總結: 讀寫鎖控制下的多個寫操做之間都是互斥的, 而且寫操做與讀操做之間也都是互斥的. 可是多個讀操做之間不存在互斥關係.

從互斥鎖和讀寫鎖的源碼能夠看出, 它們是同源的. 讀寫鎖的內部用互斥鎖來實現寫鎖定操做之間的互斥. 能夠把讀寫鎖看做是互斥鎖的一種擴展.

條件變量

在講條件變量以前, 咱們先來回顧一下以前的生產者消費者模型:

package main

import (
	"fmt"
	"time"
)

func producer(out chan <- int) {
	for i:=0; i<5; i++ {
		fmt.Println("生產者, 生產: ", i)
		out <- i
	}
	close(out)
}

func consumer(in <- chan int) {
	for num := range in {
		fmt.Println("---消費者, 消費: ", num)
	}
}

func main() {
	ch := make(chan int)
	go producer(ch)
	go consumer(ch)
	time.Sleep(5 * time.Second)
}

以前都是一個生產者與一個消費者, 那麼若是是多個生產者與多個消費者的狀況呢?

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func producer(out chan <- int, idx int) {
	for i:=0; i<10; i++ {
		num := rand.Intn(800)
		fmt.Printf("第%d個生產者, 生產: %d\n", idx, num)
		out <- num
	}
}

func consumer(in <- chan int, idx int) {
	for num := range in {
		fmt.Printf("---第%d個消費者, 消費: %d\n", idx, num)
	}
}

func main() {
	ch := make(chan int)
	rand.Seed(time.Now().UnixNano())
	for i := 0; i < 5; i++ {
		go producer(ch, i + 1)
	}
	for i := 0; i < 5; i++ {
		go consumer(ch, i + 1)
	}
	time.Sleep(5 * time.Second)
}

若是是按照上面的代碼寫的話, 就又會出現以前的錯誤.

上面已經說過了, 解決這種錯誤有兩種方法: 用鎖或者用條件變量.

此次就用條件變量來解決一下.

首先, 強調一下. 條件變量自己不是鎖!! 可是常常與鎖結合使用!!

還有另一個問題, 若是消費者比生產者多, 倉庫中就會出現沒有數據的狀況. 咱們須要不斷的經過循環來判斷倉庫隊列中是否有數據, 這樣會形成cpu的浪費. 反之, 若是生產者比較多, 倉庫很容易滿, 滿了就不能繼續添加數據, 也須要循環判斷倉庫滿這一事件, 一樣也會形成cpu的浪費.

咱們但願當倉庫滿時, 生產者中止生產, 等待消費者消費; 同理, 若是倉庫空了, 咱們但願消費者停下來等待生產者生產. 爲了達到這個目的, 這裏就引入了條件變量. (須要注意, 若是倉庫隊列用channel, 是不存在以上狀況的, 由於channel被填滿後就阻塞了, 或者channel中沒有數據也會阻塞).

條件變量: 條件變量的做用並不保證在同一時刻僅有一個協程(線程)訪問某個共享的數據資源, 而是在對應的共享數據的狀態發生變化時, 通知阻塞在某個條件上的協程(線程). 條件變量不是鎖, 在併發中不能達到同步的目的, 所以條件變量老是與鎖一塊使用.

例如, 咱們上面說的, 若是倉庫隊列滿了, 咱們可使用條件變量讓生產者對應的goroutine暫停(阻塞), 可是當消費者消費了某個產品後, 倉庫就再也不滿了, 應該喚醒(發送通知給)阻塞的生產者goroutine繼續生產產品.

Go標準庫中的 sync.Cond 類型表明了條件變量. 條件變量要與鎖(互斥鎖或者讀寫鎖)一塊兒使用. 成員變量L表明與條件變量搭配使用的鎖.

type Cond struct {
    noCopy noCopy
    L Locker
    notify notifyList
    checker copyChecker
}

對應的有3個經常使用的方法, Wait, Signal, Broadcast

  1. func (c *Cond) Wait()

該函數的做用可概括爲以下三點:

  • 阻塞等待條件變量知足
  • 釋放已掌握的互斥鎖至關於cond.L.Unlock()。注意: 兩步爲一個原子操做(第一步與第二步操做不可再分).
  • 當被喚醒時, Wait() 函數返回時, 解除阻塞並從新獲取互斥鎖. 至關於cond.L.Lock()
  1. func (c *Cond) Signal()

單發通知, 給一個正等待(阻塞)在該條件變量上的goroutine(線程)發送通知.

  1. func (c *Cond) Broadcast()

廣播通知, 給正在等待(阻塞)在該條件變量上的全部goroutine(線程)發送通知

下面, 咱們就用條件變量來寫一個生產者消費者模型.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var cond sync.Cond  // 定義全局變量

func producer2(out chan<- int, idx int) {
	for {
		// 先加鎖
		cond.L.Lock()
		// 判斷緩衝區是否滿
		for len(out) == 3 {
			cond.Wait()
		}
		num := rand.Intn(800)
		out <- num
		fmt.Printf("第%d個生產者, 生產: %d\n", idx, num)
		// 訪問公共區結束, 而且打印結束, 解鎖
		cond.L.Unlock()
		// 喚醒阻塞在條件變量上的 消費者
		cond.Signal()
	}
}

func consumer2(in <- chan int, idx int) {
	for {
		// 先加鎖
		cond.L.Lock()
		// 判斷緩衝區是否爲 空
		for len(in) == 0 {
			cond.Wait()
		}
		num := <- in
		fmt.Printf("---第%d個消費者, 消費: %d\n", idx, num)
		// 訪問公共區結束後, 解鎖
		cond.L.Unlock()
		// 喚醒阻塞在條件變量上的生產者
		cond.Signal()
	}
}

func main() {
	// 設置隨機種子數
	rand.Seed(time.Now().UnixNano())

	ch := make(chan int, 3)

	cond.L = new(sync.Mutex)

	for i := 0; i < 5; i++ {
		go producer2(ch, i + 1)
	}
	for i := 0; i < 5; i++ {
		go consumer2(ch, i + 1)
	}
	time.Sleep(time.Second * 1)
}

1)定義 ch 做爲隊列, 生產者產生數據保存至隊列中, 最多存儲3個數據, 消費者從中取出數據模擬消費

2)條件變量要與一塊兒使用, 這裏定義全局條件變量 cond, 它有一個屬性: L Locker, 是一個互斥鎖.

3)開啓5個消費者go程, 開啓5個生產者go程.

4)producer2 生產者, 在該方法中開啓互斥鎖, 保證數據完整性. 而且判斷隊列是否滿, 若是已滿, 調用 cond.Wait() 讓該goroutine阻塞. 當消費者取出數據後執行 cond.Signal(), 會喚醒該goroutine, 繼續產生數據.

5)consumer2 消費者, 一樣開啓互斥鎖, 保證數據完整性. 判斷隊列是否爲空, 若是爲空, 調用 cond.Wait() 使得當前goroutine阻塞. 當生產者產生數據並添加到隊列, 執行 cond.Signal() 喚醒該goroutine.

條件變量使用流程:

  1. 建立條件變量: var cond sync.Cond
  2. 指定條件變量用的: cond.L = new(sync.Mutex)
  3. 給公共區加鎖(互斥鎖): cond.L.Lock()
  4. 判斷是否到達阻塞條件(緩衝區滿/空) --> for循環判斷
    for len(ch) == cap(ch) { cond.Wait() }
    或者 for len(ch) == 0 { cond.Wait() }
    1) 阻塞 2)解鎖 3)加鎖
  5. 訪問公共區 --> 讀、寫數據、打印
  6. 解鎖條件變量用的: cond.L.Unlock()
  7. 喚醒阻塞在條件變量上的對端: cond.Signal() cond.Broadcast()

李培冠博客

歡迎訪問個人我的網站:

李培冠博客:lpgit.com

相關文章
相關標籤/搜索