約束能夠減輕開發者的認知負擔以便寫出有更小臨界區的併發代碼。確保某一信息再併發過程當中僅能被其中之一的進程進行訪問。程序中一般存在兩種可能的約束:特定約束和詞法約束。程序員
經過公約實現約束,不管是由語言社區、你所在的團隊,仍是你的代碼庫設置。在 Go 語言官方默認安裝 gofmt 去格式化你的代碼,爭取讓你們都寫同樣的代碼golang
設計使用詞法做用域僅公開用於多個併發進程的正確數據和併發原語,這使得作錯事是不可能的,例如:Go 中 goroutine 和 channel ,而不是使用 Thread 包(不管是官方,第三方)。在 Go 的世界裏操做系統線程不用程序員管理,須要併發 go 就能夠了。緩存
在 Go 語言中你常常看到 for-select 循環。它的結構相似這樣的安全
for{ // 無限循環或者用 range 語句循環 select { // 使用 channel 的任務 } }
for _,v := range []string{"jisdf","jisdf","ier"}{ select { case <- done: return case stringChan <- v: // 作些什麼 } }
// 第一種保持 select 語句儘量短: // 若是完成的 channel 未關閉,咱們將退出 select 語句並繼續執行 for 循環 for { select { case <- done: return default: } // 非搶佔業務 } // 第二種將工做嵌入 select 的 default 中 // 若是完成的 channel 還沒有關閉,則執行 default 內容的任務 for { select { case <- done: return default: // 非搶佔業務 } }
線程安全,是每個程序員常常討論的話題。 在 Go 中對應的是 goroutine 協程,雖然 goroutine 開銷很是小,很是廉價,可是過多的 goroutine 未獲得釋放或終止,也是會消耗資源的。goroutine 有如下幾種方式被終止:多線程
前兩種方式很是簡單明瞭,而且隱含在你的程序中。那麼咱們如何來取消工做?Go 程序在運行時默認會有一個主 goroutine (main goroutine),他會將一些沒有工做的 goroutine 設置爲自旋,這會致使內存利用率的降低。思考下,既然 main goroutine 可以將其餘 goroutine 設置自旋,那麼它能不能通知其餘 goroutine 中止或退出呢?Of sure ,首先咱們須要一個 channel 輔助 main goroutine,它能夠包含多種指令,例如超時、異常、特定條件等 。它一般被命名爲 done,而且只讀。舉個例子:併發
doWork := func(done <- chan int ,s <-chan string) <-chan s{ terminated := make(chan int) go func () { // 當前函數 return 後打印一條信息用於驗證,for {} 死循環是否被終止 defer fmt.Println("doWork exited") defer close(termainted) for { select { case l := <- s: fmt.Println(l) case <- done: // 因爲 select 會相對均勻的挑選 case ,當 done 被讀取,則 return 跳出整個併發 return } } }() return terminated } // 建立控制併發的 channel done done := make(chan int) terminated := doWork(done, "a") // 啓動一個 goroutine 在 1s 後關閉 done channel go func() { time.Sleep(1 * time.Second) fmt.Println("取消工做的 goroutine") close(done) }() // main goroutine 中讀出 termainated 中的數據,驗證咱們是否成功通知工做的 goroutine 終止工做 <- terminated fmt.Println("Done")
當一個 goroutine 阻塞了向channel 進行寫入的請求,咱們能夠這樣作:框架
newRandstream := func(done <-chan interface{}) <- chan int{ randStream := make(chan int) go func(){ defer fmt.Println("newRanstream 關閉了") defer close(randStream) for{ select { case randStream <- rand.int(): case <-done: return } } }() return } done := make(chan interface{}) randStream := newRandStream(done) fmt.Println("遍歷三次") for i := 1; i<=3;i++{ fmt.Println("%d: %d\n",i,<-randStream) } close(done) // 模擬正在進行的工做,暫停 1s time.Sleap(1 * time.Second)
以上部分咱們瞭解到單一條件下如何取消 goroutine 防止泄露。若是咱們有多種條件觸發取消 goroutine ,咱們要怎麼辦呢?讓我來了解下 or-channel,建立一個複合 done channel 來處理這種複雜狀況。函數
咱們以使用更多的 goroutine 爲代價,實現了簡潔性。f(x)=x/2 ,其中 x 是 goroutine 的數量,但你要記住 Go 語言種的一個優勢就是可以快速建立,調度和運行 goroutine ,而且該語言積極鼓勵使用 goroutine 來正確建模問題。沒必要擔憂在這裏建立的 goroutine 的數量多是一個不成熟的優化。此外,若是在編譯時你不知道你正在使用多少個 done channel ,則將會沒有其餘方式能夠合併 done channel。post
說到錯誤處理,也許不少程序程序員以爲 Go 語言錯誤處理簡直太糟糕了。漫天的 if err != nil{}
,try catch 捕捉並打印錯誤多麼好。我要說首先咱們須要注意 Go 的併發模式,與其餘語言有着很大的區別。Go 項目開發者但願咱們將錯誤視爲一等公民,合併入咱們定義的消息體內,channel 中的數據被讀出的時候咱們進行判斷,程序併發過程當中是否出現錯誤。這避免了多進程多線程模型下,try catch 丟失一些報錯,在故障回顧的時候很是麻煩。性能
// 建議的消息體 type MyMessage struct{ Data string Err error }
讓錯誤成爲一等公民合併進你的結構體中,代碼也許會更易懂
type MyMessage struct{ N int Err error } func myfuncation(n string) MyMessage{ var mm MyMessage mm.N,mm.Err = anotherFunc(n) return mm } func anotherFunc(n string) (int,error){ i,err := strconv.Atoi(n) if err !=nil{ return i,err } return i,nil } func main(){ mymsg := myfuncation("Concurrency In GO") if mymsg.Err != nil{ // 這裏能夠換成其餘的 log 框架,部分 log 框架會自動識別 error 來源。例如:func (m *MyMessage) myfuncation() 這樣的函數就會被抓到錯誤來自於哪裏。 fmt.Println(mymsg.Err) } }
我曾經在祖傳代碼中見到一個約 2000 行的函數。我但願看見這篇文章的你,不要這麼作。咱們已經瞭解了數據如何在兩個或多個 goroutine 之間經過 channel 傳遞,那我咱們把這樣的程序用多個 channel組合在一塊兒,其中的每一次讀出,或寫入channel 都是這一環上的一個 stage(步),這就是 pipeline。Go 語言的併發模式,讓咱們很方便,快捷,安全的在一個進程中實現了流式處理。咱們來看一個官方 pipeline 的例子:
package main import ( "fmt" "sync" "time" ) func gen(nums ...int) <-chan int { genOut := make(chan int) go func() { for _, n := range nums { genOut <- n } fmt.Println("Input gen Channel number =>", len(genOut)) close(genOut) }() return genOut } func sq(done <-chan struct{}, in <-chan int) <-chan int { sqOut := make(chan int) go func() { // 這個 close(sqOut) 必定要先寫,執行的時候優先壓入棧,待函數執行完成關閉 sqOut channel defer close(sqOut) for n := range in { // 利用 select {} 均衡調度 channel select { case sqOut <- n * n: fmt.Printf("=> %v <= write into sqOut channel \n", n*n) case <-done: return } } //fmt.Printf("Wait close the chan => %v\n", len(sqOut)) }() return sqOut } // merge Fan-In 函數合併多個結果 func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup mergeOut := make(chan int, 1) output := func(c <-chan int) { defer wg.Done() for n := range c { select { case mergeOut <- n: case <-done: return } } } wg.Add(len(cs)) for _, c := range cs { go output(c) } go func() { wg.Wait() close(mergeOut) }() return mergeOut } // pfnumber 計算算數平方數 func pfnumber() { // 定義 don channel 用於終止 pipeline don := make(chan struct{}, 3) don <- struct{}{} don <- struct{}{} close(don) // 傳入 don 通知發送方中止發送 for n := range sq(don, sq(don, gen(3, 4, 2))) { fmt.Println("Last result ", n) } fmt.Println("============================================") } func fanInOut() { don := make(chan struct{}, 3) in := gen(2, 3) c1 := sq(don, in) c2 := sq(don, in) for n := range merge(don, c1, c2) { fmt.Println(n) } don <- struct{}{} don <- struct{}{} don <- struct{}{} fmt.Println("Finish channel len => ", len(don)) <-don close(don) } func f1(i chan int) { fmt.Println(<-i) } func runf1() { out := make(chan int) go f1(out) time.Sleep(2 * time.Second) out <- 2 time.Sleep(2 * time.Second) } func main() { //runf1() pfnumber() // FanIn and FanOut //fanInOut() }
簡單總結一下如何正確構建一個 pipeline:
扇出模式優先的場景:
扇入模式優先:
扇入意味着多個數據流複用或者合併成一個流。例如:上文 pipeline 中的 merge 函數,能夠經過打開 fanInOut() 函數執行一下試試。
在防止 goroutine 泄露,pipeline 中咱們都在函數執行過程當中嵌入了 done channel 以便終止須要中止的 goroutine。咱們能夠看出他們有個統一的特色,傳入 done ,jobChannel ,返回 resultChannel 。那麼咱們能夠把它封裝起來,像這樣:
orDone := func(done ,c <-chan interface{}) <- chan interface{}{ valStream := make(chan interface{}) go func(){ defer close(valStream) for { select{ case <- done: case v,ok := <- c: if ok == false{ return } select{ case valStream <- v: case <-done: } } } }() return valStream }
可能須要將同一個結果發送給兩個接收者,這個時候就須要用到 tee-channel 的方式。
應用場景:
tee := func(done <- chan interface{},in <-chan interface{} )(_,_ <- chan interface{}) { <-chan interface{}) { out1 := make(chan interface{}) out2 := make(chan interface{}) go func(){ defer close(out1) defer close(out2) for val := range orDone(done, in){ var out1,out2 = out1,out2 for i:=0;i<2; i++{ select{ case <- done: case out1 <- val: out1 = nil case out2 <- val: out2 = nil } } } }() return out1,out2 }
在 channel 中傳遞 channel 。筆者學術才淺,紙上談兵多,動手實踐少,着實想不到合適的場景,但願讀者能爲我補充一下。
隊列多是咱們第一次看見 channel 的感覺,這玩意一個隊列,很是具有隊列的特性。
隊列在什麼樣的狀況下能夠提高總體性能
在前文中常常會定義 done channel 的作法,防止 goroutine 泄露,或者主動中斷須要中止的 pipeline 。難道咱們每次構建 pipeline 的時候都要建立 done channel 嗎?答案是否認的,Go 團隊爲咱們準備了 context 包,專用於幹相似的工做。
type Context interface { // 當該 context 工做的 work 被取消時,返回超時時間 Deadline() (deadline time.Time, ok bool) // done 返回中止 pipeline 的 channel Done() <chan struct{} // error 一等公民。 // 若是 context 被取消,超時,返回取消,超時的緣由,以 error 形式返回。 Err() error // 返回與此 context 關聯的 key Value(key interface{}) interface{} }
context 包有兩個主要目的:
在 防止 goroutine 泄露中學到,函數中的取消有三個方面,context 包能夠幫你管理它:
Context.Value(key interface{}) ,因爲使用 interface{} 做爲函數參數,這裏咱們須要強調一下使用注意事項,及建議:
第四章能夠稱之爲全書核心章節,它將前面的部分總結概括,並造成不少的 Go 語言併發技巧講解,能夠幫助咱們寫出可維護的併發代碼。熟悉了這些併發模式,咱們能夠將多種模式組合,以幫助咱們編寫大型系統。
筆者能力優先,才疏學淺,但願讀者可以翻閱原書,深刻理解並充分運用在工做中。