goroutiine同步/channel、互斥鎖、讀寫鎖、死鎖/條件變量

 

1. Goroutine同步【數據同步】

  • 爲何須要goroutine同步html

  • gorotine同步概念、以及同步的幾種方式node

1.1 爲何須要goroutine同步

package main

import (
	"fmt"
	"sync"
)

var A = 10
var wg = sync.WaitGroup{}

func Add(){
    defer wg.Done()
    for i:=0;i<1000000;i++{
        A += 1
    }
}

func main() {
	wg.Add(2)
    go Add()
    go Add()
	wg.Wait()
	fmt.Println(A)
}
# output:
1061865  # 每運行一次結果都不同,可是都不是咱們預期的結果2000000

 多goroutine【多任務】,有共享資源,且多goroutine修改共享資源,出現數據不安全問題【數據錯誤】,保證數據安全一致,須要goroutine同步瀏覽器

1.2 goroutine同步

goroutine按照約定的順序執行,解決數據不安全問題。緩存

 

1.3 goroutine同步方式

  • channel 【csp模型】安全

  • 互斥鎖 【傳統同步機制】併發

  • 讀寫鎖 【傳統同步機制】svg

  • 條件變量 【傳統同步機制】函數

 

2. 傳統同步機制

2.1 互斥鎖

2.1.1 特色

加鎖成功則操做資源,加鎖失敗則等待直至鎖加鎖成功----全部的goroutine互斥,一個獲得鎖其餘所有等待性能

解決了數據安全問題,下降了程序的性能,適用讀寫不太頻繁的場景單元測試

2.1.2 鎖顆粒度問題

顆粒度是指,加鎖的範圍,哪裏使用資源哪裏加鎖,儘量減小加鎖範圍

單元測試基本使用流程

  • 新建單元測試文件

  • 編寫測試案例

  • gotest運行生成對應的prof文件

  • go tool 查看生成的prof文件

package main_test
import (
	"fmt"
	"sync"
	"testing"
)

var A = 10
var wg = sync.WaitGroup{}
var mux sync.Mutex

func Add(){
	defer wg.Done()
	for i:=0;i<1000000;i++{
        mux.Lock()
		A += 1
        mux.Unlock()
	}
}
/*
// 加大鎖顆粒度
func Add(){
	defer wg.Done()
	mux.Lock()
	for i:=0;i<1000000;i++{
		A += 1
	}
	mux.Unlock()
}*/
// 單元測試格式, 
func TestMux(t *testing.T) {
	wg.Add(2)
	go Add()
	go Add()
	wg.Wait()
	fmt.Println(A)
}

 

# 生成prof文件,-cpuprofile 參數指定生成什麼類型的prof cpu.prof指定生成profile文件名字
go test mutex_test.go -cpuprofile cpu.prof

# 查看生成的prof文件,pprof 指定查看的文件類型
go tool pprof cpu.prof

# 下面是輸出信息
Type: cpu
Time: Jul 10, 2019 at 2:38pm (CST)
Duration: 201.43ms, Total samples = 80ms (39.72%)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top  # 這裏使用top命令查看測試中cpu使用的信息
Showing nodes accounting for 80ms, 100% of 80ms total
      flat  flat%   sum%        cum   cum%
      60ms 75.00% 75.00%       60ms 75.00%  sync.(*Mutex).Unlock
      20ms 25.00%   100%       20ms 25.00%  sync.(*Mutex).Lock
         0     0%   100%       80ms   100%  command-line-arguments_test.Add
(pprof) svg  #svg 保存可視化文件,可使用瀏覽器可視化查看
(pprof) list Add  # 查看對應函數的詳細時間消耗信息

 

注意

當前得測試案例,是程序設計的錯誤【這種快速計算性的,一個goroutine已經能夠勝任,更多時候讀寫分離,互斥鎖不適合這種頻繁讀寫場景】,不是鎖使用的錯誤

2.1.3 sync.once 源碼閱讀

// Once is an object that will perform exactly one action.
type Once struct {
	m    Mutex
	done uint32  // 標識是否已執行過任務,若是設置爲1 則說明任務已執行過了
}

// DO 調用用戶執行的方法,僅調用一次
func (o *Once) Do(f func()) {
    // 原子操做判斷done,已被置成1,若是done是1 說明方法已被執行,直接返回
	if atomic.LoadUint32(&o.done) == 1 {
		return
	}
   
    // 加鎖
	o.m.Lock()
	defer o.m.Unlock()
    // done爲0則開始,調用用戶函數方法
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

 

2.2 讀寫鎖

讀寫互斥,讀者能夠重複加鎖。寫加鎖須要等待全部讀者解鎖,寫加鎖期間全部讀者wait【寫優先級高於讀,讀寫同時加鎖寫着加鎖先成功】

適用寫少讀多的場景,相比互斥鎖能夠必定程度提升程序性能

  • 僅有讀者

 

 

  • 寫着加鎖更新數據

 

 

 

 

 

 

 

2.3 條件變量

條件變量的做用並不保證在同一時刻僅有一個協程(線程)訪問某個共享的數據資源,而是在對應的共享數據的狀態發生變化時,通知阻塞在某個條件上的協程(線程)。條件變量不是鎖,在併發中不能達到同步的目的,所以條件變量老是與鎖一塊使用能夠認爲條件變量是對鎖的一種補充,某種程度上提升鎖機制帶來的效率低下的問題

2.3.1 條件變量API介紹

  • 建立條件變量 【建立後不能被被拷貝】

// 參數傳遞一把鎖,返回指針類型
cond:=sync.NewCond(&sync.Mutex{})

 Cond.Wait() ,阻塞再條件變量上讓出cup資源

// 阻塞在條件變量上面,會把當前gorotine掛載到Cond隊列上面
cond.Wait()
// 1. 釋放鎖,並把本身掛載到通知隊列,阻塞等待【原子操做】
// 2. 接收到喚醒信號,嘗試獲取鎖
// 3. 獲取鎖成功則  返回

 Cond.Signal() 隨機喚醒一個阻塞在條件變量上的goroutine

// 喚醒阻塞在條件變量上的goroutine,處於wait【調用了cond.wait】狀態的goroutine
// 隨機喚醒通知隊列上的一個線程,並從通知隊列移除
cond.Signal()  // 發送喚醒信號

Cond.Broadcast() 廣播通知全部處於wait狀態的goroutine

// 廣播通知全部處於wait狀態的goroutine
// 通知通知隊列上的全部的gorotine,而且把全部的goroutine從通知隊列 取下來
cond.Broadcast()

2.3.2 條件變量在生產者消費模型中使用

  • 潛在bug-->deadlock【生產者消費者都死鎖在cond.wait,沒有其餘的goroutine喚醒】

package main
import "fmt"
import "sync"
import "math/rand"
import "time"

var cond sync.Cond             // 建立全局條件變量

// 生產者
func producer(out chan<- int, idx int) {
   for {
      cond.L.Lock()           	// 條件變量對應互斥鎖加鎖
      for len(out) == 3 {          	// 產品區滿 等待消費者消費
         cond.Wait()             	// 掛起當前協程, 等待條件變量知足,被消費者喚醒
      }
      num := rand.Intn(1000) 	// 產生一個隨機數
      out <- num             	// 寫入到 channel 中 (生產)
      fmt.Printf("%dth 生產者,產生數據 %3d, 公共區剩餘%d個數據\n", idx, num, len(out))
      cond.L.Unlock()             	// 生產結束,解鎖互斥鎖
      cond.Signal()           	// 喚醒 阻塞的 消費者
      time.Sleep(time.Second)       // 生產完休息一會,給其餘協程執行機會, 解決了死鎖機會的下降
   }
}
//消費者
func consumer(in <-chan int, idx int) {
   for {
      cond.L.Lock()           	// 條件變量對應互斥鎖加鎖(與生產者是同一個)
      for len(in) == 0 {      	// 產品區爲空 等待生產者生產
         cond.Wait()             	// 掛起當前協程, 等待條件變量知足,被生產者喚醒
      }
      num := <-in                	// 將 channel 中的數據讀走 (消費)
      fmt.Printf("---- %dth 消費者, 消費數據 %3d,公共區剩餘%d個數據\n", idx, num, len(in))
      cond.L.Unlock()             	// 消費結束,解鎖互斥鎖
      cond.Signal()           	// 喚醒 阻塞的 生產者
      time.Sleep(time.Millisecond * 500)    	//消費完 休息一會,給其餘協程執行機會, 解決了死鎖機會的下降
   }
}
func main() {
   rand.Seed(time.Now().UnixNano())  // 設置隨機數種子
   quit := make(chan bool)           // 建立用於結束通訊的 channel

   product := make(chan int, 3)      // 產品區(公共區)使用channel 模擬
   cond.L = new(sync.Mutex)          // 建立互斥鎖和條件變量

   for i := 0; i < 5; i++ {          // 5個消費者
      go producer(product, i+1)
   }
   for i := 0; i < 3; i++ {          // 3個生產者
      go consumer(product, i+1)
   }
   <-quit                         	// 主協程阻塞 不結束
}
  • deadlock緣由剖析【極值法】

    1. 極端處理: 1個生產者 2 消費 channle 緩存1

    2. 因爲極端一些狀況,會致使全部的生產者與消費者都會進入到一個wait 狀態,沒有人喚醒

  • 解決bug----單向喚醒,由生產者喚醒消費者

    喚醒方向問題: 由速率低的一方喚醒速率高的一方

package main
import (
	"fmt"
	"runtime"
)
import "sync"
import "math/rand"
import "time"

var cond sync.Cond             // 建立全局條件變量

// 生產者
func producer(out chan<- int, idx int) {
	for {
		num := rand.Intn(1000) 	// 產生一個隨機數
		cond.L.Lock()           	// 條件變量對應互斥鎖加鎖
		select {
		// 嘗試向channel寫入數據
		case out <- num:
			fmt.Printf("%dth 生產者,產生數據 %3d, 公共區剩餘%d個數據\n", idx, num, len(out))
		default:
		}
		cond.L.Unlock()             	// 生產結束,解鎖互斥鎖
		cond.Signal()           	// 喚醒 阻塞的 消費者
		runtime.Gosched()			// 給別更多的機會建立鎖
	}
}
//消費者
func consumer(in <-chan int, idx int) {
	var num int
	for {
		cond.L.Lock()           	// 條件變量對應互斥鎖加鎖(與生產者是同一個)
		for len(in)==0{
			cond.Wait()
		}
		num=<-in
		fmt.Printf("%dth 消費者,消費了 %d, 公共區剩餘%d個數據\n", idx, num, len(in))
		cond.L.Unlock()             	// 消費結束,解鎖互斥鎖
	}
}
func main() {
	rand.Seed(time.Now().UnixNano())  // 設置隨機數種子
	quit := make(chan bool)           // 建立用於結束通訊的 channel

	product := make(chan int, 3)      // 產品區(公共區)使用channel 模擬
	cond.L = new(sync.Mutex)          // 建立互斥鎖和條件變量

	for i := 0; i < 3; i++ {          // 3個生產者
		go producer(product, i+1)
	}
	for i := 0; i < 5; i++ {          // 5個消費者 
		go consumer(product, i+1)  
}
	<-quit                         	// 主協程阻塞 不結束
}

 
問題:

當咱們把條件變量取消,使用帶緩存的channel,一樣很好的完成生產者與消費者模型【channel空與非空主動阻塞等待,直至解除阻塞】,why use cond?

2.3.3 channel vs sync.Cond

使用channel通知多個關注條件的goroutine問題?

關閉的channle 與廣播的做用,僅僅單次使用

當狀態多重狀況的時候,channel 不行了,使用cond廣播的方式進行狀態更新

相關文章
相關標籤/搜索