在微服務中開發中,api網關扮演對外提供restful api的角色,而api的數據每每會依賴其餘服務,複雜的api更是會依賴多個甚至數十個服務。雖然單個被依賴服務的耗時通常都比較低,但若是多個服務串行依賴的話那麼整個api的耗時將會大大增長。git
那麼經過什麼手段來優化呢?咱們首先想到的是經過併發來的方式來處理依賴,這樣就能下降整個依賴的耗時,Go基礎庫中爲咱們提供了 WaitGroup 工具用來進行併發控制,但實際業務場景中多個依賴若是有一個出錯咱們指望能當即返回而不是等全部依賴都執行完再返回結果,並且WaitGroup中對變量的賦值每每須要加鎖,每一個依賴函數都須要添加Add和Done對於新手來講比較容易出錯github
基於以上的背景,go-zero框架中爲咱們提供了併發處理工具MapReduce,該工具開箱即用,不須要作什麼初始化,咱們經過下圖看下使用MapReduce和沒使用的耗時對比:golang
相同的依賴,串行處理的話須要200ms,使用MapReduce後的耗時等於全部依賴中最大的耗時爲100ms,可見MapReduce能夠大大下降服務耗時,並且隨着依賴的增長效果就會越明顯,減小處理耗時的同時並不會增長服務器壓力api
MapReduce是Google提出的一個軟件架構,用於大規模數據集的並行運算,go-zero中的MapReduce工具正是借鑑了這種架構思想服務器
go-zero框架中的MapReduce工具主要用來對批量數據進行併發的處理,以此來提高服務的性能restful
咱們經過幾個示例來演示MapReduce的用法架構
MapReduce主要有三個參數,第一個參數爲generate用以生產數據,第二個參數爲mapper用以對數據進行處理,第三個參數爲reducer用以對mapper後的數據作聚合返回,還能夠經過opts選項設置併發處理的線程數量併發
場景一: 某些功能的結果每每須要依賴多個服務,好比商品詳情的結果每每會依賴用戶服務、庫存服務、訂單服務等等,通常被依賴的服務都是以rpc的形式對外提供,爲了下降依賴的耗時咱們每每須要對依賴作並行處理app
func productDetail(uid, pid int64) (*ProductDetail, error) { var pd ProductDetail err := mr.Finish(func() (err error) { pd.User, err = userRpc.User(uid) return }, func() (err error) { pd.Store, err = storeRpc.Store(pid) return }, func() (err error) { pd.Order, err = orderRpc.Order(pid) return }) if err != nil { log.Printf("product detail error: %v", err) return nil, err } return &pd, nil }
該示例中返回商品詳情依賴了多個服務獲取數據,所以作併發的依賴處理,對接口的性能有很大的提高框架
場景二: 不少時候咱們須要對一批數據進行處理,好比對一批用戶id,效驗每一個用戶的合法性而且效驗過程當中有一個出錯就認爲效驗失敗,返回的結果爲效驗合法的用戶id
func checkLegal(uids []int64) ([]int64, error) { r, err := mr.MapReduce(func(source chan<- interface{}) { for _, uid := range uids { source <- uid } }, func(item interface{}, writer mr.Writer, cancel func(error)) { uid := item.(int64) ok, err := check(uid) if err != nil { cancel(err) } if ok { writer.Write(uid) } }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) { var uids []int64 for p := range pipe { uids = append(uids, p.(int64)) } writer.Write(uids) }) if err != nil { log.Printf("check error: %v", err) return nil, err } return r.([]int64), nil } func check(uid int64) (bool, error) { // do something check user legal return true, nil }
該示例中,若是check過程出現錯誤則經過cancel方法結束效驗過程,並返回error整個效驗過程結束,若是某個uid效驗結果爲false則最終結果不返回該uid
MapReduce使用注意事項
實現原理分析:
MapReduce中首先經過buildSource方法經過執行generate(參數爲無緩衝channel)產生數據,並返回無緩衝的channel,mapper會從該channel中讀取數據
func buildSource(generate GenerateFunc) chan interface{} { source := make(chan interface{}) go func() { defer close(source) generate(source) }() return source }
在MapReduceWithSource方法中定義了cancel方法,mapper和reducer中均可以調用該方法,調用後主線程收到close信號會立馬返回
cancel := once(func(err error) { if err != nil { retErr.Set(err) } else { // 默認的error retErr.Set(ErrCancelWithNil) } drain(source) // 調用close(ouput)主線程收到Done信號,立馬返回 finish() })
在mapperDispatcher方法中調用了executeMappers,executeMappers消費buildSource產生的數據,每個item都會起一個goroutine單獨處理,默認最大併發數爲16,能夠經過WithWorkers進行設置
var wg sync.WaitGroup defer func() { wg.Wait() // 保證全部的item都處理完成 close(collector) }() pool := make(chan lang.PlaceholderType, workers) writer := newGuardedWriter(collector, done) // 將mapper處理完的數據寫入collector for { select { case <-done: // 當調用了cancel會觸發當即返回 return case pool <- lang.Placeholder: // 控制最大併發數 item, ok := <-input if !ok { <-pool return } wg.Add(1) go func() { defer func() { wg.Done() <-pool }() mapper(item, writer) // 對item進行處理,處理完調用writer.Write把結果寫入collector對應的channel中 }() } }
reducer單goroutine對數mapper寫入collector的數據進行處理,若是reducer中沒有手動調用writer.Write則最終會執行finish方法對output進行close避免死鎖
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) }()
在該工具包中還提供了許多針對不一樣業務場景的方法,實現原理與MapReduce大同小異,感興趣的同窗能夠查看源碼學習
本文主要介紹了go-zero框架中的MapReduce工具,在實際的項目中很是實用。用好工具對於提高服務性能和開發效率都有很大的幫助,但願本篇文章能給你們帶來一些收穫。
https://github.com/tal-tech/go-zero
好將來技術