golang server epoll client 使用鏈接池 15萬+ qps

epoll 加鏈接池

前幾天看了epoll 使用,今天寫了一個測試腳本,測試一下epoll加鏈接池的性能mysql

50萬個請求,鏈接池使用2000鏈接,發送 "test" 服務端接受後 轉成大寫返回,處理完全部的請求耗時3.731506996s,性能很強大(注意:須要在linux環境下測試)linux

爲何要使用鏈接池?

拿數據庫舉例,頻繁的創建、關閉鏈接,會極大的下降mysql的性能,由於創建鏈接,釋放鏈接引發的大量性能開銷。git

鏈接池技術帶來的優點:github

一、資源重用sql

因爲tcp獲得重用,避免了頻繁建立、釋放鏈接引發的大量性能開銷。在減小系統消耗的基礎上,另外一方面也增進了系統運行環境的平穩性(減小內存碎片以及數據庫臨時進程/線程的數量)。數據庫

二、更快的系統響應速度編程

鏈接池在初始化後運行中。對於業務請求處理而言,大部分請求能夠直接利用現有可用鏈接,避免了數據庫鏈接初始化和釋放過程的時間開銷,從而縮減了系統總體響應時間。緩存

三、鏈接數量的控制服務器

過多的鏈接數量會拖垮整個服務,鏈接池能夠設定activeConne鏈接數量,從客戶端阻塞過多的鏈接,保證系統服務的平穩。網絡

四、統一的鏈接管理,避免數據庫鏈接泄漏

根據預先的鏈接佔用超時設定,強制收回被佔用鏈接。從而避免了常規數據庫鏈接操做中可能出現的資源泄漏。

爲何使用epoll

首先對於一個tcp鏈接,操做系統會爲每個鏈接分配必定的內存空間外(主要是內部網絡數據結構sk_buff的大小、鏈接的讀寫緩存,sof),雖然這些能夠進行調優,可是若是想使用正常的操做系統的TCP/IP棧的話,這些是硬性的需求。刨去這些,不一樣的編程語言不一樣的框架的設計,甚至是不一樣的需求場景,都會極大的影響TCP服務器內存的佔用和處理。

通常Go語言的TCP(和HTTP)的處理都是每個鏈接啓動一個goroutine去處理,由於咱們被教導goroutine的不像thread, 它是很便宜的,能夠在服務器上啓動成千上萬的goroutine。可是對於一百萬的鏈接,這種goroutine-per-connection的模式就至少要啓動一百萬個goroutine,這對資源的消耗也是極大的。針對不一樣的操做系統和不一樣的Go版本,一個goroutine鎖使用的最小的棧大小是2KB ~ 8 KB (go stack),若是在每一個goroutine中在分配byte buffer用以從鏈接中讀寫數據,幾十G的內存輕輕鬆鬆就分配出去了。

在linux測試過epoll性能,5萬個tcp鏈接

一、client端

func main()  {
    connections:=50000
    addr:="127.0.0.1:8972"
    var conns []net.Conn
    for i := 0; i < connections; i++ {
        c, err := net.DialTimeout("tcp", addr, 10*time.Second)
        if err != nil {
            fmt.Println("failed to connect", i, err)
            i--
            continue
        }
        conns = append(conns, c)
        time.Sleep(time.Millisecond)
    }
    defer func() {
        for _, c := range conns {
            c.Close()
        }
    }()
    log.Printf("完成初始化 %d 鏈接", len(conns))
    tts := time.Millisecond * 5

    for {
        for i := 0; i < len(conns); i++ {
            time.Sleep(tts)
            conn := conns[i]
            conn.Write([]byte("hello world\r\n"))
        }
    }
}

二、普通的tcp鏈接

server.go

func main() {
    ln, err := net.Listen("tcp", "127.0.0.1:8972")
    if err != nil {
        panic(err)
    }
    var connections []net.Conn
    defer func() {
        for _, conn := range connections {
            conn.Close()
        }
    }()
    for {
        conn, e := ln.Accept()
        if e != nil {
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                log.Printf("accept temp err: %v", ne)
                continue
            }
            log.Printf("accept err: %v", e)
            return
        }
        go handleConn(conn)
        connections = append(connections, conn)
        if len(connections)%100 == 0 {
            log.Printf("total number of connections: %v", len(connections))
        }
    }
}
func handleConn(conn net.Conn) {
    io.Copy(ioutil.Discard, conn)
}

5萬個tcp消耗的內存狀況
clipboard.png

三、epoll 使用的是【百萬 Go TCP 鏈接的思考: epoll方式減小資源佔用】該博客的epoll的代碼

5萬個tcp消耗的內存狀況
clipboard.png

下面使用鏈接池加epoll測試 qps 15萬+

使用鏈接池,server使用epoll,使用2000個鏈接,處理完50萬個請求,發送test ,返回TEST大寫,耗時3.7s,處理完全部的請求,qps 15萬+
github上有詳細代碼,地址:https://github.com/shanlongpa...

一、testPool.go是client端代碼

  • testPoll.go 是鏈接池的使用

    pool := &pools.Pool{
           MaxIdle:     100,
           MaxActive:   2000,
           IdleTimeout: 20 * time.Second,
           MaxConnLifetime: 100 * time.Second,
           Dial: func() (net.Conn, error) {
               c, err := net.Dial("tcp", "127.0.0.1:8972")
               if err != nil {
                   return nil, err
               }
               return c, err
           },
       }
       defer pool.Close()
    
       t := time.Now()
    
       worklist := make(chan int)
       var wg sync.WaitGroup
       for i := 0; i < 2000; i++ {
           go func() {
               for range worklist {
                   wg.Done()
                   cli,err:=pool.Get()
                   if err!=nil{
                       log.Println(err)
                       return
                   }
    
                   str:="test"
    
                   err=pools.Write(cli.C,[]byte(str))
    
                   if err!=nil{
                       log.Println(err)
                       pool.Put(cli,true)
                       return
                   }
                   _,err=pools.Read(cli.C)
                   if err!=nil{
                       log.Println(err)
                   }else{
                       //if i%500==0{
                       //    fmt.Println(string(receByte))
                       //}
                   }
                   pool.Put(cli,false)
               }
           }()
       }
    
       for i := 0; i < 500000; i++ {
           wg.Add(1)
           worklist <- i
       }
    
       fmt.Println("pool創建,鏈接數:",pool.Active)
    
       close(worklist)
       wg.Wait()
       // 調用服務
       fmt.Println(time.Since(t))

-鏈接池結構

type Pool struct {
    // 創建tcp鏈接
    Dial func() (net.Conn, error)

    // 健康檢測,判斷鏈接是否斷開
    TestOnBorrow func(c net.Conn, t time.Time) error

    // 鏈接池中最大空閒鏈接數
    MaxIdle int

    // 打開最大的鏈接數
    MaxActive int

    // Idle多久斷開鏈接,小於服務器超時時間
    IdleTimeout time.Duration

    // 配置最大鏈接數的時候,而且wait是true的時候,超過最大的鏈接,get的時候會阻塞,知道有鏈接放回到鏈接池
    Wait bool

    // 超過多久時間 連接關閉
    MaxConnLifetime time.Duration

    chInitialized uint32 // set to 1 when field ch is initialized 原子鎖ch初始化一次

    mu     sync.Mutex    // 鎖
    closed bool          // set to true when the pool is closed.
    Active int           // 鏈接池中打開的鏈接數
    ch     chan struct{} // limits open connections when p.Wait is true
    Idle   idleList      // idle 鏈接
}

// 空閒連,記錄poolConn的頭和尾
type idleList struct {
    count       int
    front, back *poolConn
}

// 鏈接的雙向鏈表
type poolConn struct {
    C          net.Conn
    t          time.Time // idle 時間,即放會pool的時間
    created    time.Time //建立時間
    next, prev *poolConn
}

主要有兩個方法Get(),獲取一個可用的鏈接。 Put() 把鏈接放回到鏈接池

func (p *Pool) Get() (*poolConn, error) {

    // p.Wait == true. 的時候限制最大鏈接數
    if p.Wait && p.MaxActive > 0 {
        p.lazyInit()
        <-p.ch
    }

    p.mu.Lock()

    // 刪除idle超時的鏈接,刪除掉
    if p.IdleTimeout > 0 {
        n := p.Idle.count
        for i := 0; i < n && p.Idle.back != nil && p.Idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
            pc := p.Idle.back
            p.Idle.popBack()
            p.mu.Unlock()
            pc.C.Close()
            p.mu.Lock()
            p.Active--
        }
    }

    //從Idle list 獲取一個可用的空閒連接.
    for p.Idle.front != nil {
        pc := p.Idle.front
        p.Idle.popFront()
        p.mu.Unlock()
        if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.C, pc.t) == nil) &&
            (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
            return pc, nil
        }
        pc.C.Close()
        p.mu.Lock()
        p.Active--
    }

    //pool關閉後直接return error
    if p.closed {
        p.mu.Unlock()
        return nil, errors.New("get on closed pool")
    }

    // Handle limit for p.Wait == false.
    if !p.Wait && p.MaxActive > 0 && p.Active >= p.MaxActive {
        p.mu.Unlock()
        return nil, errors.New("pool 耗盡了")
    }

    p.Active++
    p.mu.Unlock()
    c, err := p.Dial()
    if err != nil {
        c = nil
        p.mu.Lock()
        p.Active--
        if p.ch != nil && !p.closed {
            p.ch <- struct{}{}
        }
        p.mu.Unlock()
    }
    return &poolConn{C: c, created: nowFunc()}, err
}

func (p *Pool) Put(pc *poolConn, forceClose bool) error {
    p.mu.Lock()
    if !p.closed && !forceClose {
        pc.t = nowFunc()
        p.Idle.pushFront(pc)
        if p.Idle.count > p.MaxIdle {
            pc = p.Idle.back
            p.Idle.popBack()
        } else {
            pc = nil
        }
    }

    if pc != nil {
        p.mu.Unlock()
        pc.C.Close()
        p.mu.Lock()
        p.Active--
    }

    if p.ch != nil && !p.closed {
        p.ch <- struct{}{}
    }
    p.mu.Unlock()
    return nil
}

二、epollServer.go 是服務端代碼

epoll 使用主要分爲三部,第一步建立epoll,第二部,添加事件 EPOLL_CTL_ADD,第三步,等待EpollEvent.

func main() {
    setLimit()

    ln, err := net.Listen("tcp", "127.0.0.1:8972")
    if err != nil {
        panic(err)
    }
    epoller, err = MkEpoll()
    if err != nil {
        panic(err)
    }

    go start()

    for {
        conn, e := ln.Accept()
        if e != nil {
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                log.Printf("accept temp err: %v", ne)
                continue
            }

            log.Printf("accept err: %v", e)
            return
        }

        if err := epoller.Add(conn); err != nil {
            log.Printf("failed to add connection %v", err)
            conn.Close()
        }
    }
}
    
    
    //返回接受的信息,小寫轉成大寫字母
 func replyConn(c net.Conn) error {
    data,err:= pools.Read(c)
    if err!=nil{
        return err
    }
    err= pools.Write(c,[]byte(strings.ToUpper(string(data))))
    return err
 }
相關文章
相關標籤/搜索