Go發起HTTP2.0請求流程分析(前篇)

來自公衆號:新世界雜貨鋪

前言

Go中的HTTP請求之——HTTP1.1請求流程分析以後,中間斷斷續續,歷時近一月,終於纔敢開始碼字寫下本文。web

閱讀建議

HTTP2.0在創建TCP鏈接和安全的TLS傳輸通道與HTTP1.1的流程基本一致。因此筆者建議沒有看過Go中的HTTP請求之——HTTP1.1請求流程分析這篇文章的先去補一下課,本文會基於前一篇文章僅介紹和HTTP2.0相關的邏輯。segmentfault

(*Transport).roundTrip

(*Transport).roundTrip方法會調用t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)初始化TLSClientConfig以及h2transport,而這二者都和HTTP2.0有着緊密的聯繫。安全

TLSClientConfig: 初始化client支持的http協議, 並在tls握手時告知server。併發

h2transport: 若是本次請求是http2,那麼h2transport會接管鏈接,請求和響應的處理邏輯。app

下面看看源碼:tcp

func (t *Transport) onceSetNextProtoDefaults() {
    // ...此處省略代碼...
    t2, err := http2configureTransport(t)
    if err != nil {
        log.Printf("Error enabling Transport HTTP/2 support: %v", err)
        return
    }
    t.h2transport = t2

    // ...此處省略代碼...
}
func http2configureTransport(t1 *Transport) (*http2Transport, error) {
    connPool := new(http2clientConnPool)
    t2 := &http2Transport{
        ConnPool: http2noDialClientConnPool{connPool},
        t1:       t1,
    }
    connPool.t = t2
    if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil {
        return nil, err
    }
    if t1.TLSClientConfig == nil {
        t1.TLSClientConfig = new(tls.Config)
    }
    if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
        t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
    }
    if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
        t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
    }
    upgradeFn := func(authority string, c *tls.Conn) RoundTripper {
        addr := http2authorityAddr("https", authority)
        if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
            go c.Close()
            return http2erringRoundTripper{err}
        } else if !used {
            // Turns out we don't need this c.
            // For example, two goroutines made requests to the same host
            // at the same time, both kicking off TCP dials. (since protocol
            // was unknown)
            go c.Close()
        }
        return t2
    }
    if m := t1.TLSNextProto; len(m) == 0 {
        t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{
            "h2": upgradeFn,
        }
    } else {
        m["h2"] = upgradeFn
    }
    return t2, nil
}

筆者將上述的源碼簡單拆解爲如下幾個步驟:函數

  1. 新建一個http2clientConnPool並複製給t2,之後http2的請求會優先從該鏈接池中獲取鏈接。
  2. 初始化TLSClientConfig,並將支持的h2http1.1協議添加到TLSClientConfig.NextProtos中。
  3. 定義一個h2upgradeFn存儲到t1.TLSNextProto裏。

鑑於前一篇文章對新建鏈接前的步驟有了較爲詳細的介紹,因此這裏直接看和server創建鏈接的部分源碼,即(*Transport).dialConn方法:oop

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
    // ...此處省略代碼...
    if cm.scheme() == "https" && t.hasCustomTLSDialer() {
        // ...此處省略代碼...
    } else {
        conn, err := t.dial(ctx, "tcp", cm.addr())
        if err != nil {
            return nil, wrapErr(err)
        }
        pconn.conn = conn
        if cm.scheme() == "https" {
            var firstTLSHost string
            if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
                return nil, wrapErr(err)
            }
            if err = pconn.addTLS(firstTLSHost, trace); err != nil {
                return nil, wrapErr(err)
            }
        }
    }

    // Proxy setup.
    // ...此處省略代碼...

    if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
        if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
            return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
        }
    }

    // ...此處省略代碼...
}

筆者對上述的源碼描述以下:大數據

  1. 調用t.dial(ctx, "tcp", cm.addr())建立TCP鏈接。
  2. 若是是https的請求, 則對請求創建安全的tls傳輸通道。
  3. 檢查tls的握手狀態,若是和server協商的NegotiatedProtocol協議不爲空,且client的t.TLSNextProto有該協議,則返回alt不爲空的持久鏈接(HTTP1.1不會進入if條件裏)。

筆者對上述的第三點進行展開。經筆者在本地debug驗證,當client和server都支持http2時,s.NegotiatedProtocol的值爲h2s.NegotiatedProtocolIsMutual的值爲trueui

在上面分析http2configureTransport函數時,咱們知道TLSNextProto註冊了一個key爲h2的函數,因此調用next實際就是調用前面的upgradeFn函數。

upgradeFn會調用connPool.addConnIfNeeded向http2的鏈接池添加一個tls傳輸通道,並最終返回前面已經建立好的t2http2Transport

func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {
    p.mu.Lock()
    // ...此處省略代碼...
    // 主要用於判斷是否有必要像鏈接池添加新的鏈接
    // 判斷鏈接池中是否已有同host鏈接,若是有且該連接可以處理新的請求則直接返回
    call, dup := p.addConnCalls[key]
    if !dup {
        // ...此處省略代碼...
        call = &http2addConnCall{
            p:    p,
            done: make(chan struct{}),
        }
        p.addConnCalls[key] = call
        go call.run(t, key, c)
    }
    p.mu.Unlock()

    <-call.done
    if call.err != nil {
        return false, call.err
    }
    return !dup, nil
}
func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {
    cc, err := t.NewClientConn(tc)

    p := c.p
    p.mu.Lock()
    if err != nil {
        c.err = err
    } else {
        p.addConnLocked(key, cc)
    }
    delete(p.addConnCalls, key)
    p.mu.Unlock()
    close(c.done)
}

分析上述的源碼咱們可以獲得兩點結論:

  1. 執行完upgradeFn以後,(*Transport).dialConn返回的持久化鏈接中alt字段已經不是nil了。
  2. t.NewClientConn(tc)新建出來的鏈接會保存在http2的鏈接池即http2clientConnPool中,下一小結將對NewClientConn展開分析。

最後咱們回到(*Transport).roundTrip方法並分析其中的關鍵源碼:

func (t *Transport) roundTrip(req *Request) (*Response, error) {
    t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
    // ...此處省略代碼...
    for {
        select {
        case <-ctx.Done():
            req.closeBody()
            return nil, ctx.Err()
        default:
        }

        // ...此處省略代碼...
        pconn, err := t.getConn(treq, cm)
        if err != nil {
            t.setReqCanceler(req, nil)
            req.closeBody()
            return nil, err
        }

        var resp *Response
        if pconn.alt != nil {
            // HTTP/2 path.
            t.setReqCanceler(req, nil) // not cancelable with CancelRequest
            resp, err = pconn.alt.RoundTrip(req)
        } else {
            resp, err = pconn.roundTrip(treq)
        }
        if err == nil {
            return resp, nil
        }

        // ...此處省略代碼...
    }
}

結合前面的分析,pconn.alt在server和client都支持http2協議的狀況下是不爲nil的。因此,http2的請求會走pconn.alt.RoundTrip(req)分支,也就是說http2的請求流程就被http2Transport接管啦。

(*http2Transport).NewClientConn

(*http2Transport).NewClientConn內部會調用t.newClientConn(c, t.disableKeepAlives())

由於本節內容較多,因此筆者再也不一次性貼出源碼,而是按關鍵步驟分析並分塊兒貼出源碼。

一、初始化一個http2ClientConn

cc := &http2ClientConn{
    t:                     t,
    tconn:                 c,
    readerDone:            make(chan struct{}),
    nextStreamID:          1,
    maxFrameSize:          16 << 10,           // spec default
    initialWindowSize:     65535,              // spec default
    maxConcurrentStreams:  1000,               // "infinite", per spec. 1000 seems good enough.
    peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
    streams:               make(map[uint32]*http2clientStream),
    singleUse:             singleUse,
    wantSettingsAck:       true,
    pings:                 make(map[[8]byte]chan struct{}),
}

上面的源碼新建了一個默認的http2ClientConn。

initialWindowSize:初始化窗口大小爲65535,這個值以後會初始化每個數據流可發送的數據窗口大小。

maxConcurrentStreams:表示每一個鏈接上容許最多有多少個數據流同時傳輸數據。

streams:當前鏈接上的數據流。

singleUse: 控制http2的鏈接是否容許多個數據流共享,其值由t.disableKeepAlives()控制。

二、建立一個條件鎖而且新建Writer&Reader。

cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize))
cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
cc.br = bufio.NewReader(c)

新建Writer&Reader沒什麼好說的,須要注意的是cc.flow.add(int32(http2initialWindowSize))

cc.flow.add將當前鏈接的可寫流控制窗口大小設置爲http2initialWindowSize,即65535。

三、新建一個讀寫數據幀的Framer。

cc.fr = http2NewFramer(cc.bw, cc.br)
cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()

四、向server發送開場白,併發送一些初始化數據幀。

initialSettings := []http2Setting{
    {ID: http2SettingEnablePush, Val: 0},
    {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},
}
if max := t.maxHeaderListSize(); max != 0 {
    initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})
}

cc.bw.Write(http2clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)
cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize)
cc.bw.Flush()

client向server發送的開場白內容以下:

const (
    // client首先想server發送以PRI開頭的一串字符串。
    http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
)
var (
    http2clientPreface = []byte(http2ClientPreface)
)

發送完開場白後,client向server發送SETTINGS數據幀。

http2SettingEnablePush: 告知server客戶端是否開啓push功能。

http2SettingInitialWindowSize:告知server客戶端可接受的最大數據窗口是http2transportDefaultStreamFlow(4M)。

發送完SETTINGS數據幀後,發送WINDOW_UPDATE數據幀, 由於第一個參數爲0即streamID爲0,則是告知server此鏈接可接受的最大數據窗口爲http2transportDefaultConnFlow(1G)。

發送完WINDOW_UPDATE數據幀後,將client的可讀流控制窗口大小設置爲http2transportDefaultConnFlow + http2initialWindowSize

五、開啓讀循環並返回

go cc.readLoop()

(*http2Transport).RoundTrip

(*http2Transport).RoundTrip只是一個入口函數,它會調用(*http2Transport). RoundTripOpt方法。

(*http2Transport). RoundTripOpt有兩個步驟比較關鍵:

t.connPool().GetClientConn(req, addr): 在http2的鏈接池裏面獲取一個可用鏈接,其中鏈接池的類型爲http2noDialClientConnPool,參考http2configureTransport函數。

cc.roundTrip(req): 經過獲取到的可用鏈接發送請求並返回響應。

(http2noDialClientConnPool).GetClientConn

根據實際的debug結果(http2noDialClientConnPool).GetClientConn最終會調用(*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)

經過(http2noDialClientConnPool).GetClientConn獲取鏈接時傳遞給(*http2clientConnPool).getClientConn方法的第三個參數始終爲false,該參數爲false時表明着即便沒法正常獲取可用鏈接,也不在這個環節從新發起撥號流程。

在(*http2clientConnPool).getClientConn中會遍歷同地址的鏈接,並判斷鏈接的狀態從而獲取一個能夠處理請求的鏈接。

for _, cc := range p.conns[addr] {
    if st := cc.idleState(); st.canTakeNewRequest {
        if p.shouldTraceGetConn(st) {
            http2traceGetConn(req, addr)
        }
        p.mu.Unlock()
        return cc, nil
    }
}

cc.idleState()判斷當前鏈接池中的鏈接可否處理新的請求:

一、當前鏈接是否能被多個請求共享,若是僅單個請求使用且已經有一個數據流,則當前鏈接不能處理新的請求。

if cc.singleUse && cc.nextStreamID > 1 {
    return
}

二、如下幾點均爲true時,才表明當前鏈接可以處理新的請求:

  • 鏈接狀態正常,即未關閉而且不處於正在關閉的狀態。
  • 當前鏈接正在處理的數據流小於maxConcurrentStreams
  • 下一個要處理的數據流 + 當前鏈接處於等待狀態的請求*2 < math.MaxInt32。
  • 當前鏈接沒有長時間處於空閒狀態(主要經過cc.tooIdleLocked()判斷)。
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
        int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
        !cc.tooIdleLocked()

當從連接池成功獲取到一個能夠處理請求的鏈接,就能夠和server進行數據交互,即(*http2ClientConn).roundTrip流程。

(*http2ClientConn).roundTrip

一、在真正開始處理請求前,還要進行header檢查,http2對http1.1的某些header是不支持的,筆者就不對這個邏輯進行分析了,直接上源碼:

func http2checkConnHeaders(req *Request) error {
    if v := req.Header.Get("Upgrade"); v != "" {
        return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
    }
    if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
        return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
    }
    if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {
        return fmt.Errorf("http2: invalid Connection request header: %q", vv)
    }
    return nil
}
func http2commaSeparatedTrailers(req *Request) (string, error) {
    keys := make([]string, 0, len(req.Trailer))
    for k := range req.Trailer {
        k = CanonicalHeaderKey(k)
        switch k {
        case "Transfer-Encoding", "Trailer", "Content-Length":
            return "", &http2badStringError{"invalid Trailer key", k}
        }
        keys = append(keys, k)
    }
    if len(keys) > 0 {
        sort.Strings(keys)
        return strings.Join(keys, ","), nil
    }
    return "", nil
}

二、調用(*http2ClientConn).awaitOpenSlotForRequest,一直等到當前鏈接處理的數據流小於maxConcurrentStreams, 若是此函數返回錯誤,則本次請求失敗。

2.一、double check當前鏈接可用。

if cc.closed || !cc.canTakeNewRequestLocked() {
    if waitingForConn != nil {
        close(waitingForConn)
    }
    return http2errClientConnUnusable
}

2.二、若是當前鏈接處理的數據流小於maxConcurrentStreams則直接返回nil。筆者相信大部分邏輯走到這兒就返回了。

if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
    if waitingForConn != nil {
        close(waitingForConn)
    }
    return nil
}

2.三、若是當前鏈接處理的數據流確實已經達到上限,則開始進入等待流程。

if waitingForConn == nil {
    waitingForConn = make(chan struct{})
    go func() {
        if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
            cc.mu.Lock()
            waitingForConnErr = err
            cc.cond.Broadcast()
            cc.mu.Unlock()
        }
    }()
}
cc.pendingRequests++
cc.cond.Wait()
cc.pendingRequests--

經過上面的邏輯知道,當前鏈接處理的數據流達到上限後有兩種狀況,一是等待請求被取消,二是等待其餘請求結束。若是有其餘數據流結束並喚醒當前等待的請求,則重複2.一、2.2和2.3的步驟。

三、調用cc.newStream()在鏈接上建立一個數據流(建立數據流是線程安全的,由於源碼中在調用awaitOpenSlotForRequest以前先加鎖,直到寫入請求的header以後才釋放鎖)。

func (cc *http2ClientConn) newStream() *http2clientStream {
    cs := &http2clientStream{
        cc:        cc,
        ID:        cc.nextStreamID,
        resc:      make(chan http2resAndError, 1),
        peerReset: make(chan struct{}),
        done:      make(chan struct{}),
    }
    cs.flow.add(int32(cc.initialWindowSize))
    cs.flow.setConnFlow(&cc.flow)
    cs.inflow.add(http2transportDefaultStreamFlow)
    cs.inflow.setConnFlow(&cc.inflow)
    cc.nextStreamID += 2
    cc.streams[cs.ID] = cs
    return cs
}

筆者對上述代碼簡單描述以下:

  • 新建一個http2clientStream,數據流ID爲cc.nextStreamID,新建數據流後,cc.nextStreamID +=2
  • 數據流經過http2resAndError管道接收請求的響應。
  • 初始化當前數據流的可寫流控制窗口大小爲cc.initialWindowSize,並保存鏈接的可寫流控制指針。
  • 初始化當前數據流的可讀流控制窗口大小爲http2transportDefaultStreamFlow,並保存鏈接的可讀流控制指針。
  • 最後將新建的數據流註冊到當前鏈接中。

四、調用cc.t.getBodyWriterState(cs, body)會返回一個http2bodyWriterState結構體。經過該結構體能夠知道請求body是否發送成功。

func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {
    s.cs = cs
    if body == nil {
        return
    }
    resc := make(chan error, 1)
    s.resc = resc
    s.fn = func() {
        cs.cc.mu.Lock()
        cs.startedWrite = true
        cs.cc.mu.Unlock()
        resc <- cs.writeRequestBody(body, cs.req.Body)
    }
    s.delay = t.expectContinueTimeout()
    if s.delay == 0 ||
        !httpguts.HeaderValuesContainsToken(
            cs.req.Header["Expect"],
            "100-continue") {
        return
    }
    // 此處省略代碼,由於絕大部分請求都不會設置100-continue的標頭
    return
}

s.fn: 標記當前數據流開始寫入數據,而且將請求body的發送結果寫入s.resc管道(本文暫不對writeRequestBody展開分析,下篇文章會對其進行分析)。

五、由於是多個請求共享一個鏈接,那麼向鏈接寫入數據幀時須要加鎖,好比加鎖寫入請求頭。

cc.wmu.Lock()
endStream := !hasBody && !hasTrailers
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
cc.wmu.Unlock()

六、若是有請求body,則開始寫入請求body,沒有請求body則設置響應header的超時時間(有請求body時,響應header的超時時間須要在請求body寫完以後設置)。

if hasBody {
    bodyWriter.scheduleBodyWrite()
} else {
    http2traceWroteRequest(cs.trace, nil)
    if d := cc.responseHeaderTimeout(); d != 0 {
        timer := time.NewTimer(d)
        defer timer.Stop()
        respHeaderTimer = timer.C
    }
}

scheduleBodyWrite的內容以下:

func (s http2bodyWriterState) scheduleBodyWrite() {
    if s.timer == nil {
        // We're not doing a delayed write (see
        // getBodyWriterState), so just start the writing
        // goroutine immediately.
        go s.fn()
        return
    }
    http2traceWait100Continue(s.cs.trace)
    if s.timer.Stop() {
        s.timer.Reset(s.delay)
    }
}

由於筆者的請求header中沒有攜帶100-continue標頭,因此在前面的getBodyWriterState函數中初始化的s.timer爲nil即調用scheduleBodyWrite會當即開始發送請求body。

七、輪詢管道獲取響應結果。

在看輪詢源碼以前,先看一個簡單的函數:

handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {
    res := re.res
    if re.err != nil || res.StatusCode > 299 {
        bodyWriter.cancel()
        cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
    }
    if re.err != nil {
        cc.forgetStreamID(cs.ID)
        return nil, cs.getStartedWrite(), re.err
    }
    res.Request = req
    res.TLS = cc.tlsState
    return res, false, nil
}

該函數主要就是判斷讀到的響應是否正常,並根據響應的結果構造(*http2ClientConn).roundTrip的返回值。

瞭解了handleReadLoopResponse以後,下面就看看輪詢的邏輯:

for {
    select {
    case re := <-readLoopResCh:
        return handleReadLoopResponse(re)
    // 此處省略代碼(包含請求取消,請求超時等管道的輪詢)
    case err := <-bodyWriter.resc:
        // Prefer the read loop's response, if available. Issue 16102.
        select {
        case re := <-readLoopResCh:
            return handleReadLoopResponse(re)
        default:
        }
        if err != nil {
            cc.forgetStreamID(cs.ID)
            return nil, cs.getStartedWrite(), err
        }
        bodyWritten = true
        if d := cc.responseHeaderTimeout(); d != 0 {
            timer := time.NewTimer(d)
            defer timer.Stop()
            respHeaderTimer = timer.C
        }
    }
}

筆者僅對上面的第二種狀況即請求body發送完成進行描述:

  • 可否讀到響應,若是可以讀取響應則直接返回。
  • 判斷請求body是否發送成功,若是發送失敗,直接返回。
  • 若是請求body發送成功,則設置響應header的超時時間。

總結

本文主要描述了兩個方面的內容:

  1. 確認client和server都支持http2協議,並構建一個http2的鏈接,同時開啓該鏈接的讀循環。
  2. 經過http2鏈接池獲取一個http2鏈接,併發送請求和讀取響應。

預告

鑑於HTTTP2.0的內容較多,且文章篇幅過長時不易閱讀,筆者將後續要分析的內容拆爲兩個部分:

  1. 描述數據幀和流控制以及讀循環讀到響應併發送給readLoopResCh管道。
  2. http2.0標頭壓縮邏輯。

最後,衷心但願本文可以對各位讀者有必定的幫助。

:

  1. 寫本文時, 筆者所用go版本爲: go1.14.2。
  2. 本文對h2c的狀況不予以考慮。
  3. 由於筆者分析的是請求流程,因此沒有在本地搭建server,而是使用了一個支持http2鏈接的圖片一步步的debug。eg: https://dss0.bdstatic.com/5aV...

參考

https://developers.google.com...

相關文章
相關標籤/搜索