Golang被證實很是適合併發編程,goroutine比異步編程更易讀、優雅、高效。本文提出一個適合由Golang實現的Pipeline執行模型,適合批量處理大量數據(ETL)的情景。程序員
想象這樣的應用情景:
(1)從數據庫A(Cassandra)加載用戶評論(量巨大,例如10億條);
(2)根據每條評論的用戶ID、從數據庫B(MySQL)關聯用戶資料;
(3)調用NLP服務(天然語言處理),處理每條評論;
(4)將處理結果寫入數據庫C(ElasticSearch)。數據庫
因爲應用中遇到的各類問題,概括出這些需求:
需求一:應分批處理數據,例如規定每批100條。出現問題時(例如任意一個數據庫故障)則中斷,下次程序啓動時使用checkpoint從中斷處恢復。
需求二:每一個流程設置合理的併發數、讓數據庫和NLP服務有合理的負載(不影響其它業務的基礎上,儘量佔用更多資源以提升ETL性能)。例如,步驟(1)-(4)分別設置併發數一、八、3二、2。編程
這就是一個典型的Pipeline(流水線)執行模型。把每一批數據(例如100條)看做流水線上的產品,4個步驟對應流水線上4個處理工序,每一個工序處理完畢後就把半成品交給下一個工序。每一個工序能夠同時處理的產品數各不相同。數組
你可能首先想到啓用1+8+32+2個goroutine,使用channel來傳遞半成品。我也曾經這麼幹,結論就是這麼幹會讓程序員瘋掉:流程併發控制代碼很是複雜,特別是你得處理異常、執行時間超出預期、可控中斷等問題,你不得不加入一堆channel,直到你本身都不記得有什麼用。併發
爲了更高效完成ETL工做,我將Pipeline抽象成模塊。我先把代碼粘貼出來,再解析含義。模塊能夠直接使用,主要使用的接口是:NewPipeline、Async、Wait。異步
package main import "sync" func HasClosed(c <-chan struct{}) bool { select { case <-c: return true default: return false } } type SyncFlag interface{ Wait() Chan() <-chan struct{} Done() bool } func NewSyncFlag() (done func(), flag SyncFlag) { f := &syncFlag{ c : make(chan struct{}), } return f.done, f } type syncFlag struct { once sync.Once c chan struct{} } func (f *syncFlag) done() { f.once.Do(func(){ close(f.c) }) } func (f *syncFlag) Wait() { <-f.c } func (f *syncFlag) Chan() <-chan struct{} { return f.c } func (f *syncFlag) Done() bool { return HasClosed(f.c) } type pipelineThread struct { sigs []chan struct{} chanExit chan struct{} interrupt SyncFlag setInterrupt func() err error } func newPipelineThread(l int) *pipelineThread { p := &pipelineThread{ sigs : make([]chan struct{}, l), chanExit : make(chan struct{}), } p.setInterrupt, p.interrupt = NewSyncFlag() for i := range p.sigs { p.sigs[i] = make(chan struct{}) } return p } type Pipeline struct { mtx sync.Mutex workerChans []chan struct{} prevThd *pipelineThread } //建立流水線,參數個數是每一個任務的子過程數,每一個參數對應子過程的併發度。 func NewPipeline(workers ...int) *Pipeline { if len(workers) < 1 { panic("NewPipeline need aleast one argument") } workersChan := make([]chan struct{}, len(workers)) for i := range workersChan { workersChan[i] = make(chan struct{}, workers[i]) } prevThd := newPipelineThread(len(workers)) for _,sig := range prevThd.sigs { close(sig) } close(prevThd.chanExit) return &Pipeline{ workerChans : workersChan, prevThd : prevThd, } } //往流水線推入一個任務。若是第一個步驟的併發數達到設定上限,這個函數會堵塞等待。 //若是流水線中有其它任務失敗(返回非nil),任務不被執行,函數返回false。 func (p *Pipeline) Async(works ...func()error) bool { if len(works) != len(p.workerChans) { panic("Async: arguments number not matched to NewPipeline(...)") } p.mtx.Lock() if p.prevThd.interrupt.Done() { p.mtx.Unlock() return false } prevThd := p.prevThd thisThd := newPipelineThread(len(p.workerChans)) p.prevThd = thisThd p.mtx.Unlock() lock := func(idx int) bool { select { case <-prevThd.interrupt.Chan(): return false case <-prevThd.sigs[idx]: //wait for signal } select { case <-prevThd.interrupt.Chan(): return false case p.workerChans[idx]<-struct{}{}: //get lock } return true } if !lock(0) { thisThd.setInterrupt() <-prevThd.chanExit thisThd.err = prevThd.err close(thisThd.chanExit) return false } go func() { //watch interrupt of previous thread select { case <-prevThd.interrupt.Chan(): thisThd.setInterrupt() case <-thisThd.chanExit: } }() go func() { var err error for i,work := range works { close(thisThd.sigs[i]) //signal next thread if work != nil { err = work() } if err != nil || (i+1 < len(works) && !lock(i+1)) { thisThd.setInterrupt() break } <-p.workerChans[i] //release lock } <-prevThd.chanExit if prevThd.interrupt.Done() { thisThd.setInterrupt() } if prevThd.err != nil { thisThd.err = prevThd.err } else { thisThd.err = err } close(thisThd.chanExit) }() return true } //等待流水線中全部任務執行完畢或失敗,返回第一個錯誤,若是無錯誤則返回nil。 func (p *Pipeline) Wait() error { p.mtx.Lock() lastThd := p.prevThd p.mtx.Unlock() <-lastThd.chanExit return lastThd.err }
使用這個Pipeline組件,咱們的ETL程序將會簡單、高效、可靠,讓程序員從繁瑣的併發流程控制中解放出來:異步編程
package main import "log" func main() { checkpoint := loadCheckpoint() //工序(1)在pipeline外執行,最後一個工序是保存checkpoint pipeline := NewPipeline(8, 32, 2, 1) for { //(1) //加載100條數據,並修改變量checkpoint //data是數組,每一個元素是一條評論,以後的聯表、NLP都直接修改data裏的每條記錄。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //處理完畢 } err := pipeline.Wait() if err != nil { log.Print(err) } }
Pipeline執行模型的特性:函數
一、Pipeline分別控制每個工序的併發數,若是(4)的併發數已滿,某個線程的(3)即便完成都會堵塞等待,直到(4)有一個線程完成。
二、在上面的情景中,Pipeline最多同時處理1+8+32+2+1=44個線程共4400條記錄,內存開銷可控。
三、每一個線程的每一個工序的調度,不早於上一個線程同一個工序的調度。性能
例如:有兩個線程正在執行,<1>先執行、<2>後執行。若是<2>(4)早於<1>(4)完成,那<2>必須堵塞等待,直到<1>(4)完成、<1>(5)開始執行,那<2>(5)纔會開始。又由於(5)的最大併發數是1,因此實際上<2>(5)必須等待<1>(5)完成纔會開始。這個機制保證checkpoint的執行順序必定是按照Async的順序,避免中斷、繼續時漏處理數據。this
四、若是某個線程的某個工序處理失敗(例如數據庫故障),那以後的線程都會停止執行,下一次調用Async返回false,pipeline.Wait()返回第一個錯誤,整個流水線做業可控中斷。
例如:有三個線程正在執行:<1>、<2>、<3>。若是<2>(4)失敗(loadDataToC返回error非nil),那<3>不管正在執行到哪個工序,都不會進入下一個工序而中斷。<1>不會受到影響,會一直執行完畢。Wait()等待<1><2><3>所有完成或停止,返回loadDataToC的錯誤。
五、沒法避免中斷過程當中有checkpoint後的數據寫入。下次重啓程序將從新寫入、覆蓋這些數據。
例如:<2>(4)失敗、<3>(4)執行成功(已寫入數據),那<2>(5)和<3>(5)都不會被執行,checkpoint的最新狀態是<1>寫入的,下次重啓程序將從新執行<2>和<3>,其中<3>的數據會再次寫入,因此寫入應該按照記錄ID做覆蓋寫入。
六、你能夠隨時Ctrl+C、重啓程序,全部事情都會繼續有序執行。死機?毫無壓力。
總結:Pipeline執行模型除了限制併發數,也能限制內存開銷,對失敗恢復有充足的考慮,讓程序員從繁瑣的併發編程中解放出來。
吐槽:Python程序員沒辦法用幾百行代碼就漂亮地完成這個任務。