Go語言之併發

Go語言直接支持內置支持併發。當一個函數建立爲goroutine時,Go會將其視爲一個獨立的工做單元。這個單元會被調度到可用的邏輯處理器上執行。安全

Go語言運行時的調度器是一個複雜的軟件,這個調度器在操做系統之上。操做系統的線程與語言運行時的邏輯處理器綁定,並在邏輯處理器上運行goroutine。併發

Go語言的併發同步邏輯來自一個叫作通訊順訊進程(CSP)的範型。CSP是一種消息傳遞模型,經過在goroutine之間傳遞數據來傳遞消息,而不是經過對數據進行加鎖來實現同步訪問。這種數據的類型叫作通道(channel) 。函數

併發與並行

在操做系統中,一個應用程序就能夠看做一個進程,而每一個進程至少包含一個線程。每一個進程的初始線程被稱爲主線程。post

操做系統會在物理處理器(CPU)上調度線程來運行,而Go語言會在邏輯處理器來調度goroutine來運行。1.5版本之上,Go語言的運行時默認會爲每一個可用的物理處理器分配一個邏輯處理器。1.5以前,默認給整個應用程序只分配一個邏輯處理器。性能

以下圖,在運行時把goroutine調度到邏輯處理器上運行,邏輯處理器綁定到惟一的操做系統線程。ui

表

當goroutine執行了一個阻塞的系統調用(就是一個非純CPU的任務)時,調度器會將這個線程與處理器分離,並建立一個新線程來運行這個處理器上提供的服務。atom

表2

語言運行默認限制每一個程序最多建立10000個線程。spa

注意併發≠並行!並行須要至少2個邏輯處理器。操作系統

goroutine

以併發的形式分別顯示大寫和小寫的英文字母線程

  1: package main
  2: 
  3: import (
  4: 	"fmt"
  5: 	"runtime"
  6: 	"sync"
  7: )
  8: 
  9: func main() {
 10: 	// 分配一個邏輯處理器給調度器使用
 11: 	runtime.GOMAXPROCS(1)
 12: 	// wg用來等待程序完成
 13: 	var wg sync.WaitGroup
 14: 	// 計數器加2,表示要等待兩個goroutine
 15: 	wg.Add(2)
 16: 	fmt.Println("Start!")
 17: 	// 聲明一個匿名函數,並建立一個goroutime
 18: 	go func() {
 19: 		// 通知main函數工做已經完成
 20: 		defer wg.Done()
 21: 		// 顯示字母表3次
 22: 		for count:=0; count<3;count++ {
 23: 			for char:='a';char<'a'+26;char++ {
 24: 				fmt.Printf("%c ", char)
 25: 			}
 26: 		}
 27: 	}()
 28: 	// 同上
 29: 	go func() {
 30: 		// 通知main函數工做已經完成
 31: 		defer wg.Done()
 32: 		// 顯示字母表3次
 33: 		for count:=0; count<3;count++ {
 34: 			for char:='A';char<'A'+26;char++ {
 35: 				fmt.Printf("%c ", char)
 36: 			}
 37: 		}
 38: 	}()
 39: 	// 等待goroutine結束
 40: 	fmt.Println("Waiting!")
 41: 	wg.Wait()
 42: 	fmt.Println("\nFinish!")
 43: }

運行結果後,能夠看到先輸出的是全部的大寫字母,最後纔是小寫字母。是由於第一個goroutine完成全部顯示須要花時間過短了,以致於在調度器切換到第二個goroutine以前,就完成了全部任務。

調度器爲了防止某個goroutine長時間佔用邏輯處理器,會中止當前正運行的goroutine,運行其餘可運行的goroutine運行的機會。

建立兩個相同的長時間才能完成其工做的goroutine就能夠看到,好比說顯示5000之內的素數值。

代碼結構以下

  1: go printPrime("A")
  2: go printPrime("B")
  3: 
  4: func printPrime(prefix string) {
  5: 	...
  6: }

結果相似

  1: B:2
  2: B:3
  3: ...
  4: B:4591
  5: A:3
  6: A:5
  7: ...
  8: A:4561
  9: A:4567
 10: B:4603
 11: B:4621
 12: ...
 13: // Completed B
 14: A:4457
 15: ...
 16: // Completed A

如何修改邏輯處理器的數量

  1: runtime.GOMAXPROCS(runtime.NUMCPU())

稍微改動下上面的代碼,結果就會大不一樣

  1: package main
  2: 
  3: import (
  4: "fmt"
  5: "runtime"
  6: "sync"
  7: )
  8: 
  9: func main() {
 10: 	// 分配兩個邏輯處理器給調度器使用
 11: 	runtime.GOMAXPROCS(2)
 12: 	// wg用來等待程序完成
 13: 	var wg sync.WaitGroup
 14: 	// 計數器加2,表示要等待兩個goroutine
 15: 	wg.Add(2)
 16: 	fmt.Println("Start!")
 17: 	// 聲明一個匿名函數,並建立一個goroutime
 18: 	go func() {
 19: 		// 通知main函數工做已經完成
 20: 		defer wg.Done()
 21: 		// 顯示字母表3次
 22: 		for count:=0; count<10;count++ {
 23: 			for char:='a';char<'a'+26;char++ {
 24: 				fmt.Printf("%c ", char)
 25: 			}
 26: 		}
 27: 	}()
 28: 	// 同上
 29: 	go func() {
 30: 		// 通知main函數工做已經完成
 31: 		defer wg.Done()
 32: 		// 顯示字母表3次
 33: 		for count:=0; count<10;count++ {
 34: 			for char:='A';char<'A'+26;char++ {
 35: 				fmt.Printf("%c ", char)
 36: 			}
 37: 		}
 38: 	}()
 39: 	// 等待goroutine結束
 40: 	fmt.Println("Waiting!")
 41: 	wg.Wait()
 42: 	fmt.Println("\nFinish!")
 43: }

結果相似下面的(根據CPU單核的性能結果可能結果稍微不同)

  1: Start!
  2: Waiting!
  3: a b c d e f g h i j k l m n o A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g
  4: h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
  5: a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s
  6: t u v w x y z M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X
  7: Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q
  8: R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
  9: Finish!
能夠發現,goroutine是並行運行的。

只有在有多個邏輯處理器且能夠同時讓每一個goroutine運行在一個可用的物理處理器上的時候,goroutine纔會並行運行。

競爭狀態

若是兩個或者多個goroutine在沒有互相同步的狀況下,訪問某個共享的資源,而且試圖同時讀和寫這個資源,就處於相互競爭的狀態。

在競爭狀態,每一個goroutine都會覆蓋另外一個goroutine的工做。這種覆蓋發生在goroutine發生切換的時候。

每一個goroutien都會創造本身的共享變量副本。當切換到領另外一個goroutine時,若是這個變量的值在上一個goroutine發生改變,這個goroutine再次運行時,雖然變量的值改變了,可是因爲這個goroutine沒有更新本身的那個副本的值,而是繼續使用,而且將其存回變量的值,從而覆蓋上一個goroutine 的工做。

go build –race用來競爭檢測器標誌來編譯程序

鎖住共享資源

原子函數

原子函數可以以底層的枷鎖機制來同步訪問整型變量和指針。省略部分代碼以下:

  1: var counter int64
  2: go incCounter(1)
  3: go incCounter(2)
  4: func incCounter(id int) {
  5: 	for count:=0;count<2;count++{
  6: 		//安全地對counter加1
  7: 		atomic.AddInt64(&counter, 1)
  8: 		//當前goroutine從線程退出,並放回隊列
  9: 		runtime.Gosched()
 10: 	}
 11: }

使用atmoic包的AddInt64函數。這些goroutine都會自動根據所引用的變量作同步處理。

另外兩個原子函數是LoadInt64和StoreInt64。用法以下:

  1: // shutdown是通知正在執行的goroutine中止工做的標誌
  2: var shutdown int64
  3: var wg sync.WaitGroup
  4: // 該中止工做了,安全地設置shutdown標誌
  5: atomic.StoreInt64(&shutdown, 1)
  6: // 等待goroutine結束
  7: wg.Wait()
  8: // 檢測是否中止工做,若是shutdown==1那麼goroutine就會終止
  9: if atomic.LoadInt64(&shutdown) == 1 {
 10: 	break
 11: }
 12: 

互斥鎖

另外一種同步訪問共享資源的方式是互斥鎖。主要代碼以下:

  1: var (
  2: 	// counter是全部goroutine都要增長其值的變量
  3: 	counter int
  4: 	wg sync.WaitGroup
  5: 	// mutex用來定義一段代碼臨界區
  6: 	mutex sync.Mutex
  7: )
  8: func main...
  9: // 業務代碼
 10: func incCounter(id int) {
 11: 	defer wg.Done()
 12: 	for i:=0;i<2;i++ {
 13: 		//同一時期只容許一個goroutine進入
 14: 		mutex.Lock()
 15: 		//大括號並非必須的
 16: 		{
 17: 			//捕獲counter的值
 18: 			value := counter
 19: 			//當前goroutine從線程退出,並返回到隊列
 20: 			runtime.Gosched()
 21: 			//增長本地value變量的值
 22: 			value++
 23: 			//將該值保存回counter
 24: 			counter = value
 25: 		}
 26: 		// 釋放鎖,容許其餘正在等待的goroutine
 27: 		mutex.Unlock()
 28: 	}
 29: }

通道

通道在goroutine之間架起了一個管道,並提供了確保同步交換數據的機制。聲明通道時,須要指定將要被共享的數據的類型。

能夠經過通道共享內置類型,命名類型,結構類型和引用類型的值或者指針。

go語言須要使用make來建立一個通道,chan是關鍵字:

  1: // 無緩衝的整型通道
  2: unbuffered := make(chan int)
  3: // 有緩衝的字符串通道
  4: buffered := make(chan string, 10)
向通道發送值
  1: buffered := make(chan string, 10)
  2: // 經過通道發送一個字符串
  3: buffered <- "Gopher"
  4: // 從通道接收一個字符串
  5: value := <-buffered

無緩衝的通道是指在接收前沒有能力保存任何值的通道。發送goroutine和接收goroutine同時準備好,才能完成發送和接收操做。若是沒有準備好,通道會致使goroutine阻塞等待。因此無緩衝通道保證了goroutine之間同一時間進行數據交換。

  1: // 四個goroutine間的接力比賽
  2: package main
  3: 
  4: import (
  5: 	"fmt"
  6: 	"sync"
  7: 	"time"
  8: )
  9: 
 10: var wg sync.WaitGroup
 11: 
 12: func main()  {
 13: 	//建立一個無緩衝的通道
 14: 	baton := make(chan int)
 15: 	wg.Add(1)
 16: 	// 第一步跑步者持有接力棒
 17: 	go Runner(baton)
 18: 	// 開始比賽
 19: 	baton <- 1
 20: 	// 等待比賽結束
 21: 	wg.Wait()
 22: }
 23: 
 24: // Ruuner模擬接力比賽中的一位跑步者
 25: func Runner(baton chan int) {
 26: 	var newRunner int
 27: 	// 等待接力棒
 28: 	runner := <-baton
 29: 	// 開始跑步
 30: 	fmt.Printf("運動員%d帶着Baton跑\n", runner)
 31: 	// 建立下一步跑步者
 32: 	if runner != 4{
 33: 		newRunner = runner + 1
 34: 		fmt.Printf("運動員%d上線\n", newRunner)
 35: 		go Runner(baton)
 36: 	}
 37: 	// 圍繞跑到跑
 38: 	time.Sleep(100 * time.Millisecond)
 39: 	// 比賽結束了嗎?
 40: 	if runner == 4{
 41: 		fmt.Printf("運動員%d完成,比賽結束\n", runner)
 42: 		wg.Done()
 43: 		return
 44: 	}
 45: 	// 將接力棒交給下一位跑步者
 46: 	fmt.Printf("運動員%d與運動員%d交換\n", runner, newRunner)
 47: 	baton <- newRunner
 48: }

結果

  1: 運動員1帶着Baton跑
  2: 運動員2上線
  3: 運動員1與運動員2交換
  4: 運動員2帶着Baton跑
  5: 運動員3上線
  6: 運動員2與運動員3交換
  7: 運動員3帶着Baton跑
  8: 運動員4上線
  9: 運動員3與運動員4交換
 10: 運動員4帶着Baton跑
 11: 運動員4完成,比賽結束

有緩衝的通道則能在接收前能存儲一個或者多個值的通道。這種類型的通道並不強制要求goroutine之間必須同時完成發送和接收。只有在通道沒有可用緩衝區或者沒有要接收的值時,發送或者接收纔會阻塞。

  1: package main
  2: 
  3: import (
  4: 	"fmt"
  5: 	"math/rand"
  6: 	"sync"
  7: 	"time"
  8: )
  9: 
 10: const (
 11: 	// goroutine的數量
 12: 	numberGoroutines = 4
 13: 	// 工做的數量
 14: 	taskLoad = 10
 15: )
 16: 
 17: var wg sync.WaitGroup
 18: 
 19: // 初始化隨機數種子
 20: func init() {
 21: 	rand.Seed(time.Now().Unix())
 22: }
 23: func main() {
 24: 	// 建立一個有緩衝的通道來管理工做
 25: 	tasks := make(chan string, taskLoad)
 26: 	wg.Add(numberGoroutines)
 27: 	// 增長一組要完成的工做
 28: 	for post:=1;post<taskLoad;post++ {
 29: 		tasks <- fmt.Sprintf("Task:%d", post)
 30: 	}
 31: 	// 啓動goroutine來處理工做
 32: 	for i:=1;i<numberGoroutines+1;i++ {
 33: 		go worker(tasks, i)
 34: 	}
 35: 	// 全部工做處理完時關閉通道
 36: 	close(tasks)
 37: 
 38: 	wg.Wait()
 39: 	fmt.Printf("all finished!")
 40: 
 41: }
 42: 
 43: func worker(tasks chan string, worker_id int) {
 44: 	defer wg.Done()
 45: 
 46: 	for {
 47: 		//等待分配工做
 48: 		task, ok := <-tasks
 49: 		if !ok {
 50: 			//通道變空
 51: 			fmt.Printf("Worker%d shut down\n", worker_id)
 52: 			return
 53: 		}
 54: 		// 開始工做
 55: 		fmt.Printf("Worker%d start %s\n", worker_id, task)
 56: 
 57: 		// 隨機等待一段時間
 58: 		sleep := rand.Int63n(100)
 59: 		time.Sleep(time.Duration(sleep)*time.Millisecond)
 60: 		// 顯示完成了工做
 61: 		fmt.Printf("Worker%d Completed %s\n", worker_id, task)
 62: 	}
 63: }
輸出結果:
  1: Worker4 start Task:1
  2: Worker1 start Task:2
  3: Worker2 start Task:3
  4: Worker3 start Task:4
  5: Worker3 Completed Task:4
  6: Worker3 start Task:5
  7: Worker4 Completed Task:1
  8: Worker4 start Task:6
  9: Worker2 Completed Task:3
 10: Worker2 start Task:7
 11: Worker3 Completed Task:5
 12: Worker3 start Task:8
 13: Worker2 Completed Task:7
 14: Worker2 start Task:9
 15: Worker3 Completed Task:8
 16: Worker3 shut down
 17: Worker4 Completed Task:6
 18: Worker4 shut down
 19: Worker1 Completed Task:2
 20: Worker1 shut down
 21: Worker2 Completed Task:9
 22: Worker2 shut down
 23: all finished!

因爲程序和Go語言的調度器有隨機的成分,結果每次都會不同。不過總流程不會大變。

當通道關閉後,goroutine依舊從通道里的緩衝區獲取數據,可是不能再向通道里發送數據。從一個已經關閉且沒有數據的通道里獲取數據,總會馬上返回,兵返回一個通道類型的零值。

關於實際工程裏的併發模式,下一篇再講。

相關文章
相關標籤/搜索