Golang處理大數據時使用高效的Pipeline(流水線)執行模型

併發是件技術活

Golang被證實很是適合併發編程,goroutine比異步編程更易讀、優雅、高效。本文提出一個適合由Golang實現的Pipeline執行模型,適合批量處理大量數據(ETL)的情景。程序員

想象這樣的應用情景:數據庫

  1. 從數據庫A(Cassandra)加載用戶評論(量巨大,例如10億條);
  2. 根據每條評論的用戶ID、從數據庫B(MySQL)關聯用戶資料;
  3. 調用NLP服務(天然語言處理),處理每條評論;
  4. 將處理結果寫入數據庫C(ElasticSearch)。

因爲應用中遇到的各類問題,概括出這些需求:
需求一:應分批處理數據,例如規定每批100條。出現問題時(例如任意一個數據庫故障)則中斷,下次程序啓動時使用checkpoint從中斷處恢復。
需求二:每一個流程設置合理的併發數、讓數據庫和NLP服務有合理的負載(不影響其它業務的基礎上,儘量佔用更多資源以提升ETL性能)。例如,步驟(1)-(4)分別設置併發數一、四、八、2。編程

這就是一個典型的Pipeline(流水線)執行模型。把每一批數據(例如100條)看做流水線上的產品,4個步驟對應流水線上4個處理工序,每一個工序處理完畢後就把半成品交給下一個工序。每一個工序能夠同時處理的產品數各不相同。數組

你可能首先想到啓用1+4+8+2個goroutine,使用channel來傳遞數據。我也曾經這麼幹,結論就是這麼幹會讓程序員瘋掉:流程併發控制代碼很是複雜,特別是你得處理異常、執行時間超出預期、可控中斷等問題,你不得不加入一堆channel,直到你本身都不記得有什麼用。併發

可重用的Pipeline模塊

爲了更高效完成ETL工做,我將Pipeline抽象成模塊。我先把代碼粘貼出來,再解析含義。模塊能夠直接使用,主要使用的接口是:NewPipeline、Async、Wait。異步

package main

import "sync"

func HasClosed(c <-chan struct{}) bool {
    select {
    case <-c: return true
    default: return false
    }
}

type SyncFlag interface{
    Wait()
    Chan() <-chan struct{}
    Done() bool
}

func NewSyncFlag() (done func(), flag SyncFlag) {
    f := &syncFlag{
        c : make(chan struct{}),
    }
    return f.done, f
}

type syncFlag struct {
    once sync.Once
    c chan struct{}
}

func (f *syncFlag) done() {
    f.once.Do(func(){
        close(f.c)
    })
}

func (f *syncFlag) Wait() {
    <-f.c
}

func (f *syncFlag) Chan() <-chan struct{} {
    return f.c
}

func (f *syncFlag) Done() bool {
    return HasClosed(f.c)
}

type pipelineThread struct {
    sigs []chan struct{}
    chanExit chan struct{}
    interrupt SyncFlag
    setInterrupt func()
    err error
}

func newPipelineThread(l int) *pipelineThread {
    p := &pipelineThread{
        sigs : make([]chan struct{}, l),
        chanExit : make(chan struct{}),
    }
    p.setInterrupt, p.interrupt = NewSyncFlag()

    for i := range p.sigs {
        p.sigs[i] = make(chan struct{})
    }
    return p
}

type Pipeline struct {
    mtx sync.Mutex
    workerChans []chan struct{}
    prevThd *pipelineThread
}

//建立流水線,參數個數是每一個任務的子過程數,每一個參數對應子過程的併發度。
func NewPipeline(workers ...int) *Pipeline {
    if len(workers) < 1 { panic("NewPipeline need aleast one argument") }

    workersChan := make([]chan struct{}, len(workers))
    for i := range workersChan {
        workersChan[i] = make(chan struct{}, workers[i])
    }

    prevThd := newPipelineThread(len(workers))
    for _,sig := range prevThd.sigs {
        close(sig)
    }
    close(prevThd.chanExit)

    return &Pipeline{
        workerChans : workersChan,
        prevThd : prevThd,
    }
}

//往流水線推入一個任務。若是第一個步驟的併發數達到設定上限,這個函數會堵塞等待。
//若是流水線中有其它任務失敗(返回非nil),任務不被執行,函數返回false。
func (p *Pipeline) Async(works ...func()error) bool {
    if len(works) != len(p.workerChans) {
        panic("Async: arguments number not matched to NewPipeline(...)")
    }

    p.mtx.Lock()
    if p.prevThd.interrupt.Done() {
        p.mtx.Unlock()
        return false
    }
    prevThd := p.prevThd
    thisThd := newPipelineThread(len(p.workerChans))
    p.prevThd = thisThd
    p.mtx.Unlock()

    lock := func(idx int) bool {
        select {
        case <-prevThd.interrupt.Chan(): return false
        case <-prevThd.sigs[idx]: //wait for signal
        }
        select {
        case <-prevThd.interrupt.Chan(): return false
        case p.workerChans[idx]<-struct{}{}: //get lock
        }
        return true
    }
    if !lock(0) {
        thisThd.setInterrupt()
        <-prevThd.chanExit
        thisThd.err = prevThd.err
        close(thisThd.chanExit)
        return false
    }
    go func() { //watch interrupt of previous thread
        select {
        case <-prevThd.interrupt.Chan():
            thisThd.setInterrupt()
        case <-thisThd.chanExit:
        }
    }()
    go func() {
        var err error
        for i,work := range works {
            close(thisThd.sigs[i]) //signal next thread
            if work != nil {
                err = work()
            }
            if err != nil || (i+1 < len(works) && !lock(i+1)) {
                thisThd.setInterrupt()
                break
            }
            <-p.workerChans[i] //release lock
        }

        <-prevThd.chanExit
        if prevThd.interrupt.Done() {
            thisThd.setInterrupt()
        }
        if prevThd.err != nil {
            thisThd.err = prevThd.err
        } else {
            thisThd.err = err
        }
        close(thisThd.chanExit)
    }()
    return true
}

//等待流水線中全部任務執行完畢或失敗,返回第一個錯誤,若是無錯誤則返回nil。
func (p *Pipeline) Wait() error {
    p.mtx.Lock()
    lastThd := p.prevThd
    p.mtx.Unlock()
    <-lastThd.chanExit
    return lastThd.err
}

使用這個Pipeline組件,咱們的ETL程序將會簡單、高效、可靠,讓程序員從繁瑣的併發流程控制中解放出來:異步編程

package main

import "log"

func main() {
    //恢復上次執行的checkpoint,若是是第一次執行就獲取一個初始值。
    checkpoint := loadCheckpoint()
    
    //工序(1)在pipeline外執行,最後一個工序是保存checkpoint
    pipeline := NewPipeline(4, 8, 2, 1) 
    for {
        //(1)
        //加載100條數據,並修改變量checkpoint
        //data是數組,每一個元素是一條評論,以後的聯表、NLP都直接修改data裏的每條記錄。
        data, err := extractReviewsFromA(&checkpoint, 100) 
        if err != nil {
            log.Print(err)
            break
        }
        
        //這裏有個Golang著名的坑。
        //「checkpoint」是循環體外的變量,它在內存中只有一個實例並在循環中不斷被修改,因此不能在異步中使用它。
        //這裏建立一個副本curCheckpoint,儲存本次循環的checkpoint。
        curCheckpoint := checkpoint
        
        ok := pipeline.Async(func() error {
            //(2)
            return joinUserFromB(data)
        }, func() error {
            //(3)
            return nlp(data)
        }, func() error {
            //(4)
            return loadDataToC(data)
        }, func() error {
            //(5)保存checkpoint
            log.Print("done:", curCheckpoint)
            return saveCheckpoint(curCheckpoint)
        })
        if !ok { break }
        
        if len(data) < 100 { break } //處理完畢
    }
    err := pipeline.Wait()
    if err != nil { log.Print(err) }
}

示意圖:函數

圖片描述

每一個方格表示一批數據,黃色表示正在執行所屬工序,白色表示已經完成工序但堵塞等待中。性能

Pipeline的工做方式:學習

  1. Pipeline分別控制每個工序的併發數。

    • 如圖:(4)的併發數已滿,<14>(3)已經完成並堵塞等待(繼續佔有(3)的併發數),直到<12>(4)完成。
  2. 若是第一個工序的併發數已滿,Async會堵塞,直到有線程第一個工序完成。

    • 如圖:循環體內的<25>正在等待<21>(2)進入下一個工序。
  3. 每一個線程的每一個工序的調度,不早於上一個線程同一個工序的調度。

    • 如圖:<22>(2)早於<21>(2)完成,<22>須堵塞等待,直到<21>(2)完成。
  4. 若是某個線程的某個工序處理失敗(例如數據庫故障),那以後的線程都會停止執行,下一次調用Async返回false,pipeline.Wait()返回第一個錯誤,整個流水線做業可控中斷。

    • 例如:<12>(4)失敗,那<13>、<14>……不管正在執行到哪個工序,都不會進入下一個工序而中斷。<11>不會受到影響,會一直執行完畢。Wait()等待所有完成或停止,返回<12>(4)的錯誤。
  5. 沒法避免中斷過程當中有checkpoint後的數據寫入。下次重啓程序將從新寫入、覆蓋這些數據。

    • 例如:<12>(4)失敗、<13>(4)執行成功(已寫入數據),那<12>(5)和<13>(5)都不會被執行,checkpoint的最新狀態是<11>寫入的,下次重啓程序將從<12>開始,<13>的數據會再次寫入,因此寫入應該按照記錄ID做覆蓋寫入。

Pipeline解決了這些問題:

  1. 控制每一個工序的併發數;
  2. 控制總體併發數,不會由於in fly數據太多無限佔用內存。
  3. 任何工序出現故障(數據庫操做失敗),整個流水線可控中斷,不會漏處理任何一批記錄,也不會致使太多的從新執行。你也能夠隨時Ctrl+C、微調代碼、重啓程序,全部事情都會繼續有序執行。
  4. 任何工序發生堵塞(例如數據庫緩慢),整個流水線都會慢下來等待,不會強行加塞。
  5. 你能夠隨意修改每一個工序的併發數,直到找到最佳值。

用channel在上下游間傳遞數據是件笨拙的事

若是你剛開始學習Golang,你必定以爲channel這東西好棒。但當你理所固然地用一堆channel來串聯一條流水線,就是把本身逼瘋的開始。實際上Golang有更棒的東西,我不知道那叫什麼,反正你能夠在func開啓一個goroutine的時候,裏面調用外面的變量。

package main

import (
    "fmt"
    "time"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    for i := 0 ; i < 10 ; i++ {
        my_var := i * 10
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(time.Second)
            fmt.Println(my_var)
        }()
    }
    wg.Wait()
}

程序會在啓動1秒後不按順序輸出0、十、20、…… 90。Runtime建立了10個my_var,每一個goroutine各有一個,因此每一個goroutine輸出不同的值。

看起來很簡單的東西,其實是Golang的獨有特性,涉及到Go runtime的機制,其餘語言不得不定義一個對象來解決相似的問題。當我從C++轉Go開發時就驚訝:還有這種操做?

上面的Pipeline模塊利用了這個特性,它根本不須要任何channel來傳遞數據,使用者在一個在循環體內定義一個變量來儲存一整批的數據,在異步的goroutine中讀取、修改這些數據。在goroutine間用channel傳遞數據的思路轉變爲:每一批數據由一個goroutine處理,多個gouroutine競爭各個工序的併發數。

相關文章
相關標籤/搜索