最近在看《Programming in Go》, 其中關於併發編程寫得很不錯, 受益非淺, 其中有一些例子是須要多思考才能想明白的, 因此我打算記錄下來, 強化一下思路html
《Programming in Go》在 Chapter 7. Concurrent Programming
裏面一共用3個例子來說述併發編程的3個模式, 第一個是 filter
, 篩選出後綴名和文件大小文件列表, 還算簡單就不說, 而後第二個是升級版, 正則版 filter
, 不一樣的是他是根據正則搜索出文件的文本而且列出來. 這個例子我起初看是有點蒙的, 這樣寫是沒錯, 可是爲何要這樣寫, 他的設計思路是什麼, 和其餘方法相比他有什麼優點, 這些都不清楚, 因而決定好好分析一下. 實際上這個例子實現的功能並不複雜, 因此個人文章其實是在討論怎麼產生出和做者類似的思路.golang
若是不考慮用 goroutine 的話, 思路其實很簡單:編程
1. 列出文件列表, 編譯正則. 2. 遍歷文件, 打開並遍歷每行, 若是正則能匹配, 記錄下來. 3. 列出來.
若是用 goroutine , 就會有如下思路:數組
1. 在獲得文件路徑數組以後, 分發任務給N個核. 2. 每一個核負責打開文件, 將符合條件的那行文本寫入到 `channel` 3. 主線程等待並接收`channel`的結果. 顯示出來, 完畢
** 而後下文才是重點 **緩存
在go中, channel 是不會自動關閉的, 因此須要在咱們使用完以後手動去關閉, 並且若是使用for語法來遍歷channel每次獲得的數據, 若是channel沒有關閉的話會陷入死循環. 在 goroutine 中會形成 deadlock安全
for job := range jobs { fmt.Println(job) }
若是沒close, 會觸發dead lock. 由於for...range...會自動阻塞直到讀取到數據或者channel關閉, 沒close的話就會致使整個channel處於睡眠狀態. channel關閉後, 就不容許寫入(緩衝的數據還在, 還能夠讀取), 因此, channel 關閉的時機很重要.數據結構
我所知道任務分發方法有兩種:併發
第一種是固定分配, 若是說我想計算1+2+3+...+100, 而後分紅4份, 也就是
1+2+..+25
,...
,...
,86+87+...+100
, 而後再將結果累加起來.測試還有一種是搶佔式的, 這裏須要使用一個隊列, 將全部任務寫入隊列, 而後開N個goroutine, 每一個goroutine從隊列讀取任務(要確保線程安全), 處理, 完成後再繼續讀取任務. 再也不是固定分配, 本身那份作完了就休息了, 因此看來第二種要好一點.spa
採用第二種方式的話, 對應go的作法, 那就是使用一個channel, 命名爲 jobs
, 將全部的任務寫入進去, 寫入完畢以後關閉這個 channel, 固然, 由於是N核, 系統能同時處理的任務咱們設置爲N個(也就是咱們使用了N個goroutine), 那麼聲明 jobs
是緩衝區長度爲N的 channel.
Buffered channel
和普通的 channel 的差異是他能夠同時容納多個單位數據, 當緩存的數據單位數量等於 channel 容量的時候, 再執行寫入將會阻塞, 不然都是及時處理的.
當咱們將數據處理後, 就須要將結果收集起來. 須要注意的是, 這些操做不是在主 goroutine 執行, 因此咱們須要經過 channel 傳遞給主 goroutine . 因此只須要在外部聲明一個名爲 results
的 channel . 而後在主 goroutine 經過 for
來顯示, 這時候就會發現一個問題, 這個 results
關閉的時機問題. 正確的關閉時機是寫入全部的 Result
以後. 可是別忘了咱們同時開了多個 goroutine , 因此 results
應該在 執行任務的 goroutine 完成信號累計到N個
這個時機關閉. 因此咱們再引入一個名叫 done
的 channel 來解決. 每一個 goroutine 發送完 result 後會寫入一次done, 而後咱們就能夠遍歷 done , 遍歷以後說明所有完成了, 再執行顯示.
Result 的數據結構
type Result struct { filename string lino int line string }
書中的 cgrep1
就是這樣的
func awaitCompletion(done <-chan struct{}, results chan Result) { for i := 0; i < workers; i++ { <-done } close(results) }
可是這樣有可能形成死鎖, 由於書中 results
緩衝區長度限定爲最大1000個, 也就是超過1000個 result 的時候再打算寫入 result 會等待取出 result 後才執行, done 也不會寫入, 而 awaitCompletion
是等到全部 goroutine 都完成了纔會取出 results
, 並且當 result
很是大的時候由於內存的緣故也是不可能一次性取出的. 因此就須要在讀取 results
的同時讀取 done
, 當讀取 done
次數大於 N 後關閉 results
, 因此, 由於要在多個 channel 中同時讀取, 因此須要使用 select
.
下面是書中的 cgrep3
, 改進版:
func waitAndProcessResults(timeout int64, done <-chan struct{}, results <-chan Result) { finish := time.After(time.Duration(timeout)) for working := workers; working > 0; { select { // Blocking case result := <-results: fmt.Printf("%s:%d:%s\n", result.filename, result.lino, result.line) case <-finish: fmt.Println("timed out") return // Time's up so finish with what results there were case <-done: working-- } } for { select { // Nonblocking case result := <-results: fmt.Printf("%s:%d:%s\n", result.filename, result.lino, result.line) case <-finish: fmt.Println("timed out") return // Time's up so finish with what results there were default: return } } }
看到這裏, 我就有個疑問, 爲何在所有完成以後(done都接收到N個了), 還要再遍歷出 results
, 直到讀取不到纔算讀取完成呢(我反應一貫比較慢^_^)? 因而我作了個實驗, 去掉了後面再次循環的部分, 發現有時會遺漏掉數據(我用4個測試文件...), 證實這段代碼是有用的!!!
個人想法是, 他是在處理完 result, 而後寫入 results
, 寫完了才發送 done
, 也就是在收到全部的 done 以後, 全部的數據應該是已經處理完成的. 爲了驗證這個想法, 我寫了一下代碼:
for working := workers; working > 0; { select { // Blocking case result := <-results: // received result case <-done: working-- if working <= 0 { println(len(results)) } } }
而後看到輸出的數是大於0的, 也就是說在接收到所有 done 以後, results
還有數據在緩衝區中, 而後在看看發送result
的代碼, 忽然就明白了
func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) { for job := range jobs { job.Do(lineRx) } done <- struct{}{} }
我把寫入和讀取想固然認爲一塊兒發生了, 由於有緩衝區的緣故, doJobs在發送進 results
的緩衝區以後就馬上發送 done
了, 可是寫入的數據有沒有被處理, 是不知道的, 因此在接收到全部 done
以後, results
緩衝區還有數據, 須要再循環一遍.
附個人代碼一份:
package main import ( "bufio" "fmt" "log" "os" "regexp" "runtime" ) type Job struct { filename string results chan<- Result } type Result struct { filename string line string lino int } var worker = runtime.NumCPU() func main() { // config cpu number runtime.GOMAXPROCS(worker) files := os.Args[2:] regex, err := regexp.Compile(os.Args[1]) if err != nil { log.Fatal(err) return } // 任務列表, 併發數目爲CPU個數 jobs := make(chan Job, worker) // 結果 results := make(chan Result, minimum(1000, len(files))) defer close(results) // 標記完成 dones := make(chan int, worker) defer close(dones) go addJob(files, jobs, results) for i := 0; i < worker; i++ { go doJob(jobs, regex, dones) } awaitForCloseResult(dones, results) } func addJob(files []string, jobs chan<- Job, results chan<- Result) { for _, filename := range files { jobs <- Job{filename, results} } close(jobs) } func doJob(jobs <-chan Job, regex *regexp.Regexp, dones chan int) { for job := range jobs { job.Do(regex) } dones <- 1 } func awaitForCloseResult(dones <-chan int, results chan Result) { working := 0 MyForLable: for { select { case result := <-results: println(result) case <-dones: working++ if working >= worker { if rlen := len(results); rlen > 0 { println("----------------------------------") println("left:", rlen) println("----------------------------------") for i := 1; i <= rlen; i++ { println(<-results) } } break MyForLable } } } } func (j *Job) Do(re *regexp.Regexp) { f, err := os.Open(j.filename) if err != nil { println(err) return } defer f.Close() b := bufio.NewReader(f) lino := 0 for { line, _, err := b.ReadLine() if re.Match(line) { j.results <- Result{j.filename, string(line), lino} } if err != nil { break } lino += 1 } } func minimum(a, b int) int { if a > b { return b } return a } func println(o ...interface{}) { fmt.Println(o...) }
轉自:http://chenye.org/goroutine-note.html