Golang處理大數據時使用高效的Pipeline(流水線)執行模型

Golang被證實很是適合併發編程,goroutine比異步編程更易讀、優雅、高效。本文提出一個適合由Golang實現的Pipeline執行模型,適合批量處理大量數據(ETL)的情景。程序員

想象這樣的應用情景:
(1)從數據庫A(Cassandra)加載用戶評論(量巨大,例如10億條);
(2)根據每條評論的用戶ID、從數據庫B(MySQL)關聯用戶資料;
(3)調用NLP服務(天然語言處理),處理每條評論;
(4)將處理結果寫入數據庫C(ElasticSearch)。數據庫

因爲應用中遇到的各類問題,概括出這些需求:
需求一:應分批處理數據,例如規定每批100條。出現問題時(例如任意一個數據庫故障)則中斷,下次程序啓動時使用checkpoint從中斷處恢復。
需求二:每一個流程設置合理的併發數、讓數據庫和NLP服務有合理的負載(不影響其它業務的基礎上,儘量佔用更多資源以提升ETL性能)。例如,步驟(1)-(4)分別設置併發數一、八、3二、2。編程

這就是一個典型的Pipeline(流水線)執行模型。把每一批數據(例如100條)看做流水線上的產品,4個步驟對應流水線上4個處理工序,每一個工序處理完畢後就把半成品交給下一個工序。每一個工序能夠同時處理的產品數各不相同。數組

你可能首先想到啓用1+8+32+2個goroutine,使用channel來傳遞半成品。我也曾經這麼幹,結論就是這麼幹會讓程序員瘋掉:流程併發控制代碼很是複雜,特別是你得處理異常、執行時間超出預期、可控中斷等問題,你不得不加入一堆channel,直到你本身都不記得有什麼用。併發

爲了更高效完成ETL工做,我將Pipeline抽象成模塊。我先把代碼粘貼出來,再解析含義。模塊能夠直接使用,主要使用的接口是:NewPipeline、Async、Wait。異步

package main

import "sync"

func HasClosed(c <-chan struct{}) bool {
    select {
    case <-c: return true
    default: return false
    }
}

type SyncFlag interface{
    Wait()
    Chan() <-chan struct{}
    Done() bool
}

func NewSyncFlag() (done func(), flag SyncFlag) {
    f := &syncFlag{
        c : make(chan struct{}),
    }
    return f.done, f
}

type syncFlag struct {
    once sync.Once
    c chan struct{}
}

func (f *syncFlag) done() {
    f.once.Do(func(){
        close(f.c)
    })
}

func (f *syncFlag) Wait() {
    <-f.c
}

func (f *syncFlag) Chan() <-chan struct{} {
    return f.c
}

func (f *syncFlag) Done() bool {
    return HasClosed(f.c)
}

type pipelineThread struct {
    sigs []chan struct{}
    chanExit chan struct{}
    interrupt SyncFlag
    setInterrupt func()
    err error
}

func newPipelineThread(l int) *pipelineThread {
    p := &pipelineThread{
        sigs : make([]chan struct{}, l),
        chanExit : make(chan struct{}),
    }
    p.setInterrupt, p.interrupt = NewSyncFlag()

    for i := range p.sigs {
        p.sigs[i] = make(chan struct{})
    }
    return p
}

type Pipeline struct {
    mtx sync.Mutex
    workerChans []chan struct{}
    prevThd *pipelineThread
}

//建立流水線,參數個數是每一個任務的子過程數,每一個參數對應子過程的併發度。
func NewPipeline(workers ...int) *Pipeline {
    if len(workers) < 1 { panic("NewPipeline need aleast one argument") }

    workersChan := make([]chan struct{}, len(workers))
    for i := range workersChan {
        workersChan[i] = make(chan struct{}, workers[i])
    }

    prevThd := newPipelineThread(len(workers))
    for _,sig := range prevThd.sigs {
        close(sig)
    }
    close(prevThd.chanExit)

    return &Pipeline{
        workerChans : workersChan,
        prevThd : prevThd,
    }
}

//往流水線推入一個任務。若是第一個步驟的併發數達到設定上限,這個函數會堵塞等待。
//若是流水線中有其它任務失敗(返回非nil),任務不被執行,函數返回false。
func (p *Pipeline) Async(works ...func()error) bool {
    if len(works) != len(p.workerChans) {
        panic("Async: arguments number not matched to NewPipeline(...)")
    }

    p.mtx.Lock()
    if p.prevThd.interrupt.Done() {
        p.mtx.Unlock()
        return false
    }
    prevThd := p.prevThd
    thisThd := newPipelineThread(len(p.workerChans))
    p.prevThd = thisThd
    p.mtx.Unlock()

    lock := func(idx int) bool {
        select {
        case <-prevThd.interrupt.Chan(): return false
        case <-prevThd.sigs[idx]: //wait for signal
        }
        select {
        case <-prevThd.interrupt.Chan(): return false
        case p.workerChans[idx]<-struct{}{}: //get lock
        }
        return true
    }
    if !lock(0) {
        thisThd.setInterrupt()
        <-prevThd.chanExit
        thisThd.err = prevThd.err
        close(thisThd.chanExit)
        return false
    }
    go func() { //watch interrupt of previous thread
        select {
        case <-prevThd.interrupt.Chan():
            thisThd.setInterrupt()
        case <-thisThd.chanExit:
        }
    }()
    go func() {
        var err error
        for i,work := range works {
            close(thisThd.sigs[i]) //signal next thread
            if work != nil {
                err = work()
            }
            if err != nil || (i+1 < len(works) && !lock(i+1)) {
                thisThd.setInterrupt()
                break
            }
            <-p.workerChans[i] //release lock
        }

        <-prevThd.chanExit
        if prevThd.interrupt.Done() {
            thisThd.setInterrupt()
        }
        if prevThd.err != nil {
            thisThd.err = prevThd.err
        } else {
            thisThd.err = err
        }
        close(thisThd.chanExit)
    }()
    return true
}

//等待流水線中全部任務執行完畢或失敗,返回第一個錯誤,若是無錯誤則返回nil。
func (p *Pipeline) Wait() error {
    p.mtx.Lock()
    lastThd := p.prevThd
    p.mtx.Unlock()
    <-lastThd.chanExit
    return lastThd.err
}

使用這個Pipeline組件,咱們的ETL程序將會簡單、高效、可靠,讓程序員從繁瑣的併發流程控制中解放出來:異步編程

package main

import "log"

func main() {
    checkpoint := loadCheckpoint()
    
    //工序(1)在pipeline外執行,最後一個工序是保存checkpoint
    pipeline := NewPipeline(8, 32, 2, 1) 
    for {
        //(1)
        //加載100條數據,並修改變量checkpoint
        //data是數組,每一個元素是一條評論,以後的聯表、NLP都直接修改data裏的每條記錄。
        data, err := extractReviewsFromA(&checkpoint, 100) 
        if err != nil {
            log.Print(err)
            break
        }
        curCheckpoint := checkpoint
        
        ok := pipeline.Async(func() error {
            //(2)
            return joinUserFromB(data)
        }, func() error {
            //(3)
            return nlp(data)
        }, func() error {
            //(4)
            return loadDataToC(data)
        }, func() error {
            //(5)保存checkpoint
            log.Print("done:", curCheckpoint)
            return saveCheckpoint(curCheckpoint)
        })
        if !ok { break }
        
        if len(data) < 100 { break } //處理完畢
    }
    err := pipeline.Wait()
    if err != nil { log.Print(err) }
}

Pipeline執行模型的特性:函數

一、Pipeline分別控制每個工序的併發數,若是(4)的併發數已滿,某個線程的(3)即便完成都會堵塞等待,直到(4)有一個線程完成。
二、在上面的情景中,Pipeline最多同時處理1+8+32+2+1=44個線程共4400條記錄,內存開銷可控。
三、每一個線程的每一個工序的調度,不早於上一個線程同一個工序的調度。性能

例如:有兩個線程正在執行,<1>先執行、<2>後執行。若是<2>(4)早於<1>(4)完成,那<2>必須堵塞等待,直到<1>(4)完成、<1>(5)開始執行,那<2>(5)纔會開始。又由於(5)的最大併發數是1,因此實際上<2>(5)必須等待<1>(5)完成纔會開始。這個機制保證checkpoint的執行順序必定是按照Async的順序,避免中斷、繼續時漏處理數據。this

四、若是某個線程的某個工序處理失敗(例如數據庫故障),那以後的線程都會停止執行,下一次調用Async返回false,pipeline.Wait()返回第一個錯誤,整個流水線做業可控中斷。

例如:有三個線程正在執行:<1>、<2>、<3>。若是<2>(4)失敗(loadDataToC返回error非nil),那<3>不管正在執行到哪個工序,都不會進入下一個工序而中斷。<1>不會受到影響,會一直執行完畢。Wait()等待<1><2><3>所有完成或停止,返回loadDataToC的錯誤。

五、沒法避免中斷過程當中有checkpoint後的數據寫入。下次重啓程序將從新寫入、覆蓋這些數據。

例如:<2>(4)失敗、<3>(4)執行成功(已寫入數據),那<2>(5)和<3>(5)都不會被執行,checkpoint的最新狀態是<1>寫入的,下次重啓程序將從新執行<2>和<3>,其中<3>的數據會再次寫入,因此寫入應該按照記錄ID做覆蓋寫入。

六、你能夠隨時Ctrl+C、重啓程序,全部事情都會繼續有序執行。死機?毫無壓力。

總結:Pipeline執行模型除了限制併發數,也能限制內存開銷,對失敗恢復有充足的考慮,讓程序員從繁瑣的併發編程中解放出來。

吐槽:Python程序員沒辦法用幾百行代碼就漂亮地完成這個任務。

相關文章
相關標籤/搜索