服務端高併發編程常常須要寫不少goroutine來服務每個鏈接,如何正確使用goroutine池是又拍雲的工程師們須要考慮的問題,今天這篇文章,分享給一樣須要使用go語言的小夥伴們。 文/陶克路 本文轉載自:http://legendtkl.com/ 引言 在上文中,我說到golang的原生http server處理client的connection的時候,每一個connection起一個goroutine,這是一個至關粗暴的方法。爲了感覺更深一點,咱們來看一下go的源碼。先定義一個最簡單的http server以下。 func myHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello there!\n") } func main(){ http.HandleFunc("/", myHandler) // 設置訪問路由 log.Fatal(http.ListenAndServe(":8080", nil)) } (如不能看所有代碼請往左滑,下同) 從入口http.ListenAndServe函數跟進去。
// file: net/http/server.go func ListenAndServe(addr string, handler Handler) error { server := &Server{Addr: addr, Handler: handler} return server.ListenAndServe() } func (srv *Server) ListenAndServe() error { addr := srv.Addr if addr == "" { addr = ":http" } ln, err := net.Listen("tcp", addr) if err != nil { return err } return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}) } func (srv *Server) Serve(l net.Listener) error { defer l.Close() ... for { rw, e := l.Accept() if e != nil { // error handle return e } tempDelay = 0 c, err := srv.newConn(rw) if err != nil { continue } c.setState(c.rwc, StateNew) // before Serve can return go c.serve() } }
首先net.Listen負責監聽網絡端口,rw, e := l.Accept()則從網絡端口中取出TCP鏈接,而後go c.server()則對每個TCP鏈接起一個goroutine來處理。我還說到fasthttp這個網絡框架性能要比原生的net/http性能要好,其中一個緣由就是由於使用了goroutine pool。那麼問題來了,若是要咱們本身去實現一個goroutine pool,該怎麼去實現呢?咱們先來實現一個最簡單的。
弱雞版
golang中的goroutine經過go來啓動,goroutine資源和臨時對象池不同,不能放回去再取出來。因此goroutine應該是一直運行着的。須要的時候就運行,不須要的時候就阻塞,這樣對其餘的goroutine的調度影響也不是很大。而goroutine的任務能夠經過channel來傳遞就ok了。很簡單的弱雞版本就出來了,以下。
func Gopool() { start := time.Now() wg := new(sync.WaitGroup) data := make(chan int, 100) for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() for _ = range data { fmt.Println("goroutine:", n, i) } }(i) } for i := 0; i < 10000; i++ { data <- i } close(data) wg.Wait() end := time.Now() fmt.Println(end.Sub(start)) }
上面的代碼中還作了程序運行時間統計。做爲對比,下面是一個沒有使用pool的版本。golang
func Nopool() { start := time.Now() wg := new(sync.WaitGroup) for i := 0; i < 10000; i++ { wg.Add(1) go func(n int) { defer wg.Done() //fmt.Println("goroutine", n) }(i) } wg.Wait() end := time.Now() fmt.Println(end.Sub(start)) }
最後運行時間對比,使用了goroutine pool的代碼運行時間約爲沒有使用pool的代碼的2/3。固然這麼測試仍是略顯粗糙了。咱們下面使用reflect那篇文章裏面介紹的go benchmark testing的方式測試,測試代碼以下(去掉了不少無關代碼)。
package pool import ( "sync" "testing" ) func Gopool() { wg := new(sync.WaitGroup) data := make(chan int, 100) for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() for _ = range data { } }(i) } for i := 0; i < 10000; i++ { data <- i } close(data) wg.Wait() } func Nopool() { wg := new(sync.WaitGroup) for i := 0; i < 10000; i++ { wg.Add(1) go func(n int) { defer wg.Done() }(i) } wg.Wait() } func BenchmarkGopool(b *testing.B) { for i := 0; i < b.N; i++ { Gopool() } } func BenchmarkNopool(b *testing.B) { for i := 0; i < b.N; i++ { Nopool() } }
最終的測試結果以下,使用了goroutine pool的代碼執行時間確實更短。 $ go test -bench='.' gopool_test.go BenchmarkGopool-8 500 2596750 ns/op BenchmarkNopool-8 500 3604035 ns/op PASS
升級版
對於一個好的線程池,咱們每每有更多的需求,一個最迫切的需求是能自定義goroutine運行的函數。函數無非就是函數地址和函數參數。若是要傳入的函數形式不同(形參或者返回值不同)怎麼辦?一個比較簡單的方法是引入反射。
type worker struct { Func interface{} Args []reflect.Value } func main() { var wg sync.WaitGroup channels := make(chan worker, 10) for i := 0; i < 5; i++ { wg.Add(1) go func() { defer wg.Done() for ch := range channels { reflect.ValueOf(ch.Func).Call(ch.Args) } }() } for i := 0; i < 100; i++ { wk := worker{ Func: func(x, y int) { fmt.Println(x + y) }, Args: []reflect.Value{reflect.ValueOf(i), reflect.ValueOf(i)}, } channels <- wk } close(channels) wg.Wait() }
可是引入反射又會引入性能問題。原本goroutine pool就是爲了解決性能問題,然而如今又引入了新的性能問題。那麼怎麼辦呢?閉包。
type worker struct { Func func() } func main() { var wg sync.WaitGroup channels := make(chan worker, 10) for i := 0; i < 5; i++ { wg.Add(1) go func() { defer wg.Done() for ch := range channels { //reflect.ValueOf(ch.Func).Call(ch.Args) ch.Func() } }() } for i := 0; i < 100; i++ { j := i wk := worker{ Func: func() { fmt.Println(j + j) }, } channels <- wk } close(channels) wg.Wait() }
這裏值得注意的一點是golang的閉包用很差容易把本身代入坑,而理解閉包一個很關鍵的點就是對對象的引用而不是複製。這裏只是goroutine pool 實現的一個精簡版,真正實現的時候還有不少細節須要考慮,好比設置一個stop channel用來中止pool,可是goroutine pool的核心就在於這個地方。 goroutine池和CPU核的關係 那麼goroutine池裏面goroutine數目和核數有沒有關係呢?這個其實要分狀況討論。 1.goroutine池跑不滿 這也就意味着channel data裏面一有數據就會被goroutine拿走,這樣的話固然只能你CPU能調度的過來就行,也就是池子裏的goroutine數目和CPU核數是最優的。經測試,確實是這樣。 2.channel data有數據阻塞 這意思是說goroutine是不夠用的,若是goroutine的運行任務不是CPU密集型的(大部分狀況都不是),而只是IO阻塞,這個時候通常goroutine數目在必定範圍內是越多越好,固然範圍在什麼地方就要具體狀況具體分析了。