如何優雅地實現併發編排任務

業務場景

在作任務開發的時候,大家必定會碰到如下場景:前端

場景1:調用第三方接口的時候, 一個需求你須要調用不一樣的接口,作數據組裝。
場景2:一個應用首頁可能依託於不少服務。那就涉及到在加載頁面時須要同時請求多個服務的接口。這一步每每是由後端統一調用組裝數據再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。git

針對以上兩種場景,假設在沒有強依賴關係下,選擇串行調用,那麼總耗時即:github

time=s1+s2+....sn

按照當代秒入百萬的有爲青年,這麼長時間早就把你祖宗十八代問候了一遍。後端

爲了偉大的KPI,咱們每每會選擇併發地調用這些依賴接口。那麼總耗時就是:閉包

time=max(s1,s2,s3.....,sn)

固然開始堆業務的時候能夠先串行化,等到上面的人着急的時候,亮出絕招。併發

這樣,年末 PPT 就能夠加上濃重的一筆流水帳:爲業務某個接口提升百分之XXX性能,間接產生XXX價值。app

固然這一切的前提是,作老闆不懂技術,作技術」懂」你。函數

言歸正傳,若是修改爲併發調用,你可能會這麼寫,工具

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2)

    var userInfo *User
    var productList []Product
    
    go func() {
        defer wg.Done()
        userInfo, _ = getUser()
    }()

    go func() {
        defer wg.Done()
        productList, _ = getProductList()
    }()
    wg.Wait()
    fmt.Printf("用戶信息:%+v\n", userInfo)
    fmt.Printf("商品信息:%+v\n", productList)
}


/********用戶服務**********/

type User struct {
    Name string
    Age  uint8
}

func getUser() (*User, error) {
    time.Sleep(500 * time.Millisecond)
    var u User
    u.Name = "wuqinqiang"
    u.Age = 18
    return &u, nil
}

/********商品服務**********/

type Product struct {
    Title string
    Price uint32
}

func getProductList() ([]Product, error) {
    time.Sleep(400 * time.Millisecond)
    var list []Product
    list = append(list, Product{
        Title: "SHib",
        Price: 10,
    })
    return list, nil
}

先無論其餘問題。從實現上來講,須要多少服務,你會開多少個 G,利用 sync.WaitGroup 的特性,
實現併發編排任務的效果。性能

好像,問題不大。

可是隨着代號 996 業務場景的增長,你會發現,好多模塊都有類似的功能,只是對應的業務場景不一樣而已。

那麼咱們能不能抽像出一套針對此業務場景的工具,而把具體業務實現交給業務方。

安排。

使用

本着不重複造輪子的原則,去搜了下開源項目,最終看上了 go-zero 裏面的一個工具 mapreduce
從文件名咱們能看出來是什麼了,能夠自行 Google 這個名詞。

使用很簡單。咱們經過它改造一下上面的代碼:

package main

import (
    "fmt"
    "github.com/tal-tech/go-zero/core/mr"
    "time"
)

func main() {
    var userInfo *User
    var productList []Product
    _ = mr.Finish(func() (err error) {
        userInfo, err = getUser()
        return err
    }, func() (err error) {
        productList, err = getProductList()
        return err
    })
    fmt.Printf("用戶信息:%+v\n", userInfo)
    fmt.Printf("商品信息:%+v\n", productList)
}
用戶信息:&{Name:wuqinqiang Age:18}
商品信息:[{Title:SHib Price:10}]

是否是舒服多了。

可是這裏還須要注意一點,假設你調用的其中一個服務錯誤,而且你 return err 對應的錯誤,那麼其餘調用的服務會被取消。
好比咱們修改 getProductList 直接響應錯誤。

func getProductList() ([]Product, error) {
    return nil, errors.New("test error")
}
//打印
用戶信息:<nil>
商品信息:[]

那麼最終打印的時候連用戶信息都會爲空,由於出現一個服務錯誤,用戶服務請求被取消了。

通常狀況下,在請求服務錯誤的時候咱們會有保底操做,一個服務錯誤不能影響其餘請求的結果。
因此在使用的時候具體處理取決於業務場景。

源碼

既然用了,那麼就追下源碼吧。

func Finish(fns ...func() error) error {
    if len(fns) == 0 {
        return nil
    }

    return MapReduceVoid(func(source chan<- interface{}) {
        for _, fn := range fns {
            source <- fn
        }
    }, func(item interface{}, writer Writer, cancel func(error)) {
        fn := item.(func() error)
        if err := fn(); err != nil {
            cancel(err)
        }
    }, func(pipe <-chan interface{}, cancel func(error)) {
        drain(pipe)
    }, WithWorkers(len(fns)))
}
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
    _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
        reducer(input, cancel)
        drain(input)
        // We need to write a placeholder to let MapReduce to continue on reducer done,
        // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
        writer.Write(lang.Placeholder)
    }, opts...)
    return err
}

對於 MapReduceVoid函數,主要查看三個閉包參數。

  • 第一個 GenerateFunc 用於生產數據。
  • MapperFunc 讀取生產出的數據,進行處理。
  • VoidReducerFunc 這裏表示不對 mapper 後的數據作聚合返回。因此這個閉包在此操做幾乎0做用。
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
    source := buildSource(generate) 
    return MapReduceWithSource(source, mapper, reducer, opts...)
}

func buildSource(generate GenerateFunc) chan interface{} {
    source := make(chan interface{})// 建立無緩衝通道
    threading.GoSafe(func() {
        defer close(source)
        generate(source) //開始生產數據
    })

    return source //返回無緩衝通道
}

buildSource函數中,返回一個無緩衝的通道。並開啓一個 G 運行 generate(source),往無緩衝通道塞數據。 這個generate(source) 不就是一開始 Finish 傳遞的第一個閉包參數。

return MapReduceVoid(func(source chan<- interface{}) {
    // 就這個
        for _, fn := range fns {
            source <- fn
        }
    })

而後查看 MapReduceWithSource 函數,

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
    opts ...Option) (interface{}, error) {
    options := buildOptions(opts...)
    //任務執行結束通知信號
    output := make(chan interface{})
    //將mapper處理完的數據寫入collector
    collector := make(chan interface{}, options.workers)
    // 取消操做信號
    done := syncx.NewDoneChan()
    writer := newGuardedWriter(output, done.Done())
    var closeOnce sync.Once
    var retErr errorx.AtomicError
    finish := func() {
        closeOnce.Do(func() {
            done.Close()
            close(output)
        })
    }
    cancel := once(func(err error) {
        if err != nil {
            retErr.Set(err)
        } else {
            retErr.Set(ErrCancelWithNil)
        }

        drain(source)
        finish()
    })

    go func() {
        defer func() {
            if r := recover(); r != nil {
                cancel(fmt.Errorf("%v", r))
            } else {
                finish()
            }
        }()
        reducer(collector, writer, cancel)
        drain(collector)
    }()
    // 真正從生成器通道取數據執行Mapper
    go executeMappers(func(item interface{}, w Writer) {
        mapper(item, w, cancel)
    }, source, collector, done.Done(), options.workers)

    value, ok := <-output
    if err := retErr.Load(); err != nil {
        return nil, err
    } else if ok {
        return value, nil
    } else {
        return nil, ErrReduceNoOutput
    }
}

這段代碼挺長的,咱們說下核心的點。咱們看到使用一個G 調用 executeMappers 方法。

go executeMappers(func(item interface{}, w Writer) {
        mapper(item, w, cancel)
    }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
    done <-chan lang.PlaceholderType, workers int) {
    var wg sync.WaitGroup
    defer func() {
        // 等待全部任務所有執行完畢
        wg.Wait()
        // 關閉通道
        close(collector)
    }()
   //根據指定數量建立 worker池
    pool := make(chan lang.PlaceholderType, workers) 
    writer := newGuardedWriter(collector, done)
    for {
        select {
        case <-done:
            return
        case pool <- lang.Placeholder:
            // 從buildSource() 返回的無緩衝通道取數據
            item, ok := <-input 
            // 當通道關閉,結束
            if !ok {
                <-pool
                return
            }

            wg.Add(1)
            // better to safely run caller defined method
            threading.GoSafe(func() {
                defer func() {
                    wg.Done()
                    <-pool
                }()
                //真正運行閉包函數的地方
               // func(item interface{}, w Writer) {
               //    mapper(item, w, cancel)
               //    }
                mapper(item, writer)
            })
        }
    }
}

具體的邏輯已備註,代碼很容易懂。

一旦 executeMappers 函數返回,關閉 collector 通道,那麼執行 reducer 再也不阻塞。

go func() {
        defer func() {
            if r := recover(); r != nil {
                cancel(fmt.Errorf("%v", r))
            } else {
                finish()
            }
        }()
        reducer(collector, writer, cancel)
        //這裏
        drain(collector)
    }()

這裏的 reducer(collector, writer, cancel) 其實就是從 MapReduceVoid 傳遞的第三個閉包函數。

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
    _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
        reducer(input, cancel)
        //這裏
        drain(input)
        // We need to write a placeholder to let MapReduce to continue on reducer done,
        // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
        writer.Write(lang.Placeholder)
    }, opts...)
    return err
}

而後這個閉包函數又執行了 reducer(input, cancel),這裏的 reducer 就是咱們一開始解釋過的 VoidReducerFunc,從 Finish() 而來

image

等等,看到上面三個地方的 drain(input)了嗎?

// drain drains the channel.
func drain(channel <-chan interface{}) {
    // drain the channel
    for range channel {
    }
}

其實就是一個排空 channel 的操做,可是三個地方都對同一個 channel,也是讓我費解。

還有更重要的一點。

go func() {
        defer func() {
            if r := recover(); r != nil {
                cancel(fmt.Errorf("%v", r))
            } else {
                finish()
            }
        }()
        reducer(collector, writer, cancel)
        drain(collector)
    }()

上面的代碼,假如執行 reducerwriter 寫入引起 panic,那麼drain(collector) 會直接卡住。

不過做者已經修復了這個問題,直接把 drain(collector) 放入到 defer
image

具體 issues[1]。

到這裏,關於 Finish 的源碼也就結束了。感興趣的能夠看看其餘源碼。

很喜歡 go-zero 裏的一些工具,可是每每用的一些工具並不獨立,
依賴於其餘文件包,致使明明只想使用其中一個工具卻須要安裝整個包。
因此最終的結果就是扒源碼,建立無依賴庫工具集,遵循 MIT 便可。

附錄
[1]
https://github.com/tal-tech/g...

相關文章
相關標籤/搜索