在作任務開發的時候,大家必定會碰到如下場景:前端
場景1:調用第三方接口的時候, 一個需求你須要調用不一樣的接口,作數據組裝。
場景2:一個應用首頁可能依託於不少服務。那就涉及到在加載頁面時須要同時請求多個服務的接口。這一步每每是由後端統一調用組裝數據再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。git
針對以上兩種場景,假設在沒有強依賴關係下,選擇串行調用,那麼總耗時即:github
time=s1+s2+....sn
按照當代秒入百萬的有爲青年,這麼長時間早就把你祖宗十八代問候了一遍。後端
爲了偉大的KPI,咱們每每會選擇併發地調用這些依賴接口。那麼總耗時就是:閉包
time=max(s1,s2,s3.....,sn)
固然開始堆業務的時候能夠先串行化,等到上面的人着急的時候,亮出絕招。併發
這樣,年末 PPT 就能夠加上濃重的一筆流水帳:爲業務某個接口提升百分之XXX性能,間接產生XXX價值。app
固然這一切的前提是,作老闆不懂技術,作技術」懂」你。函數
言歸正傳,若是修改爲併發調用,你可能會這麼寫,工具
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup wg.Add(2) var userInfo *User var productList []Product go func() { defer wg.Done() userInfo, _ = getUser() }() go func() { defer wg.Done() productList, _ = getProductList() }() wg.Wait() fmt.Printf("用戶信息:%+v\n", userInfo) fmt.Printf("商品信息:%+v\n", productList) } /********用戶服務**********/ type User struct { Name string Age uint8 } func getUser() (*User, error) { time.Sleep(500 * time.Millisecond) var u User u.Name = "wuqinqiang" u.Age = 18 return &u, nil } /********商品服務**********/ type Product struct { Title string Price uint32 } func getProductList() ([]Product, error) { time.Sleep(400 * time.Millisecond) var list []Product list = append(list, Product{ Title: "SHib", Price: 10, }) return list, nil }
先無論其餘問題。從實現上來講,須要多少服務,你會開多少個 G
,利用 sync.WaitGroup
的特性,
實現併發編排任務的效果。性能
好像,問題不大。
可是隨着代號 996
業務場景的增長,你會發現,好多模塊都有類似的功能,只是對應的業務場景不一樣而已。
那麼咱們能不能抽像出一套針對此業務場景的工具,而把具體業務實現交給業務方。
安排。
本着不重複造輪子的原則,去搜了下開源項目,最終看上了 go-zero
裏面的一個工具 mapreduce
。
從文件名咱們能看出來是什麼了,能夠自行 Google
這個名詞。
使用很簡單。咱們經過它改造一下上面的代碼:
package main import ( "fmt" "github.com/tal-tech/go-zero/core/mr" "time" ) func main() { var userInfo *User var productList []Product _ = mr.Finish(func() (err error) { userInfo, err = getUser() return err }, func() (err error) { productList, err = getProductList() return err }) fmt.Printf("用戶信息:%+v\n", userInfo) fmt.Printf("商品信息:%+v\n", productList) }
用戶信息:&{Name:wuqinqiang Age:18} 商品信息:[{Title:SHib Price:10}]
是否是舒服多了。
可是這裏還須要注意一點,假設你調用的其中一個服務錯誤,而且你 return err
對應的錯誤,那麼其餘調用的服務會被取消。
好比咱們修改 getProductList 直接響應錯誤。
func getProductList() ([]Product, error) { return nil, errors.New("test error") } //打印 用戶信息:<nil> 商品信息:[]
那麼最終打印的時候連用戶信息都會爲空,由於出現一個服務錯誤,用戶服務請求被取消了。
通常狀況下,在請求服務錯誤的時候咱們會有保底操做,一個服務錯誤不能影響其餘請求的結果。
因此在使用的時候具體處理取決於業務場景。
既然用了,那麼就追下源碼吧。
func Finish(fns ...func() error) error { if len(fns) == 0 { return nil } return MapReduceVoid(func(source chan<- interface{}) { for _, fn := range fns { source <- fn } }, func(item interface{}, writer Writer, cancel func(error)) { fn := item.(func() error) if err := fn(); err != nil { cancel(err) } }, func(pipe <-chan interface{}, cancel func(error)) { drain(pipe) }, WithWorkers(len(fns))) }
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. writer.Write(lang.Placeholder) }, opts...) return err }
對於 MapReduceVoid
函數,主要查看三個閉包參數。
GenerateFunc
用於生產數據。MapperFunc
讀取生產出的數據,進行處理。VoidReducerFunc
這裏表示不對 mapper
後的數據作聚合返回。因此這個閉包在此操做幾乎0做用。func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { source := buildSource(generate) return MapReduceWithSource(source, mapper, reducer, opts...) } func buildSource(generate GenerateFunc) chan interface{} { source := make(chan interface{})// 建立無緩衝通道 threading.GoSafe(func() { defer close(source) generate(source) //開始生產數據 }) return source //返回無緩衝通道 }
buildSource
函數中,返回一個無緩衝的通道。並開啓一個 G
運行 generate(source)
,往無緩衝通道塞數據。 這個generate(source)
不就是一開始 Finish
傳遞的第一個閉包參數。
return MapReduceVoid(func(source chan<- interface{}) { // 就這個 for _, fn := range fns { source <- fn } })
而後查看 MapReduceWithSource
函數,
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { options := buildOptions(opts...) //任務執行結束通知信號 output := make(chan interface{}) //將mapper處理完的數據寫入collector collector := make(chan interface{}, options.workers) // 取消操做信號 done := syncx.NewDoneChan() writer := newGuardedWriter(output, done.Done()) var closeOnce sync.Once var retErr errorx.AtomicError finish := func() { closeOnce.Do(func() { done.Close() close(output) }) } cancel := once(func(err error) { if err != nil { retErr.Set(err) } else { retErr.Set(ErrCancelWithNil) } drain(source) finish() }) go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) drain(collector) }() // 真正從生成器通道取數據執行Mapper go executeMappers(func(item interface{}, w Writer) { mapper(item, w, cancel) }, source, collector, done.Done(), options.workers) value, ok := <-output if err := retErr.Load(); err != nil { return nil, err } else if ok { return value, nil } else { return nil, ErrReduceNoOutput } }
這段代碼挺長的,咱們說下核心的點。咱們看到使用一個G
調用 executeMappers
方法。
go executeMappers(func(item interface{}, w Writer) { mapper(item, w, cancel) }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) { var wg sync.WaitGroup defer func() { // 等待全部任務所有執行完畢 wg.Wait() // 關閉通道 close(collector) }() //根據指定數量建立 worker池 pool := make(chan lang.PlaceholderType, workers) writer := newGuardedWriter(collector, done) for { select { case <-done: return case pool <- lang.Placeholder: // 從buildSource() 返回的無緩衝通道取數據 item, ok := <-input // 當通道關閉,結束 if !ok { <-pool return } wg.Add(1) // better to safely run caller defined method threading.GoSafe(func() { defer func() { wg.Done() <-pool }() //真正運行閉包函數的地方 // func(item interface{}, w Writer) { // mapper(item, w, cancel) // } mapper(item, writer) }) } } }
具體的邏輯已備註,代碼很容易懂。
一旦 executeMappers
函數返回,關閉 collector
通道,那麼執行 reducer
再也不阻塞。
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) //這裏 drain(collector) }()
這裏的 reducer(collector, writer, cancel)
其實就是從 MapReduceVoid
傳遞的第三個閉包函數。
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) //這裏 drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. writer.Write(lang.Placeholder) }, opts...) return err }
而後這個閉包函數又執行了 reducer(input, cancel)
,這裏的 reducer
就是咱們一開始解釋過的 VoidReducerFunc
,從 Finish() 而來
。
等等,看到上面三個地方的 drain(input)
了嗎?
// drain drains the channel. func drain(channel <-chan interface{}) { // drain the channel for range channel { } }
其實就是一個排空 channel
的操做,可是三個地方都對同一個 channel
,也是讓我費解。
還有更重要的一點。
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) drain(collector) }()
上面的代碼,假如執行 reducer
,writer
寫入引起 panic
,那麼drain(collector)
會直接卡住。
不過做者已經修復了這個問題,直接把 drain(collector)
放入到 defer
。
具體 issues[1]。
到這裏,關於 Finish
的源碼也就結束了。感興趣的能夠看看其餘源碼。
很喜歡 go-zero
裏的一些工具,可是每每用的一些工具並不獨立,
依賴於其餘文件包,致使明明只想使用其中一個工具卻須要安裝整個包。
因此最終的結果就是扒源碼,建立無依賴庫工具集,遵循 MIT
便可。