原理上的內容比較多,好比goroutine啓動的時候要執行哪些相關的操做,一點一點的補充一下。web
channel是go語言中的特殊的機制,既能夠同步兩個被併發執行的函數,又可讓這兩個函數經過相互傳遞特定類型的值來進行通訊。事實上這也是channel的兩個主要功能。算法
按照通道在初始化時是否有緩衝值,又能夠分爲緩衝通道與非緩衝通道。通道初始化的時候也仍是須要使用make進行,好比make(chan int,10)聲明一個緩衝空間爲10個int的通道,直接make(chan int)就是聲明一個非緩衝通道編程
直接採用內建函數close(strChan)就能夠關閉通道。應該保證在安全的狀況下進行關閉通道的操做。基本的原則:內建函數 len(strChan)能夠查看通道中當前有的元素的數量 cap(strChan)能夠查看通道的總的容量,總容量一旦初始化以後就不會再發生改變了。json
關於select語句的使用,在go語言中,執行select語句的時候,會自動地自上而下地判斷每一個case中的發送或者接受的操做能否被當即執行,便是說當前的Goroutine不會所以操做而被阻塞。select語句在執行的時候,會先對各個case中的表達式進行判斷求值,並且直到全部的求值操做都完成以後纔會考慮選其中的某個case去執行。這要依據當時通道的特性來判斷,當發現第一個知足選擇條件的case的時候,這個case中的語句就會被執行,其餘的語句就會被忽略,當有多個case都知足狀況的話,系統會根據一個僞隨機算法決定哪一個case會被執行。default是一個特殊的case,若是沒有合適的case的話,default中的語句就會被執行,若是select語句中沒有加上default語句,那麼若是此時沒有case符合條件的話,當前goroutine就會一直阻塞在當前的這一條select語句上。所以default:對於select而言是必要的。安全
一般select還會和for語句結合在一塊兒來使用,由於單獨的select操做只會被選擇一次,要想持續不斷地使用select從通道中讀出信息,仍是要和for結合在一塊兒使用。因而跳出多層循環的時候,特別是添加了超時控制的案例,能夠參考使用場景(2)中介紹的兩種方法.websocket
package main import ( "fmt" "time" ) func main() { ch := make(chan int, 1) sign := make(chan byte, 2) go func() { for i := 0; i < 5; i++ { ch <- i time.Sleep(1 * time.Second) } close(ch) fmt.Println("The channel is closed.") sign <- 0 }() go func() { //這個循環會一直嘗試從ch中讀取信息出來 即便ch已經被髮送端關閉 //但仍是能夠讀信息出來 最後當ok 爲false的時候 說明已經沒有數據從ch中讀出 //跳出循環 注意這種判斷方式 for { fmt.Printf("before extract channel len: %v ,", len(ch)) e, ok := <-ch fmt.Printf("channel value: %d if extract ok :(%v) after extraction channel len : %v channel cap : %v \n", e, ok, len(ch), cap(ch)) if !ok { break } time.Sleep(2 * time.Second) } fmt.Println("Done.") sign <- 1 }() //要是不添加兩次取值的操做的話 主進程就會立刻結束 這裏至關因而實現了一個 //同步的操做 等待兩個go func都結束以後 再結束主進程 注意這種技巧 <-sign <-sign } /*output: before extract channel len: 1 ,channel value: 0 if extract ok :(true) after extraction channel len : 0 channel cap : 1 before extract channel len: 1 ,channel value: 1 if extract ok :(true) after extraction channel len : 0 channel cap : 1 before extract channel len: 1 ,channel value: 2 if extract ok :(true) after extraction channel len : 0 channel cap : 1 before extract channel len: 1 ,channel value: 3 if extract ok :(true) after extraction channel len : 0 channel cap : 1 The channel is closed. before extract channel len: 1 ,channel value: 4 if extract ok :(true) after extraction channel len : 0 channel cap : 1 before extract channel len: 0 ,channel value: 0 if extract ok :(false) after extraction channel len : 0 channel cap : 1 Done. */
func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person { dests := make(chan Person, 100) go func() { for p :=range origs{ handler.Handle(&p) log.Printf("old value : %v\n", p) //time.Sleep(time.Second) dests <- p } fmt.Println("alll the info has been handled") close(dests) }() return dests }
注意 app.go文件夾中的 346 行左右開始地方的一個坑 注意time.After的返回值 因爲放在了for循環中 所以 每次會新new 一個 channel出來 還有注意跳出多層循環的方式
主要參考的是《Go併發編程實戰的相關內容》併發
代碼以下:app
package main import ( "fmt" "runtime" ) func main() { names := []string{"E", "H", "R", "J", "M"} for _, name := range names { go func() { fmt.Printf("Hello , %s \n", name) }() } //要是不添加runtime的話 就不會有內容輸出 //由於for循環執行速度太快了 直接循環結束跳出了最後的循環 //以後 for循環中生成的5個go func 會被分別進行調度 runtime.Gosched() } /* output Hello , M Hello , M Hello , M Hello , M Hello , M */
根據代碼能夠看出,具體循環的時候for循環中的go func 的調度並非按照想象的那樣,一次循環一個go func ,不要對go func的執行時機作任何假設。異步
一種思路是把runtime.Gosched()函數放在每次for循環結束的時候,這樣每次for循環以後,都會被從新調度一次,可能會出現正確的結果,並非每次都準確,好比go func的程序須要運行一段時間,在這段運行的時間以內,可能就已經循環了幾個元素過去了socket
package main import ( "fmt" "runtime" "time" ) func main() { names := []string{"E", "H", "R", "J", "M", "N", "O", "P"} for _, name := range names { go func() { time.Sleep(1000 * time.Nanosecond) fmt.Printf("Hello , %s \n", name) }() runtime.Gosched() } } /* output: Hello , E Hello , J Hello , J Hello , P Hello , P Hello , P */
還有一種思路是採用傳遞參數的方式,就是給goroutine帶上了參數,雖然goroutine已經脫離了main函數的控制,可是它已經帶上了main函數給的烙印,至關因而某種解耦的感受,for循環屢次就不會影響打印的結果了,好比下面代碼:
package main import ( "fmt" "runtime" "time" ) func main() { names := []string{"E", "H", "R", "J", "M", "N", "O", "P"} for _, name := range names { go func(who string) { time.Sleep(1000 * time.Nanosecond) fmt.Printf("Hello , %s \n", who) }(name) } runtime.Gosched() } /* output: Hello , E Hello , H Hello , R Hello , J Hello , M */
可是這個方法仍然頗有問題,只能保證在函數執行時間很短的時候結果正常,並且不輸出重複的內容,若是程序執行時間比較長的話,頗有可能main函數會被提早結束,按順序生成的多個goroutine在cpu那裏會不會仍然按照順序被調度執行?這個仍然不肯定?有幾個goroutine會不能被正常調度到而且執行,好比像上面的代碼的輸出樣子,並且每次輸出的結果也都是不肯定的。
編碼的時候遇到這樣一個場景,服務建立成功以後,須要等待ip被分配,ip被分配好以後,服務才正式部署成功,最後將全部的信息返回給前臺,因而打算這樣實現,在服務建立成功以後就不斷循環,查詢ip若是分配成功了就返回,若是超過了時間也返回失敗,最後這部分的代碼像下面這樣,
第一個例子中退出的方式採用的是標記的思路形式,每次循環結束的時候會檢查一下標記看看是否退出,第二個採用的是特殊的語法,直接跳出最外層的循環,注意這種時間控制的實現,仍是弄成一個defalt一個case比較好,因爲case的調度可能有隨機性,所以正常執行的內容放在default的部分,時間控制的那個channel放在某一個case當中。
package main import ( "fmt" "time" ) func main() { sign := make(chan int) chtemp := make(chan int, 5) go func() { for i := 0; i < 5; i++ { time.Sleep(time.Millisecond * 300) chtemp <- i } close(chtemp) }() var e int ok := true //new 一個新的channel返回 注意這裏要提早聲明好 t := time.After(time.Second) go func() { for { select { case <-t: fmt.Println("time out") ok = false break //注意這裏是使用 = 而不是 := default: e, ok = <-chtemp fmt.Printf("value : %v \n", e) if !ok { break } } if !ok { sign <- 1 break } } }() <-sign }
//一個時間控制的channel //注意這個要在循環以外單獨聲明 不然每次都會分配一個新的 time.After的channel返回過來 t := time.After(time.Second * 10) //注意這種跳出多層循環的操做方式 要是單層使用break的話 僅僅跳出的是 select 那一層的循環 A: for { select { //若是時間到了 就返回錯誤信息 case <-t: log.Println("time out to allocate ip") //delete the se which deploy failed a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json") http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+"deploy error : time out"+`"}`, 406) break A //若是時間沒到 就是 t 尚未發回信息 select語句就默認跳轉到default塊中 //執行查找ip是否分配的操做 default: //log.Println("logout:", <-timeout) sename := service.ObjectMeta.Labels["name"] podslist, err := a.Podip(sename) if err != nil { log.Println(err.Error()) a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json") http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+err.Error()+`"}`, 406) break A } if len(podslist) == 0 { continue } else { log.Println("allocation ok ......") a.Data["json"] = detail a.ServeJson() break A } } }
經常有這樣一種場景,把某些信息從舊的資源池中取出來,通過一些加工處理,再放入新的資源池中,這個過程若是按傳統的方式就是採用徹底串行的方式效率會很低,粒度太粗了,具體的粒度能夠細化以每次所取的單位資源爲粒度。
好比以書上p339爲例,有一個資源池存儲這person的信息,將每一個person從中取出來,以後進行一些處理,再存到新的資源池中,這裏用oldarray以及newarray來模擬舊的和新的資源池:
具體的代碼以下:
package main //參考go 併發編程實戰 p337 import ( "log" "strconv" "time" ) type Person struct { name string age int addr string } var oldpersonarray = [5]Person{} var newpersonarray = [5]Person{} type PersonHandler interface { Batch(origs <-chan Person) <-chan Person Handle(orig *Person) } //struct 實現了personhandler 接口 type PersonHandlerImpl struct{} //從origs接收信息 處理以後再返回給新的channel func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person { dests := make(chan Person, 100) go func() { for { p, ok := <-origs if !ok { close(dests) break } handler.Handle(&p) log.Printf("old value : %v\n", p) //time.Sleep(time.Second) dests <- p } }() return dests } //這裏要使用引用傳遞 func (handler PersonHandlerImpl) Handle(orig *Person) { orig.addr = "new address" } func getPersonHandler() PersonHandler { return &PersonHandlerImpl{} } //print the oldpersonarray into the chan<-Person func fetchPerson(origs chan<- Person) { for _, v := range oldpersonarray { //fmt.Printf("get the value : %v %v \n", k, v) time.Sleep(time.Second) origs <- v } close(origs) } //fetch the value from the channel and store it into the newpersonarray func savePerson(dest <-chan Person) <-chan int { intChann := make(chan int) go func() { index := 0 for { p, ok := <-dest if !ok { break } //time.Sleep(time.Second) log.Printf("new value transfer %v \n", p) newpersonarray[index] = p index++ } intChann <- 1 }() return intChann } func init() { //使用range的話是值傳遞 這裏要給oldpersonarray賦值進來 tmplen := len(oldpersonarray) for i := 0; i < tmplen; i++ { oldpersonarray[i].addr = "old address" oldpersonarray[i].age = i oldpersonarray[i].name = strconv.Itoa(i) } log.Printf("first print init value : %v\n", oldpersonarray) } func main() { handeler := getPersonHandler() origs := make(chan Person, 100) dests := handeler.Batch(origs) //go func() { fetchPerson(origs) }() // 不加go func的話 要等這句執行完 才能執行下一句 // 則orgis信息都輸出 徹底關閉掉 這個時候 從dest接收信息的語句纔開始執行 // 因此不會動態輸出 這句加上go func的話 就會沒隔 1s 動態輸出 // 若是將fetchPerson 再往前面放一句 則old value也不會動態輸出 fetchPerson(origs) sign := savePerson(dests) <-sign log.Printf("last print new value : %v \n", newpersonarray) }
總體的結構圖以下:
代碼基本分析: