前一篇文章《Golang併發模型:輕鬆入門流水線模型》,介紹了流水線模型的概念,這篇文章是流水線模型進階,介紹FAN-IN和FAN-OUT,FAN模式可讓咱們的流水線模型更好的利用Golang併發,提升軟件性能。但FAN模式不必定是萬能,不見得能提升程序的性能,甚至還不如普通的流水線。咱們先介紹下FAN模式,再看看它怎麼提高性能的,它是否是萬能的。git
這篇文章內容略多,原本打算分幾回寫的,但不如一次讀完爽,因此乾脆仍是放一篇文章了,要是時間不充足,利用好碎片時間,能夠每次看1個標題的內容。github
Golang的併發模式靈感來自現實世界,這些模式是通用的,毫無例外,FAN模式也是對當前世界的模仿。以汽車組裝爲例,汽車生產線上有個階段是給小汽車裝4個輪子,能夠把這個階段任務交給4我的同時去作,這4我的把輪子都裝完後,再把汽車移動到生產線下一個階段。這個過程當中,就有任務的分發,和任務結果的收集。其中任務分發是FAN-OUT,任務收集是FAN-IN。golang
咱們此次試用FAN-OUT和FAN-IN,解決《Golang併發模型:輕鬆入門流水線模型》中提到的問題:計算一個整數切片中元素的平方值並把它打印出來。segmentfault
producer()
保持不變,負責生產數據。squre()
也不變,負責計算平方值。main()
,啓動3個square,這3個squre從producer生成的通道讀數據,這是FAN-OUT。merge()
,入參是3個square各自寫數據的通道,給這3個通道分別啓動1個協程,把數據寫入到本身建立的通道,並返回該通道,這是FAN-IN。FAN模式流水線示例:緩存
package main import ( "fmt" "sync" ) func producer(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- i } }() return out } func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n } }() return out } func merge(cs ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup collect := func(in <-chan int) { defer wg.Done() for n := range in { out <- n } } wg.Add(len(cs)) // FAN-IN for _, c := range cs { go collect(c) } // 錯誤方式:直接等待是bug,死鎖,由於merge寫了out,main卻沒有讀 // wg.Wait() // close(out) // 正確方式 go func() { wg.Wait() close(out) }() return out } func main() { in := producer(1, 2, 3, 4) // FAN-OUT c1 := square(in) c2 := square(in) c3 := square(in) // consumer for ret := range merge(c1, c2, c3) { fmt.Printf("%3d ", ret) } fmt.Println() }
3個squre協程併發運行,結果順序是沒法肯定的,因此你獲得的結果,不必定與下面的相同。bash
➜ awesome git:(master) ✗ go run hi.go 1 4 16 9
相信你內心已經有了答案,能夠的。咱們仍是使用老問題,對比一下簡單的流水線和FAN模式的流水線,修改下代碼,增長程序的執行時間:併發
produer()
使用參數生成指定數量的數據。square()
增長阻塞操做,睡眠1s,模擬階段的運行時間。main()
關閉對結果數據的打印,下降結果處理時的IO對FAN模式的對比。普通流水線:less
// hi_simple.go package main import ( "fmt" ) func producer(n int) <-chan int { out := make(chan int) go func() { defer close(out) for i := 0; i < n; i++ { out <- i } }() return out } func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n // simulate time.Sleep(time.Second) } }() return out } func main() { in := producer(10) ch := square(in) // consumer for _ = range ch { } }
使用FAN模式的流水線:函數
// hi_fan.go package main import ( "sync" "time" ) func producer(n int) <-chan int { out := make(chan int) go func() { defer close(out) for i := 0; i < n; i++ { out <- i } }() return out } func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n // simulate time.Sleep(time.Second) } }() return out } func merge(cs ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup collect := func(in <-chan int) { defer wg.Done() for n := range in { out <- n } } wg.Add(len(cs)) // FAN-IN for _, c := range cs { go collect(c) } // 錯誤方式:直接等待是bug,死鎖,由於merge寫了out,main卻沒有讀 // wg.Wait() // close(out) // 正確方式 go func() { wg.Wait() close(out) }() return out } func main() { in := producer(10) // FAN-OUT c1 := square(in) c2 := square(in) c3 := square(in) // consumer for _ = range merge(c1, c2, c3) { } }
屢次測試,每次結果近似,結果以下:高併發
➜ awesome git:(master) ✗ time go run hi_simple.go go run hi_simple.go 0.17s user 0.18s system 3% cpu 10.389 total ➜ awesome git:(master) ✗ ➜ awesome git:(master) ✗ time go run hi_fan.go go run hi_fan.go 0.17s user 0.16s system 7% cpu 4.288 total
也可使用Benchmark進行測試,看2個類型的執行時間,結論相同。爲了節約篇幅,這裏再也不介紹,方法和結果貼在Gist了,想看的朋友瞄一眼,或本身動手搞搞。
FAN模式能夠提升併發的性能,那咱們是否是能夠都使用FAN模式?
不行的,由於FAN模式不必定能提高性能。
依然使用以前的問題,再次修改下代碼,其餘不變:
squre()
去掉耗時。main()
增長producer()的入參,讓producer生產10,000,000個數據。// hi_simple.go func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n } }() return out } func main() { in := producer(10000000) ch := square(in) // consumer for _ = range ch { } }
// hi_fan.go package main import ( "sync" ) func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n } }() return out } func main() { in := producer(10000000) // FAN-OUT c1 := square(in) c2 := square(in) c3 := square(in) // consumer for _ = range merge(c1, c2, c3) { } }
結果,能夠跑屢次,結果近似:
➜ awesome git:(master) ✗ time go run hi_simple.go go run hi_simple.go 9.96s user 5.93s system 168% cpu 9.424 total ➜ awesome git:(master) ✗ time go run hi_fan.go go run hi_fan.go 23.35s user 11.51s system 297% cpu 11.737 total
從這個結果,咱們能看到2點。
既然FAN模式不必定能提升性能,如何優化?
不一樣的場景優化不一樣,要依具體的狀況,解決程序的瓶頸。
咱們當前程序的瓶頸在FAN-IN,squre函數很快就完成,merge函數它把3個數據寫入到1個通道的時候出現了瓶頸,適當使用帶緩衝通道能夠提升程序性能,再修改下代碼
merge()
中的out
修改成:
out := make(chan int, 100)
結果:
➜ awesome git:(master) ✗ time go run hi_fan_buffered.go go run hi_fan_buffered.go 19.85s user 8.19s system 323% cpu 8.658 total
使用帶緩存通道後,程序的性能有了較大提高,CPU利用率提升到323%,提高了8%,運行時間從11.7下降到8.6,下降了26%。
FAN模式的特色很簡單,相信你已經掌握了,若是記不清了看這裏,本文全部代碼在該Github倉庫。
FAN模式頗有意思,而且能提升Golang併發的性能,若是想之後運用自如,用到本身的項目中去,仍是要寫寫本身的Demo,快去實踐一把。
下一篇,寫流水線中協程的「優雅退出」,歡迎關注。
本文全部代碼都在倉庫,可查看完整示例代碼:https://github.com/Shitaibin/...
- 若是這篇文章對你有幫助,請點個贊/喜歡,鼓勵我持續分享,感謝。
- 個人文章列表,點此可查看
- 若是喜歡本文,隨意轉載,但請保留此原文連接。