Golang協程併發的流水線模型

背景

最近因爲性能問題,後端服務一直在作python到golang的遷移和重構。go語言精簡優雅,既有編譯型語言的嚴謹和高性能,又有解釋型語言的開發效率,出色的併發性能也是go區別於其餘語言的一大特點。go的併發編程代碼雖然簡單,但重在其併發模型和流程的設計。因此這裏總結下golang協程併發經常使用的流水線模型。python

簡單的流水線思惟

流水線模式並非什麼新奇的概念,可是它能極大地提升生產效率。好比實際生活中的汽車生產流水線,流水線上的每個流程負責不一樣的工做,好比第一個流程是拼裝車身,第二個流程是安裝發動機,第三個流程是裝輪胎...,這些步驟咱們能夠類比成go併發流程中的協程,每個協程就是一個任務。流水線上面傳遞的車身、發動機、輪胎,這些咱們能夠類比成協程間須要傳遞的數據,而在這些流程(協程)間傳遞這些配件(數據),天然就要經過傳送帶(channel)。在流水線上,咱們裝四個輪胎確定不是一個一個來裝的,確定是有四個機械臂同時來裝。所以裝輪胎這個步驟咱們有4個協程在併發工做來提升效率。這麼一來,流水線模型的基本要素就構成了。
Golang的併發模型靈感其實都來自咱們生活,對程序而言,高的生產效率就是高的性能。在Golang中,流水線由多個流程節點組成,流程之間經過channel鏈接,每一個流程節點能夠由多個同時運行的goroutine組成。
image.pnggolang

如何構造流水線

有了流水線模式的思惟,接下來就是如何構造流水線了。簡單來講,其實就是經過channel將任務流程鏈接起來,兩個相鄰的流程互爲生產者和消費者,經過channel進行通訊。耗時的流程能夠將任務分散到多個協程來執行。
咱們先來看一個最簡單的流水線,以下圖,A是生產者流程,B是它的消費流程,同時又是C的生產者流程。A,B,C三個協程直接,經過讀寫channel進行通訊。
image.png編程

那若是此時B流程能夠將a channel中的任務併發執行呢,很簡單,咱們只須要起多個B協程就能夠了。以下圖。
image.png後端

總之,咱們構造流水線併發的思路是關注數據的流動,數據流動的過程交給channel,channel兩端數據處理的每一個環節都交給goroutine,這個流程連起來,就構成了流水線模型。安全

關於channel

爲何咱們能夠選擇channel來進行協程間的通訊呢,協程之間又是怎麼保持同步順序呢,固然這都要歸功於channel。channel是go提供的進程內協程間的通訊方式,它是協程/線程安全的,channe的讀寫阻塞會致使協程的切換。
channel的操做和狀態組合能夠有如下幾種狀況:
image.png數據結構

**有1個特殊場景**:當`nil`的通道在`select`的某個`case`中時,這個case會阻塞,但不會形成死鎖。

channel不只能夠保證協程安全的數據流動,還能夠保證協程的同步。當有併發問題時,channel也是咱們首先應該想到的數據結構。不過顯而易見,當使用有緩衝區的channel時,才能達到協程併發的效果,而且生產者和消費者的協程間是相對同步的。使用無緩衝區的channel時,是沒有併發效果的,協程間是絕對同步的,生產者和消費者必須同時寫和讀協程才能運行。
channel關注的是數據的流動,這種場景下均可以考慮使用channel。好比:消息傳遞、信號廣播、任務分發、結果彙總、同步與異步、併發控制... 更多的不在這裏贅述了,總之,Share memory by communicating, don't communicate by sharing memory.併發

流水線模型實例

舉個簡單栗子,計算80000之內的質數並輸出。
這個例子若是咱們採用非併發的方式,就是for循環80000,挨個判斷是否是素數再輸出。不過若是咱們採用流水線的併發模型會更高效。異步

從數據流動的角度來分析,須要遍歷生成1-80000的數字到一個channel中,數字判斷是否爲素數,輸出結果到一個channel中。所以咱們須要兩個channel,channel的兩端就設計成協程便可。
一、遍歷生成原始80000個數據(生產者)
二、計算這80000個數據中的素數(生產者+消費者)
三、取結果輸出(消費者)函數

代碼以下:性能

package gen_channel
import "fmt"
import "time"
func generate_source(data_source_chan chan int) {
   for i := 1; i <= 80000; i++ {
      data_source_chan <- i
   }
   fmt.Println("寫入協程結束")
   close(data_source_chan)
}
func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) {
   for num:= range data_source_chan {
      falg := true
 for i := 2; i < num; i++ {
         if num%i == 0 {
            falg = false
 break }
      }
      if falg == true {
         data_result_chan <- num
      }
   }
   fmt.Println("該協程結束")
   gen_chan <- true
}
func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){
   // 開啓8個協程
 for i := 0; i < gen_num; i++ {
      go generate_sushu(data_source_chan, data_result_chan, gen_chan)
   }
}
func Channel_main() {
   // 任務數據
   data_source_chan := make(chan int, 2000)
   // 結果數據
   data_result_chan := make(chan int, 2000)
   // 全部任務協程是否結束
   gen_chan := make(chan bool, 8)
   time1 := time.Now().Unix()
   go generate_source(data_source_chan)
   // 協程池,任務分發
   workpool(data_source_chan, data_result_chan, gen_chan, 8)
   // 全部協程結束後關閉結果數據channel
   go func() {
      for i := 0; i < 8; i++ {
         <-gen_chan
      }
      close(data_result_chan)
      fmt.Println("spend timeis ", time.Now().Unix()-time1)
   }()
   for date_result := range data_result_chan {
      fmt.Println(date_result)
   }
}

上面這段代碼中。data_source_chandata_result_chan這兩個channel分別用來放原始數據和結果數據,buffer分別爲2000。

generate_source協程: 生產數據,它會把數據寫入data_source_chan通道,所有寫入完成後關閉通道。
generate_sushu協程: 負責計算並判斷data_source_chan中的數據是否爲質數,是的話就寫入data_result_chan通道。
主協程for date_result := range data_result_chan: 最後負責讀取data_result_chan中的結果,直到data_result_chan關閉後結束程序。

能夠看到咱們經過workpool方法起了8個generate_sushu協程來併發處理data_source_chan的任務。那麼就有一個問題,如何知道全部數據都已處理完畢呢,等到生產者generate_source協程結束data_source_chan關閉嗎? 恐怕不是,由於可能data_source_chan關閉後8個任務協程仍然在繼續計算。那麼只能等8個協程所有處理完畢後,才能說明全部數據已處理完,從而才能關閉data_result_chan,而後主協程讀取data_result_chan結束。

所以咱們這裏引入了另外一個channel:gen_chan,來記錄計算結束的任務。每一個generate_sushu協程處理完,就寫入一個記錄到channel中。所以咱們有一個匿名協程,當能夠從gen_chan中取8個結果出來的話,就說明全部協程已計算完成,那麼能夠關上阻塞程序的最後閥門data_result_chan

固然這種設計方式並不惟一,咱們也能夠不用統一的data_result_chan來接收結果,而是每一個協程分配一個channel來存放結果,最後再merge到一塊兒。

可能你們以爲這種方式很複雜,確實比較高效但寫起來並不友好,那有沒有更友好的方式呢?

sync包

在處理併發任務時咱們首先想到的應該是channel,但有時候channel不是萬能或者最方便的,因此go也爲咱們提供了sync包。

sync包提供了各類異步及鎖類型及其內置方法。用起來也很方便,好比Mutex就是給協程加鎖,某個時段內不能有多個協程訪問同一段代碼。WaitGroup就是等待一些工做完成後,再進行下一步工做。Once能夠用來確保協程中某個函數只執行1次...當咱們面對一個併發問題的時候,應該去分析採用哪一種協程同步方式,是channel仍是Mutex呢。這須要看咱們關注的是數據的流動仍是數據的安全性。篇幅緣由這裏再也不展開講了。

  1. Mutex:互斥鎖
  2. RWMutex:讀寫鎖
  3. WaitGroup:等待組
  4. Once:單次執行
  5. Cond:信號量
  6. Pool:臨時對象池
  7. Map:自帶鎖的map

咱們接着上面質數的問題,使用sync中的WaitGroup,會讓咱們的代碼更加友好,由於咱們不須要引入一個channel來記錄是否4個車輪都換完了,讓WaitGroup來作就行了。

package gen_channel
import (
   "fmt"
 "time")
import "sync"
func generate_source3(data_source_chan chan int) {
   for i := 1; i <= 80000; i++ {
      data_source_chan <- i
   }
   fmt.Println("寫入協程結束")
   close(data_source_chan)
}
func generate_sushu3(data_source_chan, data_result_chan chan int, wg *sync.WaitGroup) {
   defer wg.Done()
   for num := range data_source_chan {
      falg := true
 for i := 2; i < num; i++ {
         if num%i == 0 {
            falg = false
 break }
      }
      if falg == true {
         data_result_chan <- num
      }
   }
   fmt.Println("該協程結束")
}
func workpool3(data_source_chan chan int, data_result_chan chan int, wg *sync.WaitGroup, gen_num int) {
   // 開啓8個協程
 for i := 0; i < gen_num; i++ {
      wg.Add(1)
      go generate_sushu3(data_source_chan, data_result_chan, wg)
   }
}
func Channel_main3() {
   data_source_chan := make(chan int, 500)
   data_result_chan := make(chan int, 2000)
   time1 := time.Now().Unix()
   var wg sync.WaitGroup
 go generate_source3(data_source_chan)
   // 開啓8個協程
 for i := 0; i < 8; i++ {
      wg.Add(1)
      go generate_sushu3(data_source_chan, data_result_chan, &wg)
   }
   wg.Wait()
   close(data_result_chan)
   fmt.Println("spend timeis ", time.Now().Unix()-time1)
   for date_result := range data_result_chan {
      fmt.Println(date_result)
   }
}

總結

流水線模式的設計要關注數據的流動,而後在數據流動的路徑中將數據放到channel中,將channel的兩端設計成協程。
併發設計中channel和sync能夠從開發效率和性能的角度自由組合,channel不必定是最優解
寫入channel的協程來控制該協程的關閉,消費者協程不關閉讀協程,防止報錯。養成在協程入口限制channel讀寫類型的習慣。

以上是咱們在go併發的流水線模型中的一些總結。能夠看出go的協程併發更考驗咱們的設計能力,由於協程間的同步和數據傳遞都交給了開發者來設計。同時也留給咱們一些引伸思考,協程在IO密集和CPU密集的狀況下是否都能大幅提升性能呢?是否和channel的緩衝區或者併發設計有關呢?協程異常該怎麼處理呢?go的協程和python的協程又有什麼區別呢?...咱們後面慢慢探討~

相關文章
相關標籤/搜索