斷斷續續理了一下關於channel的一些概念,如今能夠把下面的程序理清楚了。git
1. source codegithub
這個程序來自於《Go語言程序設計》 7.2.2 併發的Grep, 程序以下。併發
package main import ( "bufio" "bytes" "fmt" "io" "log" "os" "path/filepath" "regexp" ) type Job struct { filename string results chan<- Result } type Result struct { filename string fino int line string } var numberOfWorkers = 4 func main() { //retrieve input arguments if len(os.Args) < 3 || os.Args[1] == "-1" || os.Args[1] == "--help" { fmt.Printf("usage: %s <regrex> <files>\n", filepath.Base(os.Args[0])) os.Exit(1) } if lineRx, err := regexp.Compile(os.Args[1]); err != nil { log.Fatalf("invalid regexp: %s\n", err) } else { //do the real work grep(lineRx, os.Args[2:]) } } func grep(lineRx *regexp.Regexp, filenames []string) { jobs := make(chan Job, numberOfWorkers) done := make(chan struct{}, numberOfWorkers) results := make(chan Result, minimun(1000, len(filenames))) go addJobs(jobs, filenames, results) //produce Jobs for i := 0; i < numberOfWorkers; i++ { go doJobs(done, lineRx, jobs) //consume Jobs concurrent } go awaitCompletion(done, results) //wait until all the Jobs done processResults(results) } func addJobs(jobs chan<- Job, filenames []string, result chan<- Result) { for _, filename := range filenames { jobs <- Job{filename, result} } close(jobs) } func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) { for job := range jobs { job.Do(lineRx) } done <- struct{}{} } func awaitCompletion(done <-chan struct{}, results chan Result) { for i := 0; i < numberOfWorkers; i++ { <-done } //???what will happen to data in results??? close(results) } func processResults(results <-chan Result) { for result := range results { fmt.Printf("%s:%d:%s\n", result.filename, result.fino, result.line) } } func minimun(a int, b int) int { if a <= b { return a } else { return b } } func (job Job) Do(lineRx *regexp.Regexp) { file, err := os.Open(job.filename) if err != nil { log.Printf("error: %s\n", err) return } defer file.Close() reader := bufio.NewReader(file) for lino := 1; ; lino++ { line, err := reader.ReadBytes('\n') if err != nil { if err != io.EOF { log.Printf("error:%d: %s\n", lino, err) } break } line = bytes.TrimRight(line, "\n\r") if lineRx.Match(line) { job.results <- Result{job.filename, lino, string(line)} } } }
編譯程序:app
go build programui
program a12345b a.txt b.txt c.txt...spa
2.編寫併發go routinue 的一些模式說明線程
同步通訊時須要避開兩個陷阱:設計
陷阱一:主線程提早退出code
當其餘線程工做沒有完成,而主線程提早退出。主線程退出會致使其餘線程強制退出,而得不到想要的結果。regexp
常見的解決方式是 讓主 gorountine在done通道上等待,根據接收到的消息判斷工做是否完成。另外一種是使用sync.WaitGroup。
陷阱二:死鎖
注意讀寫線程之間的關係,例如:不關閉 寫chanel 會致使 使用range 讀數據的 rountine堵塞。
3.程序同步關係圖
這裏我畫出程序執行流程:
0-1 : prepare channels
2: addJobs start
3: in addJobs, close(jobs) to notice the reader
4: in doJobs, finish read jobs(not blocked here), the consumer won't be controlled by the producer
5: in do Jobs, close done to inform that the reader is free
6, 7, 8, 9: ....
10: whole program finished
從上述流程上能夠看出,都是生產者 經過必定的形式 通知消費者。告知消費者,產品已經生產完成,所以消費者不須要等待了,消費者只須要把剩下的任務完成就能夠,消費者不須要受控於生產者了。
而這種通知 經過兩種形式完成。
第一種: close channel. 例如: close(jobs), close(results)
第二種:讀寫done 通道。例如: done <- struct{}{}, <-done。 因爲done佔用資源比較小,程序中並無把它關閉。
continue to the github project gocrawl.....