Go中的HTTP請求之——HTTP1.1請求流程分析

來自公衆號:新世界雜貨鋪

前言

http是目前應用最爲普遍, 也是程序員接觸最多的協議之一。今天筆者站在GoPher的角度對http1.1的請求流程進行全面的分析。但願讀者讀完此文後, 可以有如下幾個收穫:程序員

  1. 對http1.1的請求流程有一個大概的瞭解
  2. 在平時的開發中可以更好地重用底層TCP鏈接
  3. 對http1.1的線頭阻塞能有一個更清楚的認識

HTTP1.1流程

今天內容較多, 廢話很少說, 直接上乾貨。算法

image

接下來, 筆者將根據流程圖,對除了NewRequest之外的函數進行逐步的展開和分析緩存

(*Client).do

(*Client).do方法的核心代碼是一個沒有結束條件的for循環。安全

for {
    // For all but the first request, create the next
    // request hop and replace req.
    if len(reqs) > 0 {
        loc := resp.Header.Get("Location")
        // ...此處省略代碼...
        err = c.checkRedirect(req, reqs)
        // ...此處省略不少代碼...
    }

    reqs = append(reqs, req)
    var err error
    var didTimeout func() bool
    if resp, didTimeout, err = c.send(req, deadline); err != nil {
        // c.send() always closes req.Body
        reqBodyClosed = true
        // ...此處省略代碼...
        return nil, uerr(err)
    }

    var shouldRedirect bool
    redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
    if !shouldRedirect {
        return resp, nil
    }

    req.closeBody()
}

上面的代碼中, 請求第一次進入會調用c.send, 獲得響應後會判斷請求是否須要重定向, 若是須要重定向則繼續循環, 不然返回響應。cookie

進入重定向流程後, 這裏筆者簡單介紹一下checkRedirect函數:併發

func defaultCheckRedirect(req *Request, via []*Request) error {
    if len(via) >= 10 {
        return errors.New("stopped after 10 redirects")
    }
    return nil
}
// ...
func (c *Client) checkRedirect(req *Request, via []*Request) error {
    fn := c.CheckRedirect
    if fn == nil {
        fn = defaultCheckRedirect
    }
    return fn(req, via)
}

由上可知, 用戶能夠本身定義重定向的檢查規則。若是用戶沒有自定義檢查規則, 則重定向次數不能超過10次app

(*Client).send

(*Client).send方法邏輯較爲簡單, 主要看用戶有沒有爲http.Client的Jar字段實現CookieJar接口。主要流程以下:socket

  1. 若是實現了CookieJar接口, 爲Request添加保存的cookie信息。
  2. 調用send函數。
  3. 若是實現了CookieJar接口, 將Response中的cookie信息保存下來。
// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    if c.Jar != nil {
        for _, cookie := range c.Jar.Cookies(req.URL) {
            req.AddCookie(cookie)
        }
    }
    resp, didTimeout, err = send(req, c.transport(), deadline)
    if err != nil {
        return nil, didTimeout, err
    }
    if c.Jar != nil {
        if rc := resp.Cookies(); len(rc) > 0 {
            c.Jar.SetCookies(req.URL, rc)
        }
    }
    return resp, nil, nil
}

另外, 咱們還須要關注c.transport()的調用。若是用戶未對http.Client指定Transport則會使用go默認的DefaultTransport。tcp

該Transport實現RoundTripper接口。在go中RoundTripper的定義爲「執行單個HTTP事務的能力,獲取給定請求的響應」。ide

func (c *Client) transport() RoundTripper {
    if c.Transport != nil {
        return c.Transport
    }
    return DefaultTransport
}

send

send函數會檢查request的URL,以及參數的rt, 和header值。若是URL和rt爲nil則直接返回錯誤。同時, 若是請求中設置了用戶信息, 還會檢查並設置basic的驗證頭信息,最後調用rt.RoundTrip獲得請求的響應。

func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    req := ireq // req is either the original request, or a modified fork
    // ...此處省略代碼...
    if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
        username := u.Username()
        password, _ := u.Password()
        forkReq()
        req.Header = cloneOrMakeHeader(ireq.Header)
        req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
    }

    if !deadline.IsZero() {
        forkReq()
    }
    stopTimer, didTimeout := setRequestCancel(req, rt, deadline)

    resp, err = rt.RoundTrip(req)
    if err != nil {
        // ...此處省略代碼...
        return nil, didTimeout, err
    }
    // ...此處省略代碼...
    return resp, nil, nil
}

(*Transport).RoundTrip

(*Transport).RoundTrip的邏輯很簡單,它會調用(*Transport).roundTrip方法,所以本節其實是對(*Transport).roundTrip方法的分析。

func (t *Transport) RoundTrip(req *Request) (*Response, error) {
    return t.roundTrip(req)
}
func (t *Transport) roundTrip(req *Request) (*Response, error) {
    // ...此處省略校驗header頭和headervalue的代碼以及其餘代碼...

    for {
        select {
        case <-ctx.Done():
            req.closeBody()
            return nil, ctx.Err()
        default:
        }

        // treq gets modified by roundTrip, so we need to recreate for each retry.
        treq := &transportRequest{Request: req, trace: trace}
        cm, err := t.connectMethodForRequest(treq)
        // ...此處省略代碼...
        pconn, err := t.getConn(treq, cm)
        if err != nil {
            t.setReqCanceler(req, nil)
            req.closeBody()
            return nil, err
        }

        var resp *Response
        if pconn.alt != nil {
            // HTTP/2 path.
            t.setReqCanceler(req, nil) // not cancelable with CancelRequest
            resp, err = pconn.alt.RoundTrip(req)
        } else {
            resp, err = pconn.roundTrip(treq)
        }
        if err == nil {
            return resp, nil
        }

        // ...此處省略判斷是否重試請求的代碼邏輯...
    }
}

由上可知, 每次for循環, 會判斷請求上下文是否已經取消, 若是沒有取消則繼續進行後續的流程。

  1. 先調用t.getConn方法獲取一個persistConn。
  2. 由於本篇主旨是http1.1,因此咱們直接看http1.1的執行分支。根據源碼中的註釋和實際的debug結果,獲取到鏈接後, 會繼續調用pconn.roundTrip

(*Transport).getConn

筆者認爲這一步在http請求中是很是核心的一個步驟,由於只有和server端創建鏈接後才能進行後續的通訊。

func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
    req := treq.Request
    trace := treq.trace
    ctx := req.Context()
    // ...此處省略代碼...
    w := &wantConn{
        cm:         cm,
        key:        cm.key(),
        ctx:        ctx,
        ready:      make(chan struct{}, 1),
        beforeDial: testHookPrePendingDial,
        afterDial:  testHookPostPendingDial,
    }
    // ...此處省略代碼...
    // Queue for idle connection.
    if delivered := t.queueForIdleConn(w); delivered {
        pc := w.pc
        // ...此處省略代碼...
        return pc, nil
    }

    cancelc := make(chan error, 1)
    t.setReqCanceler(req, func(err error) { cancelc <- err })

    // Queue for permission to dial.
    t.queueForDial(w)

    // Wait for completion or cancellation.
    select {
    case <-w.ready:
        // Trace success but only for HTTP/1.
        // HTTP/2 calls trace.GotConn itself.
        if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
            trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
        }
        // ...此處省略代碼...
        return w.pc, w.err
    case <-req.Cancel:
        return nil, errRequestCanceledConn
    case <-req.Context().Done():
        return nil, req.Context().Err()
    case err := <-cancelc:
        if err == errRequestCanceled {
            err = errRequestCanceledConn
        }
        return nil, err
    }
}

由上可以清楚的知道, 獲取鏈接分爲如下幾個步驟:

  1. 調用t.queueForIdleConn獲取一個空閒且可複用的鏈接,若是獲取成功則直接返回該鏈接。
  2. 若是未獲取到空閒鏈接則調用t.queueForDial開始新建一個鏈接。
  3. 等待w.ready關閉,則能夠返回新的鏈接。
(*Transport).queueForIdleConn

(*Transport).queueForIdleConn方法會根據請求的connectMethodKey從t.idleConn獲取一個[]*persistConn切片, 並從切片中,根據算法獲取一個有效的空閒鏈接。若是未獲取到空閒鏈接,則將wantConn結構體變量放入t.idleConnWait[w.key]等待隊列,此處wantConn結構體變量就是前面提到的w

connectMethodKey定義和queueForIdleConn部分關鍵代碼以下:

type connectMethodKey struct {
    proxy, scheme, addr string
    onlyH1              bool
}

func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
    // ...此處省略代碼...
    // Look for most recently-used idle connection.
    if list, ok := t.idleConn[w.key]; ok {
        stop := false
        delivered := false
        for len(list) > 0 && !stop {
            pconn := list[len(list)-1]

            // See whether this connection has been idle too long, considering
            // only the wall time (the Round(0)), in case this is a laptop or VM
            // coming out of suspend with previously cached idle connections.
            tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
            // ...此處省略代碼...
            delivered = w.tryDeliver(pconn, nil)
            if delivered {
                // ...此處省略代碼...
            }
            stop = true
        }
        if len(list) > 0 {
            t.idleConn[w.key] = list
        } else {
            delete(t.idleConn, w.key)
        }
        if stop {
            return delivered
        }
    }

    // Register to receive next connection that becomes idle.
    if t.idleConnWait == nil {
        t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
    }
    q := t.idleConnWait[w.key]
    q.cleanFront()
    q.pushBack(w)
    t.idleConnWait[w.key] = q
    return false
}

其中w.tryDeliver方法主要做用是將鏈接協程安全的賦值給w.pc,並關閉w.ready管道。此時咱們即可以和(*Transport).getConn中調用queueForIdleConn成功後的返回值對應上。

(*Transport).queueForDial

(*Transport).queueForDial方法包含三個步驟:

  1. 若是t.MaxConnsPerHost小於等於0,執行go t.dialConnFor(w)並返回。其中MaxConnsPerHost表明着每一個host的最大鏈接數,小於等於0表示不限制。
  2. 若是當前host的鏈接數不超過t.MaxConnsPerHost,對當前host的鏈接數+1,而後執行go t.dialConnFor(w)並返回。
  3. 若是當前host的鏈接數等於t.MaxConnsPerHost,則將wantConn結構體變量放入t.connsPerHostWait[w.key]等待隊列,此處wantConn結構體變量就是前面提到的w。另外在放入等待隊列前會先清除隊列中已經失效或者再也不等待的變量。
func (t *Transport) queueForDial(w *wantConn) {
    w.beforeDial()
    if t.MaxConnsPerHost <= 0 {
        go t.dialConnFor(w)
        return
    }

    t.connsPerHostMu.Lock()
    defer t.connsPerHostMu.Unlock()

    if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
        if t.connsPerHost == nil {
            t.connsPerHost = make(map[connectMethodKey]int)
        }
        t.connsPerHost[w.key] = n + 1
        go t.dialConnFor(w)
        return
    }

    if t.connsPerHostWait == nil {
        t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
    }
    q := t.connsPerHostWait[w.key]
    q.cleanFront()
    q.pushBack(w)
    t.connsPerHostWait[w.key] = q
}
(*Transport).dialConnFor

(*Transport).dialConnFor方法調用t.dialConn獲取一個真正的*persistConn。並將這個鏈接傳遞給w, 若是w已經獲取到了鏈接,則會傳遞失敗,此時調用t.putOrCloseIdleConn將鏈接放回空閒鏈接池。

若是鏈接獲取錯誤則會調用t.decConnsPerHost減小當前host的鏈接數。

func (t *Transport) dialConnFor(w *wantConn) {
    defer w.afterDial()

    pc, err := t.dialConn(w.ctx, w.cm)
    delivered := w.tryDeliver(pc, err)
    if err == nil && (!delivered || pc.alt != nil) {
        // pconn was not passed to w,
        // or it is HTTP/2 and can be shared.
        // Add to the idle connection pool.
        t.putOrCloseIdleConn(pc)
    }
    if err != nil {
        t.decConnsPerHost(w.key)
    }
}
  • (*Transport).putOrCloseIdleConn方法
func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
    if err := t.tryPutIdleConn(pconn); err != nil {
        pconn.close(err)
    }
}
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
    if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
        return errKeepAlivesDisabled
    }
    // ...此處省略代碼...
    t.idleMu.Lock()
    defer t.idleMu.Unlock()
    // ...此處省略代碼...
    
    // Deliver pconn to goroutine waiting for idle connection, if any.
    // (They may be actively dialing, but this conn is ready first.
    // Chrome calls this socket late binding.
    // See https://insouciant.org/tech/connection-management-in-chromium/.)
    key := pconn.cacheKey
    if q, ok := t.idleConnWait[key]; ok {
        done := false
        if pconn.alt == nil {
            // HTTP/1.
            // Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
            for q.len() > 0 {
                w := q.popFront()
                if w.tryDeliver(pconn, nil) {
                    done = true
                    break
                }
            }
        } else {
            // HTTP/2.
            // Can hand the same pconn to everyone in the waiting list,
            // and we still won't be done: we want to put it in the idle
            // list unconditionally, for any future clients too.
            for q.len() > 0 {
                w := q.popFront()
                w.tryDeliver(pconn, nil)
            }
        }
        if q.len() == 0 {
            delete(t.idleConnWait, key)
        } else {
            t.idleConnWait[key] = q
        }
        if done {
            return nil
        }
    }

    if t.closeIdle {
        return errCloseIdle
    }
    if t.idleConn == nil {
        t.idleConn = make(map[connectMethodKey][]*persistConn)
    }
    idles := t.idleConn[key]
    if len(idles) >= t.maxIdleConnsPerHost() {
        return errTooManyIdleHost
    }
    // ...此處省略代碼...
    t.idleConn[key] = append(idles, pconn)
    t.idleLRU.add(pconn)
    // ...此處省略代碼...
    // Set idle timer, but only for HTTP/1 (pconn.alt == nil).
    // The HTTP/2 implementation manages the idle timer itself
    // (see idleConnTimeout in h2_bundle.go).
    if t.IdleConnTimeout > 0 && pconn.alt == nil {
        if pconn.idleTimer != nil {
            pconn.idleTimer.Reset(t.IdleConnTimeout)
        } else {
            pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
        }
    }
    pconn.idleAt = time.Now()
    return nil
}
func (t *Transport) maxIdleConnsPerHost() int {
    if v := t.MaxIdleConnsPerHost; v != 0 {
        return v
    }
    return DefaultMaxIdleConnsPerHost // 2
}

由上可知,將鏈接放入t.idleConn前,先檢查t.idleConnWait的數量。若是有請求在等待空閒鏈接, 則將鏈接複用,沒有空閒鏈接時,纔將鏈接放入t.idleConn。鏈接放入t.idleConn後,還會重置鏈接的可空閒時間。

另外在t.putOrCloseIdleConn函數中還須要注意兩點:

  1. 若是用戶自定義了http.client,且將DisableKeepAlives設置爲true,或者將MaxIdleConnsPerHost設置爲負數,則鏈接不會放入t.idleConn即鏈接不能複用。
  2. 在判斷已有空閒鏈接數量時, 若是MaxIdleConnsPerHost 不等於0, 則返回用戶設置的數量,不然返回默認值2,詳見上面的(*Transport).maxIdleConnsPerHost 函數。

綜上, 咱們知道對於部分有鏈接數限制的業務, 咱們能夠爲http.Client自定義一個Transport, 並設置Transport的MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives從而達到即限制鏈接數量,又能保證必定的併發。

  • (*Transport).decConnsPerHost方法
func (t *Transport) decConnsPerHost(key connectMethodKey) {
    // ...此處省略代碼...
    t.connsPerHostMu.Lock()
    defer t.connsPerHostMu.Unlock()
    n := t.connsPerHost[key]
    // ...此處省略代碼...

    // Can we hand this count to a goroutine still waiting to dial?
    // (Some goroutines on the wait list may have timed out or
    // gotten a connection another way. If they're all gone,
    // we don't want to kick off any spurious dial operations.)
    if q := t.connsPerHostWait[key]; q.len() > 0 {
        done := false
        for q.len() > 0 {
            w := q.popFront()
            if w.waiting() {
                go t.dialConnFor(w)
                done = true
                break
            }
        }
        if q.len() == 0 {
            delete(t.connsPerHostWait, key)
        } else {
            // q is a value (like a slice), so we have to store
            // the updated q back into the map.
            t.connsPerHostWait[key] = q
        }
        if done {
            return
        }
    }

    // Otherwise, decrement the recorded count.
    if n--; n == 0 {
        delete(t.connsPerHost, key)
    } else {
        t.connsPerHost[key] = n
    }
}

由上可知, decConnsPerHost方法主要乾了兩件事:

  1. 判斷是否有請求在等待撥號, 若是有則執行go t.dialConnFor(w)
  2. 若是沒有請求在等待撥號, 則減小當前host的鏈接數量。
(*Transport).dialConn

根據http.Client的默認配置和實際的debug結果,(*Transport).dialConn方法主要邏輯以下:

  1. 調用t.dial(ctx, "tcp", cm.addr())建立TCP鏈接。
  2. 若是是https的請求, 則對請求創建安全的tls傳輸通道。
  3. 爲persistConn建立讀寫buffer, 若是用戶沒有自定義讀寫buffer的大小, 根據writeBufferSize和readBufferSize方法可知, 讀寫bufffer的大小默認爲4096。
  4. 執行go pconn.readLoop()go pconn.writeLoop()開啓讀寫循環而後返回鏈接。
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
    pconn = &persistConn{
        t:             t,
        cacheKey:      cm.key(),
        reqch:         make(chan requestAndChan, 1),
        writech:       make(chan writeRequest, 1),
        closech:       make(chan struct{}),
        writeErrCh:    make(chan error, 1),
        writeLoopDone: make(chan struct{}),
    }
    // ...此處省略代碼...
    if cm.scheme() == "https" && t.hasCustomTLSDialer() {
        // ...此處省略代碼...
    } else {
        conn, err := t.dial(ctx, "tcp", cm.addr())
        if err != nil {
            return nil, wrapErr(err)
        }
        pconn.conn = conn
        if cm.scheme() == "https" {
            var firstTLSHost string
            if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
                return nil, wrapErr(err)
            }
            if err = pconn.addTLS(firstTLSHost, trace); err != nil {
                return nil, wrapErr(err)
            }
        }
    }

    // Proxy setup.
    switch { // ...此處省略代碼... }

    if cm.proxyURL != nil && cm.targetScheme == "https" {
        // ...此處省略代碼...
    }

    if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
        // ...此處省略代碼...
    }

    pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
    pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())

    go pconn.readLoop()
    go pconn.writeLoop()
    return pconn, nil
}
func (t *Transport) writeBufferSize() int {
    if t.WriteBufferSize > 0 {
        return t.WriteBufferSize
    }
    return 4 << 10
}

func (t *Transport) readBufferSize() int {
    if t.ReadBufferSize > 0 {
        return t.ReadBufferSize
    }
    return 4 << 10
}

(*persistConn).roundTrip

(*persistConn).roundTrip方法是http1.1請求的核心之一,該方法在這裏獲取真實的Response並返回給上層。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
    // ...此處省略代碼...

    gone := make(chan struct{})
    defer close(gone)
    // ...此處省略代碼...
    const debugRoundTrip = false

    // Write the request concurrently with waiting for a response,
    // in case the server decides to reply before reading our full
    // request body.
    startBytesWritten := pc.nwrite
    writeErrCh := make(chan error, 1)
    pc.writech <- writeRequest{req, writeErrCh, continueCh}

    resc := make(chan responseAndError)
    pc.reqch <- requestAndChan{
        req:        req.Request,
        ch:         resc,
        addedGzip:  requestedGzip,
        continueCh: continueCh,
        callerGone: gone,
    }

    var respHeaderTimer <-chan time.Time
    cancelChan := req.Request.Cancel
    ctxDoneChan := req.Context().Done()
    for {
        testHookWaitResLoop()
        select {
        case err := <-writeErrCh:
            // ...此處省略代碼...
            if err != nil {
                pc.close(fmt.Errorf("write error: %v", err))
                return nil, pc.mapRoundTripError(req, startBytesWritten, err)
            }
            // ...此處省略代碼...
        case <-pc.closech:
            // ...此處省略代碼...
            return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
        case <-respHeaderTimer:
            // ...此處省略代碼...
            return nil, errTimeout
        case re := <-resc:
            if (re.res == nil) == (re.err == nil) {
                panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
            }
            if debugRoundTrip {
                req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
            }
            if re.err != nil {
                return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
            }
            return re.res, nil
        case <-cancelChan:
            pc.t.CancelRequest(req.Request)
            cancelChan = nil
        case <-ctxDoneChan:
            pc.t.cancelRequest(req.Request, req.Context().Err())
            cancelChan = nil
            ctxDoneChan = nil
        }
    }
}

由上可知, (*persistConn).roundTrip方法能夠分爲三步:

  1. 向鏈接的writech寫入writeRequest: pc.writech <- writeRequest{req, writeErrCh, continueCh}, 參考(*Transport).dialConn可知pc.writech是一個緩衝大小爲1的管道,因此會立馬寫入成功。
  2. 向鏈接的reqch寫入requestAndChan: pc.reqch <- requestAndChan, pc.reqch和pc.writech同樣都是緩衝大小爲1的管道。其中requestAndChan.ch是一個無緩衝的responseAndError管道,(*persistConn).roundTrip就經過這個管道讀取到真實的響應。
  3. 開啓for循環select, 等待響應或者超時等信息。
  • (*persistConn).writeLoop 寫循環

(*persistConn).writeLoop方法主體邏輯相對簡單,把用戶的請求寫入鏈接的寫緩存buffer, 最後再flush就能夠了。

func (pc *persistConn) writeLoop() {
    defer close(pc.writeLoopDone)
    for {
        select {
        case wr := <-pc.writech:
            startBytesWritten := pc.nwrite
            err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
            if bre, ok := err.(requestBodyReadError); ok {
                err = bre.error
                wr.req.setError(err)
            }
            if err == nil {
                err = pc.bw.Flush()
            }
            if err != nil {
                wr.req.Request.closeBody()
                if pc.nwrite == startBytesWritten {
                    err = nothingWrittenError{err}
                }
            }
            pc.writeErrCh <- err // to the body reader, which might recycle us
            wr.ch <- err         // to the roundTrip function
            if err != nil {
                pc.close(err)
                return
            }
        case <-pc.closech:
            return
        }
    }
}
  • (*persistConn).readLoop 讀循環

(*persistConn).readLoop有較多的細節, 咱們先看代碼, 而後再逐步分析。

func (pc *persistConn) readLoop() {
    closeErr := errReadLoopExiting // default value, if not changed below
    defer func() {
        pc.close(closeErr)
        pc.t.removeIdleConn(pc)
    }()

    tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
        if err := pc.t.tryPutIdleConn(pc); err != nil {
            // ...此處省略代碼...
        }
        // ...此處省略代碼...
        return true
    }
    // ...此處省略代碼...
    alive := true
    for alive {
        // ...此處省略代碼...
        rc := <-pc.reqch
        trace := httptrace.ContextClientTrace(rc.req.Context())

        var resp *Response
        if err == nil {
            resp, err = pc.readResponse(rc, trace)
        } else {
            err = transportReadFromServerError{err}
            closeErr = err
        }

        // ...此處省略代碼...
        bodyWritable := resp.bodyIsWritable()
        hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

        if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
            // Don't do keep-alive on error if either party requested a close
            // or we get an unexpected informational (1xx) response.
            // StatusCode 100 is already handled above.
            alive = false
        }

        if !hasBody || bodyWritable {
            // ...此處省略代碼...
            continue
        }

        waitForBodyRead := make(chan bool, 2)
        body := &bodyEOFSignal{
            body: resp.Body,
            earlyCloseFn: func() error {
                waitForBodyRead <- false
                <-eofc // will be closed by deferred call at the end of the function
                return nil

            },
            fn: func(err error) error {
                isEOF := err == io.EOF
                waitForBodyRead <- isEOF
                if isEOF {
                    <-eofc // see comment above eofc declaration
                } else if err != nil {
                    if cerr := pc.canceled(); cerr != nil {
                        return cerr
                    }
                }
                return err
            },
        }

        resp.Body = body
        // ...此處省略代碼...

        select {
        case rc.ch <- responseAndError{res: resp}:
        case <-rc.callerGone:
            return
        }

        // Before looping back to the top of this function and peeking on
        // the bufio.Reader, wait for the caller goroutine to finish
        // reading the response body. (or for cancellation or death)
        select {
        case bodyEOF := <-waitForBodyRead:
            pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
            alive = alive &&
                bodyEOF &&
                !pc.sawEOF &&
                pc.wroteRequest() &&
                tryPutIdleConn(trace)
            if bodyEOF {
                eofc <- struct{}{}
            }
        case <-rc.req.Cancel:
            alive = false
            pc.t.CancelRequest(rc.req)
        case <-rc.req.Context().Done():
            alive = false
            pc.t.cancelRequest(rc.req, rc.req.Context().Err())
        case <-pc.closech:
            alive = false
        }

        testHookReadLoopBeforeNextRead()
    }
}

由上可知, 只要鏈接處於活躍狀態, 則這個讀循環會一直開啓, 直到
鏈接不活躍或者產生其餘錯誤纔會結束讀循環。

在上述源碼中,pc.readResponse(rc,trace)會從鏈接的讀buffer中獲取一個請求對應的Response。

讀到響應以後判斷請求是不是HEAD請求或者響應內容爲空,若是是HEAD請求或者響應內容爲空則將響應寫入rc.ch,並將鏈接放入idleConn(此處由於篇幅的緣由省略了源碼內容, 正常請求的邏輯也有寫響應和將鏈接放入idleConn兩個步驟)。

若是不是HEAD請求而且響應內容不爲空即!hasBody || bodyWritable爲false:

  1. 建立一個緩衝大小爲2的等待響應被讀取的管道waitForBodyRead: waitForBodyRead := make(chan bool, 2)
  2. 將響應的Body修改成bodyEOFSignal結構體。經過上面的源碼咱們能夠知道,此時的resp.Body中有earlyCloseFnfn兩個函數。earlyCloseFn函數會向waitForBodyRead管道寫入false, fn函數會判斷響應是否讀完, 若是已經讀完則向waitForBodyRead寫入true不然寫入false
  3. 將修改後的響應寫入rc.ch。其中rc.chrc := <-pc.reqch獲取,而pc.reqch正是前面(*persistConn).roundTrip函數寫入的requestAndChanrequestAndChan.ch是一個無緩衝的responseAndError管道,(*persistConn).roundTrip經過這個管道讀取到真實的響應。
  4. select 讀取 waitForBodyRead被寫入的值。若是讀到到的是true則能夠調用tryPutIdleConn(此方法會調用前面提到的(*Transport).tryPutIdleConn方法)將鏈接放入idleConn從而複用鏈接。

waitForBodyRead寫入true的緣由咱們已經知道了,可是被寫入true的時機咱們尚不明確。

func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
    // ...此處省略代碼...
    n, err = es.body.Read(p)
    if err != nil {
        es.mu.Lock()
        defer es.mu.Unlock()
        if es.rerr == nil {
            es.rerr = err
        }
        err = es.condfn(err)
    }
    return
}

func (es *bodyEOFSignal) Close() error {
    es.mu.Lock()
    defer es.mu.Unlock()
    if es.closed {
        return nil
    }
    es.closed = true
    if es.earlyCloseFn != nil && es.rerr != io.EOF {
        return es.earlyCloseFn()
    }
    err := es.body.Close()
    return es.condfn(err)
}

// caller must hold es.mu.
func (es *bodyEOFSignal) condfn(err error) error {
    if es.fn == nil {
        return err
    }
    err = es.fn(err)
    es.fn = nil
    return err
}

由上述源碼可知, 只有當調用方完整的讀取了響應,該鏈接纔可以被複用。所以在http1.1中,一個鏈接上的請求,只有等前一個請求處理完以後才能繼續下一個請求。若是前面的請求處理較慢, 則後面的請求必須等待, 這就是http1.1中的線頭阻塞。

根據上面的邏輯, 咱們GoPher在平時的開發中若是遇到了不關心響應的請求, 也必定要記得把響應body讀完以保證鏈接的複用性。筆者在這裏給出一個demo:

io.CopyN(ioutil.Discard, resp.Body, 2 << 10)
resp.Body.Close()

以上,就是筆者整理的HTTP1.1的請求流程。

注意

筆者本着嚴謹的態度, 特此提醒:

上述流程中筆者對不少細節並未詳細說起或者僅一筆帶過,但願讀者酌情參考。

總結

  1. 在go中發起http1.1的請求時, 若是遇到不關心響應的請求,請務必完整讀取響應內容以保證鏈接的複用性。
  2. 若是遇到對鏈接數有限制的業務,能夠經過自定義http.Client的Transport, 並設置Transport的MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives的值,來控制鏈接數。
  3. 若是對於重定向業務邏輯有需求,能夠自定義http.Client的CheckRedirect
  4. 在http1.1,中一個鏈接上的請求,只有等前一個請求處理完以後才能繼續下一個請求。若是前面的請求處理較慢, 則後面的請求必須等待, 這就是http1.1中的線頭阻塞。
注: 寫本文時, 筆者所用go版本爲: go1.14.2

生命不息, 探索不止, 後續將持續更新有關於go的技術探索

原創不易, 卑微求關注收藏二連.

相關文章
相關標籤/搜索