goroutine pool,WaitGroup,chan 示例

服務端高併發編程常常須要寫不少goroutine來服務每個鏈接,如何正確使用goroutine池是又拍雲的工程師們須要考慮的問題,今天這篇文章,分享給一樣須要使用go語言的小夥伴們。

文/陶克路
本文轉載自:http://legendtkl.com/
引言

在上文中,我說到golang的原生http server處理client的connection的時候,每一個connection起一個goroutine,這是一個至關粗暴的方法。爲了感覺更深一點,咱們來看一下go的源碼。先定義一個最簡單的http server以下。

func myHandler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello there!\n")
}

func main(){
    http.HandleFunc("/", myHandler)     //  設置訪問路由
    log.Fatal(http.ListenAndServe(":8080", nil))
}
(如不能看所有代碼請往左滑,下同)

從入口http.ListenAndServe函數跟進去。
// file: net/http/server.go
func ListenAndServe(addr string, handler Handler) error {
    server := &Server{Addr: addr, Handler: handler}
    return server.ListenAndServe()
}

func (srv *Server) ListenAndServe() error {
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})        
}

func (srv *Server) Serve(l net.Listener) error {
    defer l.Close()
    ...
    for {
        rw, e := l.Accept()
        if e != nil {
            // error handle
            return e
        }
        tempDelay = 0
        c, err := srv.newConn(rw)
        if err != nil {
            continue
        }
        c.setState(c.rwc, StateNew) // before Serve can return
        go c.serve()
    }
}
首先net.Listen負責監聽網絡端口,rw, e := l.Accept()則從網絡端口中取出TCP鏈接,而後go c.server()則對每個TCP鏈接起一個goroutine來處理。我還說到fasthttp這個網絡框架性能要比原生的net/http性能要好,其中一個緣由就是由於使用了goroutine pool。那麼問題來了,若是要咱們本身去實現一個goroutine pool,該怎麼去實現呢?咱們先來實現一個最簡單的。

弱雞版

golang中的goroutine經過go來啓動,goroutine資源和臨時對象池不同,不能放回去再取出來。因此goroutine應該是一直運行着的。須要的時候就運行,不須要的時候就阻塞,這樣對其餘的goroutine的調度影響也不是很大。而goroutine的任務能夠經過channel來傳遞就ok了。很簡單的弱雞版本就出來了,以下。
func Gopool() {
    start := time.Now()
    wg := new(sync.WaitGroup)
    data := make(chan int, 100)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            for _ = range data {
                fmt.Println("goroutine:", n, i)
            }
        }(i)
    }

    for i := 0; i < 10000; i++ {
        data <- i
    }
    close(data)
    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

上面的代碼中還作了程序運行時間統計。做爲對比,下面是一個沒有使用pool的版本。golang

func Nopool() {
    start := time.Now()
    wg := new(sync.WaitGroup)

    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            //fmt.Println("goroutine", n)
        }(i)
    }
    wg.Wait()

    end := time.Now()
    fmt.Println(end.Sub(start))
}
最後運行時間對比,使用了goroutine pool的代碼運行時間約爲沒有使用pool的代碼的2/3。固然這麼測試仍是略顯粗糙了。咱們下面使用reflect那篇文章裏面介紹的go benchmark testing的方式測試,測試代碼以下(去掉了不少無關代碼)。
package pool

import (
    "sync"
    "testing"
)

func Gopool() {
    wg := new(sync.WaitGroup)
    data := make(chan int, 100)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            for _ = range data {
            }
        }(i)
    }

    for i := 0; i < 10000; i++ {
        data <- i
    }
    close(data)
    wg.Wait()
}

func Nopool() {
    wg := new(sync.WaitGroup)

    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
        }(i)
    }
    wg.Wait()
}

func BenchmarkGopool(b *testing.B) {
    for i := 0; i < b.N; i++ {
        Gopool()
    }
}

func BenchmarkNopool(b *testing.B) {
    for i := 0; i < b.N; i++ {
        Nopool()
    }
}
最終的測試結果以下,使用了goroutine pool的代碼執行時間確實更短。

$ go test -bench='.' gopool_test.go
BenchmarkGopool-8            500       2596750 ns/op
BenchmarkNopool-8            500       3604035 ns/op
PASS
升級版

對於一個好的線程池,咱們每每有更多的需求,一個最迫切的需求是能自定義goroutine運行的函數。函數無非就是函數地址和函數參數。若是要傳入的函數形式不同(形參或者返回值不同)怎麼辦?一個比較簡單的方法是引入反射。
type worker struct {
    Func interface{}
    Args []reflect.Value
}

func main() {
    var wg sync.WaitGroup

    channels := make(chan worker, 10)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for ch := range channels {
                reflect.ValueOf(ch.Func).Call(ch.Args)
            }
        }()
    }

    for i := 0; i < 100; i++ {
        wk := worker{
            Func: func(x, y int) {
                fmt.Println(x + y)
            },
            Args: []reflect.Value{reflect.ValueOf(i), reflect.ValueOf(i)},
        }
        channels <- wk
    }
    close(channels)
    wg.Wait()
}
可是引入反射又會引入性能問題。原本goroutine pool就是爲了解決性能問題,然而如今又引入了新的性能問題。那麼怎麼辦呢?閉包。
type worker struct {
    Func func()
}

func main() {
    var wg sync.WaitGroup

    channels := make(chan worker, 10)

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for ch := range channels {
                //reflect.ValueOf(ch.Func).Call(ch.Args)
                ch.Func()
            }
        }()
    }

    for i := 0; i < 100; i++ {
        j := i
        wk := worker{
            Func: func() {
                fmt.Println(j + j)
            },
        }
        channels <- wk
    }
    close(channels)
    wg.Wait()
}
這裏值得注意的一點是golang的閉包用很差容易把本身代入坑,而理解閉包一個很關鍵的點就是對對象的引用而不是複製。這裏只是goroutine pool 實現的一個精簡版,真正實現的時候還有不少細節須要考慮,好比設置一個stop channel用來中止pool,可是goroutine pool的核心就在於這個地方。

goroutine池和CPU核的關係

那麼goroutine池裏面goroutine數目和核數有沒有關係呢?這個其實要分狀況討論。

1.goroutine池跑不滿

這也就意味着channel data裏面一有數據就會被goroutine拿走,這樣的話固然只能你CPU能調度的過來就行,也就是池子裏的goroutine數目和CPU核數是最優的。經測試,確實是這樣。

2.channel data有數據阻塞

這意思是說goroutine是不夠用的,若是goroutine的運行任務不是CPU密集型的(大部分狀況都不是),而只是IO阻塞,這個時候通常goroutine數目在必定範圍內是越多越好,固然範圍在什麼地方就要具體狀況具體分析了。
相關文章
相關標籤/搜索