go channel 案例分析

斷斷續續理了一下關於channel的一些概念,如今能夠把下面的程序理清楚了。git

1. source codegithub

這個程序來自於《Go語言程序設計》 7.2.2 併發的Grep, 程序以下。併發

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "io"
    "log"
    "os"
    "path/filepath"
    "regexp"
)

type Job struct {
    filename string
    results  chan<- Result
}

type Result struct {
    filename string
    fino     int
    line     string
}

var numberOfWorkers = 4

func main() {
    //retrieve input arguments
    if len(os.Args) < 3 || os.Args[1] == "-1" || os.Args[1] == "--help" {
        fmt.Printf("usage:  %s <regrex> <files>\n", filepath.Base(os.Args[0]))
        os.Exit(1)
    }

    if lineRx, err := regexp.Compile(os.Args[1]); err != nil {
        log.Fatalf("invalid regexp: %s\n", err)
    } else {
        //do the real work
        grep(lineRx, os.Args[2:])
    }
}

func grep(lineRx *regexp.Regexp, filenames []string) {
    jobs := make(chan Job, numberOfWorkers)
    done := make(chan struct{}, numberOfWorkers)
    results := make(chan Result, minimun(1000, len(filenames)))

    go addJobs(jobs, filenames, results) //produce Jobs
    for i := 0; i < numberOfWorkers; i++ {
        go doJobs(done, lineRx, jobs) //consume Jobs concurrent
    }
    go awaitCompletion(done, results) //wait until all the Jobs done
    processResults(results)
}

func addJobs(jobs chan<- Job, filenames []string, result chan<- Result) {
    for _, filename := range filenames {
        jobs <- Job{filename, result}
    }
    close(jobs)
}

func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
    for job := range jobs {
        job.Do(lineRx)
    }
    done <- struct{}{}
}

func awaitCompletion(done <-chan struct{}, results chan Result) {
    for i := 0; i < numberOfWorkers; i++ {
        <-done
    }
    //???what will happen to data in results???
    close(results)
}

func processResults(results <-chan Result) {
    for result := range results {
        fmt.Printf("%s:%d:%s\n", result.filename, result.fino, result.line)
    }
}

func minimun(a int, b int) int {
    if a <= b {
        return a
    } else {
        return b
    }
}

func (job Job) Do(lineRx *regexp.Regexp) {
    file, err := os.Open(job.filename)
    if err != nil {
        log.Printf("error: %s\n", err)
        return
    }
    defer file.Close()

    reader := bufio.NewReader(file)
    for lino := 1; ; lino++ {
        line, err := reader.ReadBytes('\n')
        if err != nil {
            if err != io.EOF {
                log.Printf("error:%d: %s\n", lino, err)
            }
            break
        }

        line = bytes.TrimRight(line, "\n\r")
        if lineRx.Match(line) {
            job.results <- Result{job.filename, lino, string(line)}
        }
    }
}

 

編譯程序:app

go build programui

program  a12345b   a.txt  b.txt c.txt...spa

2.編寫併發go routinue 的一些模式說明線程

同步通訊時須要避開兩個陷阱:設計

陷阱一:主線程提早退出code

當其餘線程工做沒有完成,而主線程提早退出。主線程退出會致使其餘線程強制退出,而得不到想要的結果。regexp

常見的解決方式是 讓主 gorountine在done通道上等待,根據接收到的消息判斷工做是否完成。另外一種是使用sync.WaitGroup。

陷阱二:死鎖

注意讀寫線程之間的關係,例如:不關閉 寫chanel 會致使 使用range 讀數據的 rountine堵塞。

 

3.程序同步關係圖

這裏我畫出程序執行流程:

0-1 : prepare channels

2:     addJobs start

3:     in addJobs,   close(jobs)  to notice the reader

4:     in doJobs,     finish read jobs(not blocked here),  the consumer won't be controlled by the producer

5:     in do Jobs,   close done to inform that the reader is free

6, 7, 8, 9:   ....

10:  whole program finished

 

從上述流程上能夠看出,都是生產者 經過必定的形式 通知消費者。告知消費者,產品已經生產完成,所以消費者不須要等待了,消費者只須要把剩下的任務完成就能夠,消費者不須要受控於生產者了。

而這種通知 經過兩種形式完成。

第一種: close channel.  例如:  close(jobs),  close(results)

第二種:讀寫done 通道。例如:  done <- struct{}{},   <-done。  因爲done佔用資源比較小,程序中並無把它關閉。

 

continue to the github project  gocrawl.....

相關文章
相關標籤/搜索