併發是編程裏面一個很是重要的概念,Go語言在語言層面天生支持併發,這也是Go語言流行的一個很重要的緣由。html
併發:同一時間段內執行多個任務(你在用微信和兩個女友聊天)。java
並行:同一時刻執行多個任務(你和你朋友都在用微信和女友聊天)。c++
Go語言的併發經過goroutine
實現。goroutine
相似於線程,屬於用戶態的線程,咱們能夠根據須要建立成千上萬個goroutine
併發工做。goroutine
是由Go語言的運行時(runtime)調度完成,而線程是由操做系統調度完成。git
Go語言還提供channel
在多個goroutine
間進行通訊。goroutine
和channel
是 Go 語言秉承的 CSP(Communicating Sequential Process)併發模式的重要實現基礎。程序員
在java/c++中咱們要實現併發編程的時候,咱們一般須要本身維護一個線程池,而且須要本身去包裝一個又一個的任務,同時須要本身去調度線程執行任務並維護上下文切換,這一切一般會耗費程序員大量的心智。那麼能不能有一種機制,程序員只須要定義不少個任務,讓系統去幫助咱們把這些任務分配到CPU上實現併發執行呢?github
Go語言中的goroutine
就是這樣一種機制,goroutine
的概念相似於線程,但 goroutine
是由Go的運行時(runtime)調度和管理的。Go程序會智能地將 goroutine 中的任務合理地分配給每一個CPU。Go語言之因此被稱爲現代化的編程語言,就是由於它在語言層面已經內置了調度和上下文切換的機制。算法
在Go語言編程中你不須要去本身寫進程、線程、協程,你的技能包裏只有一個技能–goroutine
,當你須要讓某個任務併發執行的時候,你只須要把這個任務包裝成一個函數,開啓一個goroutine
去執行這個函數就能夠了,就是這麼簡單粗暴。編程
Go語言中使用goroutine
很是簡單,只須要在調用函數的時候在前面加上go
關鍵字,就能夠爲一個函數建立一個goroutine
。安全
一個goroutine
一定對應一個函數,能夠建立多個goroutine
去執行相同的函數。bash
啓動goroutine的方式很是簡單,只須要在調用的函數(普通函數和匿名函數)前面加上一個go
關鍵字。
舉個例子以下:
func hello() { fmt.Println("Hello Goroutine!") } func main() { hello() fmt.Println("main goroutine done!") }
這個示例中hello函數和下面的語句是串行的,執行的結果是打印完Hello Goroutine!
後打印main goroutine done!
。
接下來咱們在調用hello函數前面加上關鍵字go
,也就是啓動一個goroutine去執行hello這個函數。
func main() { go hello() // 啓動另一個goroutine去執行hello函數 fmt.Println("main goroutine done!") }
這一次的執行結果只打印了main goroutine done!
,並無打印Hello Goroutine!
。爲何呢?
在程序啓動時,Go程序就會爲main()
函數建立一個默認的goroutine
。
當main()函數返回的時候該goroutine
就結束了,全部在main()
函數中啓動的goroutine
會一同結束,main
函數所在的goroutine
就像是權利的遊戲中的夜王,其餘的goroutine
都是異鬼,夜王一死它轉化的那些異鬼也就所有GG了。
因此咱們要想辦法讓main函數等一等hello函數,最簡單粗暴的方式就是time.Sleep
了。
func main() { go hello() // 啓動另一個goroutine去執行hello函數 fmt.Println("main goroutine done!") time.Sleep(time.Second) }
執行上面的代碼你會發現,這一次先打印main goroutine done!
,而後緊接着打印Hello Goroutine!
。
首先爲何會先打印main goroutine done!
是由於咱們在建立新的goroutine的時候須要花費一些時間,而此時main函數所在的goroutine
是繼續執行的。
在Go語言中實現併發就是這樣簡單,咱們還能夠啓動多個goroutine
。讓咱們再來一個例子: (這裏使用了sync.WaitGroup
來實現goroutine的同步)
var wg sync.WaitGroup func hello(i int) { defer wg.Done() // goroutine結束就登記-1 fmt.Println("Hello Goroutine!", i) } func main() { for i := 0; i < 10; i++ { wg.Add(1) // 啓動一個goroutine就登記+1 go hello(i) } wg.Wait() // 等待全部登記的goroutine都結束 }
屢次執行上面的代碼,會發現每次打印的數字的順序都不一致。這是由於10個goroutine
是併發執行的,而goroutine
的調度是隨機的。
OS線程(操做系統線程)通常都有固定的棧內存(一般爲2MB),一個goroutine
的棧在其生命週期開始時只有很小的棧(典型狀況下2KB),goroutine
的棧不是固定的,他能夠按需增大和縮小,goroutine
的棧大小限制能夠達到1GB,雖然極少會用到這個大。因此在Go語言中一次建立十萬左右的goroutine
也是能夠的。
GPM
是Go語言運行時(runtime)層面的實現,是go語言本身實現的一套調度系統。區別於操做系統調度OS線程。
G
很好理解,就是個goroutine的,裏面除了存放本goroutine信息外 還有與所在P的綁定等信息。P
管理着一組goroutine隊列,P裏面會存儲當前goroutine運行的上下文環境(函數指針,堆棧地址及地址邊界),P會對本身管理的goroutine隊列作一些調度(好比把佔用CPU時間較長的goroutine暫停、運行後續的goroutine等等)當本身的隊列消費完了就去全局隊列裏取,若是全局隊列裏也消費完了會去其餘P的隊列裏搶任務。M(machine)
是Go運行時(runtime)對操做系統內核線程的虛擬, M與內核線程通常是一一映射的關係, 一個groutine最終是要放到M上執行的;P與M通常也是一一對應的。他們關係是: P管理着一組G掛載在M上運行。當一個G長久阻塞在一個M上時,runtime會新建一個M,阻塞G所在的P會把其餘的G 掛載在新建的M上。當舊的G阻塞完成或者認爲其已經死掉時 回收舊的M。
P的個數是經過runtime.GOMAXPROCS
設定(最大256),Go1.5版本以後默認爲物理線程數。 在併發量大的時候會增長一些P和M,但不會太多,切換太頻繁的話得不償失。
單從線程調度講,Go語言相比起其餘語言的優點在於OS線程是由OS內核來調度的,goroutine
則是由Go運行時(runtime)本身的調度器調度的,這個調度器使用一個稱爲m:n調度的技術(複用/調度m個goroutine到n個OS線程)。 其一大特色是goroutine的調度是在用戶態下完成的, 不涉及內核態與用戶態之間的頻繁切換,包括內存的分配與釋放,都是在用戶態維護着一塊大的內存池, 不直接調用系統的malloc函數(除非內存池須要改變),成本比調度OS線程低不少。 另外一方面充分利用了多核的硬件資源,近似的把若干goroutine均分在物理線程上, 再加上自己goroutine的超輕量,以上種種保證了go調度方面的性能。
Go運行時的調度器使用GOMAXPROCS
參數來肯定須要使用多少個OS線程來同時執行Go代碼。默認值是機器上的CPU核心數。例如在一個8核心的機器上,調度器會把Go代碼同時調度到8個OS線程上(GOMAXPROCS是m:n調度中的n)。
Go語言中能夠經過runtime.GOMAXPROCS()
函數設置當前程序併發時佔用的CPU邏輯核心數。
Go1.5版本以前,默認使用的是單核心執行。Go1.5版本以後,默認使用所有的CPU邏輯核心數。
咱們能夠經過將任務分配到不一樣的CPU邏輯核心上實現並行的效果,這裏舉個例子:
func a() { for i := 1; i < 10; i++ { fmt.Println("A:", i) } } func b() { for i := 1; i < 10; i++ { fmt.Println("B:", i) } } func main() { runtime.GOMAXPROCS(1) go a() go b() time.Sleep(time.Second) }
兩個任務只有一個邏輯核心,此時是作完一個任務再作另外一個任務。 將邏輯核心數設爲2,此時兩個任務並行執行,代碼以下。
func a() { for i := 1; i < 10; i++ { fmt.Println("A:", i) } } func b() { for i := 1; i < 10; i++ { fmt.Println("B:", i) } } func main() { runtime.GOMAXPROCS(2) go a() go b() time.Sleep(time.Second) }
Go語言中的操做系統線程和goroutine的關係:
單純地將函數併發執行是沒有意義的。函數與函數間須要交換數據才能體現併發執行函數的意義。
雖然可使用共享內存進行數據交換,可是共享內存在不一樣的goroutine
中容易發生競態問題。爲了保證數據交換的正確性,必須使用互斥量對內存進行加鎖,這種作法勢必形成性能問題。
Go語言的併發模型是CSP(Communicating Sequential Processes)
,提倡經過通訊共享內存而不是經過共享內存而實現通訊。
若是說goroutine
是Go程序併發的執行體,channel
就是它們之間的鏈接。channel
是可讓一個goroutine
發送特定值到另外一個goroutine
的通訊機制。
Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,老是遵循先入先出(First In First Out)的規則,保證收發數據的順序。每個通道都是一個具體類型的導管,也就是聲明channel的時候須要爲其指定元素類型。
channel
是一種類型,一種引用類型。聲明通道類型的格式以下:
var 變量 chan 元素類型
舉幾個例子:
var ch1 chan int // 聲明一個傳遞整型的通道 var ch2 chan bool // 聲明一個傳遞布爾型的通道 var ch3 chan []int // 聲明一個傳遞int切片的通道
通道是引用類型,通道類型的空值是nil
。
var ch chan int fmt.Println(ch) // <nil>
聲明的通道後須要使用make
函數初始化以後才能使用。
建立channel的格式以下:
make(chan 元素類型, [緩衝大小])
channel的緩衝大小是可選的。
舉幾個例子:
ch4 := make(chan int) ch5 := make(chan bool) ch6 := make(chan []int)
通道有發送(send)、接收(receive)和關閉(close)三種操做。
發送和接收都使用<-
符號。
如今咱們先使用如下語句定義一個通道:
ch := make(chan int)
將一個值發送到通道中。
ch <- 10 // 把10發送到ch中
從一個通道中接收值。
x := <- ch // 從ch中接收值並賦值給變量x <-ch // 從ch中接收值,忽略結果
咱們經過調用內置的close
函數來關閉通道。
close(ch)
關於關閉通道須要注意的事情是,只有在通知接收方goroutine全部的數據都發送完畢的時候才須要關閉通道。通道是能夠被垃圾回收機制回收的,它和關閉文件是不同的,在結束操做以後關閉文件是必需要作的,但關閉通道不是必須的。
關閉後的通道有如下特色:
無緩衝的通道又稱爲阻塞的通道。咱們來看一下下面的代碼:
func main() { ch := make(chan int) ch <- 10 fmt.Println("發送成功") }
上面這段代碼可以經過編譯,可是執行的時候會出現如下錯誤:
fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() .../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54
爲何會出現deadlock
錯誤呢?
由於咱們使用ch := make(chan int)
建立的是無緩衝的通道,無緩衝的通道只有在有人接收值的時候才能發送值。就像你住的小區沒有快遞櫃和代收點,快遞員給你打電話必需要把這個物品送到你的手中,簡單來講就是無緩衝的通道必須有接收才能發送。
上面的代碼會阻塞在ch <- 10
這一行代碼造成死鎖,那如何解決這個問題呢?
一種方法是啓用一個goroutine
去接收值,例如:
func recv(c chan int) { ret := <-c fmt.Println("接收成功", ret) } func main() { ch := make(chan int) go recv(ch) // 啓用goroutine從通道接收值 ch <- 10 fmt.Println("發送成功") }
無緩衝通道上的發送操做會阻塞,直到另外一個goroutine
在該通道上執行接收操做,這時值才能發送成功,兩個goroutine
將繼續執行。相反,若是接收操做先執行,接收方的goroutine將阻塞,直到另外一個goroutine
在該通道上發送一個值。
使用無緩衝通道進行通訊將致使發送和接收的goroutine
同步化。所以,無緩衝通道也被稱爲同步通道
。
解決上面問題的方法還有一種就是使用有緩衝區的通道。咱們能夠在使用make函數初始化通道的時候爲其指定通道的容量,例如:
func main() { ch := make(chan int, 1) // 建立一個容量爲1的有緩衝區通道 ch <- 10 fmt.Println("發送成功") }
只要通道的容量大於零,那麼該通道就是有緩衝的通道,通道的容量表示通道中能存放元素的數量。就像你小區的快遞櫃只有那麼個多格子,格子滿了就裝不下了,就阻塞了,等到別人取走一個快遞員就能往裏面放一個。
咱們可使用內置的len
函數獲取通道內元素的數量,使用cap
函數獲取通道的容量,雖然咱們不多會這麼作。
當經過通道發送有限的數據時,咱們能夠經過close
函數關閉通道來告知從該通道接收值的goroutine
中止等待。當通道被關閉時,往該通道發送值會引起panic,從該通道里接收的值一直都是類型零值。那如何判斷一個通道是否被關閉了呢?
咱們來看下面這個例子:
// channel 練習 func main() { ch1 := make(chan int) ch2 := make(chan int) // 開啓goroutine將0~100的數發送到ch1中 go func() { for i := 0; i < 100; i++ { ch1 <- i } close(ch1) }() // 開啓goroutine從ch1中接收值,並將該值的平方發送到ch2中 go func() { for { i, ok := <-ch1 // 通道關閉後再取值ok=false if !ok { break } ch2 <- i * i } close(ch2) }() // 在主goroutine中從ch2中接收值打印 for i := range ch2 { // 通道關閉後會退出for range循環 fmt.Println(i) } }
從上面的例子中咱們看到有兩種方式在接收值的時候判斷通道是否被關閉,咱們一般使用的是for range
的方式。
有的時候咱們會將通道做爲參數在多個任務函數間傳遞,不少時候咱們在不一樣的任務函數中使用通道都會對其進行限制,好比限制通道在函數中只能發送或只能接收。
Go語言中提供了單向通道來處理這種狀況。例如,咱們把上面的例子改造以下:
func counter(out chan<- int) { for i := 0; i < 100; i++ { out <- i } close(out) } func squarer(out chan<- int, in <-chan int) { for i := range in { out <- i * i } close(out) } func printer(in <-chan int) { for i := range in { fmt.Println(i) } } func main() { ch1 := make(chan int) ch2 := make(chan int) go counter(ch1) go squarer(ch2, ch1) printer(ch2) }
其中,
chan<- int
是一個只能發送的通道,能夠發送可是不能接收;<-chan int
是一個只能接收的通道,能夠接收可是不能發送。在函數傳參及任何賦值操做中將雙向通道轉換爲單向通道是能夠的,但反過來是不能夠的。
channel
常見的異常總結,以下圖:[channel異常總結]
關閉已經關閉的channel
也會引起panic
。
在工做中咱們一般會使用能夠指定啓動的goroutine數量–worker pool
模式,控制goroutine
的數量,防止goroutine
泄漏和暴漲。
一個簡易的work pool
示例代碼以下:
func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Printf("worker:%d start job:%d\n", id, j) time.Sleep(time.Second) fmt.Printf("worker:%d end job:%d\n", id, j) results <- j * 2 } } func main() { jobs := make(chan int, 100) results := make(chan int, 100) // 開啓3個goroutine for w := 1; w <= 3; w++ { go worker(w, jobs, results) } // 5個任務 for j := 1; j <= 5; j++ { jobs <- j } close(jobs) // 輸出結果 for a := 1; a <= 5; a++ { <-results } }
在某些場景下咱們須要同時從多個通道接收數據。通道在接收數據時,若是沒有數據能夠接收將會發生阻塞。你也許會寫出以下代碼使用遍歷的方式來實現:
for{ // 嘗試從ch1接收值 data, ok := <-ch1 // 嘗試從ch2接收值 data, ok := <-ch2 … }
這種方式雖然能夠實現從多個通道接收值的需求,可是運行性能會差不少。爲了應對這種場景,Go內置了select
關鍵字,能夠同時響應多個通道的操做。
select
的使用相似於switch語句,它有一系列case分支和一個默認的分支。每一個case會對應一個通道的通訊(接收或發送)過程。select
會一直等待,直到某個case
的通訊操做完成時,就會執行case
分支對應的語句。具體格式以下:
select{ case <-ch1: ... case data := <-ch2: ... case ch3<-data: ... default: 默認操做 }
舉個小例子來演示下select
的使用:
func main() { ch := make(chan int, 1) for i := 0; i < 10; i++ { select { case x := <-ch: fmt.Println(x) case ch <- i: } } }
使用select
語句能提升代碼的可讀性。
case
同時知足,select
會隨機選擇一個。case
的select{}
會一直等待,可用於阻塞main函數。有時候在Go代碼中可能會存在多個goroutine
同時操做一個資源(臨界區),這種狀況會發生競態問題
(數據競態)。類比現實生活中的例子有十字路口被各個方向的的汽車競爭;還有火車上的衛生間被車箱裏的人競爭。
舉個例子:
var x int64 var wg sync.WaitGroup func add() { for i := 0; i < 5000; i++ { x = x + 1 } wg.Done() } func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println(x) }
上面的代碼中咱們開啓了兩個goroutine
去累加變量x的值,這兩個goroutine
在訪問和修改x
變量的時候就會存在數據競爭,致使最後的結果與期待的不符。
互斥鎖是一種經常使用的控制共享資源訪問的方法,它可以保證同時只有一個goroutine
能夠訪問共享資源。Go語言中使用sync
包的Mutex
類型來實現互斥鎖。 使用互斥鎖來修復上面代碼的問題:
var x int64 var wg sync.WaitGroup var lock sync.Mutex func add() { for i := 0; i < 5000; i++ { lock.Lock() // 加鎖 x = x + 1 lock.Unlock() // 解鎖 } wg.Done() } func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println(x) }
使用互斥鎖可以保證同一時間有且只有一個goroutine
進入臨界區,其餘的goroutine
則在等待鎖;當互斥鎖釋放後,等待的goroutine
才能夠獲取鎖進入臨界區,多個goroutine
同時等待一個鎖時,喚醒的策略是隨機的。
互斥鎖是徹底互斥的,可是有不少實際的場景下是讀多寫少的,當咱們併發的去讀取一個資源不涉及資源修改的時候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語言中使用sync
包中的RWMutex
類型。
讀寫鎖分爲兩種:讀鎖和寫鎖。當一個goroutine獲取讀鎖以後,其餘的goroutine
若是是獲取讀鎖會繼續得到鎖,若是是獲取寫鎖就會等待;當一個goroutine
獲取寫鎖以後,其餘的goroutine
不管是獲取讀鎖仍是寫鎖都會等待。
讀寫鎖示例:
var ( x int64 wg sync.WaitGroup lock sync.Mutex rwlock sync.RWMutex ) func write() { // lock.Lock() // 加互斥鎖 rwlock.Lock() // 加寫鎖 x = x + 1 time.Sleep(10 * time.Millisecond) // 假設讀操做耗時10毫秒 rwlock.Unlock() // 解寫鎖 // lock.Unlock() // 解互斥鎖 wg.Done() } func read() { // lock.Lock() // 加互斥鎖 rwlock.RLock() // 加讀鎖 time.Sleep(time.Millisecond) // 假設讀操做耗時1毫秒 rwlock.RUnlock() // 解讀鎖 // lock.Unlock() // 解互斥鎖 wg.Done() } func main() { start := time.Now() for i := 0; i < 10; i++ { wg.Add(1) go write() } for i := 0; i < 1000; i++ { wg.Add(1) go read() } wg.Wait() end := time.Now() fmt.Println(end.Sub(start)) }
須要注意的是讀寫鎖很是適合讀多寫少的場景,若是讀和寫的操做差異不大,讀寫鎖的優點就發揮不出來。
在代碼中生硬的使用time.Sleep
確定是不合適的,Go語言中可使用sync.WaitGroup
來實現併發任務的同步。 sync.WaitGroup
有如下幾個方法:
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) | 計數器+delta |
(wg *WaitGroup) Done() | 計數器-1 |
(wg *WaitGroup) Wait() | 阻塞直到計數器變爲0 |
sync.WaitGroup
內部維護着一個計數器,計數器的值能夠增長和減小。例如當咱們啓動了N 個併發任務時,就將計數器值增長N。每一個任務完成時經過調用Done()方法將計數器減1。經過調用Wait()來等待併發任務執行完,當計數器值爲0時,表示全部併發任務已經完成。
咱們利用sync.WaitGroup
將上面的代碼優化一下:
var wg sync.WaitGroup func hello() { defer wg.Done() fmt.Println("Hello Goroutine!") } func main() { wg.Add(1) go hello() // 啓動另一個goroutine去執行hello函數 fmt.Println("main goroutine done!") wg.Wait() }
須要注意sync.WaitGroup
是一個結構體,傳遞的時候要傳遞指針。
說在前面的話:這是一個進階知識點。
在編程的不少場景下咱們須要確保某些操做在高併發的場景下只執行一次,例如只加載一次配置文件、只關閉一次通道等。
Go語言中的sync
包中提供了一個針對只執行一次場景的解決方案–sync.Once
。
sync.Once
只有一個Do
方法,其簽名以下:
func (o *Once) Do(f func()) {}
備註:若是要執行的函數f須要傳遞參數就須要搭配閉包來使用。
延遲一個開銷很大的初始化操做到真正用到它的時候再執行是一個很好的實踐。由於預先初始化一個變量(好比在init函數中完成初始化)會增長程序的啓動耗時,並且有可能實際執行過程當中這個變量沒有用上,那麼這個初始化操做就不是必需要作的。咱們來看一個例子:
var icons map[string]image.Image func loadIcons() { icons = map[string]image.Image{ "left": loadIcon("left.png"), "up": loadIcon("up.png"), "right": loadIcon("right.png"), "down": loadIcon("down.png"), } } // Icon 被多個goroutine調用時不是併發安全的 func Icon(name string) image.Image { if icons == nil { loadIcons() } return icons[name] }
多個goroutine
併發調用Icon函數時不是併發安全的,現代的編譯器和CPU可能會在保證每一個goroutine
都知足串行一致的基礎上自由地重排訪問內存的順序。loadIcons函數可能會被重排爲如下結果:
func loadIcons() { icons = make(map[string]image.Image) icons["left"] = loadIcon("left.png") icons["up"] = loadIcon("up.png") icons["right"] = loadIcon("right.png") icons["down"] = loadIcon("down.png") }
在這種狀況下就會出現即便判斷了icons
不是nil也不意味着變量初始化完成了。考慮到這種狀況,咱們能想到的辦法就是添加互斥鎖,保證初始化icons
的時候不會被其餘的goroutine
操做,可是這樣作又會引起性能問題。
使用sync.Once
改造的示例代碼以下:
var icons map[string]image.Image var loadIconsOnce sync.Once func loadIcons() { icons = map[string]image.Image{ "left": loadIcon("left.png"), "up": loadIcon("up.png"), "right": loadIcon("right.png"), "down": loadIcon("down.png"), } } // Icon 是併發安全的 func Icon(name string) image.Image { loadIconsOnce.Do(loadIcons) return icons[name] }
var wg sync.WaitGroup var once sync.Once func f1(ch1 chan<- int) { defer wg.Done() for i := 0; i < 100; i++ { ch1 <- i } close(ch1) } func f2(ch1 <-chan int, ch2 chan<- int) { defer wg.Done() for { x, ok := <-ch1 if !ok { break } ch2 <- x * x } once.Do(func() { close(ch2) }) // 確保某個操做只執行一次 } func main() { a := make(chan int, 100) b := make(chan int, 100) wg.Add(3) go f1(a) go f2(a, b) go f2(a, b) wg.Wait() for ret := range b { fmt.Println(ret) } }
sync.Once
其實內部包含一個互斥鎖和一個布爾值,互斥鎖保證布爾值和數據的安全,而布爾值用來記錄初始化是否完成。這樣設計就能保證初始化操做的時候是併發安全的而且初始化操做也不會被執行屢次。
Go語言中內置的map不是併發安全的。請看下面的示例:
var m = make(map[string]int) func get(key string) int { return m[key] } func set(key string, value int) { m[key] = value } func main() { wg := sync.WaitGroup{} for i := 0; i < 20; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) set(key, n) fmt.Printf("k=:%v,v:=%v\n", key, get(key)) wg.Done() }(i) } wg.Wait() }
上面的代碼開啓少許幾個goroutine
的時候可能沒什麼問題,當併發多了以後執行上面的代碼就會報fatal error: concurrent map writes
錯誤。
像這種場景下就須要爲map加鎖來保證併發的安全性了,Go語言的sync
包中提供了一個開箱即用的併發安全版map–sync.Map
。開箱即用表示不用像內置的map同樣使用make函數初始化就能直接使用。同時sync.Map
內置了諸如Store
、Load
、LoadOrStore
、Delete
、Range
等操做方法。
var m = sync.Map{} func main() { wg := sync.WaitGroup{} for i := 0; i < 20; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) m.Store(key, n) value, _ := m.Load(key) fmt.Printf("k=:%v,v:=%v\n", key, value) wg.Done() }(i) } wg.Wait() }
代碼中的加鎖操做由於涉及內核態的上下文切換會比較耗時、代價比較高。針對基本數據類型咱們還可使用原子操做來保證併發安全,由於原子操做是Go語言提供的方法它在用戶態就能夠完成,所以性能比加鎖操做更好。Go語言中原子操做由內置的標準庫sync/atomic
提供。
方法 | 解釋 |
---|---|
func LoadInt32(addr int32) (val int32) func LoadInt64(addr int64) (val int64) func LoadUint32(addr uint32) (val uint32) func LoadUint64(addr uint64) (val uint64) func LoadUintptr(addr uintptr) (val uintptr) func LoadPointer(addr unsafe.Pointer) (val unsafe.Pointer) | 讀取操做 |
func StoreInt32(addr int32, val int32) func StoreInt64(addr int64, val int64) func StoreUint32(addr uint32, val uint32) func StoreUint64(addr uint64, val uint64) func StoreUintptr(addr uintptr, val uintptr) func StorePointer(addr unsafe.Pointer, val unsafe.Pointer) | 寫入操做 |
func AddInt32(addr int32, delta int32) (new int32) func AddInt64(addr int64, delta int64) (new int64) func AddUint32(addr uint32, delta uint32) (new uint32) func AddUint64(addr uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) | 修改操做 |
func SwapInt32(addr int32, new int32) (old int32) func SwapInt64(addr int64, new int64) (old int64) func SwapUint32(addr uint32, new uint32) (old uint32) func SwapUint64(addr uint64, new uint64) (old uint64) func SwapUintptr(addr uintptr, new uintptr) (old uintptr) func SwapPointer(addr unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) | 交換操做 |
func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) | 比較並交換操做 |
咱們填寫一個示例來比較下互斥鎖和原子操做的性能。
var x int64 var l sync.Mutex var wg sync.WaitGroup // 普通版加函數 func add() { // x = x + 1 x++ // 等價於上面的操做 wg.Done() } // 互斥鎖版加函數 func mutexAdd() { l.Lock() x++ l.Unlock() wg.Done() } // 原子操做版加函數 func atomicAdd() { atomic.AddInt64(&x, 1) wg.Done() } func main() { start := time.Now() for i := 0; i < 10000; i++ { wg.Add(1) // go add() // 普通版add函數 不是併發安全的 // go mutexAdd() // 加鎖版add函數 是併發安全的,可是加鎖性能開銷大 go atomicAdd() // 原子操做版add函數 是併發安全,性能優於加鎖版 } wg.Wait() end := time.Now() fmt.Println(x) fmt.Println(end.Sub(start)) }
atomic
包提供了底層的原子級內存操做,對於同步算法的實現頗有用。這些函數必須謹慎地保證正確使用。除了某些特殊的底層應用,使用通道或者sync包的函數/類型實現同步更好。