併發編程一直是Golang區別與其餘語言的很大優點,也是實際工做場景中常常遇到的。近日筆者在組內分享了咱們常見的併發場景,及代碼示例,以指望你們能在遇到相同場景下,能快速的想到解決方案,或者是拿這些方案與本身實現的比較,取長補短。現整理出來與你們共享。git
不少時候,咱們只想併發的作一件事情,好比測試某個接口的是否支持併發。那麼咱們就能夠這麼作:github
func RunScenario1() { count := 10 var wg sync.WaitGroup for i := 0; i < count; i++ { wg.Add(1) go func(index int) { defer wg.Done() doSomething(index) }(i) } wg.Wait() }
使用goroutine來實現異步,使用WaitGroup來等待全部goroutine結束。這裏要注意的是要正確釋放WaitGroup的counter(在goroutine裏調用Done()方法)。golang
但此種方式有個弊端,就是當goroutine的量過多時,很容易消耗完客戶端的資源,致使程序表現不佳。編程
咱們仍然以測試某個後端API接口爲例,若是咱們想知道這個接口在持續高併發狀況下是否有句柄泄露,這種狀況該如何測試呢?後端
這種時候,咱們須要能控制時間的高併發模型:併發
func RunScenario2() { timeout := time.Now().Add(time.Second * time.Duration(10)) n := runtime.NumCPU() waitForAll := make(chan struct{}) done := make(chan struct{}) concurrentCount := make(chan struct{}, n) for i := 0; i < n; i++ { concurrentCount <- struct{}{} } go func() { for time.Now().Before(timeout) { <-done concurrentCount <- struct{}{} } waitForAll <- struct{}{} }() go func() { for { <-concurrentCount go func() { doSomething(rand.Intn(n)) done <- struct{}{} }() } }() <-waitForAll }
上面的代碼裏,咱們經過一個buffered channel來控制併發的數量(concurrentCount),而後另起一個channel來週期性的發起新的任務,而控制的條件就是 time.Now().Before(timeout),這樣當超過規定的時間,waitForAll 就會獲得信號,而使整個程序退出。異步
這是一種實現方式,那麼還有其餘的方式沒?咱們接着往下看。高併發
前面說的基於時間的併發模型,那若是隻知道數據量很大,可是具體結束時間不肯定,該怎麼辦呢?性能
好比,客戶給了個幾TB的文件列表,要求把這些文件從存儲裏刪除。再好比,實現個爬蟲去爬某些網站的全部內容。學習
而解決此類問題,最多見的就是使用工做池模式了(Worker Pool)。以刪文件爲例,咱們能夠簡單這樣來處理:
雖然這只是個簡單Worker Pool模型,但已經能知足咱們的需求:
func RunScenario3() { numOfConcurrency := runtime.NumCPU() taskTool := 10 jobs := make(chan int, taskTool) results := make(chan int, taskTool) var wg sync.WaitGroup // workExample workExampleFunc := func(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { res := job * 2 fmt.Printf("Worker %d do things, produce result %d \n", id, res) time.Sleep(time.Millisecond * time.Duration(100)) results <- res } } for i := 0; i < numOfConcurrency; i++ { wg.Add(1) go workExampleFunc(i, jobs, results, &wg) } totalTasks := 100 // 本例就要從文件列表裏讀取 wg.Add(1) go func() { defer wg.Done() for i := 0; i < totalTasks; i++ { n := <-results fmt.Printf("Got results %d \n", n) } close(results) }() for i := 0; i < totalTasks; i++ { jobs <- i } close(jobs) wg.Wait() }
在Go裏,分發任務,收集結果,咱們能夠都交給Channel來實現。從實現上更加的簡潔。
仔細看會發現,本模型也是適用於按時間來控制併發。只要把totalTask的遍歷換成時間控制就行了。
goroutine和channel的組合在實際編程時常常會用到,而加上Select更是無往而不利。
func RunScenario4() { sth := make(chan string) result := make(chan string) go func() { id := rand.Intn(100) for { sth <- doSomething(id) } }() go func() { for { result <- takeSomthing(<-sth) } }() select { case c := <-result: fmt.Printf("Got result %s ", c) case <-time.After(time.Duration(30 * time.Second)): fmt.Errorf("指定時間內都沒有獲得結果") } }
在select的case狀況,加上time.After()模型可讓咱們在必定時間範圍內等待異步任務結果,防止程序卡死。
上面咱們說到持續的壓測某後端API,但並未實時收集結果。而不少時候對於性能測試場景,實時的統計吞吐率,成功率是很是有必要的。
func RunScenario5() { concurrencyCount := runtime.NumCPU() for i := 0; i < concurrencyCount; i++ { go func(index int) { for { doUploadMock() } }(i) } t := time.NewTicker(time.Second) for { select { case <-t.C: // 計算並打印實時數據 } } }
這種場景就須要使用到Ticker,且上面的Example模型還能控制併發數量,也是很是實用的方式。
上面咱們共提到了五種併發模式:
概括下來其核心就是使用了Go的幾個知識點:Goroutine, Channel, Select, Time, Timer/Ticker, WaitGroup. 如果對這些不清楚,能夠自行Google之。
另完整的Example 代碼能夠參考這裏:https://github.com/jichangjun/golearn/blob/master/src/carlji.com/experiments/concurrency/main.go
使用方式: go run main.go <場景>
好比 :
這篇是Google官方推薦學習Go併發的資料,從初學者到進階,內容很是豐富,且權威。
Email: jinsdu@outlook.com
Blog: http://www.cnblogs.com/jinsdu/
Github: https://github.com/CarlJi
童鞋,若是以爲本文還算用心,還算有用,何不點個贊呢(⊙o⊙)?