最近線上的一個項目遇到了內存泄露的問題,查了heap以後,發現 http包的 dialConn函數居然佔了內存使用的大頭,這個有點懵逼了,後面在網上查詢資料的時候無心間發現一句話golang
10次內存泄露,有9次是goroutine泄露。
結果發現,正是我認爲的不可能的goroutine泄露致使了此次的內存泄露,而goroutine泄露的緣由就是 沒有調用 response.Body.Close()
緩存
既然發現是 response.Body.Close()
惹的禍,那就作個實驗證明一下cookie
func main() { for true { requestWithNoClose() time.Sleep(time.Microsecond * 100) } } func requestWithNoClose() { _, err := http.Get("https://www.baidu.com") if err != nil { fmt.Printf("error occurred while fetching page, error: %s", err.Error()) } fmt.Println("ok") }
func main() { for true { requestWithClose() time.Sleep(time.Microsecond * 10) } } func requestWithClose() { resp, err := http.Get("https://www.baidu.com") if err != nil { fmt.Printf("error occurred while fetching page, error: %s", err.Error()) return } defer resp.Body.Close() fmt.Println("ok") }
一樣的代碼,區別只有 是否resp.Body.Close()
是否被調用,咱們運行一段時間後,發現內存差距如此之大app
後面,咱們就帶着問題,深刻一下Http包的底層實現來找出具體緣由socket
只分析咱們可能用會用到的tcp
type Transport struct { idleMu sync.Mutex wantIdle bool // user has requested to close all idle conns // 空閒的鏈接 緩存的地方 idleConn map[connectMethodKey][]*persistConn // most recently used at end // connectMethodKey => 空閒鏈接的chan 造成的map // 有空閒鏈接放入的時候,首先嚐試放入這個chan,方便另外一個可能須要鏈接的goroutine直接使用,若是沒有goroutine須要鏈接,就放入到上面的idleConn裏面,便於後面的請求鏈接複用 idleConnCh map[connectMethodKey]chan *persistConn // DisableKeepAlives, if true, disables HTTP keep-alives and // will only use the connection to the server for a single // HTTP request. // // This is unrelated to the similarly named TCP keep-alives. // 是否開啓 keepAlive,爲true的話,鏈接不會被複用 DisableKeepAlives bool // MaxIdleConns controls the maximum number of idle (keep-alive) // connections across all hosts. Zero means no limit. // 全部hosts對應的最大的鏈接總數 MaxIdleConns int // 每個host對應的最大的空閒鏈接數 MaxIdleConnsPerHost int // 每個host對應的最大鏈接數 MaxConnsPerHost int }
type persistConn struct { // alt optionally specifies the TLS NextProto RoundTripper. // This is used for HTTP/2 today and future protocols later. // If it's non-nil, the rest of the fields are unused. alt RoundTripper t *Transport cacheKey connectMethodKey conn net.Conn tlsState *tls.ConnectionState br *bufio.Reader // from conn bw *bufio.Writer // to conn nwrite int64 // bytes written // roundTrip 往 這個chan 裏寫入request,readLoop從這個 chan 讀取request reqch chan requestAndChan // written by roundTrip; read by readLoop // roundTrip 往 這個chan 裏寫入request 和 writeErrCh,writeLoop從這個 chan 讀取request寫入大盤 鏈接 裏,並寫入 err 到 writeErrCh chan writech chan writeRequest // written by roundTrip; read by writeLoop closech chan struct{} // closed when conn closed // 判斷body是否讀取完 sawEOF bool // whether we've seen EOF from conn; owned by readLoop // writeErrCh passes the request write error (usually nil) // from the writeLoop goroutine to the readLoop which passes // it off to the res.Body reader, which then uses it to decide // whether or not a connection can be reused. Issue 7569. // writeLoop 寫入 err的 chan writeErrCh chan error // writeLoop 結束的時候關閉 writeLoopDone chan struct{} // closed when write loop ends }
type writeRequest struct { req *transportRequest ch chan<- error // Optional blocking chan for Expect: 100-continue (for receive). // If not nil, writeLoop blocks sending request body until // it receives from this chan. continueCh <-chan struct{} }
type requestAndChan struct { req *Request ch chan responseAndError // unbuffered; always send in select on callerGone // whether the Transport (as opposed to the user client code) // added the Accept-Encoding gzip header. If the Transport // set it, only then do we transparently decode the gzip. addedGzip bool // Optional blocking chan for Expect: 100-continue (for send). // If the request has an "Expect: 100-continue" header and // the server responds 100 Continue, readLoop send a value // to writeLoop via this chan. continueCh chan<- struct{} callerGone <-chan struct{} // closed when roundTrip caller has returned }
這裏的函數沒有太多的邏輯,貼出來主要是爲了追蹤過程ide
這裏用一個簡單的例子表示函數
func main() { // 調用 Http 包的 Get 函數處理 resp, err := http.Get("https://www.baidu.com") if err != nil { panic(err) } resp.Body.Close() }
var DefaultClient = &Client{} .... // 使用默認的 client, 調用 client.Get 來處理 func Get(url string) (resp *Response, err error) { return DefaultClient.Get(url) }
func (c *Client) Get(url string) (resp *Response, err error) { // request這裏的建立忽略 req, err := NewRequest("GET", url, nil) if err != nil { return nil, err } // 調用 client.Do 函數來處理, 而後 client.Do 調用 client.do 來處理,不懂爲啥非要多一層嵌套 return c.Do(req) }
func (c *Client) do(req *Request) (retres *Response, reterr error) { // URL 及 hook檢測,忽略... var ( deadline = c.deadline() reqs []*Request resp *Response copyHeaders = c.makeHeadersCopier(req) reqBodyClosed = false // have we closed the current req.Body? // Redirect behavior: redirectMethod string includeBody bool ) // 錯誤自定義處理,忽略.... for { // 省略無關的代碼.... reqs = append(reqs, req) var err error var didTimeout func() bool // 調用 client.send 方法來獲取response,主要邏輯 if resp, didTimeout, err = c.send(req, deadline); err != nil { // c.send() always closes req.Body reqBodyClosed = true if !deadline.IsZero() && didTimeout() { err = &httpError{ // TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancelation/ err: err.Error() + " (Client.Timeout exceeded while awaiting headers)", timeout: true, } } return nil, uerr(err) } // 判斷是否須要跳轉,進而進一步請求 var shouldRedirect bool redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0]) if !shouldRedirect { return resp, nil } req.closeBody() }
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { if c.Jar != nil { for _, cookie := range c.Jar.Cookies(req.URL) { req.AddCookie(cookie) } } // 調用 send 方法來獲取 response resp, didTimeout, err = send(req, c.transport(), deadline) if err != nil { return nil, didTimeout, err } if c.Jar != nil { if rc := resp.Cookies(); len(rc) > 0 { c.Jar.SetCookies(req.URL, rc) } } return resp, nil, nil }
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { req := ireq // req is either the original request, or a modified fork // URL Hader 等判斷及請求fork,忽略.... stopTimer, didTimeout := setRequestCancel(req, rt, deadline) // 調用 Transport.RoundTrip 來處理請求 resp, err = rt.RoundTrip(req) if err != nil { stopTimer() if resp != nil { log.Printf("RoundTripper returned a response & error; ignoring response") } if tlsErr, ok := err.(tls.RecordHeaderError); ok { // If we get a bad TLS record header, check to see if the // response looks like HTTP and give a more helpful error. // See golang.org/issue/11111. if string(tlsErr.RecordHeader[:]) == "HTTP/" { err = errors.New("http: server gave HTTP response to HTTPS client") } } return nil, didTimeout, err } if !deadline.IsZero() { resp.Body = &cancelTimerBody{ stop: stopTimer, rc: resp.Body, reqDidTimeout: didTimeout, } } return resp, nil, nil }
這裏開始接近重點區域了oop
這個函數主要就是湖區鏈接,而後獲取response返回post
func (t *Transport) roundTrip(req *Request) (*Response, error) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) ctx := req.Context() trace := httptrace.ContextClientTrace(ctx) // URL, header schema 等判斷,與主流程無關,忽略... for { // 判斷context 是否完成,超時等 select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // treq gets modified by roundTrip, so we need to recreate for each retry. // treq會被 roundTrip 方法修改,全部每一次循環須要建立一個新的 treq := &transportRequest{Request: req, trace: trace} // 根據當前的請求獲取 connectMethod,包含schema和address,方便請求的複用,這裏不重要,不作詳細分析 cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() return nil, err } // Get the cached or newly-created connection to either the // host (for http or https), the http proxy, or the http proxy // pre-CONNECTed to https server. In any case, we'll be ready // to send it requests. // 根據請求和connectMethod獲取一個可用的鏈接,重要,後面會具體分析 pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // HTTP/2 path. // http2 使用,這裏不展開 t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { // 獲取response,這裏是重點,後面展開 resp, err = pconn.roundTrip(treq) } // 判斷獲取response是否有誤及錯誤處理等操做,可有可無,忽略 } }
接下來,進入重點分析了 getConn
persistConn.roundTrip
Transport.dialConn
以及內存泄露的罪魁禍首 persistConn.readLoop
persistConn.writeLoop
這個方法根據connectMethod,也就是 schema和addr(忽略proxy代理),複用鏈接或者建立一個新的鏈接,同時開啓了兩個goroutine,分別 讀取response 和 寫request
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) { // trace相關的忽略... req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } // 從idleConn裏面獲取一個 connectMethod對應的空閒的 鏈接,獲取到了直接返回 if pc, idleSince := t.getIdleConn(cm); pc != nil { if trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(idleSince)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func(error) {}) return pc, nil } type dialRes struct { pc *persistConn err error } // 沒有獲取到空閒鏈接,定義一個 dialRes 結構體,用於接收 自身建立的另外一個goroutine建立的 鏈接 dialc := make(chan dialRes) cmKey := cm.key() // Copy these hooks so we don't race on the postPendingDial in // the goroutine we launch. Issue 11136. testHookPrePendingDial := testHookPrePendingDial testHookPostPendingDial := testHookPostPendingDial // 處理 dialc 中暫時用不到的鏈接的方法,後面會講到爲何有可能建立的鏈接是沒有人使用的 handlePendingDial := func() { testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil { t.putOrCloseIdleConn(v.pc) } else { t.decHostConnCount(cmKey) } testHookPostPendingDial() }() } cancelc := make(chan error, 1) t.setReqCanceler(req, func(err error) { cancelc <- err }) // 忽略部分判斷... // 開啓一個goroutine,去建立一個鏈接,dialConn 是重點,後面深刻分析 go func() { pc, err := t.dialConn(ctx, cm) dialc <- dialRes{pc, err} }() // 獲取 idleChan中對應connectMethod的 channel idleConnCh := t.getIdleConnCh(cm) // 從多個chan中獲取鏈接,獲取取消信號,先來的先處理 select { case v := <-dialc: // 上面 goroutine首先建立完成了一個鏈接,使用這個連接 // Our dial finished. if v.pc != nil { if trace != nil && trace.GotConn != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } return v.pc, nil } // Our dial failed. See why to return a nicer error // value. t.decHostConnCount(cmKey) select { case <-req.Cancel: // It was an error due to cancelation, so prioritize that // error value. (Issue 16049) return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err default: // It wasn't an error due to cancelation, so // return the original error message: return nil, v.err } case pc := <-idleConnCh: // 另外一個goroutine的request首先完成了,而後會把這個連接首先嚐試放入對應connectMethod對應的 chan,若是放入不了,則放入idleConns的map中,進入這裏說明,另外一個goroutine把鏈接放入了chan,並被當前goroutine捕獲了,那麼上面 // go func() { // pc, err := t.dialConn(ctx, cm) // dialc <- dialRes{pc, err} // }() // 生成的鏈接就暫時沒用了,這時候就用到上面 handlePendingDial 定義的方法,去處理這個多餘的鏈接 // Another request finished first and its net.Conn // became available before our dial. Or somebody // else's dial that they didn't use. // But our dial is still going, so give it away // when it finishes: handlePendingDial() if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil case <-req.Cancel: handlePendingDial() return nil, errRequestCanceledConn case <-req.Context().Done(): handlePendingDial() return nil, req.Context().Err() case err := <-cancelc: handlePendingDial() if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } }
上面的 handlePendingDial
方法中,調用了 putOrCloseIdleConn
,這個方法到底幹了什麼,跟 idleConnCh
和 idleConn
有什麼關係?
func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { if err := t.tryPutIdleConn(pconn); err != nil { pconn.close(err) } }
func (t *Transport) tryPutIdleConn(pconn *persistConn) error { if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { return errKeepAlivesDisabled } if pconn.isBroken() { return errConnBroken } // http2判斷 if pconn.alt != nil { return errNotCachingH2Conn } // 標記爲 reused pconn.markReused() // cacheKey是由 connectMethod獲得的 key := pconn.cacheKey t.idleMu.Lock() defer t.idleMu.Unlock() // 獲取connectMethod對應的 idleConnCh waitingDialer := t.idleConnCh[key] select { case waitingDialer <- pconn: // 在這裏嘗試將 鏈接 放到 connectMethod對應的chan裏面,若是沒有另外一個goroutine接收就算了 // We're done with this pconn and somebody else is // currently waiting for a conn of this type (they're // actively dialing, but this conn is ready // first). Chrome calls this socket late binding. See // https://insouciant.org/tech/connection-management-in-chromium/ return nil default: // 沒有另外一個goroutine接收的chan,從map中刪除,便於垃圾回收 if waitingDialer != nil { // They had populated this, but their dial won // first, so we can clean up this map entry. delete(t.idleConnCh, key) } } if t.wantIdle { return errWantIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } idles := t.idleConn[key] // 設定了每一個 connectMethod對應的最大空閒鏈接數,超過就再也不往裏面填充 if len(idles) >= t.maxIdleConnsPerHost() { return errTooManyIdleHost } for _, exist := range idles { if exist == pconn { log.Fatalf("dup idle pconn %p in freelist", pconn) } } // 後面就是清理多有的鏈接,及重置timer等操做,與主流程無關,不展開分析 t.idleConn[key] = append(idles, pconn) t.idleLRU.add(pconn) if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { oldest := t.idleLRU.removeOldest() oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) } if t.IdleConnTimeout > 0 { if pconn.idleTimer != nil { pconn.idleTimer.Reset(t.IdleConnTimeout) } else { pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle) } } pconn.idleAt = time.Now() return nil }
跑偏了一會,如今接着 getConn分析 dialConn 這個函數
這個函數主要就是建立了一個 鏈接,而後 建立了兩個goroutine,分別去往這個鏈接寫入請求(writeLoop
函數)和讀取響應(readLoop
函數)
而這兩個函數,又會與 persistConn.roundTrip
經過chan進行關聯,這裏先對函數進行分析,分析完成後,再畫出對應的關聯圖示
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) { pconn := &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } trace := httptrace.ContextClientTrace(ctx) wrapErr := func(err error) error { if cm.proxyURL != nil { // Return a typed error, per Issue 16997 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} } return err } // 構建一個 TLS的鏈接 if cm.scheme() == "https" && t.DialTLS != nil { var err error pconn.conn, err = t.DialTLS("tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } if pconn.conn == nil { return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)")) } if tc, ok := pconn.conn.(*tls.Conn); ok { // Handshake here, in case DialTLS didn't. TLSNextProto below // depends on it for knowing the connection state. if trace != nil && trace.TLSHandshakeStart != nil { trace.TLSHandshakeStart() } if err := tc.Handshake(); err != nil { go pconn.conn.Close() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(tls.ConnectionState{}, err) } return nil, err } cs := tc.ConnectionState() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(cs, nil) } pconn.tlsState = &cs } } else { // 構建一個普通的tcp鏈接 conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) } if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // proxy 設置、 協議轉換等,忽略... if t.MaxConnsPerHost > 0 { pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey} } pconn.br = bufio.NewReader(pconn) pconn.bw = bufio.NewWriter(persistConnWriter{pconn}) go pconn.readLoop() go pconn.writeLoop() return pconn, nil }
readLoop 這裏從鏈接中讀取 response,而後經過chan發送給persistConn.roundTrip,最後等待結束
func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting // default value, if not changed below defer func() { // 關閉這個連接,這裏的關閉函數 至關於 close(pc.closech),而後 writeLoop 的 <-pc.closech 就不會阻塞,而正常退出了,這樣就能夠理解,爲何readLoop函數退出後,writeLoop函數也就退出了 pc.close(closeErr) pc.t.removeIdleConn(pc) }() // 這個函數同上面分析的 Transport.tryPutIdleConn tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { if err := pc.t.tryPutIdleConn(pc); err != nil { closeErr = err if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { trace.PutIdleConn(err) } return false } if trace != nil && trace.PutIdleConn != nil { trace.PutIdleConn(nil) } return true } // eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection // back to the idle pool. eofc := make(chan struct{}) defer close(eofc) // unblock reader on errors // 省略部分... alive := true for alive { pc.readLimit = pc.maxHeaderResponseSize() _, err := pc.br.Peek(1) pc.mu.Lock() if pc.numExpectedResponses == 0 { pc.readLoopPeekFailLocked(err) pc.mu.Unlock() return } pc.mu.Unlock() // 從當前鏈接中獲取request, 這裏標記爲 m1 rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { // 從請求中獲取response,就是那麼簡單 resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } // 若是讀取response失敗,則包裝錯誤返回給 上層,即 persistConn.roundTrip 函數 if err != nil { if pc.readLimit <= 0 { err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize()) } select { case rc.ch <- responseAndError{err: err}: case <-rc.callerGone: return } return } pc.readLimit = maxInt64 // effictively no limit for response bodies pc.mu.Lock() pc.numExpectedResponses-- pc.mu.Unlock() hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } // body爲空的處理,忽略.... waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ body: resp.Body, // resp.Body.Close() 的最終調用的函數, Close()影響readLoop 和 writeLoop 兩個goroutine 這兩個goroutine的關閉,在後面講close的時候具體介紹 earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, // 上面函數出錯後,會調用這個函數,這個函數影響readLoop 和 writeLoop 兩個goroutine的形式,與上面的邏輯大體相同 fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } // 組裝resp resp.Body = body if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") { resp.Body = &gzipReader{body: body} resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") resp.ContentLength = -1 resp.Uncompressed = true } // 將resp經過chan返回給 persistConn.roundTrip 函數 select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancelation or death) // 阻塞在這裏,等待 請求 body close 或 請求cancel 或 context done 或 pc.closech select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() } }
相對於persistConn.readLoop
, 這個函數就簡單不少,其主要功能也就是往鏈接裏面寫request請求
func (pc *persistConn) writeLoop() { defer close(pc.writeLoopDone) for { select { // 首先經過pc.writech chan 從 persistConn.roundTrip 函數中獲取 writeRequest, 能夠簡單理解爲 request case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error // Errors reading from the user's // Request.Body are high priority. // Set it here before sending on the // channels below or calling // pc.close() which tears town // connections and causes other // errors. wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } // 把 err 經過 chan 返回給 persistConn.roundTrip 函數,persistConn.roundTrip 函數判斷 err是否爲 nil及相應的處理 pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { // 若是 寫入請求出現錯誤,這裏關閉,pc.closech chan,readLoop的第151行就會中止阻塞,將alive設爲false,進而結束循環,終止 readLoop的goroutine pc.close(err) return } case <-pc.closech: // 這裏結束阻塞,是由 readLoop 結束是,調用 第3行的 defer函數,關閉 pc.closech chan 致使的 return } } }
不管是 persistConn.readLoop
仍是 persistConn.writeLoop
都避免不了和這個函數交互,這個函數的重要性也就不言而喻了
可是 這個函數的主要邏輯就是 建立個鏈接的 writeRequest chan, 也就是 writeLoop 用到的chan,而後把request 經過這個 chan 傳給 persistConn.writeLoop
,而後 在建立一個 responseAndError chan,也就是 readLoop 用到的chan,從 這個chan中獲取 persistConn.readLoop
獲取到的 response,最後把 response返回給上層函數
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { // 設置 request的頭信息,cancel函數,省略.... var continueCh chan struct{} if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() { continueCh = make(chan struct{}, 1) } gone := make(chan struct{}) defer close(gone) defer func() { if err != nil { pc.t.setReqCanceler(req.Request, nil) } }() const debugRoundTrip = false // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. startBytesWritten := pc.nwrite writeErrCh := make(chan error, 1) // 把 request 放入 writech chan 裏面,這樣, `persistConn.writeLoop` 的第6行就能夠拿到 request,往 鏈接 裏面寫入請求信息了 pc.writech <- writeRequest{req, writeErrCh, continueCh} // 定義responseAndError chan,並放入 reqch chan 裏面,這樣 `persistConn.readLoop` 的第46行,也就是 m1標記的地方,就能夠解除阻塞,開始獲取 response的邏輯了 resc := make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() // 阻塞在這裏,直到 獲取 writeLoop 返回的寫入錯誤或 pc.closech的關閉信息,鏈接超時的信息或 readLoop的 resp或cancel或ctx done的信息 for { testHookWaitResLoop() select { // 這裏獲取到 writeLoop的寫入信息,多是err,也可能不是,下面作對應的處理 case err := <-writeErrCh: if debugRoundTrip { req.logf("writeErrCh resv: %T/%#v", err, err) } if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) } // 寫入request沒有問題,判斷是否有超時 if d := pc.t.ResponseHeaderTimeout; d > 0 { if debugRoundTrip { req.logf("starting timer for %v", d) } timer := time.NewTimer(d) defer timer.Stop() // prevent leaks respHeaderTimer = timer.C } // 到此獲取到了 writeLoop的信息,可是並無return,進入下一個循環 case <-pc.closech: // closech的關閉信息 if debugRoundTrip { req.logf("closech recv: %T %#v", pc.closed, pc.closed) } return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) case <-respHeaderTimer: // 等待超時的信息 if debugRoundTrip { req.logf("timeout waiting for response headers.") } pc.close(errTimeout) return nil, errTimeout case re := <-resc: // 從 readLoop獲取到 response if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } // 返回 response,這裏結束循環 return re.res, nil case <-cancelChan: pc.t.CancelRequest(req.Request) cancelChan = nil case <-ctxDoneChan: pc.t.cancelRequest(req.Request, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }
上面 persistConn.roundTrip
persistConn.readLoop
persistConn.writeLoop
之間的數據交互,可能靠語言比較蒼白,這裏畫一下圖示
綜上分析,能夠發現,readLoop和 writeLoop 兩個goroutine在 寫入請求並獲取response返回後,並無跳出for循環,而繼續阻塞在 下一次for循環的select 語句裏面,因此,兩個函數所在的goroutine並無運行結束,致使了最開的現象: goroutine持續增長致使內存持續增長
close的主要邏輯是經過調用 readLoop
的第89行定義的earlyCloseFn 方法, 向 waitForBodyRead 的chan寫入false,進而讓 readLoop
退出阻塞,終止 readLoop
的 goroutine
readLoop
退出的時候,關閉 closech chan,進而讓 writeLoop
退出阻塞,終止 writeLoop
的goroutine
func (es *bodyEOFSignal) Close() error { es.mu.Lock() defer es.mu.Unlock() if es.closed { return nil } es.closed = true if es.earlyCloseFn != nil && es.rerr != io.EOF { // 調用 readLoop定義的函數處理 return es.earlyCloseFn() } err := es.body.Close() return es.condfn(err) }
定義的 earlyCloseFn 方法
body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { // 向 waiForBody 的chan 中寫入 false waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, }
回過頭來看 readLoop
阻塞的代碼
select { case bodyEOF := <-waitForBodyRead: // 這裏的 waitForBodyRead 接收到 earlyCloseFn 傳遞過來的 false,並賦值給 bodyEOF pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool // bodyEOF 爲 false,整個表達式的 值爲 false,從而退出整個for循環,結束 當前goroutine alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false }
當 readLoop
退出的時候,調用函數最開始定義的 defer 函數
defer func() { // 這裏的 close 把 pc.closech chan 關閉 (有興趣的能夠追一下,不難),進而影響 writeLoop 的阻塞 pc.close(closeErr) pc.t.removeIdleConn(pc) }()
繼續看一下 writeLoop
阻塞的代碼
select { case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error // Errors reading from the user's // Request.Body are high priority. // Set it here before sending on the // channels below or calling // pc.close() which tears town // connections and causes other // errors. wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { pc.close(err) return } // 在 上面 readLoop 關閉 pc.closech chan 後,這裏就直接return了,循環終止,結束當前goroutine case <-pc.closech: return }