多線程程序在單核上運行就是併發。golang
多線程程序在多核上運行就是並行。編程
Go主線程(有人直接稱爲線程/也能夠理解成進程):一個Go線程上,能夠起多個協程,協程是輕量級的線程[編譯器作優化]。緩存
Go協程的特色:有獨立的棧空間;共享程序堆空間;調度由用戶控制;協程是輕量級的線程。安全
請編寫一個程序,完成以下功能:
在主線程(能夠理解成進程)中,開啓一個goroutine, 該協程每隔1秒輸出 "hello,world"
在主線程中也每隔一秒輸出"hello,golang", 輸出10次後,退出程序
要求主線程和goroutine同時執行.
畫出主線程和協程執行流程圖服務器
package main import ( "fmt" "strconv" "time" ) func test() { for i := 1; i <= 10; i++ { fmt.Println("test() hello,world " + strconv.Itoa(i)) time.Sleep(time.Second) } } func main() { go test() //開協啓一個協程 for i := 1; i <= 10; i++ { fmt.Println(" main() hello,golang " + strconv.Itoa(i)) time.Sleep(time.Second) } }
主線程是一個物理線程,直接做用在cpu上的。是重量級的,很是耗費cpu資源。
協程從主線程開啓的,是輕量級的線程,是邏輯態。對資源消耗相對小。
Golang的協程機制是重要的特色,能夠輕鬆的開啓上萬個協程。其它編程語言的併發機制是通常基於線程的,開啓過多的線程,資源耗費大,這裏就突顯Golang在併發上的優點了。數據結構
M指的是Machine,一個M直接關聯了一個內核線程。由操做系統管理。 P指的是」processor」,表明了M所需的上下文環境,也是處理用戶級代碼邏輯的處理器。它負責銜接M和G的調度上下文,將等待執行的G與M對接。 G指的是Goroutine,其實本質上也是一種輕量級的線程。包括了調用棧,重要的調度信息,例如channel等。
P的數量由環境變量中的GOMAXPROCS決定,一般來講它是和核心數對應,例如在4Core的服務器上回啓動4個線程。G會有不少個,每一個P會將Goroutine從一個就緒的隊列中作Pop操做,爲了減少鎖的競爭,一般狀況下每一個P會負責一個隊列。
三者關係以下圖所示: 多線程
以上這個圖講的是兩個線程(內核線程)的狀況。一個M會對應一個內核線程,一個M也會鏈接一個上下文P,一個上下文P至關於一個「處理器」,一個上下文鏈接一個或者多個Goroutine。爲了運行goroutine,線程必須保存上下文。
上下文P(Processor)的數量在啓動時設置爲GOMAXPROCS環境變量的值或經過運行時函數GOMAXPROCS()。一般狀況下,在程序執行期間不會更改。上下文數量固定意味着只有固定數量的線程在任什麼時候候運行Go代碼。可使用它來調整Go進程到我的計算機的調用,例如4核PC在4個線程上運行Go代碼。
圖中P正在執行的Goroutine爲藍色的;處於待執行狀態的Goroutine爲灰色的,灰色的Goroutine造成了一個隊列runqueues。
Go語言裏,啓動一個goroutine很容易:go function就行,因此每有一個go語句被執行,runqueue隊列就在其末尾加入一個goroutine,一旦上下文運行goroutine直到調度點,它會從其runqueue中彈出goroutine,設置堆棧和指令指針並開始運行goroutine。併發
可否拋棄P(Processor),讓Goroutine的runqueues掛到M上呢?答案是不行,須要上下文的目的是:當遇到內核線程阻塞的時候能夠直接放開其餘線程。編程語言
一個很簡單的例子就是系統調用sysall,一個線程確定不能同時執行代碼和系統調用被阻塞,這個時候,此線程M須要放棄當前的上下文環境P,以即可以讓其餘的Goroutine被調度執行。函數
如上圖左圖所示,M0中的G0執行了syscall,而後就建立了一個M1(也有可能來自線程緩存),(轉向右圖)而後M0丟棄了P,等待syscall的返回值,M1接受了P,將繼續執行Goroutine隊列中的其餘Goroutine。
當系統調用syscall結束後,M0會「偷」一個上下文,若是不成功,M0就把它的Gouroutine G0放到一個全局的runqueue中,將本身置於線程緩存中並進入休眠狀態。全局runqueue是各個P在運行完本身的本地的Goroutine runqueue後用來拉取新goroutine的地方。P也會週期性的檢查這個全局runqueue上的goroutine,不然,全局runqueue上的goroutines可能得不到執行而餓死。
均衡的分配工做:按照以上的說法,上下文P會按期的檢查全局的goroutine隊列中的goroutine,以便本身在消費掉自身Goroutine隊列的時候有事可作。假如全局goroutine隊列中的goroutine也沒了呢?就從其餘運行的中的P的runqueue裏偷。
每一個P中的Goroutine不一樣致使他們運行的效率和時間也不一樣,在一個有不少P和M的環境中,不能讓一個P跑完自身的Goroutine就沒事可作了,由於或許其餘的P有很長的goroutine隊列要跑,得須要均衡。 該如何解決呢?Go的作法倒也直接,從其餘P中偷一半!
爲了充分利用多cpu的優點,在golang程序中能夠設置運行cpu數目。 go1.8後,默認讓程序運行在多個核上,能夠不用設置。go1.8以前,須要設置一下,能夠更高效的利用cpu。
package main import ( "fmt" "runtime" ) func main() { //獲取當前系統cpu的數量 num := runtime.NumCPU() //設置運行go程序的cpu數量 runtime.GOMAXPROCS(num) fmt.Println("cpu number = ", num) }
計算1-200的各個數的階乘,而且把各個數的階乘放入到map中。最後顯示出來。要求使用goroutine完成。
package main import ( "fmt" "time" ) var ( myMap = make(map[int]int, 10) ) func fac(n int) { res := 1 for i := 1; i <= n; i++ { res *= i } //將階乘的計算結果放到map中 myMap[n] = res } func main() { for i := 1; i <= 200; i++ { go fac(i) } time.Sleep(time.Second * 10) for i, v := range myMap { fmt.Printf("map[%d]=%d\n", i, v) } }
上述代碼由於沒有對全局變量myMap加鎖,所以會出現資源爭奪問題,代碼會出現錯誤,提示 concurrent map writes
不一樣goroutine之間如何通信:(1)、全局變量加入互斥鎖;(2)、使用管道channel來解決。
爲了解決上述代碼中存在的資源競爭問題,全局變量myMap加入互斥鎖。
package main import ( "fmt" "sync" "time" ) var ( myMap = make(map[int]int, 10) //聲明一個全局的互斥鎖, lock sync.Mutex ) func fac(n int) { res := 1 for i := 1; i <= n; i++ { res *= i } //將階乘的計算結果放到map中 //加鎖 lock.Lock() myMap[n] = res //釋放鎖 lock.Unlock() } func main() { for i := 1; i <= 20; i++ { go fac(i) } time.Sleep(time.Second * 10) lock.Lock() for i, v := range myMap { fmt.Printf("map[%d]=%d\n", i, v) } lock.Unlock() }
channle本質就是一個數據結構-隊列。
數據是先進先出【FIFO : first in first out】。
線程安全,多 goroutine 訪問時,不須要加鎖,就是說channel自己就是線程安全的。
channel有類型的,一個string的channel只能存放string類型數據。
var 變量名 chan 數據類型
var intChan chan int (intChan 用於存放 int 數據)
var mapChan chan map[int]string (mapChan 用於存放 map[int]string 類型)
var perChan chan Person
var perChanPtr chan *Person
channel是引用類型
channel必須初始化才能寫入數據, 即make後才能使用
管道是有類型的,intChan只能寫入整數int
package main import "fmt" func main() { //建立一個能夠存放3個int類型的管道 var intChan chan int intChan = make(chan int, 3) fmt.Printf("intChan的值=%v intChan自己的地址=%p\n", intChan, &intChan) //向管道寫入數據 intChan <- 10 num := 211 intChan <- num intChan <- 50 //向管道寫入數據時不能超過其容量 //intChan <- 80 //查看管道的長度和容量 fmt.Printf("channel len=%v cap=%v\n", len(intChan), cap(intChan)) //從管道中讀取數據 var n int n = <-intChan fmt.Println("n=", n) fmt.Printf("channel len=%v cap=%v\n", len(intChan), cap(intChan)) //在沒有使用協程的狀況下,若是管道的數據已經所有取出,再取就會報告deadlock num1 := <-intChan num2 := <-intChan //num3 := <-intChan fmt.Println("num1=", num1, "num2=", num2) }
channel中只能存放指定的數據類型
channle的數據放滿後,就不能再放入了
若是從channel取出數據後,能夠繼續放入
在沒有使用協程的狀況下,若是channel數據取完了,再取就會報dead lock
(1)、建立一個intChan,最多能夠存放3個int,存3個數據到intChan中,而後再取出這三個int。
package main import "fmt" func main() { var intChan chan int intChan = make(chan int, 10) //intChan容量是3,再存放會報告deadlock intChan <- 10 intChan <- 20 intChan <- 30 num1 := <-intChan num2 := <-intChan num3 := <-intChan //intChany已經沒有數據了,再取數據會報告deadlock fmt.Printf("num1=%v num2=%v num3=%v", num1, num2, num3) }
(2)、建立一個mapChan,最多能夠存放10個map[string]string的key-value,對這個chan進行寫入和讀取。
package main import "fmt" func main() { var mapChan chan map[string]string mapChan = make(chan map[string]string, 10) m1 := make(map[string]string, 20) m1["city1"] = "北京" m1["city2"] = "天津" m2 := make(map[string]string, 20) m2["hero1"] = "宋江" m2["hero2"] = "武松" mapChan <- m1 mapChan <- m2 mo1 := <-mapChan mo2 := <-mapChan fmt.Printf("mo1=%v\nmo2=%v", mo1, mo2) }
(3)、建立一個catChan,最多能夠存放10個Cat結構體變量,對這個chan進行寫入和讀取。
package main import "fmt" type Cat struct { Name string Age int } func main() { var catChan chan Cat catChan = make(chan Cat, 10) cat1 := Cat{Name: "tom", Age: 10,} cat2 := Cat{Name: "nancy", Age: 78,} catChan <- cat1 catChan <- cat2 c1 := <-catChan c2 := <-catChan fmt.Printf("c1=%v\nc2=%v", c1, c2) }
(4)、建立一個catChanPtr,最多能夠存放10個*Cat變量,對這個chan進行寫入和讀取。
package main import "fmt" type Cat struct { Name string Age int } func main() { var catChan chan *Cat catChan = make(chan *Cat, 10) cat1 := Cat{Name: "tom", Age: 10,} cat2 := Cat{Name: "nancy", Age: 78,} catChan <- &cat1 catChan <- &cat2 c1 := <-catChan c2 := <-catChan fmt.Printf("c1=%p\nc2=%p", c1, c2) }
(5)、建立一個allChan,最多能夠存放10個任意數據類型變量,對這個chan寫入和讀取。
package main import "fmt" type Cat struct { Name string Age int } func main() { var allChan chan interface{} allChan = make(chan interface{}, 10) cat1 := Cat{Name: "tom", Age: 10,} cat2 := Cat{Name: "nancy", Age: 78,} allChan <- &cat1 allChan <- &cat2 allChan <- 10 allChan <- "jack" c1 := <-allChan c2 := <-allChan v1 := <-allChan v2 := <-allChan fmt.Println(v1, v2, c1, c2) }
使用內置函數close能夠關閉channel, 當channel關閉後,就不能再向channel寫數據了,可是仍然能夠從該channel讀取數據。
package main import "fmt" func main() { intChan := make(chan int, 3) intChan <- 100 intChan <- 200 //關閉後不能再寫數據 close(intChan) //管道關閉以後,讀取數據時能夠的 n1 := <-intChan fmt.Println("n1=", n1) }
channel的遍歷
channel支持for--range的方式進行遍歷,請注意兩個細節
(1)、在遍歷時,若是channel沒有關閉,則回出現deadlock的錯誤
(2)、在遍歷時,若是channel已經關閉,則會正常遍歷數據,遍歷完後,就會退出遍歷。
package main import "fmt" func main() { intChan := make(chan int, 100) for i := 0; i < 100; i++ { intChan <- i * 2 } close(intChan) for v := range intChan { fmt.Println("v=", v) } }
使用goroutine和channel協調完成以下需求:
開啓一個writeData協程,向管道intChan中寫入50個整數;開啓一個readData協程,從管道intChan中讀取writeData寫入的數據。writeData和readData操做的是同一個管道,主線程須要等待writeData和readData協程都完成才能退出。
package main import "fmt" func writeData(intChan chan int) { for i := 1; i <= 50; i++ { intChan <- i fmt.Println("writeData ", i) } close(intChan) } func readData(intChan chan int, exitChan chan bool) { for { v, ok := <-intChan if !ok { break } fmt.Printf("readData 讀到數據=%v\n", v) } exitChan <- true close(exitChan) } func main() { intChan := make(chan int, 50) exitChan := make(chan bool, 1) go writeData(intChan) go readData(intChan, exitChan) for { _, ok := <-exitChan if !ok { break } } }
統計1-8000的數字中,哪些是素數?
package main import ( "fmt" "time" ) func putNum(intChan chan int) { for i := 1; i <= 8000; i++ { intChan <- i } close(intChan) } func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) { var flag bool for { time.Sleep(time.Millisecond * 10) num, ok := <-intChan if !ok { break } flag = true for i := 2; i < num; i++ { if num%i == 0 { flag = false break } } if flag { primeChan <- num } } fmt.Println("有一個primeNum協程由於取不到數據退出") exitChan <- true } func main() { intChan := make(chan int, 1000) primeChan := make(chan int, 2000) exitChan := make(chan bool, 4) go putNum(intChan) for i := 0; i < 4; i++ { go primeNum(intChan, primeChan, exitChan) } go func() { for i := 0; i < 4; i++ { <-exitChan } close(primeChan) }() for { res, ok := <-primeChan if !ok { break } fmt.Printf("素數=%d\n", res) } fmt.Println("main線程退出") }
(1)、默認狀況下,管道是雙向的,可讀寫的。channel能夠聲明爲只讀,或者只寫性質。
package main import "fmt" func main() { //聲明爲只寫 var intChan chan<- int intChan = make(chan int, 3) intChan <- 20 fmt.Println("intChan=", intChan) //聲明爲只讀 var stringChan <-chan string str := <-stringChan fmt.Println("str=", str) }
(2)、channel只讀和只寫的最佳實踐
package main import "fmt" func send(ch chan<- int, exitChan chan struct{}) { for i := 0; i < 10; i++ { ch <- i } close(ch) var a struct{} exitChan <- a } func recv(ch <-chan int, exitChan chan struct{}) { for { v, ok := <-ch if !ok { break } fmt.Println(v) } var a struct{} exitChan <- a } func main() { var ch chan int ch = make(chan int, 10) exitChan := make(chan struct{}, 2) go send(ch, exitChan) go recv(ch, exitChan) var total = 0 for _ = range exitChan { total++ if total == 2 { break } } fmt.Println("結束") }
(3)、使用select解決從管道中取數據的阻塞問題
package main import ( "fmt" "time" ) func main() { intChan := make(chan int, 10) for i := 0; i < 10; i++ { intChan <- i } stringChan := make(chan string, 5) for i := 0; i < 5; i++ { stringChan <- "hello" + fmt.Sprintf("%d", i) } //傳統的方法在遍歷管道時,若是不關閉管道會阻塞而致使deadlock //在實際開發中,很差肯定何時關閉管道。可使用select方式解決 for { select { //若是intChan一直沒有關閉,不會一直阻塞而deadlock,會自動到下一個case匹配 case v := <-intChan: fmt.Printf("從intChan讀取的數據%d\n", v) time.Sleep(time.Second) case v := <-stringChan: fmt.Printf("從stringChan讀取的數據%s\n", v) time.Sleep(time.Second) default: fmt.Printf("都取不到數據") time.Sleep(time.Second) return } } }
(4)、goroutine中使用recover,解決協程中出現panic致使程序崩潰問題
若是開啓一個協程,可是這個協程出現了panic,若是沒有捕獲這個panic,就會形成整個程序崩潰,這時能夠在goroutine中使用recover來捕獲panic進行處理。這樣即便這個協程發生問題,主線程仍然不受影響,能夠繼續執行。
package main import ( "fmt" "time" ) func sayHello() { for i := 0; i < 10; i++ { time.Sleep(time.Second) fmt.Println("hello,world") } } func test() { defer func() { if err := recover(); err != nil { fmt.Println("test() 發生錯誤", err) } }() var myMap map[int]string //error,沒有爲map申請內存 myMap[0] = "golang" } func main() { go sayHello() go test() for i := 0; i < 10; i++ { fmt.Println("main() ok=", i) time.Sleep(time.Second) } }