Golang的channel使用以及併發同步技巧

在學習《The Go Programming Language》第八章併發單元的時候仍是遭遇了很多問題,和值得總結思考和記錄的地方。git

作一個相似於unix du命令的工具。可是閹割了一些功能,這裏應該只實現-c(統計total大小) 和-h(以human比較容易辨識的顯示出來)的功能。github

 

首先咱們須要構造一個 可以返回FileInfo信息數組的函數,咱們把它取名爲dirEntries:數組

func dirEntries(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

傳入一個路徑字符串,而後使用ioutil.ReadDir解析這個路徑下面的全部文件以及文件夾生成一個FileInfo的profile。併發

Fileinfo interface下面包含了:函數

type FileInfo interface {
    Name() string       // base name of the file
    Size() int64        // length in bytes for regular files; system-dependent for others
    Mode() FileMode     // file mode bits
    ModTime() time.Time // modification time
    IsDir() bool        // abbreviation for Mode().IsDir()
    Sys() interface{}   // underlying data source (can return nil)
}

多種方法,能夠直接調用,其做用就是後面註釋寫的同樣。工具

 

有了可以獲取文件夾下面文件和文件夾的函數以後,咱們須要一個調用方用來walk指定的目錄:oop

// 入參是一個文件目錄,一個INT64的只接收的單向channel
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirEntries(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

這裏咱們定義一個目錄,而後需求傳入一個單向接收channel用於在多goroutine中計算總共的文件大小。學習

使用range方法來遍歷咱們上面寫的dirEntries的返回文件或文件夾,若是是文件夾則繼續迭代。優化

若是不是則將文件大小存入放入fileSizes channel中。spa

 

搞定上面兩個函數,咱們來寫主函數部分:

func main() {
    root := ""
    flag.StringVar(&root, "-p", ".", "input dir.")
    flag.Parse()

    fileSizes := make(chan int64)
    // 起一個goroutine去walk目錄
    go func() {
        walkDir(root, fileSizes)
        // Walk完畢以後要關閉該channel下面使用range讀取數據的時候纔會有盡頭
        close(fileSizes)
    }()

    var nfiles, nbytes int64
    for size := range fileSizes {
        nfiles++
        nbytes += size
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

這裏注意一點,由於起goroutine的walk函數,和下面同時在range遍歷是在同步進行,若是下面range速度太快讀到管道里面沒有值了會阻塞住等待有數據繼續進來以後讀取,而不是會跳出。只有當close(fileSizes)這句執行到,顯示關閉掉channel以後,纔會跳出range循環而且這時已經讀取完了全部的數據。這裏有點像,close channel的時候給range發送了一箇中止信號同樣,感受這個利用起來會比較有用? 後續可能會再研究一下。

 

讓咱們繼續來優化咱們的程序,添加一個-v參數,打印出掃描文件的進度,當咱們要掃描整個盤的時候,可能會花費大量的時間,咱們須要知道進度如何了。

其實這個需求只須要很小的改動,讓咱們來從新改寫一下main函數,用select多路複用來完成這個事情。

func main() {
    root := ""
    verbose := false
    tick := make(<-chan time.Time)
    var nfiles, nbytes int64

    flag.StringVar(&root, "p", ".", "input dir.")
    flag.BoolVar(&verbose, "v", false, "add verbose if you want")
    flag.Parse()

    if verbose {
        tick = time.Tick(500 * time.Millisecond)
    }

    fileSizes := make(chan int64)
    // 起一個goroutine去walk目錄
    go func() {
        walkDir(root, fileSizes)
        // Walk完畢以後要關閉該channel下面使用range讀取數據的時候纔會有盡頭
        close(fileSizes)
    }()

loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
        }
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

上面其實都差很少,這裏我直接從loop那裏開始說吧,遇到這個loop的時候我其實還蠻疑惑的,由於我在go語言保留關鍵字裏面並無看到他的身影,可是這裏他的確是個關鍵字,和裏面的break連用 裏面break後面跟上的loop 能夠直接跳出到最外層loop包裹的循環,而不是break默認的只跳出一層循環。明白了這個道理以後,這個就不難理解了,當咱們還在遍歷文件的時候,select 會持續讀取文件大小賦值給size,而且返回true給ok。若是咱們開啓了verbose,每隔500毫秒tick會收到來自time.Tick的消息。咱們都知道select會在都準備好的狀況下隨機pick一個執行,因此這裏也或快或慢的被打印進度(前提是同時收到信號,可是實際上這個發生速度可能在nm級別,憑感覺很難感受到誰先)。當最後都執行完畢後filesSizes channel會被上面的攜程函數close(),當close以後,在讀取完剩餘數據後,fileSizes會返回給ok nil。就能夠跳出循環。

 

看到這裏可能會以爲有點繞,因此要儘量的多理解一下,固然咱們可讓這個du程序更快。能夠注意到咱們並無在walkdir裏面開啓goroutines進行併發處理。下面我將嘗試開啓goroutine處理它們,而且用channel給他們加個鎖控制一下goroutine的數量,在此以前咱們先來看看如今完成了的代碼:

package main

import (
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "flag"
    "time"
)

// 入參是一個文件目錄,一個INT64的只接收的單向channel
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirEntries(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

func dirEntries(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

func main() {
    t1 := time.Now()
    root := ""
    verbose := false
    tick := make(<-chan time.Time)
    var nfiles, nbytes int64

    flag.StringVar(&root, "p", ".", "input dir.")
    flag.BoolVar(&verbose, "v", false, "add verbose if you want")
    flag.Parse()

    if verbose {
        tick = time.Tick(500 * time.Millisecond)
    }

    fileSizes := make(chan int64)
    // 起一個goroutine去walk目錄
    go func() {
        walkDir(root, fileSizes)
        // Walk完畢以後要關閉該channel下面使用range讀取數據的時候纔會有盡頭
        close(fileSizes)
    }()

loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
        }
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
    fmt.Println(time.Since(t1))
}

觀察上面代碼能夠看出咱們並不能直接在這個代碼的基礎上直接給walkDir加上goroutine,這樣會致使channel直接被關閉,而後啥也沒跑就結束了。

咱們須要讓主goroutine等待其餘goroutine都完成以後再結束,因此主goroutine須要在這裏阻塞住,等到獲得能夠結束的信號以後再結束。

 

咱們可使用sync.WaitGroup 來對仍舊活躍的walkDir調用進行計數。等到數量爲0的時候就算咱們能夠結束了。

sync.WaitGroup提供了三個方法:

  Add:添加或減小goroutine的數量。

  Done:至關於Add(-1)。

  Wait:阻塞住等待WaitGroup數量變成0.

 

明白這個道理以後咱們改寫了一下代碼,讓它使用sync.WaitGroup來支持同步,最後當全部goroutine都結束以後,關閉channel完成任務。

package main

import (
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "flag"
    "time"
    "sync"
)

// 入參是一個文件目錄,一個INT64的只接收的單向channel
func walkDir(dir string, fileSizes chan<- int64, n *sync.WaitGroup) {
    defer n.Done()
    for _, entry := range dirEntries(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, fileSizes, n)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

func dirEntries(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

func main() {
    t1 := time.Now()
    root := ""
    verbose := false
    tick := make(<-chan time.Time)
    fileSizes := make(chan int64)

    var n sync.WaitGroup
    var nfiles, nbytes int64

    flag.StringVar(&root, "p", ".", "input dir.")
    flag.BoolVar(&verbose, "v", false, "add verbose if you want")
    flag.Parse()

    if verbose {
        tick = time.Tick(500 * time.Millisecond)
    }

    n.Add(1)
    go walkDir(root, fileSizes, &n)

    go func() {
        n.Wait()
        close(fileSizes)
    }()


loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
        }
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
    fmt.Println(time.Since(t1))
}

隨便跑跑。。感受快得飛起,然而跑不了幾秒就會報錯,這個程序最大的問題就是咱們徹底沒有辦法以後它會本身打開多少個goroutine,感受會爆炸。因此咱們要限制這種誇張的寫法,使用channel來作一個併發協程池,把同時開啓的goroutine的數量控制一下。

 

最後上一下完整代碼,注意defer關鍵字,只接收函數,因此我會在釋放鎖的時候使用匿名函數:

package main

import (
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "flag"
    "time"
    "sync"
)

var token = make(chan int, 100)


// 入參是一個文件目錄,一個INT64的只接收的單向channel
func walkDir(dir string, fileSizes chan<- int64, n *sync.WaitGroup) {
    defer n.Done()
    for _, entry := range dirEntries(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, fileSizes, n)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

func dirEntries(dir string) []os.FileInfo {
    token <- 1
    defer func() {<-token}()
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

func main() {
    var nfiles, nbytes int64
    var n sync.WaitGroup

    root := ""
    verbose := false
    t1 := time.Now()
    fileSizes := make(chan int64)
    tick := make(<-chan time.Time)

    flag.StringVar(&root, "p", ".", "input dir.")
    flag.BoolVar(&verbose, "v", false, "add verbose if you want")
    flag.Parse()

    if verbose {
        tick = time.Tick(500 * time.Millisecond)
    }

    n.Add(1)
    go walkDir(root, fileSizes, &n)

    go func() {
        n.Wait()
        close(fileSizes)
    }()


loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
        }
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
    fmt.Println(time.Since(t1))
}

 

 

Reference:

https://github.com/gopl-zh/gopl-zh.github.com  The Go Programming Language

相關文章
相關標籤/搜索