Golang被證實很是適合併發編程,goroutine比異步編程更易讀、優雅、高效。本文提出一個適合由Golang實現的Pipeline執行模型,適合批量處理大量數據(ETL)的情景。程序員
想象這樣的應用情景:數據庫
因爲應用中遇到的各類問題,概括出這些需求:
需求一:應分批處理數據,例如規定每批100條。出現問題時(例如任意一個數據庫故障)則中斷,下次程序啓動時使用checkpoint從中斷處恢復。
需求二:每一個流程設置合理的併發數、讓數據庫和NLP服務有合理的負載(不影響其它業務的基礎上,儘量佔用更多資源以提升ETL性能)。例如,步驟(1)-(4)分別設置併發數一、四、八、2。編程
這就是一個典型的Pipeline(流水線)執行模型。把每一批數據(例如100條)看做流水線上的產品,4個步驟對應流水線上4個處理工序,每一個工序處理完畢後就把半成品交給下一個工序。每一個工序能夠同時處理的產品數各不相同。數組
你可能首先想到啓用1+4+8+2個goroutine,使用channel來傳遞數據。我也曾經這麼幹,結論就是這麼幹會讓程序員瘋掉:流程併發控制代碼很是複雜,特別是你得處理異常、執行時間超出預期、可控中斷等問題,你不得不加入一堆channel,直到你本身都不記得有什麼用。併發
爲了更高效完成ETL工做,我將Pipeline抽象成模塊。我先把代碼粘貼出來,再解析含義。模塊能夠直接使用,主要使用的接口是:NewPipeline、Async、Wait。異步
package main import "sync" func HasClosed(c <-chan struct{}) bool { select { case <-c: return true default: return false } } type SyncFlag interface{ Wait() Chan() <-chan struct{} Done() bool } func NewSyncFlag() (done func(), flag SyncFlag) { f := &syncFlag{ c : make(chan struct{}), } return f.done, f } type syncFlag struct { once sync.Once c chan struct{} } func (f *syncFlag) done() { f.once.Do(func(){ close(f.c) }) } func (f *syncFlag) Wait() { <-f.c } func (f *syncFlag) Chan() <-chan struct{} { return f.c } func (f *syncFlag) Done() bool { return HasClosed(f.c) } type pipelineThread struct { sigs []chan struct{} chanExit chan struct{} interrupt SyncFlag setInterrupt func() err error } func newPipelineThread(l int) *pipelineThread { p := &pipelineThread{ sigs : make([]chan struct{}, l), chanExit : make(chan struct{}), } p.setInterrupt, p.interrupt = NewSyncFlag() for i := range p.sigs { p.sigs[i] = make(chan struct{}) } return p } type Pipeline struct { mtx sync.Mutex workerChans []chan struct{} prevThd *pipelineThread } //建立流水線,參數個數是每一個任務的子過程數,每一個參數對應子過程的併發度。 func NewPipeline(workers ...int) *Pipeline { if len(workers) < 1 { panic("NewPipeline need aleast one argument") } workersChan := make([]chan struct{}, len(workers)) for i := range workersChan { workersChan[i] = make(chan struct{}, workers[i]) } prevThd := newPipelineThread(len(workers)) for _,sig := range prevThd.sigs { close(sig) } close(prevThd.chanExit) return &Pipeline{ workerChans : workersChan, prevThd : prevThd, } } //往流水線推入一個任務。若是第一個步驟的併發數達到設定上限,這個函數會堵塞等待。 //若是流水線中有其它任務失敗(返回非nil),任務不被執行,函數返回false。 func (p *Pipeline) Async(works ...func()error) bool { if len(works) != len(p.workerChans) { panic("Async: arguments number not matched to NewPipeline(...)") } p.mtx.Lock() if p.prevThd.interrupt.Done() { p.mtx.Unlock() return false } prevThd := p.prevThd thisThd := newPipelineThread(len(p.workerChans)) p.prevThd = thisThd p.mtx.Unlock() lock := func(idx int) bool { select { case <-prevThd.interrupt.Chan(): return false case <-prevThd.sigs[idx]: //wait for signal } select { case <-prevThd.interrupt.Chan(): return false case p.workerChans[idx]<-struct{}{}: //get lock } return true } if !lock(0) { thisThd.setInterrupt() <-prevThd.chanExit thisThd.err = prevThd.err close(thisThd.chanExit) return false } go func() { //watch interrupt of previous thread select { case <-prevThd.interrupt.Chan(): thisThd.setInterrupt() case <-thisThd.chanExit: } }() go func() { var err error for i,work := range works { close(thisThd.sigs[i]) //signal next thread if work != nil { err = work() } if err != nil || (i+1 < len(works) && !lock(i+1)) { thisThd.setInterrupt() break } <-p.workerChans[i] //release lock } <-prevThd.chanExit if prevThd.interrupt.Done() { thisThd.setInterrupt() } if prevThd.err != nil { thisThd.err = prevThd.err } else { thisThd.err = err } close(thisThd.chanExit) }() return true } //等待流水線中全部任務執行完畢或失敗,返回第一個錯誤,若是無錯誤則返回nil。 func (p *Pipeline) Wait() error { p.mtx.Lock() lastThd := p.prevThd p.mtx.Unlock() <-lastThd.chanExit return lastThd.err }
使用這個Pipeline組件,咱們的ETL程序將會簡單、高效、可靠,讓程序員從繁瑣的併發流程控制中解放出來:異步編程
package main import "log" func main() { //恢復上次執行的checkpoint,若是是第一次執行就獲取一個初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外執行,最後一個工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加載100條數據,並修改變量checkpoint //data是數組,每一個元素是一條評論,以後的聯表、NLP都直接修改data裏的每條記錄。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //這裏有個Golang著名的坑。 //「checkpoint」是循環體外的變量,它在內存中只有一個實例並在循環中不斷被修改,因此不能在異步中使用它。 //這裏建立一個副本curCheckpoint,儲存本次循環的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //處理完畢 } err := pipeline.Wait() if err != nil { log.Print(err) } }
示意圖:函數
每一個方格表示一批數據,黃色表示正在執行所屬工序,白色表示已經完成工序但堵塞等待中。性能
Pipeline的工做方式:學習
Pipeline分別控制每個工序的併發數。
若是第一個工序的併發數已滿,Async會堵塞,直到有線程第一個工序完成。
每一個線程的每一個工序的調度,不早於上一個線程同一個工序的調度。
若是某個線程的某個工序處理失敗(例如數據庫故障),那以後的線程都會停止執行,下一次調用Async返回false,pipeline.Wait()返回第一個錯誤,整個流水線做業可控中斷。
沒法避免中斷過程當中有checkpoint後的數據寫入。下次重啓程序將從新寫入、覆蓋這些數據。
Pipeline解決了這些問題:
若是你剛開始學習Golang,你必定以爲channel這東西好棒。但當你理所固然地用一堆channel來串聯一條流水線,就是把本身逼瘋的開始。實際上Golang有更棒的東西,我不知道那叫什麼,反正你能夠在func開啓一個goroutine的時候,裏面調用外面的變量。
package main import ( "fmt" "time" "sync" ) func main() { var wg sync.WaitGroup for i := 0 ; i < 10 ; i++ { my_var := i * 10 wg.Add(1) go func() { defer wg.Done() time.Sleep(time.Second) fmt.Println(my_var) }() } wg.Wait() }
程序會在啓動1秒後不按順序輸出0、十、20、…… 90。Runtime建立了10個my_var,每一個goroutine各有一個,因此每一個goroutine輸出不同的值。
看起來很簡單的東西,其實是Golang的獨有特性,涉及到Go runtime的機制,其餘語言不得不定義一個對象來解決相似的問題。當我從C++轉Go開發時就驚訝:還有這種操做?
上面的Pipeline模塊利用了這個特性,它根本不須要任何channel來傳遞數據,使用者在一個在循環體內定義一個變量來儲存一整批的數據,在異步的goroutine中讀取、修改這些數據。在goroutine間用channel傳遞數據的思路轉變爲:每一批數據由一個goroutine處理,多個gouroutine競爭各個工序的併發數。