簡而言之,所謂併發編程是指在一臺處理器上「同時」處理多個任務。linux
隨着硬件的發展,併發程序變得愈來愈重要。Web服務器會一次處理成千上萬的請求。平板電腦和手機app在渲染用戶畫面同時還會在後臺執行各類計算任務和網絡請求。即便是傳統的批處理問題--讀取數據,計算,寫輸出--如今也會用併發來隱藏掉I/O的操做延遲以充分利用現代計算機設備的多個核心。計算機的性能每一年都在以非線性的速度增加。算法
宏觀的併發是指在一段時間內,有多個程序在同時運行。編程
併發在微觀上,是指在同一時刻只能有一條指令執行,但多個程序指令被快速的輪換執行,使得在宏觀上具備多個進程同時執行的效果,但在微觀上並非同時執行的,只是把時間分紅若干段,使多個程序快速交替的執行。安全
並行(parallel)
:指在同一時刻,有多條指令在多個處理器上同時執行。bash
併發(concurrency)
:指在同一時刻只能有一條指令執行,但多個進程指令被快速的輪換執行,使得在宏觀上具備多個進程同時執行的效果,但在微觀上並非同時執行的,只是把時間分紅若干段,經過cpu時間片輪轉使多個進程快速交替的執行。服務器
大師曾以咖啡機的例子來解釋並行和併發的區別。網絡
下面的兩張圖是國人作的一些不錯的比較並行與併發區別的圖。數據結構
程序,是指編譯好的二進制文件,在磁盤上,不佔用系統資源(cpu、內存、打開的文件、設備、鎖....)多線程
進程,是一個抽象的概念,與操做系統原理聯繫緊密。進程是活躍的程序,佔用系統資源。在內存中執行。(程序運行起來,產生一個進程)閉包
咱們能夠把程序比做劇本(紙),把進程比做一場戲(舞臺、演員、燈光、道具...)
同一個劇本能夠在多個舞臺同時上演。一樣,同一個程序也能夠加載爲不一樣的進程(彼此之間互不影響)
如:同時開兩個終端。各自都有一個bash但彼此ID不一樣。
進程基本的狀態有5種。分別爲初始態,就緒態,運行態,掛起態與終止態。其中初始態爲進程準備階段,常與就緒態結合來看。
在使用進程 實現併發時會出現什麼問題呢?
咱們知道在操做系統中,能夠產生不少的進程。在unix/linux系統中,正常狀況下,子進程是經過父進程fork建立的,子進程再建立新的進程。
而且父進程永遠沒法預測子進程 到底何時結束。 當一個 進程完成它的工做終止以後,它的父進程須要調用系統調用取得子進程的終止狀態。
孤兒進程
父進程先於子進程結束,則子進程成爲孤兒進程,子進程的父進程成爲init進程,稱爲init進程領養孤兒進程。
殭屍進程
進程終止,父進程還沒有回收,子進程殘留資源(PCB)存放於內核中,變成殭屍(Zombie)進程。
Windows下的進程和Linux下的進程是不同的,它比較懶惰,歷來不執行任何東西,只是爲線程提供執行環境。而後由線程負責執行包含在進程的地址空間中的代碼。當建立一個進程的時候,操做系統會自動建立這個進程的第一個線程,稱爲主線程。
LWP:light weight process 輕量級的進程,本質還是進程 (Linux下)
進程:獨立地址空間,擁有PCB
線程:有獨立的PCB,但沒有獨立的地址空間(共享)
區別:在因而否共享地址空間。獨居(進程);合租(線程)。
Windows系統下,能夠直接忽略進程的概念,只談線程。由於線程是最小的執行單位,是被系統獨立調度和分派的基本單位。而進程只是給線程提供執行環境。
同步即協同步調,按預約的前後次序運行。
線程同步,指一個線程發出某一功能調用時,在沒有獲得結果以前,該調用不返回。同時其它線程爲保證數據一致性,不能調用該功能。
舉例1: 銀行存款 5000。櫃檯,折:取3000;提款機,卡:取 3000。剩餘:2000
舉例2: 內存中100字節,線程T1欲填入全1, 線程T2欲填入全0。但若是T1執行了50個字節失去cpu,T2執行,會將T1寫過的內容覆蓋。當T1再次得到cpu繼續 從失去cpu的位置向後寫入1,當執行結束,內存中的100字節,既不是全1,也不是全0。
產生的現象叫作「與時間有關的錯誤」(time related)。爲了不這種數據混亂,線程須要同步。
「同步」的目的,是爲了不數據混亂,解決與時間有關的錯誤。實際上,不只線程間須要同步,進程間、信號間等等都須要同步機制。
所以,全部「多個控制流,共同操做一個共享資源」的狀況,都須要同步。
Linux中提供一把互斥鎖mutex(也稱之爲互斥量)。
每一個線程在對資源操做前都嘗試先加鎖,成功加鎖才能操做,操做結束解鎖。
資源仍是共享的,線程間也仍是競爭的,
但經過「鎖」就將資源的訪問變成互斥操做,然後與時間有關的錯誤也不會再產生了。
但,應注意:同一時刻,只能有一個線程持有該鎖。
當A線程對某個全局變量加鎖訪問,B在訪問前嘗試加鎖,拿不到鎖,B阻塞。C線程不去加鎖,而直接訪問該全局變量,依然可以訪問,但會出現數據混亂。
因此,互斥鎖實質上是操做系統提供的一把「建議鎖」(又稱「協同鎖」),建議程序中有多線程訪問共享資源的時候使用該機制。但,並無強制限定。
所以,即便有了mutex,若是有線程不按規則來訪問數據,依然會形成數據混亂。
與互斥量相似,但讀寫鎖容許更高的並行性。其特性爲:寫獨佔,讀共享。
讀寫鎖狀態:
特別強調:讀寫鎖只有一把,但其具有兩種狀態:
讀寫鎖特性:
讀寫鎖也叫共享-獨佔鎖。當讀寫鎖以讀模式鎖住時,它是以共享模式鎖住的;當它以寫模式鎖住時,它是以獨佔模式鎖住的。寫獨佔、讀共享。
讀寫鎖很是適合於對數據結構讀的次數遠大於寫的狀況。
協程:coroutine。也叫輕量級線程。
與傳統的系統級線程和進程相比,協程最大的優點在於「輕量級」。能夠輕鬆建立上萬個而不會致使系統資源衰竭。而線程和進程一般很難超過1萬個。這也是協程別稱「輕量級線程」的緣由。
一個線程中能夠有任意多個協程,但某一時刻只能有一個協程在運行,多個協程分享該線程分配到的計算機資源。
多數語言在語法層面並不直接支持協程,而是經過庫的方式支持,但用庫的方式支持的功能也並不完整,好比僅僅提供協程的建立、銷燬與切換等能力。若是在這樣的輕量級線程中調用一個同步 IO 操做,好比網絡通訊、本地文件讀寫,都會阻塞其餘的併發執行輕量級線程,從而沒法真正達到輕量級線程自己指望達到的目標。
在協程中,調用一個任務就像調用一個函數同樣,消耗的系統資源最少!但能達到進程、線程併發相同的效果。
在一次併發任務中,進程、線程、協程都可以實現。從系統資源消耗的角度出發來看,進程至關多,線程次之,協程最少。
Go 在語言級別支持協程,叫goroutine。Go 語言標準庫提供的全部系統調用操做(包括全部同步IO操做),都會出讓CPU給其餘goroutine。這讓輕量級線程的切換管理不依賴於系統的線程和進程,也不須要依賴於CPU的核心數量。
有人把Go比做21世紀的C語言。第一是由於Go語言設計簡單,第二,21世紀最重要的就是並行程序設計,而Go從語言層面就支持並行。同時,併發程序的內存管理有時候是很是複雜的,而Go語言提供了自動垃圾回收機制。
Go語言爲併發編程而內置的上層API基於順序通訊進程模型CSP(communicating sequential processes)。這就意味着顯式鎖都是能夠避免的,由於Go經過相對安全的通道發送和接受數據以實現同步,這大大地簡化了併發程序的編寫。
Go語言中的併發程序主要使用兩種手段來實現。goroutine和channel。
goroutine是Go並行設計的核心。goroutine說到底其實就是協程,它比線程更小,十幾個goroutine可能體如今底層就是五六個線程,Go語言內部幫你實現了這些goroutine之間的內存共享。執行goroutine只需極少的棧內存(大概是4~5KB),固然會根據相應的數據伸縮。也正由於如此,可同時運行成千上萬個併發任務。goroutine比thread更易用、更高效、更輕便。
通常狀況下,一個普通計算機跑幾十個線程就有點負載過大了,可是一樣的機器卻能夠輕鬆地讓成百上千個goroutine進行資源競爭。
只需在函數調⽤語句前添加 go 關鍵字,就可建立併發執⾏單元。開發⼈員無需瞭解任何執⾏細節,調度器會自動將其安排到合適的系統線程上執行。
在併發編程中,咱們一般想將一個過程切分紅幾塊,而後讓每一個goroutine各自負責一塊工做,當一個程序啓動時,主函數在一個單獨的goroutine中運行,咱們叫它main goroutine。新的goroutine會用go語句來建立。而go語言的併發設計,讓咱們很輕鬆就能夠達成這一目的。
示例代碼:
package main import ( "fmt" "time" ) func newTask() { i := 0 for { i++ fmt.Printf("new goroutine: i = %d\n", i) time.Sleep(time.Second) //延時1秒 } } func main() { //建立一個goroutine,啓動另一個任務 go newTask() //循環打印 for i := 0; i < 5; i++ { fmt.Printf("main goroutine: i = %d\n", i) time.Sleep(time.Second) //延時1秒 i++ } }
程序運行結果:
主goroutine退出後,其它的工做的子goroutine也會自動退出:
能夠看出,因爲主goroutine(main函數)執行太快了,因此致使newTask還沒執行,程序就退出了
package main import ( "fmt" "time" ) func newTask() { i := 0 for { i++ fmt.Printf("new goroutine: i = %d\n", i) time.Sleep(time.Second) //延時1秒 } } func main() { //建立一個goroutine,啓動另一個任務 go newTask() fmt.Println("hello world") }
程序運行結果:
經過運行結果(運行了三次)能夠看出來,其中有一次newTask獲得了執行,可是也只輸出了一次程序就退出了。另外兩次newTask徹底徹底沒有執行就退出程序了。
runtime.Gosched() 用於讓出CPU時間片,讓出當前goroutine的執行權限,調度器安排其餘等待的任務運行,並在下次再得到cpu時間輪片的時候,從該出讓cpu的位置恢復執行。
有點像跑接力賽,A跑了一會碰到代碼runtime.Gosched() 就把接力棒交給B了,A歇着了,B繼續跑。
示例代碼:
package main import ( "fmt" "runtime" ) func main() { //建立一個goroutine go func(s string) { for i := 0; i < 2; i++ { fmt.Println(s) } }("world") for i := 0; i < 2; i++ { runtime.Gosched() //import "runtime"包 fmt.Println("hello") } /* 屏蔽runtime.Gosched()運行結果以下: hello hello 沒有runtime.Gosched()運行結果以下: world world hello hello */ }
以上程序的執行過程以下:
主協程進入main()函數,進行代碼的執行。當執行到go func()匿名函數時,建立一個新的協程,開始執行匿名函數中的代碼,主協程繼續向下執行,執行到runtime.Gosched( )時會暫停向下執行,直到其它協程執行完後,再回到該位置,主協程繼續向下執行。
調用 runtime.Goexit() 將當即終止當前 goroutine 執⾏,調度器確保全部已註冊 defer延遲調用被執行。
示例代碼:
package main import ( "fmt" "runtime" ) func main() { go func() { defer fmt.Println("A.defer") func() { defer fmt.Println("B.defer") runtime.Goexit()//終止當前 goroutine, import "runtime" fmt.Println("B") //不會執行 }() fmt.Println("A") //不會執行 }() //死循環,目的不讓主goroutine結束 for {} }
程序運行結果:
調用 runtime.GOMAXPROCS() 用來設置能夠並行計算的CPU核數的最大值,並返回以前的值。(默認是跑滿整個CPU)
示例代碼:
package main import ( "fmt" "runtime" ) func main() { //n := runtime.GOMAXPROCS(1) //第一次 測試 //打印結果: 111111111111111111111111110000000000000000000000000.... n := runtime.GOMAXPROCS(2) //第二次 測試 //打印結果: 1111111111111111111111110000000000000011111110000100000000111100001111 fmt.Println(n) for { go fmt.Print(0) fmt.Print(1) } }
在第一次執行runtime.GOMAXPROCS(1) 時,最多同時只能有一個goroutine被執行。因此會打印不少1。過了一段時間後,GO調度器會將其置爲休眠,並喚醒另外一個goroutine,這時候就開始打印不少0了,在打印的時候,goroutine是被調度到操做系統線程上的。
在第二次執行runtime.GOMAXPROCS(2) 時, 咱們使用了兩個CPU,因此兩個goroutine能夠一塊兒被執行,以一樣的頻率交替打印0和1。
channel是Go語言中的一個核心類型,能夠把它當作管道。併發核心單元經過它就能夠發送或者接收數據進行通信,這在必定程度上又進一步下降了編程的難度。
channel是一個數據類型,主要用來解決協程的同步問題以及協程之間數據共享(數據傳遞)的問題。
goroutine運行在相同的地址空間,所以訪問共享內存必須作好同步。goroutine 奉行經過通訊來共享內存,而不是共享內存來通訊
。
引⽤類型 channel可用於多個 goroutine 通信。其內部實現了同步,確保併發安全。
和map相似,channel也一個對應make建立的底層數據結構的引用。
當咱們複製一個channel或用於函數參數傳遞時,咱們只是拷貝了一個channel引用,所以調用者和被調用者將引用同一個channel對象。和其它的引用類型同樣,channel的零值也是nil。
定義一個channel時,也須要定義發送到channel的值的類型。channel可使用內置的make()函數來建立:
chan是建立channel所需使用的關鍵字。Type 表明指定channel收發數據的類型。
make(chan Type) //等價於make(chan Type, 0) make(chan Type, capacity)
當咱們複製一個channel或用於函數參數傳遞時,咱們只是拷貝了一個channel引用,所以調用者和被調用者將引用同一個channel對象。和其它的引用類型同樣,channel的零值也是nil。
當 參數capacity= 0 時,channel 是無緩衝阻塞讀寫的;當capacity > 0 時,channel 有緩衝、是非阻塞的,直到寫滿 capacity個元素才阻塞寫入。
channel很是像生活中的管道,一邊能夠存放東西,另外一邊能夠取出東西。channel經過操做符 <- 來接收和發送數據,發送和接收數據語法:
channel <- value //發送value到channel <- channel //取出channel裏的一個值並丟棄 x := <-channel //從channel中接收數據,並賦值給x x, ok := <-channel //功能同上,同時檢查通道是否已關閉或者是否爲空
默認狀況下,無緩衝的channel接收和發送數據都是阻塞的,除非另外一端已經準備好,這樣就使得goroutine同步變的更加的簡單,而不須要顯式的lock。
示例代碼:
package main import "fmt" func main() { c := make(chan int) go func() { defer fmt.Println("子協程結束") fmt.Println("子協程正在運行.....") c <- 666 //666發送到c }() num := <-c //從c中接收數據,並賦值給num fmt.Println("num = ", num) fmt.Println("main協程結束") }
程序運行結果:
無緩衝的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。
這種類型的通道要求發送goroutine和接收goroutine同時準備好,才能完成發送和接收操做。不然,通道會致使先執行發送或接收操做的 goroutine 阻塞等待。
這種對通道進行發送和接收的交互行爲自己就是同步的。其中任意一個操做都沒法離開另外一個操做單獨存在。
阻塞:因爲某種緣由數據沒有到達,當前協程(線程)持續處於等待狀態,直到條件知足,才接觸阻塞。
同步:在兩個或多個協程(線程)間,保持數據內容一致性的機制。
下圖展現兩個 goroutine 如何利用無緩衝的通道來共享一個值:
無緩衝的channel建立格式:
make(chan Type) //等價於make(chan Type, 0)
若是沒有指定緩衝區容量,那麼該通道就是同步的,所以會阻塞到發送者準備好發送和接收者準備好接收。
示例代碼:
package main import ( "fmt" "time" ) func main() { c := make(chan int, 0) //建立無緩衝的通道c //內置函數len返回未被讀取的緩衝元素數量,cap返回緩衝區大小 fmt.Printf("len(c) = %d, cap(c) = %d\n", len(c), cap(c)) go func() { defer fmt.Println("子協程結束") for i := 0; i < 3; i++ { c <- i fmt.Printf("子協程正在運行[%d]: len(c) = %d, cap(c) = %d\n", i, len(c), cap(c)) } }() time.Sleep(time.Second * 2) //延時2s for i := 0; i < 3; i++ { num := <-c //從c中接收數據,並複製給num fmt.Printf("num = %d\n", num) } fmt.Println("main協程結束") }
有緩衝的通道(buffered channel)是一種在被接收前能存儲一個或者多個數據值的通道。
這種類型的通道並不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動做的條件也不一樣。
只有通道中沒有能夠接收的值時,接收動做纔會阻塞。
只有通道沒有可用緩衝區容納被髮送的值時,發送動做纔會阻塞。
這致使有緩衝的通道和無緩衝的通道之間的一個很大的不一樣:無緩衝的通道保證進行發送和接收的 goroutine 會在同一時間進行數據交換;有緩衝的通道沒有這種保證。
示例以下:
有緩衝的channel建立格式:
make(chan Type, capacity)
若是給定了一個緩衝區容量,通道就是異步的。只要緩衝區有未使用空間用於發送數據,或還包含能夠接收的數據,那麼其通訊就會無阻塞地進行。
示例代碼:
package main import ( "fmt" "time" ) func main() { c := make(chan int, 3) //建立無緩衝的通道c //內置函數len返回未被讀取的緩衝元素數量,cap返回緩衝區大小 fmt.Printf("len(c) = %d, cap(c) = %d\n", len(c), cap(c)) go func() { defer fmt.Println("子協程結束") for i := 0; i < 3; i++ { c <- i fmt.Printf("子協程正在運行[%d]: len(c) = %d, cap(c) = %d\n", i, len(c), cap(c)) } }() time.Sleep(time.Second * 2) //延時2s for i := 0; i < 3; i++ { num := <-c //從c中接收數據,並複製給num fmt.Printf("num = %d\n", num) } fmt.Println("main協程結束") }
若是發送者知道,沒有更多的值須要發送到channel的話,那麼讓接收者也能及時知道沒有多餘的值可接收將是有用的,由於接收者能夠中止沒必要要的接收等待。這能夠經過內置的close函數來關閉channel實現。
示例代碼:
注意:
可使用 range 來迭代不斷操做channel:
package main import "fmt" func main() { c := make(chan int) go func() { for i := 0; i < 5; i++ { c <- i } //把close(c)註釋掉,程序會死鎖 //close(c) }() for { //ok爲true說明channel沒有關閉,爲false說明channel已經關閉 if data, ok := <-c; ok { fmt.Println(data) } else { break } } fmt.Println("main協程結束") }
有的時候咱們會將通道做爲參數在多個任務函數間傳遞,不少時候咱們在不一樣的任務函數中使用通道都會對其進行限制,好比限制通道在函數中只能發送或只能接收。
Go語言中提供了單向通道
來處理這種狀況。例如,咱們把上面的例子改造以下:
package main import ( "fmt" ) func counter(in chan<- int) { for i := 0; i < 100; i++ { in <- i } close(in) } func squarer(in chan<- int, out <-chan int) { for i := range out { in <- i * i } close(in) } func printer(in <-chan int) { for i := range in { fmt.Println(i) } } func main() { ch1 := make(chan int) ch2 := make(chan int) go counter(ch1) go squarer(ch2, ch1) printer(ch2) }
其中
chan<- int
是一個只能發送的通道,能夠發送可是不能接收;<-chan int
是一個只能接收的通道,能夠接收可是不能發送。在函數傳參及任何賦值操做中將雙向通道轉換爲單向通道是能夠的,但反過來是不能夠的。
Timer是一個定時器,表明將來的一個單一事件,你能夠告訴timer你須要等待多長事件,它提供一個channel,在未來的那個時間那個channel提供了一個時間值。
它提供一個channel,在定時時間到達以前,沒有數據寫入timer.C會一直阻塞。直到定時時間到,向channel寫入值,阻塞解除,能夠從中讀取數據。
示例代碼:
package main import ( "fmt" "time" ) func main() { //建立定時器,2秒後,定時器就會向本身的C字段發送一個time.Time類型的元素值 timer1 := time.NewTimer(time.Second * 2) t1 := time.Now() //當前時間 fmt.Printf("t1: %v\n", t1) t2 := <-timer1.C fmt.Printf("t2: %v\n", t2) //若是隻是想單純的等待的話,可使用time.Sleep來實現 timer2 := time.NewTimer(time.Second * 2) <-timer2.C fmt.Println("2s後") time.Sleep(time.Second * 2) fmt.Println("再一次2s後") <-time.After(time.Second * 2) fmt.Println("再再一次2s後") timer3 := time.NewTimer(time.Second) go func() { <-timer3.C fmt.Println("Timer 3 expired") }() stop := timer3.Stop() //中止定時器 if stop { fmt.Println("Timer 3 stopped") } fmt.Println("before") timer4 := time.NewTimer(time.Second * 5) //原來設置5秒 timer4.Reset(time.Second * 1) //從新設置時間 <-timer4.C fmt.Println("after") }
定時器的經常使用操做:
(3) 延時2s後打印一句話
timer := time.NewTimer(time.Second * 2)
<-timer.C
fmt.Println("時間到")
定時器中止
timer := time.NewTimer(time.Second * 3) go func() { <-timer.C fmt.Println("子協程能夠打印了,由於定時器的時間到") }() timer.Stop() //中止定時器 for { }
定時器重置
timer := time.NewTimer(3 * time.Second) ok := timer.Reset(1 * time.Second) //從新設置爲1s fmt.Println("ok = ", ok) <-timer.C fmt.Println("時間到")
Ticker是一個定時觸發的計時器,它會以一個間隔(interval)往channel發送一個事件(當前時間),而channel的接受者能夠以固定的時間間隔從channel中讀取事件。
示例代碼:
package main import ( "fmt" "time" ) func main() { //建立定時器,每隔1s後,定時器就會給channel發送一個事件(當前時間) ticker := time.NewTicker(1 * time.Second) go func() { i := 0 for { //循環 <-ticker.C i++ fmt.Println("i =", i) if i == 5 { ticker.Stop() //中止定時器 } } }() //死循環,特意不讓main goroutine結束 for { } }
Go裏面提供了一個關鍵字select,經過select能夠監聽channel上的數據流動。
select的用法與switch語言很是相似,由select開始一個新的選擇塊,每一個選擇條件由case語句來描述。
與switch語句相比, select有比較多的限制,其中最大的一條限制就是每一個case語句裏必須是一個IO操做,大體的結構以下:
select { case <-chan1: // 若是chan1成功讀到數據,則進行該case處理語句 case chan2 <- 1: // 若是成功向chan2寫入數據,則進行該case處理語句 default: // 若是上面都沒有成功,則進入default處理流程 }
在一個select語句中,Go語言會按順序從頭到尾評估每個發送和接收的語句。
若是其中的任意一語句能夠繼續執行(即沒有被阻塞),那麼就從那些能夠執行的語句中任意選擇一條來使用。
若是沒有任意一條語句能夠執行(即全部的通道都被阻塞),那麼有兩種可能的狀況:
示例代碼:
package main import "fmt" func fibonacci(c, quit chan int) { x, y := 1, 1 for { select { case c <-x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } func main() { c := make(chan int) quit := make(chan int) go func() { for i := 0; i < 6; i++ { fmt.Println(<-c) } quit <- 0 }() fibonacci(c, quit) }
運行結果以下:
示例2:
package main import "fmt" func main() { var ch = make(chan int, 1) for i := 0; i < 10; i++ { select { case x := <-ch: fmt.Println(x) case ch <- i: } } }
使用select
語句能提升代碼的可讀性。
case
同時知足,select
會隨機選擇一個。case
的select{}
會一直等待,可用於阻塞main函數。有時候會出現goroutine阻塞的狀況,那麼咱們如何避免整個程序進入阻塞的狀況呢?咱們能夠利用select來設置超時,經過以下的方式實現:
package main import ( "fmt" "time" ) func main() { c := make(chan int) o := make(chan bool) go func() { for { select { case v := <-c: fmt.Println(v) case <-time.After(5 * time.Second): fmt.Println("timeout") o <- true break } } }() //c <- 666 //註釋掉,引起timeout <-o }
前面咱們爲了解決協程同步的問題咱們使用了channel,可是GO也提供了傳統的同步工具。
它們都在GO的標準庫代碼包sync和sync/atomic中。
什麼是鎖呢?就是某個協程(線程)在訪問某個資源時先鎖住,防止其它協程的訪問,等訪問完畢解鎖後其餘協程再來加鎖進行訪問。這和咱們生活中加鎖使用公共資源類似,例如:公共衛生間。
死鎖是指兩個或兩個以上的進程在執行過程當中,因爲競爭資源或者因爲彼此通訊而形成的一種阻塞的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖
示例代碼:
package main import "fmt" func main() { ch := make(chan int) ch <- 1 // I'm blocked because there is no channel read yet. fmt.Println("send") go func() { <-ch // I will never be called for the main routine is blocked! fmt.Println("received") }() fmt.Println("over") }
每一個資源都對應於一個可稱爲 "互斥鎖" 的標記,這個標記用來保證在任意時刻,只能有一個協程(線程)訪問該資源。其它的協程只能等待。
互斥鎖是傳統併發編程對共享資源進行訪問控制的主要手段,它由標準庫sync中的Mutex結構體類型表示。sync.Mutex類型只有兩個公開的指針方法,Lock和Unlock。Lock鎖定當前的共享資源,Unlock進行解鎖。
在使用互斥鎖時,必定要注意:對資源操做完成後,必定要解鎖,不然會出現流程執行異常,死鎖等問題。
有時候在Go代碼中可能會存在多個goroutine
同時操做一個資源(臨界區),這種狀況會發生競態問題
(數據競態)。類比現實生活中的例子有十字路口被各個方向的的汽車競爭;還有火車上的衛生間被車箱裏的人競爭。
舉個例子:
package main import ( "fmt" "sync" ) var ( x int64 wg sync.WaitGroup ) func add() { defer wg.Done() for i := 0; i < 5000; i++ { x = x + 1 } } func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println(x) }
上面的代碼中咱們開啓了兩個goroutine
去累加變量x的值,這兩個goroutine
在訪問和修改x
變量的時候就會存在數據競爭,致使最後的結果與期待的不符。
互斥鎖是一種經常使用的控制共享資源訪問的方法,它可以保證同時只有一個goroutine
能夠訪問共享資源。Go語言中使用sync
包的Mutex
類型來實現互斥鎖。 使用互斥鎖來修復上面代碼的問題:
package main import ( "fmt" "sync" ) var ( x int64 wg sync.WaitGroup lock sync.Mutex ) func add() { defer wg.Done() for i := 0; i < 5000; i++ { lock.Lock() //加鎖 x = x + 1 lock.Unlock() //解鎖 } } func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println(x) }
使用互斥鎖可以保證同一時間有且只有一個goroutine
進入臨界區,其餘的goroutine
則在等待鎖;當互斥鎖釋放後,等待的goroutine
才能夠獲取鎖進入臨界區,多個goroutine
同時等待一個鎖時,喚醒的策略是隨機的。
互斥鎖的本質是當一個goroutine訪問的時候,其餘goroutine都不能訪問。這樣在資源同步,避免競爭的同時也下降了程序的併發性能。程序由原來的並行執行變成了串行執行。
其實,當咱們對一個不會變化的數據只作「讀」操做的話,是不存在資源競爭的問題的。由於數據是不變的,無論怎麼讀取,多少goroutine同時讀取,都是能夠的。
因此問題不是出在「讀」上,主要是修改,也就是「寫」。修改的數據要同步,這樣其餘goroutine才能夠感知到。因此真正的互斥應該是讀取和修改、修改和修改之間,讀和讀是沒有互斥操做的必要的。
所以,衍生出另一種鎖,叫作讀寫鎖。
讀寫鎖可讓多個讀操做併發,同時讀取,可是對於寫操做是徹底互斥的。也就是說,當一個goroutine進行寫操做的時候,其餘goroutine既不能進行讀操做,也不能進行寫操做。
GO中的讀寫鎖由結構體類型sync.RWMutex
表示。此類型的方法集合中包含兩對方法:
一組是對寫操做的鎖定和解鎖,簡稱「寫鎖定」和「寫解鎖」:
func (*RWMutex)Lock() func (*RWMutex)Unlock()
另外一組表示對讀操做的鎖定和解鎖,簡稱爲「讀鎖定」與「讀解鎖」:
func (*RWMutex)RLock() func (*RWMutex)RUlock()
讀寫鎖基本示例:
package main import ( "fmt" "sync" "time" ) var ( x int64 wg sync.WaitGroup rwlock sync.RWMutex ) func write() { defer wg.Done() rwlock.Lock() //加寫鎖 x = x + 1 time.Sleep(time.Millisecond * 10) //假設寫操做耗時10毫秒 rwlock.Unlock() //解寫鎖 } func read() { defer wg.Done() rwlock.RLock() //加讀鎖 time.Sleep(time.Millisecond) //假設讀操做耗時一毫秒 rwlock.RUnlock() //解讀鎖 } func main() { var start = time.Now() for i := 0; i < 10; i++ { wg.Add(1) go write() } for i := 0; i < 1000; i++ { wg.Add(1) go read() } wg.Wait() var end = time.Now() fmt.Println(end.Sub(start)) }
咱們在read裏使用讀鎖,也就是RLock和RUnlock,寫鎖的方法名和咱們平時使用的同樣,是Lock和Unlock。這樣,咱們就使用了讀寫鎖,能夠併發地讀,可是同時只能有一個寫,而且寫的時候不能進行讀操做。
須要注意的是讀寫鎖很是適合讀多寫少的場景,若是讀和寫的操做差異不大,讀寫鎖的優點就發揮不出來。
總結:讀寫鎖控制下的多個寫操做之間都是互斥的,而且寫操做與讀操做之間也都是互斥的。可是,多個讀操做之間不存在互斥關係。
從互斥鎖和讀寫鎖的源碼能夠看出,它們是同源的。讀寫鎖的內部用互斥鎖來實現寫鎖定操做之間的互斥。能夠把讀寫鎖看做是互斥鎖的一種擴展。
在代碼中生硬的使用time.Sleep
確定是不合適的,Go語言中可使用sync.WaitGroup
來實現併發任務的同步。 sync.WaitGroup
有如下幾個方法:
方法名 | 功能 |
---|---|
(wg *WaitGroup) Add(delta int) | 計數器+delta |
(wg *WaitGroup) Done() | 計數器-1 |
(wg *WaitGroup) Wait() | 阻塞直到計數器變爲0 |
sync.WaitGroup
內部維護着一個計數器,計數器的值能夠增長和減小。例如當咱們啓動了N 個併發任務時,就將計數器值增長N。每一個任務完成時經過調用Done()方法將計數器減1。經過調用Wait()來等待併發任務執行完,當計數器值爲0時,表示全部併發任務已經完成。
咱們利用sync.WaitGroup
將上面的代碼優化一下:
package main import ( "fmt" "sync" ) func sayHello(wg *sync.WaitGroup) { defer wg.Done() fmt.Println("Hello") } func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go sayHello(&wg) } fmt.Println("main goroutine done!") wg.Wait() }
須要注意sync.WaitGroup
是一個結構體,傳遞的時候要傳遞指針。
說在前面的話:這是一個進階知識點。
在編程的不少場景下咱們須要確保某些操做在高併發的場景下只執行一次,例如只加載一次配置文件、只關閉一次通道等。
Go語言中的sync
包中提供了一個針對只執行一次場景的解決方案–sync.Once
。
sync.Once
只有一個Do
方法,其簽名以下:
func (o *Once) Do(f func()) {}
備註:若是要執行的函數f
須要傳遞參數就須要搭配閉包來使用。
延遲一個開銷很大的初始化操做到真正用到它的時候再執行是一個很好的實踐。由於預先初始化一個變量(好比在init函數中完成初始化)會增長程序的啓動耗時,並且有可能實際執行過程當中這個變量沒有用上,那麼這個初始化操做就不是必需要作的。咱們來看一個例子:
var icons map[string]image.Image func loadIcons() { icons = map[string]image.Image{ "left": loadIcon("left.png"), "up": loadIcon("up.png"), "right": loadIcon("right.png"), "down": loadIcon("down.png"), } } // Icon 被多個goroutine調用時不是併發安全的 func Icon(name string) image.Image { if icons == nil { loadIcons() } return icons[name] }
多個goroutine
併發調用Icon函數時不是併發安全的,現代的編譯器和CPU可能會在保證每一個goroutine
都知足串行一致的基礎上自由地重排訪問內存的順序。loadIcons函數可能會被重排爲如下結果:
func loadIcons() { icons = make(map[string]image.Image) icons["left"] = loadIcon("left.png") icons["up"] = loadIcon("up.png") icons["right"] = loadIcon("right.png") icons["down"] = loadIcon("down.png") }
在這種狀況下就會出現即便判斷了icons
不是nil也不意味着變量初始化完成了。考慮到這種狀況,咱們能想到的辦法就是添加互斥鎖,保證初始化icons
的時候不會被其餘的goroutine
操做,可是這樣作又會引起性能問題。
使用sync.Once
改造的示例代碼以下:
var icons map[string]image.Image var loadIconsOnce sync.Once func loadIcons() { icons = map[string]image.Image{ "left": loadIcon("left.png"), "up": loadIcon("up.png"), "right": loadIcon("right.png"), "down": loadIcon("down.png"), } } // Icon 是併發安全的 func Icon(name string) image.Image { loadIconsOnce.Do(loadIcons) return icons[name] }
下面是藉助sync.Once
實現的併發安全的單例模式:
package main import "sync" type singleton struct{} var ( instance *singleton once sync.Once ) func GetInstance() *singleton { once.Do(func() { instance = &singleton{} }) return instance }
sync.Once
其實內部包含一個互斥鎖和一個布爾值,互斥鎖保證布爾值和數據的安全,而布爾值用來記錄初始化是否完成。這樣設計就能保證初始化操做的時候是併發安全的而且初始化操做也不會被執行屢次。
Go語言中內置的map不是併發安全的。請看下面的示例:
package main import ( "fmt" "strconv" "sync" ) var m = make(map[string]int) func get(key string) int { return m[key] } func set(key string, value int) { m[key] = value } func main() { var wg = sync.WaitGroup{} for i := 0; i < 20; i++ { wg.Add(1) go func(n int) { defer wg.Done() key := strconv.Itoa(n) set(key, n) fmt.Printf("k: %v, v: %v\n", key, get(key)) }(i) } wg.Wait() }
上面的代碼開啓少許幾個goroutine
的時候可能沒什麼問題,當併發多了以後執行上面的代碼就會報fatal error: concurrent map writes
錯誤。
像這種場景下就須要爲map加鎖來保證併發的安全性了,Go語言的sync
包中提供了一個開箱即用的併發安全版map–sync.Map
。開箱即用表示不用像內置的map同樣使用make函數初始化就能直接使用。同時sync.Map
內置了諸如Store
、Load
、LoadOrStore
、Delete
、Range
等操做方法。
package main import ( "fmt" "strconv" "sync" ) func main() { var m = sync.Map{} var wg = sync.WaitGroup{} for i := 0; i < 40; i++ { wg.Add(1) go func(n int) { defer wg.Done() key := strconv.Itoa(n) m.Store(key, n) value, _ := m.Load(key) fmt.Printf("k: %v, v: %v\n", key, value) }(i) } wg.Wait() }
代碼中的加鎖操做由於涉及內核態的上下文切換會比較耗時、代價比較高。針對基本數據類型咱們還可使用原子操做來保證併發安全,由於原子操做是Go語言提供的方法它在用戶態就能夠完成,所以性能比加鎖操做更好。Go語言中原子操做由內置的標準庫sync/atomic
提供。
方法 | 解釋 |
---|---|
func LoadInt32(addr int32) (val int32) func LoadInt64(addr int64) (val int64) func LoadUint32(addr uint32) (val uint32) func LoadUint64(addr uint64) (val uint64) func LoadUintptr(addr uintptr) (val uintptr) func LoadPointer(addr unsafe.Pointer) (val unsafe.Pointer) |
讀取操做 |
func StoreInt32(addr int32, val int32) func StoreInt64(addr int64, val int64) func StoreUint32(addr uint32, val uint32) func StoreUint64(addr uint64, val uint64) func StoreUintptr(addr uintptr, val uintptr) func StorePointer(addr unsafe.Pointer, val unsafe.Pointer) |
寫入操做 |
func AddInt32(addr int32, delta int32) (new int32) func AddInt64(addr int64, delta int64) (new int64) func AddUint32(addr uint32, delta uint32) (new uint32) func AddUint64(addr uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) |
修改操做 |
func SwapInt32(addr int32, new int32) (old int32) func SwapInt64(addr int64, new int64) (old int64) func SwapUint32(addr uint32, new uint32) (old uint32) func SwapUint64(addr uint64, new uint64) (old uint64) func SwapUintptr(addr uintptr, new uintptr) (old uintptr) func SwapPointer(addr unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) |
交換操做 |
func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) |
比較並交換操做 |
咱們經過一個示例來比較下互斥鎖和原子操做的性能。
package main import ( "fmt" "sync" "sync/atomic" "time" ) type Counter interface { Inc() Load() int64 } //普通版 type NormalCounter struct { x int64 } func (normal *NormalCounter) Inc() { normal.x++ } func (normal *NormalCounter) Load() int64 { return normal.x } //互斥鎖版 type MutexCounter struct { x int64 lock sync.Mutex } func (m *MutexCounter) Inc () { m.lock.Lock() m.x++ m.lock.Unlock() } func (m *MutexCounter) Load() int64 { m.lock.Lock() defer m.lock.Unlock() return m.x } //原子操做版 type AtomicCounter struct { x int64 } func (a *AtomicCounter) Inc() { atomic.AddInt64(&a.x, 1) } func (a *AtomicCounter) Load() int64 { return atomic.LoadInt64(&a.x) } func test(c Counter) { var wg sync.WaitGroup var start = time.Now() for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() c.Inc() }() } wg.Wait() var end = time.Now() fmt.Printf("執行用時: %v, 結果爲: %v\n", end.Sub(start), c.Load()) } func main() { var c1 = NormalCounter{} // 非併發安全 test(&c1) var c2 = MutexCounter{} // 使用互斥鎖實現併發安全 test(&c2) var c3 = AtomicCounter{} // 併發安全且比互斥鎖效率更高 test(&c3) }
atomic
包提供了底層的原子級內存操做,對於同步算法的實現頗有用。這些函數必須謹慎地保證正確使用。除了某些特殊的底層應用,使用通道或者sync包的函數/類型實現同步更好。