goroutine和通道程序員
goroutine入門github
管道golang
用於描述兩個獨立的併發實體經過共享的通信 channel(管道)進行通訊的併發模型安全
Golang 就是借用CSP模型的一些概念爲之實現併發進行理論支持數據結構
process是在go語言上的表現就是 goroutine 是實際併發執行的實體,每一個實體之間是經過channel通信來實現數據共享。併發
MPG模型編程語言
M: 操做系統的主線程
P: 協程執行所須要的上下文
G:協程函數
一、Go主線程(線程或者叫進程):一個Go主線程,能夠起多個協程,協程就是輕量級的線程
二、Go協程的特色
有獨立的棧空間共享程序堆空間
調度由用戶控制
協程是輕量級的線程【編譯器作優化】
問題:
什麼是棧空間&堆空間?
棧空間?
編譯器自動分配釋放,存放函數的參數值,局部變量的值等,其操做方式相似於數據結構的棧。堆空間?
通常是由程序員分配釋放,若程序員不釋放的話,程序結束時可能由OS回收,值得注意的是他與數據結構的堆是兩回事,分配方式卻是相似於數據結構的鏈表
怎麼理解這段話?
注意咱們此處談到的堆和棧是對操做系統中的,這個和數據結構中的堆和棧仍是又必定區別的。
棧: 能夠簡單得理解成一次函數調用內部申請到的內存,它們會隨着函數的返回把內存還給系統。
func F() { temp := make([]int, 0, 20) ... }
相似於上面代碼裏面的temp變量,只是內函數內部申請的臨時變量,並不會做爲返回值返回,它就是被編譯器申請到棧裏面。
申請到 棧內存 好處:函數返回直接釋放,不會引發垃圾回收,對性能沒有影響。
再來看看堆得狀況之一以下代碼:
func F() []int{ a := make([]int, 0, 20) return a }
而上面這段代碼,申請的代碼如出一轍,可是申請後做爲返回值返回了,編譯器會認爲變量以後還會被使用,當函數返回以後並不會將其內存歸還,那麼它就會被申請到 堆 上面了。
申請到堆上面的內存纔會引發垃圾回收,若是這個過程(特指垃圾回收不斷被觸發)過於高頻就會致使 gc 壓力過大,程序性能出問題。
參考文獻:
後面我會單獨出一章介紹Golang 堆空間&棧空間理解
例子:
package main import ( "fmt" "strconv" "time" ) func test() { for i := 1; i <= 10; i++ { fmt.Println("test () hello world" + strconv.Itoa(i)) time.Sleep(time.Second) } } func main() { // test() go test() //開啓一個協程 for i := 1; i <= 10; i++ { fmt.Println("main () hello golang" + strconv.Itoa(i)) time.Sleep(time.Second) } }
運行結果:
main () hello golang1 test () hello world1 test () hello world2 main () hello golang2 test () hello world3 main () hello golang3 main () hello golang4 test () hello world4 test () hello world5 main () hello golang5 main () hello golang6 test () hello world6 test () hello world7 main () hello golang7 main () hello golang8 test () hello world8 main () hello golang9 test () hello world9 main () hello golang10 test () hello world10
運行結果: 說明main這個主線程和test協程同時運行
能夠畫個邏輯圖來講明這個狀況:
邏輯圖講解:
一、主線程是一個物理線程、直接做用在CPU上、是重量級的,很是耗費CPU資源
二、協程是主線程開啓的,是輕量級的線程,是邏輯態,對資源消耗相對小
三、Golang的協程機制是重要的特色,能夠輕鬆開啓上萬個協程
其餘編程語言的開發機制通常基於線程,開啓過多的線程,資源耗費大
這裏就凸顯了golang在併發上的優點了
注意:
一、Go1.8以前 要進行設置下 能夠更高效的利用CPU二、GO1.8以後 默認讓程序運行在多個核上 能夠不用設置
這裏使用的是go version go1.13.1
package main import ( "fmt" "runtime" ) func main() { cpuNum := runtime.NumCPU() fmt.Println("cpunum:", cpuNum) //能夠本身設置使用多個CPU runtime.GOMAXPROCS(cpuNum - 1) fmt.Println("ok") }
看一個例子來解釋爲何要用到管道這個技術?
package main import ( "fmt" "time" ) var ( myMap = make(map[int]int, 10) ) func test(n int) { res := 1 for i := 1; i <= n; i++ { res *= i } myMap[n] = res } func main() { for i := 1; i <= 200; i++ { go test(i) } time.Sleep(time.Second * 10) //遍歷結果 for i, v := range myMap { fmt.Printf("map[%d]=%d\n", i, v) } }
運行結果:
map[76]=0 map[81]=0 map[104]=0 map[117]=0 map[118]=0 map[124]=0 map[139]=0 map[153]=0 map[162]=0 map[2]=2 map[16]=20922789888000 ....
發現的問題:
多個協程 同時寫 會出現資源競爭
解決思路:
全局變量加鎖同步
沒有對全局變量加鎖,會出現資源競爭問題,代碼會報錯: concurrent map writes
加入互斥鎖
package main import ( "fmt" "sync" "time" ) var ( myMap = make(map[int]int, 10) //聲明全局互斥鎖 //lock 是一個全局互斥鎖 //sync 表示同步 //Mutex 表示互斥 lock sync.Mutex ) func test(n int) { res := 1 for i := 1; i <= n; i++ { res *= i } //加鎖 lock.Lock() myMap[n] = res //解鎖 lock.Unlock() } func main() { for i := 1; i <= 200; i++ { go test(i) } //休眠幾秒合適? time.Sleep(time.Second * 10) //遍歷結果 lock.Lock() for i, v := range myMap { fmt.Printf("map[%d]=%d\n", i, v) } lock.Unlock() }
遍歷結果也要加入鎖機制, 緣由:
程序從設計上能夠指定10秒執行了全部協程,可是主線程並不知道,所以底層可能仍然出現資源爭奪
前面使用全局變量加鎖解決 但不完美:
主要有三個地方:
1)主線程在等待全部gorouting所有完成的時間很難肯定,這裏設置了10秒,僅僅是估算2)若是主線程休眠時間長了,會加長等待時間
若是等待時間短了,可能還有goroutine處於工做狀態, 這時會隨着主線程的退出而銷燬
3)經過全局變量加鎖,也並不利用協程對全局變量的讀寫操做(不知道在哪裏加鎖、釋放鎖)
1.主要有下面幾個特色:
1.Channel本質就是一個數據結構 -隊列2.數據是先進先出
3.線程安全,多goroutine訪問時,不須要加鎖,就是說channel自己就是線程安全
4.channel是有類型的,一個string的channel只能存放string類型數據
2.基本使用:
定義 /聲明 channel var 變量 chan 數據類型 var intChan chan int 說明: 1)channel是引用類型 2)channel必須初始化才能寫入數據、即make後才能使用
3.例子
package main import ( "fmt" ) func main() { var intChan chan int intChan = make(chan int, 3) fmt.Printf("intChan的值=%v\n", intChan) //intChan的值=0xc00001a100 }
4.管道寫入
例子1:
package main import ( "fmt" ) func main() { var intChan chan int intChan = make(chan int, 3) fmt.Println() //管道寫入 intChan <- 10 num := 211 intChan <- num //管道長度和容量 fmt.Printf("channel len=%v cap=%v", len(intChan), cap(intChan)) }
例子2:
package main import ( "fmt" ) func main() { var intChan chan int intChan = make(chan int, 3) fmt.Println() //管道寫入 intChan <- 10 num := 211 intChan <- num //當寫入數據不能超過容量,超過報錯 intChan <- 50 intChan <- 98 //管道長度和容量 fmt.Printf("channel len=%v cap=%v", len(intChan), cap(intChan)) }
例子3:
package main import ( "fmt" ) func main() { var intChan chan int intChan = make(chan int, 3) fmt.Println() //管道寫入 intChan <- 10 num := 211 intChan <- num //當寫入數據不能超過容量 intChan <- 50 //管道長度和容量 fmt.Printf("channel len=%v cap=%v\n", len(intChan), cap(intChan)) //讀數據 var num2 int num2 = <-intChan fmt.Println("num2=", num2) fmt.Printf("channel len=%v cap=%v\n", len(intChan), cap(intChan)) //在沒有使用協程的狀況下,管道數據已經所有取出,再取就會報錯deadlock num3 := <-intChan num4 := <-intChan num5 := <-intChan fmt.Println("num3=", num3, "num4=", num4, "num5=", num5) }
5.管道細節總結:
1.channel只能存放指定的數據類型2.channel的數據放滿後,就不能再放入了
3.若是從channel取出數據後,能夠繼續放入
4.在沒有使用協程的狀況下,若是channel數據取完了再取, 就會報deadlock
6.channel的關閉
使用內置函數close能夠關閉channel,當channel關閉後 就不能再向channel寫數據
可是能夠從channel讀取數據
package main func main() { intChan := make(chan int, 3) intChan <- 100 intChan <- 200 close(intChan) intChan <- 300 //panic: send on closed channel }
7.channel的遍歷
支持for-range的方式來遍歷:
1.在遍歷時,若是channel沒有關閉,則出現deadlock
2.在遍歷時,若是channel已經關閉,會正常遍歷數據,遍歷完後會退出遍歷
package main import "fmt" func main() { intChan := make(chan int, 100) for i := 0; i < 100; i++ { intChan <- i * 2 } //遍歷,不能使用普通的for循環,取出來的不是值 // for i := 0; i < len(intChan); i++ { // fmt.Println("i=", i) // } //使用for-range循環,取出來的是值 close(intChan) for v := range intChan { fmt.Println("v=", v) } }
看一個例子:
package main import ( "fmt" ) //write data func writeData(intChan chan int) { for i := 1; i <= 50; i++ { intChan <- i fmt.Printf("writeData寫數據=%v\n", i) // time.Sleep(time.Second) } close(intChan) } //read data func readData(intChan chan int, exitChan chan bool) { for { v, ok := <-intChan if !ok { break } // time.Sleep(time.Second) fmt.Printf("readData 讀到數據=%v\n", v) } //任務完成 exitChan <- true close(exitChan) } func main() { //建立兩個管道 intChan := make(chan int, 10) exitChan := make(chan bool, 1) go readData(intChan, exitChan) go writeData(intChan) // time.Sleep(time.Second * 10) for { _, ok := <-exitChan if !ok { break } } }
再看一個例子:
package main import ( "fmt" ) func putNum(intChan chan int) { for i := 1; i <= 80; i++ { intChan <- i } //關閉intChan close(intChan) } func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) { var flag bool for { num, ok := <-intChan //intChan取不到 if !ok { break } flag = true //判斷是否是素數 for i := 2; i < num; i++ { //說明num不是素數 if num%i == 0 { flag = false break } } if flag { //放入primeChan primeChan <- num } } fmt.Println("有一個primeNum 協程由於取不到數據退出") //還不能關閉primeChan //向exitChan寫入true exitChan <- true } func main() { intChan := make(chan int, 1000) primeChan := make(chan int, 2000) //放入結果 exitChan := make(chan bool, 4) //退出管道 //開啓一個協程,向intChan寫入1-8000 go putNum(intChan) //開啓4個協程,從intChan取出數據,並判斷是否爲素數 //若是是,就放入到primeChan for i := 0; i < 4; i++ { go primeNum(intChan, primeChan, exitChan) } //主線程處理 go func() { for i := 0; i < 4; i++ { <-exitChan } //關閉primeChan close(primeChan) }() //遍歷primeChan for { res, ok := <-primeChan if !ok { break } //結果輸出 fmt.Printf("素數=%d\n", res) } fmt.Println("main主線程退出") }
運行結果:
有一個primeNum 協程由於取不到數據退出 有一個primeNum 協程由於取不到數據退出 有一個primeNum 協程由於取不到數據退出 有一個primeNum 協程由於取不到數據退出 素數=1 素數=2 素數=3 素數=5 素數=7 素數=11 素數=13 素數=17 素數=19 素數=23 素數=29 素數=31 素數=37 素數=41 素數=43 素數=47 素數=53 素數=59 素數=61 素數=67 素數=71 素數=73 素數=79 main主線程退出
這裏有個問題,就是結果顯示不對:
代碼裏面增長休眠時間
修改後:
package main import ( "fmt" "time" ) func putNum(intChan chan int) { for i := 1; i <= 80; i++ { intChan <- i } //關閉intChan close(intChan) } func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) { var flag bool for { time.Sleep(time.Millisecond) num, ok := <-intChan //intChan取不到 if !ok { break } flag = true //判斷是否是素數 for i := 2; i < num; i++ { //說明num不是素數 if num%i == 0 { flag = false break } } if flag { //放入primeChan primeChan <- num } } fmt.Println("有一個primeNum 協程由於取不到數據退出") //還不能關閉primeChan //向exitChan寫入true exitChan <- true } func main() { intChan := make(chan int, 1000) primeChan := make(chan int, 2000) //放入結果 exitChan := make(chan bool, 4) //退出管道 //開啓一個協程,向intChan寫入1-8000 go putNum(intChan) //開啓4個協程,從intChan取出數據,並判斷是否爲素數 //若是是,就放入到primeChan for i := 0; i < 4; i++ { go primeNum(intChan, primeChan, exitChan) } //主線程處理 go func() { for i := 0; i < 4; i++ { <-exitChan } //關閉primeChan close(primeChan) }() //遍歷primeChan for { res, ok := <-primeChan if !ok { break } //結果輸出 fmt.Printf("素數=%d\n", res) } fmt.Println("main主線程退出") }
運行結果:
素數=1 素數=2 素數=3 素數=5 素數=7 素數=11 素數=13 素數=17 素數=19 素數=23 素數=29 素數=31 素數=37 素數=41 素數=43 素數=47 素數=53 素數=59 素數=61 素數=67 素數=71 素數=73 素數=79 有一個primeNum 協程由於取不到數據退出 有一個primeNum 協程由於取不到數據退出 有一個primeNum 協程由於取不到數據退出 有一個primeNum 協程由於取不到數據退出 main主線程退出
1.普通方法
package main import ( "fmt" "time" ) func main() { start := time.Now().Unix() for num := 1; num <= 80000; num++ { flag := true //判斷是否是素數 for i := 2; i < num; i++ { //說明num不是素數 if num%i == 0 { flag = false break } } if flag { } } end := time.Now().Unix() fmt.Println("普通方法耗時=", end-start) //普通方法耗時= 3 }
2.使用了協程+管道
package main import ( "fmt" "time" ) func putNum(intChan chan int) { for i := 1; i <= 80000; i++ { intChan <- i } //關閉intChan close(intChan) } func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) { var flag bool for { // time.Sleep(time.Millisecond) num, ok := <-intChan //intChan取不到 if !ok { break } flag = true //判斷是否是素數 for i := 2; i < num; i++ { //說明num不是素數 if num%i == 0 { flag = false break } } if flag { //放入primeChan primeChan <- num } } fmt.Println("有一個primeNum 協程由於取不到數據退出") //還不能關閉primeChan //向exitChan寫入true exitChan <- true } func main() { intChan := make(chan int, 1000) primeChan := make(chan int, 20000) //放入結果 exitChan := make(chan bool, 4) //退出管道 start := time.Now().Unix() //開啓一個協程,向intChan寫入1-8000 go putNum(intChan) //開啓4個協程,從intChan取出數據,並判斷是否爲素數 //若是是,就放入到primeChan for i := 0; i < 4; i++ { go primeNum(intChan, primeChan, exitChan) } //主線程處理 go func() { for i := 0; i < 4; i++ { <-exitChan } end := time.Now().Unix() fmt.Println("使用協程耗時=", end-start) //使用協程耗時= 1 //關閉primeChan close(primeChan) }() //遍歷primeChan for { _, ok := <-primeChan // res, ok := <-primeChan if !ok { break } //結果輸出 // fmt.Printf("素數=%d\n", res) } fmt.Println("main主線程退出") }
3.優化版
在運行某個程序時,如何指定是否存在資源競爭問題?
方法很簡單,在編譯程序時,增長一個參數 -race
package main import ( "fmt" ) func main() { //管道能夠聲明只讀或者只寫 //1.在默認狀況下,管道是雙向 //var chan1 chan int //可讀可寫 //2 聲明爲只寫 var chan2 chan<- int chan2 = make(chan int, 3) chan2 <- 20 // num := <-chan2 //error //3 聲明爲只讀 var chan3 <-chan int chan3 = make(chan int, 3) // chan3 <- 20//error num := <-chan3 fmt.Println("chan2=", chan2) }
package main import ( "fmt" ) func main() { //使用select 能夠解決從管道取數據的阻塞問題 //1.定義一個管道 10個數據int intChan := make(chan int, 10) for i := 0; i < 10; i++ { intChan <- i } //2.定義一個管道 5個數據string StringChan := make(chan string, 5) for i := 0; i < 5; i++ { StringChan <- "hello" + fmt.Sprintf("%d", i) } //傳統方法在遍歷管道時候 若是不關閉會阻塞會致使deadlock //問題在實際開發中可能咱們很差肯定何時關閉管道 //可使用select 方法解決 // label: for { select { //注意:這裏若是intChan一直沒有關閉不會一直阻塞而deadlock //會自動到下一個case匹配 case v := <-intChan: fmt.Printf("從intChan讀取數據%d\n", v) case v := <-StringChan: fmt.Printf("從StringChan讀取數據%s\n", v) default: fmt.Printf("都取不到\n") // break label //跟label配合使用 return } } }
package main import ( "fmt" "time" ) func sayHello() { for i := 0; i < 10; i++ { time.Sleep(time.Second) fmt.Println("hello world") } } func test() { //使用defer+recover defer func() { //捕獲拋出的panic if err := recover(); err != nil { fmt.Println("test()發生錯誤", err) } }() var myMap map[int]string myMap[0] = "golang" } func main() { go sayHello() go test() for i := 0; i < 10; i++ { fmt.Println("main() ok=", i) } }