goroutine 併發之搜索文件內容

golang併發編程 - 例子解析

February 26, 2013

最近在看《Programming in Go》, 其中關於併發編程寫得很不錯, 受益非淺, 其中有一些例子是須要多思考才能想明白的, 因此我打算記錄下來, 強化一下思路html

《Programming in Go》在 Chapter 7. Concurrent Programming 裏面一共用3個例子來說述併發編程的3個模式, 第一個是 filter , 篩選出後綴名和文件大小文件列表, 還算簡單就不說, 而後第二個是升級版, 正則版 filter , 不一樣的是他是根據正則搜索出文件的文本而且列出來. 這個例子我起初看是有點蒙的, 這樣寫是沒錯, 可是爲何要這樣寫, 他的設計思路是什麼, 和其餘方法相比他有什麼優點, 這些都不清楚, 因而決定好好分析一下. 實際上這個例子實現的功能並不複雜, 因此個人文章其實是在討論怎麼產生出和做者類似的思路.golang

若是不考慮用 goroutine 的話, 思路其實很簡單:編程

1. 列出文件列表, 編譯正則.
2. 遍歷文件, 打開並遍歷每行, 若是正則能匹配, 記錄下來.
3. 列出來.

若是用 goroutine , 就會有如下思路:數組

1. 在獲得文件路徑數組以後, 分發任務給N個核.
2. 每一個核負責打開文件,  將符合條件的那行文本寫入到 `channel`
3. 主線程等待並接收`channel`的結果. 顯示出來, 完畢

** 而後下文才是重點 **緩存

1. channel關閉的時機

在go中, channel 是不會自動關閉的, 因此須要在咱們使用完以後手動去關閉, 並且若是使用for語法來遍歷channel每次獲得的數據, 若是channel沒有關閉的話會陷入死循環. 在 goroutine 中會形成 deadlock安全

for job := range jobs { fmt.Println(job) } 

若是沒close, 會觸發dead lock. 由於for...range...會自動阻塞直到讀取到數據或者channel關閉, 沒close的話就會致使整個channel處於睡眠狀態. channel關閉後, 就不容許寫入(緩衝的數據還在, 還能夠讀取), 因此, channel 關閉的時機很重要.數據結構

2. 分發任務

我所知道任務分發方法有兩種:併發

第一種是固定分配, 若是說我想計算1+2+3+...+100, 而後分紅4份, 也就是 1+2+..+25......86+87+...+100, 而後再將結果累加起來.測試

還有一種是搶佔式的, 這裏須要使用一個隊列, 將全部任務寫入隊列, 而後開N個goroutine, 每一個goroutine從隊列讀取任務(要確保線程安全), 處理, 完成後再繼續讀取任務. 再也不是固定分配, 本身那份作完了就休息了, 因此看來第二種要好一點.spa

採用第二種方式的話, 對應go的作法, 那就是使用一個channel, 命名爲 jobs, 將全部的任務寫入進去, 寫入完畢以後關閉這個 channel, 固然, 由於是N核, 系統能同時處理的任務咱們設置爲N個(也就是咱們使用了N個goroutine), 那麼聲明 jobs 是緩衝區長度爲N的 channel.

Buffered channel 和普通的 channel 的差異是他能夠同時容納多個單位數據, 當緩存的數據單位數量等於 channel 容量的時候, 再執行寫入將會阻塞, 不然都是及時處理的.

3. 結果集

當咱們將數據處理後, 就須要將結果收集起來. 須要注意的是, 這些操做不是在主 goroutine 執行, 因此咱們須要經過 channel 傳遞給主 goroutine . 因此只須要在外部聲明一個名爲 results 的 channel . 而後在主 goroutine 經過 for 來顯示, 這時候就會發現一個問題, 這個 results 關閉的時機問題. 正確的關閉時機是寫入全部的 Result 以後. 可是別忘了咱們同時開了多個 goroutine , 因此 results 應該在 執行任務的 goroutine 完成信號累計到N個 這個時機關閉. 因此咱們再引入一個名叫 done 的 channel 來解決. 每一個 goroutine 發送完 result 後會寫入一次done, 而後咱們就能夠遍歷 done , 遍歷以後說明所有完成了, 再執行顯示.

Result 的數據結構

type Result struct {
    filename string
    lino int
    line string
}

 

書中的 cgrep1 就是這樣的

func awaitCompletion(done <-chan struct{}, results chan Result) {
    for i := 0; i < workers; i++ {
        <-done
    }
    close(results)
}

 

可是這樣有可能形成死鎖, 由於書中 results 緩衝區長度限定爲最大1000個, 也就是超過1000個 result 的時候再打算寫入 result 會等待取出 result 後才執行, done 也不會寫入, 而 awaitCompletion 是等到全部 goroutine 都完成了纔會取出 results, 並且當 result 很是大的時候由於內存的緣故也是不可能一次性取出的. 因此就須要在讀取 results 的同時讀取 done, 當讀取 done 次數大於 N 後關閉 results, 因此, 由於要在多個 channel 中同時讀取, 因此須要使用 select.

下面是書中的 cgrep3 , 改進版:

func waitAndProcessResults(timeout int64, done <-chan struct{}, results <-chan Result) {
    finish := time.After(time.Duration(timeout))
    for working := workers; working > 0; {
        select { // Blocking
            case result := <-results:
                fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                result.line)
            case <-finish:
                fmt.Println("timed out")
                return // Time's up so finish with what results there were
            case <-done: 
                working--
        } 
    }
    for {
        select { // Nonblocking
            case result := <-results:
                fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                result.line)
            case <-finish:
                fmt.Println("timed out")
                return // Time's up so finish with what results there were
            default: 
                return
        } 
    }
}

 

看到這裏, 我就有個疑問, 爲何在所有完成以後(done都接收到N個了), 還要再遍歷出 results, 直到讀取不到纔算讀取完成呢(我反應一貫比較慢^_^)? 因而我作了個實驗, 去掉了後面再次循環的部分, 發現有時會遺漏掉數據(我用4個測試文件...), 證實這段代碼是有用的!!!

個人想法是, 他是在處理完 result, 而後寫入 results, 寫完了才發送 done, 也就是在收到全部的 done 以後, 全部的數據應該是已經處理完成的. 爲了驗證這個想法, 我寫了一下代碼:

for working := workers; working > 0; {
    select { // Blocking
        case result := <-results:
            // received result
        case <-done: 
            working--
            if working <= 0 {
                println(len(results))
            }
    } 
}

 

而後看到輸出的數是大於0的, 也就是說在接收到所有 done 以後, results 還有數據在緩衝區中, 而後在看看發送result 的代碼, 忽然就明白了

func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
    for job := range jobs {
        job.Do(lineRx)
    }
    done <- struct{}{}
}

 

我把寫入和讀取想固然認爲一塊兒發生了, 由於有緩衝區的緣故, doJobs在發送進 results 的緩衝區以後就馬上發送 done 了, 可是寫入的數據有沒有被處理, 是不知道的, 因此在接收到全部 done 以後, results 緩衝區還有數據, 須要再循環一遍.


附個人代碼一份:

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "regexp"
    "runtime"
)

type Job struct {
    filename string
    results  chan<- Result
}

type Result struct {
    filename string
    line     string
    lino     int
}

var worker = runtime.NumCPU()

func main() {
    // config cpu number
    runtime.GOMAXPROCS(worker)
    files := os.Args[2:]
    regex, err := regexp.Compile(os.Args[1])
    if err != nil {
        log.Fatal(err)
        return
    }

    // 任務列表, 併發數目爲CPU個數
    jobs := make(chan Job, worker)
    // 結果
    results := make(chan Result, minimum(1000, len(files)))
    defer close(results)
    // 標記完成
    dones := make(chan int, worker)
    defer close(dones)

    go addJob(files, jobs, results)
    for i := 0; i < worker; i++ {
        go doJob(jobs, regex, dones)
    }
    awaitForCloseResult(dones, results)
}

func addJob(files []string, jobs chan<- Job, results chan<- Result) {
    for _, filename := range files {
        jobs <- Job{filename, results}
    }
    close(jobs)
}

func doJob(jobs <-chan Job, regex *regexp.Regexp, dones chan int) {
    for job := range jobs {
        job.Do(regex)
    }

    dones <- 1
}

func awaitForCloseResult(dones <-chan int, results chan Result) {
    working := 0
MyForLable:
    for {
        select {
        case result := <-results:
            println(result)
        case <-dones:
            working++
            if working >= worker {
                if rlen := len(results); rlen > 0 {
                    println("----------------------------------")
                    println("left:", rlen)
                    println("----------------------------------")
                    for i := 1; i <= rlen; i++ {
                        println(<-results)
                    }
                }
                break MyForLable
            }
        }
    }
}

func (j *Job) Do(re *regexp.Regexp) {
    f, err := os.Open(j.filename)
    if err != nil {
        println(err)
        return
    }
    defer f.Close()

    b := bufio.NewReader(f)
    lino := 0
    for {
        line, _, err := b.ReadLine()
        if re.Match(line) {
            j.results <- Result{j.filename, string(line), lino}
        }

        if err != nil {
            break
        }
        lino += 1
    }
}

func minimum(a, b int) int {
    if a > b {
        return b
    }
    return a
}

func println(o ...interface{}) {
    fmt.Println(o...)
}

轉自:http://chenye.org/goroutine-note.html

相關文章
相關標籤/搜索