goroutine的退出git
有時候咱們須要通知goroutine中止它正在乾的事情,好比一個正在執行計算的web服務,然而它的客戶端已經斷開了和服務端的鏈接。web
Go語言並無提供在一個goroutine中終止另外一個goroutine的方法,因爲這樣會致使goroutine之間的共享變量落在未定義的狀態上。併發
若是咱們想要退出兩個或者任意多個goroutine怎麼辦呢?app
假設有一個abort channel,全部的goroutine訂閱這個channel,能夠向這個channel發送發送和goroutine數目同樣多的事件來退出它們。若是這些goroutine中已經有一些本身退出了,那麼會致使咱們的channel裏的事件數比goroutine還多,這樣致使咱們的發送直接被阻塞。另外一方面,若是這些goroutine又生成了其它的goroutine,咱們的channel裏的數目又太少了,因此有些goroutine可能會沒法接收到退出消息。通常狀況下咱們是很難知道在某一個時刻具體有多少個goroutine在運行着的。另外,當一個goroutine從abort channel中接收到一個值的時候,他會消費掉這個值,這樣其它的goroutine就無法看到這條信息。爲了可以達到咱們退出goroutine的目的,咱們須要更靠譜的策略,來經過一個channel把消息廣播出去,這樣goroutine們可以看到這條事件消息,而且在事件完成以後,能夠知道這件事已經發生過了。oop
回憶一下咱們關閉了一個channel而且被消費掉了全部已發送的值,操做channel以後的代碼能夠當即被執行,而且會產生零值。咱們能夠將這個機制擴展一下,來做爲咱們的廣播機制:不要向channel發送值,而是用關閉一個channel來進行廣播。ui
簡單的來講廣播機制的原理就是經過關閉channel,這樣對該channel的讀取操做都不會阻塞,而是會獲得一個零值。經過關閉channel來廣播消息事件。code
簡單的用代碼表示以下,token
package main import ( "fmt" "os" "time" ) var done = make(chan struct{}) func cancelled() bool { select { case <-done: return true default: return false } } func main() { // Cancel traversal when input is detected. go func() { os.Stdin.Read(make([]byte, 1)) // read a single byte close(done) }() for { if cancelled() { fmt.Println("cancell") return } else { fmt.Println("press enter to cancell") } time.Sleep(1000 * time.Millisecond) } }
運行結果,事件
➜ close git:(master) ✗ go run close.go press enter to cancell press enter to cancell press enter to cancell press enter to cancell cancell
那麼咱們能夠利用這個廣播機制來關閉全部的goroutine。input
首先來看一段代碼,
package main import ( "fmt" "io/ioutil" "os" "path/filepath" "time" ) var done = make(chan struct{}) func cancelled() bool { select { case <-done: return true default: return false } } // walkDir recursively walks the file tree rooted at dir // and sends the size of each found file on fileSizes. func walkDir(dir string, fileSizes chan<- int64) { for _, entry := range dirents(dir) { if entry.IsDir() { subdir := filepath.Join(dir, entry.Name()) walkDir(subdir, fileSizes) } else { fileSizes <- entry.Size() } } } // dirents returns the entries of directory dir. func dirents(dir string) []os.FileInfo { entries, err := ioutil.ReadDir(dir) if err != nil { fmt.Fprintf(os.Stderr, "du1: %v\n", err) return nil } return entries } func main() { // Cancel traversal when input is detected. go func() { os.Stdin.Read(make([]byte, 1)) // read a single byte close(done) }() var roots = []string{"/Users/xinxingegeya"} // Traverse the file tree. fileSizes := make(chan int64) go func() { for _, root := range roots { walkDir(root, fileSizes) } close(fileSizes) }() // Print the results. tick := time.Tick(1 * time.Second) var nfiles, nbytes int64 loop: for { select { case size, ok := <-fileSizes: if !ok { break loop // fileSizes was closed } nfiles++ nbytes += size case <-tick: printDiskUsage(nfiles, nbytes) } } printDiskUsage(nfiles, nbytes) // final totals } func printDiskUsage(nfiles, nbytes int64) { fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9) }
這段代碼,會遍歷指定目錄,計算出該目錄下文件的數目和所用空間的大小。這段代碼還不是併發的執行,咱們下面改爲併發的執行,而且經過上面所說的廣播機制來中斷全部運行中的goroutine退出計算任務。
package main import ( "fmt" "io/ioutil" "os" "path/filepath" "sync" "time" ) var done = make(chan struct{}) // sema is a counting semaphore for limiting concurrency in dirents. var sema = make(chan struct{}, 20) func cancelled() bool { select { case <-done: return true default: return false } } func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) { defer n.Done() if cancelled() { return } for _, entry := range dirents(dir) { if entry.IsDir() { n.Add(1) subdir := filepath.Join(dir, entry.Name()) go walkDir(subdir, n, fileSizes) } else { fileSizes <- entry.Size() } } } // dirents returns the entries of directory dir. func dirents(dir string) []os.FileInfo { select { case sema <- struct{}{}: // acquire token case <-done: return nil // cancelled } defer func() { <-sema }() // release token entries, err := ioutil.ReadDir(dir) if err != nil { fmt.Fprintf(os.Stderr, "du1: %v\n", err) return nil } return entries } func main() { // Cancel traversal when input is detected. go func() { os.Stdin.Read(make([]byte, 1)) // read a single byte close(done) }() var roots = []string{"/Users/xinxingegeya"} // Traverse the file tree. fileSizes := make(chan int64) var n sync.WaitGroup for _, root := range roots { n.Add(1) go walkDir(root, &n, fileSizes) } go func() { n.Wait() close(fileSizes) }() // Print the results. tick := time.Tick(1 * time.Second) var nfiles, nbytes int64 loop: for { select { case <-done: // Drain fileSizes to allow existing goroutines to finish. for range fileSizes { // Do nothing. } return case size, ok := <-fileSizes: if !ok { break loop // fileSizes was closed } nfiles++ nbytes += size case <-tick: printDiskUsage(nfiles, nbytes) } } printDiskUsage(nfiles, nbytes) // final totals } func printDiskUsage(nfiles, nbytes int64) { fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9) }
運行結果,
➜ interrupt git:(master) ✗ go run interrupt.go du1: open /Users/xinxingegeya/Library/Saved Application State/com.bitrock.appinstaller.savedState: permission denied 11553 files 32.6 GB 27593 files 44.1 GB 27929 files 44.2 GB 56182 files 46.3 GB 70592 files 48.2 GB 85680 files 49.9 GB 97835 files 49.9 GB 110396 files 49.9 GB 119635 files 49.9 GB
當按下enter鍵後,程序會退出。
===========END===========