本文的主要內容是:算法
- 瞭解goroutine,使用它來運行程序
- 瞭解Go是如何檢測並修正競爭狀態的(解決資源互斥訪問的方式)
- 瞭解並使用通道chan來同步goroutine
Go的併發能力,是指讓某個函數獨立於其餘函數運行的能力。當爲一個函數建立goroutine
時,該函數將做爲一個獨立的工做單元,被 調度器 調度到可用的邏輯處理器上執行。Go的運行時調度器是個複雜的軟件,它作的工做大體是:緩存
參考The Go scheduler ,這裏較淺顯地說一下Go的運行時調度器。操做系統會在物理處理器上調度操做系統線程
來運行,而Go語言的運行時會在邏輯處理器
上調度goroutine
來運行,每一個邏輯處理器都分別綁定到單個操做系統線程上。這裏涉及到三個角色:安全
每一個P會維護一個全局運行隊列(稱爲runqueue),處於ready就緒狀態的goroutine
(灰色G)被放在這個隊列中等待被調度。在編寫程序時,每當go func
啓動一個goroutine
時,runqueue
便在尾部加入一個goroutine
。在下一個調度點上,P就從runqueue
中取出一個goroutine
出來執行(藍色G)。併發
當某個操做系統線程M阻塞的時候(好比goroutine
執行了阻塞的系統調用),P能夠綁定到另一個操做系統線程M上,讓運行隊列中的其餘goroutine
繼續執行:函數
上圖中G0執行了阻塞操做,M0被阻塞,P將在新的系統線程M1上繼續調度G執行。M1有多是被新建立的,或者是從線程緩存中取出。Go調度器保證有足夠的線程來運行全部的P,語言運行時默認限制每一個程序最多建立10000個線程,這個如今能夠經過調用runtime/debug包的SetMaxThreads
方法來更改。工具
Go能夠在在一個邏輯處理器P上實現併發,若是須要並行,必須使用多於1個的邏輯處理器。Go調度器會把goroutine
平等分配到每一個邏輯處理器上,此時goroutine
將在不一樣的線程上運行,不過前提是要求機器擁有多個物理處理器。ui
使用關鍵字go
來建立一個goroutine
,並讓全部的goroutine
都獲得執行:atom
//example1.go package main import ( "runtime" "sync" "fmt" ) var ( wg sync.WaitGroup ) func main() { //分配一個邏輯處理器P給調度器使用 runtime.GOMAXPROCS(1) //在這裏,wg用於等待程序完成,計數器加2,表示要等待兩個goroutine wg.Add(2) //聲明1個匿名函數,並建立一個goroutine fmt.Printf("Begin Coroutines\n") go func() { //在函數退出時,wg計數器減1 defer wg.Done() //打印3次小寫字母表 for count := 0; count < 3; count++ { for char := 'a'; char < 'a'+26; char++ { fmt.Printf("%c ", char) } } }() //聲明1個匿名函數,並建立一個goroutine go func() { defer wg.Done() //打印大寫字母表3次 for count := 0; count < 3; count++ { for char := 'A'; char < 'A'+26; char++ { fmt.Printf("%c ", char) } } }() fmt.Printf("Waiting To Finish\n") //等待2個goroutine執行完畢 wg.Wait() }
這個程序使用runtime.GOMAXPROCS(1)
來分配一個邏輯處理器給調度器使用,兩個goroutine
將被該邏輯處理器調度併發執行。程序輸出:操作系統
Begin Coroutines Waiting To Finish A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
從輸出來看,是先執行完一個goroutine
,再接着執行第二個goroutine
的,大寫字母所有打印完後,再打印所有的小寫字母。那麼,有沒有辦法讓兩個goroutine
並行執行呢?爲程序指定兩個邏輯處理器便可:線程
//修改成2個邏輯處理器 runtime.GOMAXPROCS(2)
此時執行程序,輸出爲:
Begin Coroutines Waiting To Finish A B C D E a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c F G H I J K L M N O P Q R S T U V W X d e f g h i j k l m n o p q r s Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z t u v w x y z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
那若是隻有1個邏輯處理器,如何讓兩個goroutine交替被調度?實際上,若是goroutine
須要很長的時間才能運行完,調度器的內部算法會將當前運行的goroutine
讓出,防止某個goroutine
長時間佔用邏輯處理器。因爲示例程序中兩個goroutine
的執行時間都很短,在爲引發調度器調度以前已經執行完。不過,程序也可使用runtime.Gosched()
來將當前在邏輯處理器上運行的goruntine
讓出,讓另外一個goruntine
獲得執行:
//example2.go package main import ( "runtime" "sync" "fmt" ) var ( wg sync.WaitGroup ) func main() { //分配一個邏輯處理器P給調度器使用 runtime.GOMAXPROCS(1) //在這裏,wg用於等待程序完成,計數器加2,表示要等待兩個goroutine wg.Add(2) //聲明1個匿名函數,並建立一個goroutine fmt.Printf("Begin Coroutines\n") go func() { //在函數退出時,wg計數器減1 defer wg.Done() //打印3次小寫字母表 for count := 0; count < 3; count++ { for char := 'a'; char < 'a'+26; char++ { if char=='k'{ runtime.Gosched() } fmt.Printf("%c ", char) } } }() //聲明1個匿名函數,並建立一個goroutine go func() { defer wg.Done() //打印大寫字母表3次 for count := 0; count < 3; count++ { for char := 'A'; char < 'A'+26; char++ { if char == 'K'{ runtime.Gosched() } fmt.Printf("%c ", char) } } }() fmt.Printf("Waiting To Finish\n") //等待2個goroutine執行完畢 wg.Wait() }
兩個goroutine
在循環的字符爲k/K的時候會讓出邏輯處理器,程序的輸出結果爲:
Begin Coroutines Waiting To Finish A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J a b c d e f g h i j K L M N O P Q R S T U V W X Y Z A B C D E F G H I J k l m n o p q r s t u v w x y z a b c d e f g h i j K L M N O P Q R S T U V W X Y Z k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
這裏大小寫字母果真是交替着輸出了。不過從輸出能夠看到,第一次輸出大寫字母時遇到K沒有讓出邏輯處理器,這是什麼緣由還不是很清楚,調度器的調度機制?
併發程序避免不了的一個問題是對資源的同步訪問。若是多個goroutine
在沒有互相同步的狀況下去訪問同一個資源,並進行讀寫操做,這時goroutine
就處於競爭狀態下:
//example3.go package main import ( "sync" "runtime" "fmt" ) var ( //counter爲訪問的資源 counter int64 wg sync.WaitGroup ) func addCount() { defer wg.Done() for count := 0; count < 2; count++ { value := counter //當前goroutine從線程退出 runtime.Gosched() value++ counter=value } } func main() { wg.Add(2) go addCount() go addCount() wg.Wait() fmt.Printf("counter: %d\n",counter) } //output: counter: 4 或者counter: 2
這段程序中,goroutine
對counter
的讀寫操做沒有進行同步,goroutine 1對counter的寫結果可能被goroutine 2所覆蓋。Go可經過以下方式來解決這個問題:
chan
有時候競爭狀態並不能一眼就看出來。Go 提供了一個很是有用的工具,用於檢測競爭狀態。使用方式是:
go build -race example4.go//用競爭檢測器標誌來編譯程序
./example4 //運行程序
工具檢測出了程序存在一處競爭狀態,並指出發生競爭狀態的幾行代碼是:
22 counter=value 18 value := counter 28 go addCount() 29 go addCount()
對整形變量或指針的同步訪問,可使用原子函數來進行。這裏使用原子函數來修復example4.go中的競爭狀態問題:
//example5.go package main import ( "sync" "runtime" "fmt" "sync/atomic" ) var ( //counter爲訪問的資源 counter int64 wg sync.WaitGroup ) func addCount() { defer wg.Done() for count := 0; count < 2; count++ { //使用原子操做來進行 atomic.AddInt64(&counter,1) //當前goroutine從線程退出 runtime.Gosched() } } func main() { wg.Add(2) go addCount() go addCount() wg.Wait() fmt.Printf("counter: %d\n",counter) } //output: counter: 4
這裏使用atomic.AddInt64
函數來對一個整形數據進行加操做,另一些有用的原子操做還有:
atomic.StoreInt64() //寫 atomic.LoadInt64() //讀
更多的原子操做函數請看atomic
包中的聲明。
對臨界區的訪問,可使用互斥鎖來進行。對於example4.go的競爭狀態,可使用互斥鎖來解決:
//example5.go package main import ( "sync" "runtime" "fmt" ) var ( //counter爲訪問的資源 counter int wg sync.WaitGroup mutex sync.Mutex ) func addCount() { defer wg.Done() for count := 0; count < 2; count++ { //加上鎖,進入臨界區域 mutex.Lock() { value := counter //當前goroutine從線程退出 runtime.Gosched() value++ counter = value } //離開臨界區,釋放互斥鎖 mutex.Unlock() } } func main() { wg.Add(2) go addCount() go addCount() wg.Wait() fmt.Printf("counter: %d\n", counter) } //output: counter: 4
使用Lock()
與Unlock()
函數調用來定義臨界區,在同一個時刻內,只有一個goroutine可以進入臨界區,直到調用Unlock()
函數後,其餘的goroutine纔可以進入臨界區。
在Go中解決共享資源安全訪問,更經常使用的使用通道chan。
Go語言採用CSP消息傳遞模型。經過在goroutine
之間傳遞數據來傳遞消息,而不是對數據進行加鎖來實現同步訪問。這裏就須要用到通道chan
這種特殊的數據類型。當一個資源須要在goroutine
中共享時,chan在goroutine
中間架起了一個通道。通道使用make
來建立:
unbuffered := make(char int) //建立無緩存通道,用於int類型數據共享 buffered := make(chan string,10)//建立有緩存通道,用於string類型數據共享 buffered<- "hello world" //向通道中寫入數據 value:= <-buffered //從通道buffered中接受數據
通道用於放置某一種類型的數據。建立通道時指定通道的大小,將建立有緩存的通道。無緩存通道是一種同步通訊機制,它要求發送goroutine
和接收goroutine
都應該準備好,不然會進入阻塞。
無緩存通道是同步的——一個goroutine
向channel寫入消息的操做會一直阻塞,直到另外一個goroutine
從通道中讀取消息。反過來也是,一個goroutine
從channel讀取消息的操做會一直阻塞,直到另外一個goroutine
向通道中寫入消息。《Go in action》中關於無緩存通道的解釋有一個很是棒的例子:網球比賽。在網球比賽中,兩位選手老是處在如下兩種狀態之一:要麼在等待接球,要麼在把球打向對方。球的傳遞可看爲通道中數據傳遞。下面這段代碼使用通道模擬了這個過程:
//example6.go package main import ( "sync" "fmt" "math/rand" "time" ) var wg sync.WaitGroup func player(name string, court chan int) { defer wg.Done() for { //若是通道關閉,那麼選手勝利 ball, ok := <-court if !ok { fmt.Printf("Player %s Won\n", name) return } n := rand.Intn(100) //隨機機率使某個選手Miss if n%13 == 0 { fmt.Printf("Player %s Missed\n", name) //關閉通道 close(court) return } fmt.Printf("Player %s Hit %d\n", name, ball) ball++ //不然選手進行擊球 court <- ball } } func main() { rand.Seed(time.Now().Unix()) court := make(chan int) //等待兩個goroutine都執行完 wg.Add(2) //選手1等待接球 go player("candy", court) //選手2等待接球 go player("luffic", court) //球進入球場(能夠開始比賽了) court <- 1 wg.Wait() } //output: Player luffic Hit 1 Player candy Hit 2 Player luffic Hit 3 Player candy Hit 4 Player luffic Hit 5 Player candy Missed Player luffic Won
有緩存的通道是一種在被接收前能存儲一個或者多個值的通道,它與無緩存通道的區別在於:無緩存的通道保證進行發送和接收的goroutine會在同一時間進行數據交換,有緩存的通道沒有這種保證。有緩存通道讓goroutine
阻塞的條件爲:通道中沒有數據可讀的時候,接收動做會被阻塞;通道中沒有區域容納更多數據時,發送動做阻塞。向已經關閉的通道中發送數據,會引起panic,可是goroutine
依舊能從通道中接收數據,可是不能再向通道里發送數據。因此,發送端應該負責把通道關閉,而不是由接收端來關閉通道。
(完)