看着身邊優秀的小夥伴們早就開始寫博客,本身深感落後,還好遲作總比不作好,勉勵本身見賢思齊。趁着年前最後一個週末,陽光正好,寫下第一篇博客,爲2019年開個頭,以期完成今年爲本身立下的flags。git
從PHPer轉Gopher,很大一個緣由就是業務對性能和併發的持續需求,另外一個主要緣由就是Go語言原生的併發特性,能夠在提供同等高可用的能力下,使用更少的機器資源,節約可觀的成本。所以本文就結合本身在學習Go併發的實戰demo中,把遇到的一些坑點寫下來,共享進步。github
a) Channel
- 分爲無緩衝、有緩衝通道;docker
b) WaitGroup
- sync包提供的goroutine間的同步機制;bash
c) Context
- 在調用鏈不一樣goroutine間傳遞和共享數據;markdown
本文demo中主要用到了前兩種,基本使用請查看官方文檔。併發
需求:實現一個EDM的高效郵件發送:須要支持多個國家(能夠當作是多個任務),須要記錄每條任務發送的狀態(當前成功、失敗條數),須要支持可暫停(stop)、從新發送(run)操做。函數
分析:從需求能夠看出,在郵件發送中能夠經過併發實現多個國家(多個任務)併發、單個任務分批次併發實現快速、高效EDM需求。oop
3.1 main.go性能
package main import ( "bufio" "fmt" "io" "log" "os" "strconv" "sync" "time" ) var ( batchLength = 20 wg sync.WaitGroup finish = make(chan bool) ) func main() { startTime := time.Now().UnixNano() for i := 1; i <= 3; i++ { filename := "./task/edm" + strconv.Itoa(i) + ".txt" start := 60 go RunTask(filename, start, batchLength) } // main 阻塞等待goroutine執行完成 fmt.Println(<-finish) fmt.Println("finished all tasks.") endTime := time.Now().UnixNano() fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) } // 單任務 func RunTask(filename string, start, length int) (retErr error) { for { readLine, err := ReadLines(filename, start, length) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { fmt.Println(err) retErr = err break } fmt.Println("current line:", readLine) start += length // 等待一批完成才進入下一批 //wg.Wait() } wg.Wait() finish <- true return retErr } 複製代碼
注意上面wg.Wait()
的位置(下面有討論),在finish channel
以前,目的是爲了等待子goroutine
運行完,再經過一個無緩衝通道finish
通知main goroutine
,而後main
運行結束。學習
func ReadLines()讀取指定行數據:
// 讀取指定行數據 func ReadLines(filename string, start, length int) (line int, retErr error) { fmt.Println("current file:", filename) fileObj, err := os.Open(filename) if err != nil { panic(err) } defer fileObj.Close() // 跳過開始行以前的行-ReadString方式 startLine := 1 endLine := start + length reader := bufio.NewReader(fileObj) for { line, err := reader.ReadString(byte('\n')) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { log.Fatal(err) retErr = err break } if startLine > start && startLine <= endLine { wg.Add(1) // go併發執行 go SendEmail(line) if startLine == endLine { break } } startLine++ } return startLine, retErr } // 模擬郵件發送 func SendEmail(email string) error { defer wg.Done() time.Sleep(time.Second * 1) fmt.Println(email) return nil } 複製代碼
運行上面main.go
,3個任務在1s內併發完成全部郵件(./task/edm1.txt
中一行表示一個郵箱)發送。
true finished all tasks. Total cost(ms): 1001 複製代碼
那麼問題來了:沒有實現分批每次併發batchLength = 20
,由於若是不分批發送,只要其中某個任務或某一封郵件出錯了,那下次從新run的時候,會不知道哪些用戶已經發送過了,出現重複發送。而分批發送即便中途出錯了,下一次從新run可從上次出錯的end行開始,最可能是[start - end]
一個batchLength
發送失敗,能夠接受。
因而,將倒數第5行wg.Wait()
註釋掉,倒數第8行註釋打開,以下:
// 單任務 func RunTask(filename string, start, length int) (retErr error) { for { readLine, err := ReadLines(filename, start, length) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { fmt.Println(err) retErr = err break } fmt.Println("current line:", readLine) start += length // 等待一批完成才進入下一批 wg.Wait() } //wg.Wait() finish <- true return retErr } 複製代碼
運行就報錯:
panic: sync: WaitGroup is reused before previous Wait has returned
複製代碼
提示WaitGroup
在goroutine
之間重用了,雖然是全局變量,看起來是使用不當。怎麼調整呢?
3.2 main.go
package main import ( "bufio" "fmt" "io" "log" "os" "strconv" "sync" "time" ) var ( batchLength = 10 outerWg sync.WaitGroup ) func main() { startTime := time.Now().UnixNano() for i := 1; i <= 3; i++ { filename := "./task/edm" + strconv.Itoa(i) + ".txt" start := 60 outerWg.Add(1) go RunTask(filename, start, batchLength) } // main 阻塞等待goroutine執行完成 outerWg.Wait() fmt.Println("finished all tasks.") endTime := time.Now().UnixNano() fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) } // 單任務 func RunTask(filename string, start, length int) (retErr error) { for { isFinish := make(chan bool) readLine, err := ReadLines(filename, start, length, isFinish) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { fmt.Println(err) retErr = err break } // 等待一批完成才進入下一批 fmt.Println("current line:", readLine) start += length <-isFinish // 關閉channel,釋放資源 close(isFinish) } outerWg.Done() return retErr } 複製代碼
從上面能夠看出:調整的思路是外層用WaitGroup
控制,裏層用channel
控制,執行又報錯 : (
fatal error: all goroutines are asleep - deadlock! goroutine 1 [semacquire]: sync.runtime_Semacquire(0x55fe7c) /usr/local/go/src/runtime/sema.go:56 +0x39 sync.(*WaitGroup).Wait(0x55fe70) /usr/local/go/src/sync/waitgroup.go:131 +0x72 main.main() /home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1ab goroutine 5 [chan send]: main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0) 複製代碼
仔細檢查,發現上面代碼中定義的isFinish
是一個無緩衝channel
,在發郵件SendMail()
子協程沒有完成時,讀取一個無數據的無緩衝通道將阻塞當前goroutine
,其餘goroutine
也是同樣的都被阻塞,這樣就出現了all goroutines are asleep - deadlock!
因而將上面代碼改成有緩衝繼續嘗試:
isFinish := make(chan bool, 1) // 讀取指定行數據 func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) { fmt.Println("current file:", filename) // 控制每一批發完再下一批 var wg sync.WaitGroup fileObj, err := os.Open(filename) if err != nil { panic(err) } defer fileObj.Close() // 跳過開始行以前的行-ReadString方式 startLine := 1 endLine := start + length reader := bufio.NewReader(fileObj) for { line, err := reader.ReadString(byte('\n')) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { log.Fatal(err) retErr = err break } if startLine > start && startLine <= endLine { wg.Add(1) // go併發執行 go SendEmail(line, wg) if startLine == endLine { isFinish <- true break } } startLine++ } wg.Wait() return startLine, retErr } // 模擬郵件發送 func SendEmail(email string, wg sync.WaitGroup) error { defer wg.Done() time.Sleep(time.Second * 1) fmt.Println(email) return nil } 複製代碼
運行,又報錯了 : (
fatal error: all goroutines are asleep - deadlock! goroutine 1 [semacquire]: sync.runtime_Semacquire(0x55fe7c) /usr/local/go/src/runtime/sema.go:56 +0x39 sync.(*WaitGroup).Wait(0x55fe70) 複製代碼
此次提示有點不同,看起來是裏層的WaitGroup
致使了死鎖,繼續檢查發現裏層wg
是值傳遞,應該使用指針傳引用。
// go併發執行 go SendEmail(line, wg) 複製代碼
最後修改代碼以下:
// 讀取指定行數據 func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) { fmt.Println("current file:", filename) // 控制每一批發完再下一批 var wg sync.WaitGroup fileObj, err := os.Open(filename) if err != nil { panic(err) } defer fileObj.Close() // 跳過開始行以前的行-ReadString方式 startLine := 1 endLine := start + length reader := bufio.NewReader(fileObj) for { line, err := reader.ReadString(byte('\n')) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { log.Fatal(err) retErr = err break } if startLine > start && startLine <= endLine { wg.Add(1) // go併發執行 go SendEmail(line, &wg) if startLine == endLine { isFinish <- true break } } startLine++ } wg.Wait() return startLine, retErr } // 模擬郵件發送 func SendEmail(email string, wg *sync.WaitGroup) error { defer wg.Done() time.Sleep(time.Second * 1) fmt.Println(email) return nil } 複製代碼
趕忙運行一下,此次終於成功啦 : )
current line: 100
current file: ./task/edm2.txt
Read EOF: ./task/edm2.txt
Read EOF: ./task/edm2.txt
finished all tasks.
Total cost(ms): 4003
複製代碼
每一個任務模擬的是100行,從第60行開始運行,四個任務併發執行,每一個任務分批內再次併發,而且控制了每一批次完成後再進行下一批,因此總運行時間約4s,符合指望值。完整源碼請閱讀原文或移步GitHub:github.com/astraw99/ed…
本文經過兩層嵌套Go 併發,模擬實現了高性能併發EDM,具體的一些出錯行控制、任務中斷與再次執行將在下次繼續討論,主要邏輯已跑通,幾個坑點小結以下:
a) WaitGroup 通常用於main 主協程等待所有子協程退出後,再優雅退出主協程;嵌套使用時注意wg.Wait()放的位置;
b) 合理使用channel,無緩衝chan將阻塞當前goroutine,有緩衝chan在cap未滿的狀況下不會阻塞當前goroutine,使用完記得釋放chan資源;
c) 注意函數間傳值或傳引用(本質上仍是傳值,傳的指針的指針內存值)的合理使用;