開發go程序的時候,時常須要使用goroutine併發處理任務,有時候這些goroutine是相互獨立的,而有的時候,多個goroutine之間經常是須要同步與通訊的。另外一種狀況,主goroutine須要控制它所屬的子goroutine,總結起來,實現多個goroutine間的同步與通訊大體有:
本文章經過goroutine同步與通訊的一個典型場景-通知子goroutine退出運行,來深刻講解下golang的控制併發。
goroutine做爲go語言的併發利器,不只性能強勁並且使用方便:只須要一個關鍵字go便可將普通函數併發執行,且goroutine佔用內存極小(一個goroutine只佔2KB的內存),因此開發go程序的時候不少開發者經常會使用這個併發工具,獨立的併發任務比較簡單,只須要用go關鍵字修飾函數就能夠啓用一個goroutine直接運行;可是,實際的併發場景經常是須要進行協程間的同步與通訊,以及精確控制子goroutine開始和結束,其中一個典型場景就是主進程通知名下全部子goroutine優雅退出運行。html
因爲goroutine的退出機制設計是,goroutine退出只能由自己控制,不容許從外部強制結束該goroutine。只有兩種狀況例外,那就是main函數結束或者程序崩潰結束運行;因此,要實現主進程控制子goroutine的開始和結束,必須藉助其它工具來實現。git
實現控制併發的方式,大體可分紅如下三類:github
這是最簡單的實現控制併發的方式,實現步驟是:golang
示例以下:web
package main import ( "fmt" "time" ) func main() { running := true f := func() { for running { fmt.Println("sub proc running...") time.Sleep(1 * time.Second) } fmt.Println("sub proc exit") } go f() go f() go f() time.Sleep(2 * time.Second) running = false time.Sleep(3 * time.Second) fmt.Println("main proc exit") }
全局變量的優點是簡單方便,不須要過多繁雜的操做,經過一個變量就能夠控制全部子goroutine的開始和結束;缺點是功能有限,因爲架構所致,該全局變量只能是多讀一寫,不然會出現數據同步問題,固然也能夠經過給全局變量加鎖來解決這個問題,但那就增長了複雜度,另外這種方式不適合用於子goroutine間的通訊,由於全局變量能夠傳遞的信息很小;還有就是主進程沒法等待全部子goroutine退出,由於這種方式只能是單向通知,因此這種方法只適用於很是簡單的邏輯且併發量不太大的場景,一旦邏輯稍微複雜一點,這種方法就有點捉襟見肘。數據庫
另外一種更爲通用且靈活的實現控制併發的方式是使用channel進行通訊。
首先,咱們先來了解下什麼是golang中的channel:Channel是Go中的一個核心類型,你能夠把它當作一個管道,經過它併發核心單元就能夠發送或者接收數據進行通信(communication)。
要想理解 channel 要先知道 CSP 模型:編程
CSP 是 Communicating Sequential Process 的簡稱,中文能夠叫作通訊順序進程,是一種併發編程模型,由 Tony Hoare 於 1977 年提出。簡單來講,CSP 模型由併發執行的實體(線程或者進程)所組成,實體之間經過發送消息進行通訊,這裏發送消息時使用的就是通道,或者叫 channel。CSP 模型的關鍵是關注 channel,而不關注發送消息的實體。Go 語言實現了 CSP 部分理論,goroutine 對應 CSP 中併發執行的實體,channel 也就對應着 CSP 中的 channel。
也就是說,CSP 描述這樣一種併發模型:多個Process 使用一個 Channel 進行通訊, 這個 Channel 連結的 Process 一般是匿名的,消息傳遞一般是同步的(有別於 Actor Model)。
先來看示例代碼:數組
package main import ( "fmt" "os" "os/signal" "sync" "syscall" "time" ) func consumer(stop <-chan bool) { for { select { case <-stop: fmt.Println("exit sub goroutine") return default: fmt.Println("running...") time.Sleep(500 * time.Millisecond) } } } func main() { stop := make(chan bool) var wg sync.WaitGroup // Spawn example consumers for i := 0; i < 3; i++ { wg.Add(1) go func(stop <-chan bool) { defer wg.Done() consumer(stop) }(stop) } waitForSignal() close(stop) fmt.Println("stopping all jobs!") wg.Wait() } func waitForSignal() { sigs := make(chan os.Signal) signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, syscall.SIGTERM) <-sigs }
這裏能夠實現優雅等待全部子goroutine徹底結束以後主進程才結束退出,藉助了標準庫sync裏的Waitgroup,這是一種控制併發的方式,能夠實現對多goroutine的等待,官方文檔是這樣描述的:安全
A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for.
Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.
簡單來說,它的源碼裏實現了一個相似計數器的結構,記錄每個在它那裏註冊過的協程,而後每個協程完成任務以後須要到它那裏註銷,而後在主進程那裏能夠等待直至全部協程完成任務退出。
使用步驟:網絡
該示例程序是一種golang的select+channel的典型用法,咱們來稍微深刻一點分析一下這種典型用法:
首先了解下channel,能夠理解爲管道,它的主要功能點是:
channel 實現集中在文件 runtime/chan.go 中,channel底層數據結構是這樣的:
type hchan struct { qcount uint // 隊列中數據個數 dataqsiz uint // channel 大小 buf unsafe.Pointer // 存放數據的環形數組 elemsize uint16 // channel 中數據類型的大小 closed uint32 // 表示 channel 是否關閉 elemtype *_type // 元素數據類型 sendx uint // send 的數組索引 recvx uint // recv 的數組索引 recvq waitq // 由 recv 行爲(也就是 <-ch)阻塞在 channel 上的 goroutine 隊列 sendq waitq // 由 send 行爲 (也就是 ch<-) 阻塞在 channel 上的 goroutine 隊列 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
從源碼能夠看出它其實就是一個隊列加一個鎖(輕量),代碼自己不復雜,但涉及到上下文不少細節,故而不易通讀,有興趣的同窗能夠去看一下,個人建議是,從上面總結的兩個功能點出發,一個是 ring buffer,用於存數據; 一個是存放操做(讀寫)該channel的goroutine 的隊列。
因爲涉及源碼較多,這裏就再也不深刻。
而後是select機制,golang 的 select 機制能夠理解爲是在語言層面實現了和 select, poll, epoll 類似的功能:監聽多個描述符的讀/寫等事件,一旦某個描述符就緒(通常是讀或者寫事件發生了),就可以將發生的事件通知給關心的應用程序去處理該事件。 golang 的 select 機制是,監聽多個channel,每個 case 是一個事件,能夠是讀事件也能夠是寫事件,隨機選擇一個執行,能夠設置default,它的做用是:當監聽的多個事件都阻塞住會執行default的邏輯。
select的源碼在runtime/select.go ,看的時候建議是重點關注 pollorder 和 lockorder
由於我對這部分源碼研究得也不是很深,故而點到爲止便可,有興趣的能夠去看看源碼啦!
具體到demo代碼:consumer爲協程的具體代碼,裏面是隻有一個不斷輪詢channel變量stop的循環,因此主進程是經過stop來通知子協程什麼時候該結束運行的,在main方法中,close掉stop以後,讀取已關閉的channel會馬上返回該channel數據類型的零值,所以子goroutine裏的<-stop操做會立刻返回,而後退出運行。
事實上,經過channel控制子goroutine的方法能夠總結爲:循環監聽一個channel,通常來講是for循環裏放一個select監聽channel以達到通知子goroutine的效果。再借助Waitgroup,主進程能夠等待全部協程優雅退出後再結束本身的運行,這就經過channel實現了優雅控制goroutine併發的開始和結束。
channel通訊控制基於CSP模型,相比於傳統的線程與鎖併發模型,避免了大量的加鎖解鎖的性能消耗,而又比Actor模型更加靈活,使用Actor模型時,負責通信的媒介與執行單元是緊耦合的–每一個Actor都有一個信箱。而使用CSP模型,channel是第一對象,能夠被獨立地建立,寫入和讀出數據,更容易進行擴展。
Context一般被譯做上下文,它是一個比較抽象的概念。在討論鏈式調用技術時也常常會提到上下文。通常理解爲程序單元的一個運行狀態、現場、快照,而翻譯中上下又很好地詮釋了其本質,上下則是存在上下層的傳遞,上會把內容傳遞給下。在Go語言中,程序單元也就指的是Goroutine。每一個Goroutine在執行以前,都要先知道程序當前的執行狀態,一般將這些執行狀態封裝在一個Context變量中,傳遞給要執行的Goroutine中。上下文則幾乎已經成爲傳遞與請求同生存週期變量的標準方法。在網絡編程下,當接收到一個網絡請求Request,在處理這個Request的goroutine中,可能須要在當前gorutine繼續開啓多個新的Goroutine來獲取數據與邏輯處理(例如訪問數據庫、RPC服務等),即一個請求Request,會須要多個Goroutine中處理。而這些Goroutine可能須要共享Request的一些信息;同時當Request被取消或者超時的時候,全部從這個Request建立的全部Goroutine也應該被結束。
context在go1.7以後被引入到標準庫中,1.7以前的go版本使用context須要安裝golang.org/x/net/context包,關於golang context的更詳細說明,可參考官方文檔:context
Context的建立和調用關係是層層遞進的,也就是咱們一般所說的鏈式調用,相似數據結構裏的樹,從根節點開始,每一次調用就衍生一個葉子節點。首先,生成根節點,使用context.Background方法生成,然後能夠進行鏈式調用使用context包裏的各種方法,context包裏的全部方法:
這裏僅以WithCancel和WithValue方法爲例來實現控制併發和通訊:
話很少說,上碼:
package main import ( "context" "crypto/md5" "fmt" "io/ioutil" "net/http" "sync" "time" ) type favContextKey string func main() { wg := &sync.WaitGroup{} values := []string{"https://www.baidu.com/", "https://www.zhihu.com/"} ctx, cancel := context.WithCancel(context.Background()) for _, url := range values { wg.Add(1) subCtx := context.WithValue(ctx, favContextKey("url"), url) go reqURL(subCtx, wg) } go func() { time.Sleep(time.Second * 3) cancel() }() wg.Wait() fmt.Println("exit main goroutine") } func reqURL(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() url, _ := ctx.Value(favContextKey("url")).(string) for { select { case <-ctx.Done(): fmt.Printf("stop getting url:%s\n", url) return default: r, err := http.Get(url) if r.StatusCode == http.StatusOK && err == nil { body, _ := ioutil.ReadAll(r.Body) subCtx := context.WithValue(ctx, favContextKey("resp"), fmt.Sprintf("%s%x", url, md5.Sum(body))) wg.Add(1) go showResp(subCtx, wg) } r.Body.Close() //啓動子goroutine是爲了避免阻塞當前goroutine,這裏在實際場景中能夠去執行其餘邏輯,這裏爲了方便直接sleep一秒 // doSometing() time.Sleep(time.Second * 1) } } } func showResp(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() for { select { case <-ctx.Done(): fmt.Println("stop showing resp") return default: //子goroutine裏通常會處理一些IO任務,如讀寫數據庫或者rpc調用,這裏爲了方便直接把數據打印 fmt.Println("printing ", ctx.Value(favContextKey("resp"))) time.Sleep(time.Second * 1) } } }
前面咱們說過Context就是設計用來解決那種多個goroutine處理一個Request且這多個goroutine須要共享Request的一些信息的場景,以上是一個簡單模擬上述過程的demo。
首先調用context.Background()生成根節點,而後調用withCancel方法,傳入根節點,獲得新的子Context以及根節點的cancel方法(通知全部子節點結束運行),這裏要注意:該方法也返回了一個Context,這是一個新的子節點,與初始傳入的根節點不是同一個實例了,可是每個子節點裏會保存從最初的根節點到本節點的鏈路信息 ,才能實現鏈式。
程序的reqURL方法接收一個url,而後經過http請求該url得到response,而後在當前goroutine裏再啓動一個子groutine把response打印出來,而後從ReqURL開始Context樹往下衍生葉子節點(每個鏈式調用新產生的ctx),中間每一個ctx均可以經過WithValue方式傳值(實現通訊),而每個子goroutine都能經過Value方法從父goroutine取值,實現協程間的通訊,每一個子ctx能夠調用Done方法檢測是否有父節點調用cancel方法通知子節點退出運行,根節點的cancel調用會沿着鏈路通知到每個子節點,所以實現了強併發控制,流程如圖:
該demo結合前面說的WaitGroup實現了優雅併發控制和通訊,關於WaitGroup的原理和使用前文已作解析,這裏便再也不贅述,固然,實際的應用場景不會這麼簡單,處理Request的goroutine啓動多個子goroutine大可能是處理IO密集的任務如讀寫數據庫或rpc調用,而後在主goroutine中繼續執行其餘邏輯,這裏爲了方便講解作了最簡單的處理。
Context做爲golang中併發控制和通訊的大殺器,被普遍應用,一些使用go開發http服務的同窗若是閱讀過這些不少 web framework的源碼就知道,Context在web framework隨處可見,由於http請求處理就是一個典型的鏈式過程以及併發場景,因此不少web framework都會藉助Context實現鏈式調用的邏輯。有興趣能夠讀一下context包的源碼,會發現Context的實現實際上是結合了Mutex鎖和channel而實現的,其實併發、同步的不少高級組件萬變不離其宗,都是經過最底層的數據結構組裝起來的,只要知曉了最基礎的概念,上游的架構也能夠一目瞭然。
最後,Context雖然是神器,但開發者使用也要遵循基本法,如下是一些Context使用的規範: