來自公衆號:新世界雜貨鋪
繼Go中的HTTP請求之——HTTP1.1請求流程分析以後,中間斷斷續續,歷時近一月,終於纔敢開始碼字寫下本文。web
HTTP2.0在創建TCP鏈接和安全的TLS傳輸通道與HTTP1.1的流程基本一致。因此筆者建議沒有看過Go中的HTTP請求之——HTTP1.1請求流程分析這篇文章的先去補一下課,本文會基於前一篇文章僅介紹和HTTP2.0相關的邏輯。segmentfault
(*Transport).roundTrip
方法會調用t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
初始化TLSClientConfig
以及h2transport
,而這二者都和HTTP2.0有着緊密的聯繫。安全
TLSClientConfig: 初始化client支持的http協議, 並在tls握手時告知server。併發
h2transport: 若是本次請求是http2,那麼h2transport會接管鏈接,請求和響應的處理邏輯。app
下面看看源碼:tcp
func (t *Transport) onceSetNextProtoDefaults() { // ...此處省略代碼... t2, err := http2configureTransport(t) if err != nil { log.Printf("Error enabling Transport HTTP/2 support: %v", err) return } t.h2transport = t2 // ...此處省略代碼... } func http2configureTransport(t1 *Transport) (*http2Transport, error) { connPool := new(http2clientConnPool) t2 := &http2Transport{ ConnPool: http2noDialClientConnPool{connPool}, t1: t1, } connPool.t = t2 if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil { return nil, err } if t1.TLSClientConfig == nil { t1.TLSClientConfig = new(tls.Config) } if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") { t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...) } if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") { t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1") } upgradeFn := func(authority string, c *tls.Conn) RoundTripper { addr := http2authorityAddr("https", authority) if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil { go c.Close() return http2erringRoundTripper{err} } else if !used { // Turns out we don't need this c. // For example, two goroutines made requests to the same host // at the same time, both kicking off TCP dials. (since protocol // was unknown) go c.Close() } return t2 } if m := t1.TLSNextProto; len(m) == 0 { t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{ "h2": upgradeFn, } } else { m["h2"] = upgradeFn } return t2, nil }
筆者將上述的源碼簡單拆解爲如下幾個步驟:函數
http2clientConnPool
並複製給t2,之後http2的請求會優先從該鏈接池中獲取鏈接。TLSClientConfig
,並將支持的h2
和http1.1
協議添加到TLSClientConfig.NextProtos
中。h2
的upgradeFn
存儲到t1.TLSNextProto
裏。鑑於前一篇文章對新建鏈接前的步驟有了較爲詳細的介紹,因此這裏直接看和server創建鏈接的部分源碼,即(*Transport).dialConn
方法:oop
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { // ...此處省略代碼... if cm.scheme() == "https" && t.hasCustomTLSDialer() { // ...此處省略代碼... } else { conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) } if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // Proxy setup. // ...此處省略代碼... if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil } } // ...此處省略代碼... }
筆者對上述的源碼描述以下:大數據
t.dial(ctx, "tcp", cm.addr())
建立TCP鏈接。NegotiatedProtocol
協議不爲空,且client的t.TLSNextProto
有該協議,則返回alt不爲空的持久鏈接(HTTP1.1不會進入if條件裏)。筆者對上述的第三點進行展開。經筆者在本地debug驗證,當client和server都支持http2時,s.NegotiatedProtocol
的值爲h2
且s.NegotiatedProtocolIsMutual
的值爲true
。ui
在上面分析http2configureTransport
函數時,咱們知道TLSNextProto
註冊了一個key爲h2
的函數,因此調用next
實際就是調用前面的upgradeFn
函數。
upgradeFn
會調用connPool.addConnIfNeeded
向http2的鏈接池添加一個tls傳輸通道,並最終返回前面已經建立好的t2
即http2Transport
。
func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) { p.mu.Lock() // ...此處省略代碼... // 主要用於判斷是否有必要像鏈接池添加新的鏈接 // 判斷鏈接池中是否已有同host鏈接,若是有且該連接可以處理新的請求則直接返回 call, dup := p.addConnCalls[key] if !dup { // ...此處省略代碼... call = &http2addConnCall{ p: p, done: make(chan struct{}), } p.addConnCalls[key] = call go call.run(t, key, c) } p.mu.Unlock() <-call.done if call.err != nil { return false, call.err } return !dup, nil } func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) { cc, err := t.NewClientConn(tc) p := c.p p.mu.Lock() if err != nil { c.err = err } else { p.addConnLocked(key, cc) } delete(p.addConnCalls, key) p.mu.Unlock() close(c.done) }
分析上述的源碼咱們可以獲得兩點結論:
upgradeFn
以後,(*Transport).dialConn返回的持久化鏈接中alt字段已經不是nil了。t.NewClientConn(tc)
新建出來的鏈接會保存在http2的鏈接池即http2clientConnPool
中,下一小結將對NewClientConn展開分析。最後咱們回到(*Transport).roundTrip方法並分析其中的關鍵源碼:
func (t *Transport) roundTrip(req *Request) (*Response, error) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) // ...此處省略代碼... for { select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // ...此處省略代碼... pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // HTTP/2 path. t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) } if err == nil { return resp, nil } // ...此處省略代碼... } }
結合前面的分析,pconn.alt
在server和client都支持http2協議的狀況下是不爲nil的。因此,http2的請求會走pconn.alt.RoundTrip(req)
分支,也就是說http2的請求流程就被http2Transport
接管啦。
(*http2Transport).NewClientConn內部會調用t.newClientConn(c, t.disableKeepAlives())
。
由於本節內容較多,因此筆者再也不一次性貼出源碼,而是按關鍵步驟分析並分塊兒貼出源碼。
一、初始化一個http2ClientConn
:
cc := &http2ClientConn{ t: t, tconn: c, readerDone: make(chan struct{}), nextStreamID: 1, maxFrameSize: 16 << 10, // spec default initialWindowSize: 65535, // spec default maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. streams: make(map[uint32]*http2clientStream), singleUse: singleUse, wantSettingsAck: true, pings: make(map[[8]byte]chan struct{}), }
上面的源碼新建了一個默認的http2ClientConn。
initialWindowSize:初始化窗口大小爲65535,這個值以後會初始化每個數據流可發送的數據窗口大小。
maxConcurrentStreams:表示每一個鏈接上容許最多有多少個數據流同時傳輸數據。
streams:當前鏈接上的數據流。
singleUse: 控制http2的鏈接是否容許多個數據流共享,其值由t.disableKeepAlives()
控制。
二、建立一個條件鎖而且新建Writer&Reader。
cc.cond = sync.NewCond(&cc.mu) cc.flow.add(int32(http2initialWindowSize)) cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr}) cc.br = bufio.NewReader(c)
新建Writer&Reader沒什麼好說的,須要注意的是cc.flow.add(int32(http2initialWindowSize))
。
cc.flow.add
將當前鏈接的可寫流控制窗口大小設置爲http2initialWindowSize
,即65535。
三、新建一個讀寫數據幀的Framer。
cc.fr = http2NewFramer(cc.bw, cc.br) cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil) cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
四、向server發送開場白,併發送一些初始化數據幀。
initialSettings := []http2Setting{ {ID: http2SettingEnablePush, Val: 0}, {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow}, } if max := t.maxHeaderListSize(); max != 0 { initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max}) } cc.bw.Write(http2clientPreface) cc.fr.WriteSettings(initialSettings...) cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow) cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize) cc.bw.Flush()
client向server發送的開場白內容以下:
const ( // client首先想server發送以PRI開頭的一串字符串。 http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" ) var ( http2clientPreface = []byte(http2ClientPreface) )
發送完開場白後,client向server發送SETTINGS
數據幀。
http2SettingEnablePush: 告知server客戶端是否開啓push功能。
http2SettingInitialWindowSize:告知server客戶端可接受的最大數據窗口是http2transportDefaultStreamFlow
(4M)。
發送完SETTINGS數據幀後,發送WINDOW_UPDATE數據幀, 由於第一個參數爲0即streamID爲0,則是告知server此鏈接可接受的最大數據窗口爲http2transportDefaultConnFlow
(1G)。
發送完WINDOW_UPDATE數據幀後,將client的可讀流控制窗口大小設置爲http2transportDefaultConnFlow + http2initialWindowSize
。
五、開啓讀循環並返回
go cc.readLoop()
(*http2Transport).RoundTrip只是一個入口函數,它會調用(*http2Transport). RoundTripOpt方法。
(*http2Transport). RoundTripOpt有兩個步驟比較關鍵:
t.connPool().GetClientConn(req, addr)
: 在http2的鏈接池裏面獲取一個可用鏈接,其中鏈接池的類型爲http2noDialClientConnPool
,參考http2configureTransport
函數。
cc.roundTrip(req)
: 經過獲取到的可用鏈接發送請求並返回響應。
根據實際的debug結果(http2noDialClientConnPool).GetClientConn最終會調用(*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)
。
經過(http2noDialClientConnPool).GetClientConn獲取鏈接時傳遞給(*http2clientConnPool).getClientConn方法的第三個參數始終爲false
,該參數爲false時表明着即便沒法正常獲取可用鏈接,也不在這個環節從新發起撥號流程。
在(*http2clientConnPool).getClientConn中會遍歷同地址的鏈接,並判斷鏈接的狀態從而獲取一個能夠處理請求的鏈接。
for _, cc := range p.conns[addr] { if st := cc.idleState(); st.canTakeNewRequest { if p.shouldTraceGetConn(st) { http2traceGetConn(req, addr) } p.mu.Unlock() return cc, nil } }
cc.idleState()
判斷當前鏈接池中的鏈接可否處理新的請求:
一、當前鏈接是否能被多個請求共享,若是僅單個請求使用且已經有一個數據流,則當前鏈接不能處理新的請求。
if cc.singleUse && cc.nextStreamID > 1 { return }
二、如下幾點均爲true時,才表明當前鏈接可以處理新的請求:
maxConcurrentStreams
。cc.tooIdleLocked()
判斷)。st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 && !cc.tooIdleLocked()
當從連接池成功獲取到一個能夠處理請求的鏈接,就能夠和server進行數據交互,即(*http2ClientConn).roundTrip
流程。
一、在真正開始處理請求前,還要進行header檢查,http2對http1.1的某些header是不支持的,筆者就不對這個邏輯進行分析了,直接上源碼:
func http2checkConnHeaders(req *Request) error { if v := req.Header.Get("Upgrade"); v != "" { return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"]) } if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") { return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv) } if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) { return fmt.Errorf("http2: invalid Connection request header: %q", vv) } return nil } func http2commaSeparatedTrailers(req *Request) (string, error) { keys := make([]string, 0, len(req.Trailer)) for k := range req.Trailer { k = CanonicalHeaderKey(k) switch k { case "Transfer-Encoding", "Trailer", "Content-Length": return "", &http2badStringError{"invalid Trailer key", k} } keys = append(keys, k) } if len(keys) > 0 { sort.Strings(keys) return strings.Join(keys, ","), nil } return "", nil }
二、調用(*http2ClientConn).awaitOpenSlotForRequest
,一直等到當前鏈接處理的數據流小於maxConcurrentStreams
, 若是此函數返回錯誤,則本次請求失敗。
2.一、double check當前鏈接可用。
if cc.closed || !cc.canTakeNewRequestLocked() { if waitingForConn != nil { close(waitingForConn) } return http2errClientConnUnusable }
2.二、若是當前鏈接處理的數據流小於maxConcurrentStreams
則直接返回nil。筆者相信大部分邏輯走到這兒就返回了。
if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { if waitingForConn != nil { close(waitingForConn) } return nil }
2.三、若是當前鏈接處理的數據流確實已經達到上限,則開始進入等待流程。
if waitingForConn == nil { waitingForConn = make(chan struct{}) go func() { if err := http2awaitRequestCancel(req, waitingForConn); err != nil { cc.mu.Lock() waitingForConnErr = err cc.cond.Broadcast() cc.mu.Unlock() } }() } cc.pendingRequests++ cc.cond.Wait() cc.pendingRequests--
經過上面的邏輯知道,當前鏈接處理的數據流達到上限後有兩種狀況,一是等待請求被取消,二是等待其餘請求結束。若是有其餘數據流結束並喚醒當前等待的請求,則重複2.一、2.2和2.3的步驟。
三、調用cc.newStream()
在鏈接上建立一個數據流(建立數據流是線程安全的,由於源碼中在調用awaitOpenSlotForRequest
以前先加鎖,直到寫入請求的header以後才釋放鎖)。
func (cc *http2ClientConn) newStream() *http2clientStream { cs := &http2clientStream{ cc: cc, ID: cc.nextStreamID, resc: make(chan http2resAndError, 1), peerReset: make(chan struct{}), done: make(chan struct{}), } cs.flow.add(int32(cc.initialWindowSize)) cs.flow.setConnFlow(&cc.flow) cs.inflow.add(http2transportDefaultStreamFlow) cs.inflow.setConnFlow(&cc.inflow) cc.nextStreamID += 2 cc.streams[cs.ID] = cs return cs }
筆者對上述代碼簡單描述以下:
http2clientStream
,數據流ID爲cc.nextStreamID
,新建數據流後,cc.nextStreamID +=2
。http2resAndError
管道接收請求的響應。cc.initialWindowSize
,並保存鏈接的可寫流控制指針。http2transportDefaultStreamFlow
,並保存鏈接的可讀流控制指針。四、調用cc.t.getBodyWriterState(cs, body)
會返回一個http2bodyWriterState
結構體。經過該結構體能夠知道請求body是否發送成功。
func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) { s.cs = cs if body == nil { return } resc := make(chan error, 1) s.resc = resc s.fn = func() { cs.cc.mu.Lock() cs.startedWrite = true cs.cc.mu.Unlock() resc <- cs.writeRequestBody(body, cs.req.Body) } s.delay = t.expectContinueTimeout() if s.delay == 0 || !httpguts.HeaderValuesContainsToken( cs.req.Header["Expect"], "100-continue") { return } // 此處省略代碼,由於絕大部分請求都不會設置100-continue的標頭 return }
s.fn
: 標記當前數據流開始寫入數據,而且將請求body的發送結果寫入s.resc
管道(本文暫不對writeRequestBody
展開分析,下篇文章會對其進行分析)。
五、由於是多個請求共享一個鏈接,那麼向鏈接寫入數據幀時須要加鎖,好比加鎖寫入請求頭。
cc.wmu.Lock() endStream := !hasBody && !hasTrailers werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) cc.wmu.Unlock()
六、若是有請求body,則開始寫入請求body,沒有請求body則設置響應header的超時時間(有請求body時,響應header的超時時間須要在請求body寫完以後設置)。
if hasBody { bodyWriter.scheduleBodyWrite() } else { http2traceWroteRequest(cs.trace, nil) if d := cc.responseHeaderTimeout(); d != 0 { timer := time.NewTimer(d) defer timer.Stop() respHeaderTimer = timer.C } }
scheduleBodyWrite
的內容以下:
func (s http2bodyWriterState) scheduleBodyWrite() { if s.timer == nil { // We're not doing a delayed write (see // getBodyWriterState), so just start the writing // goroutine immediately. go s.fn() return } http2traceWait100Continue(s.cs.trace) if s.timer.Stop() { s.timer.Reset(s.delay) } }
由於筆者的請求header中沒有攜帶100-continue
標頭,因此在前面的getBodyWriterState
函數中初始化的s.timer爲nil即調用scheduleBodyWrite
會當即開始發送請求body。
七、輪詢管道獲取響應結果。
在看輪詢源碼以前,先看一個簡單的函數:
handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) { res := re.res if re.err != nil || res.StatusCode > 299 { bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWrite) } if re.err != nil { cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), re.err } res.Request = req res.TLS = cc.tlsState return res, false, nil }
該函數主要就是判斷讀到的響應是否正常,並根據響應的結果構造(*http2ClientConn).roundTrip
的返回值。
瞭解了handleReadLoopResponse
以後,下面就看看輪詢的邏輯:
for { select { case re := <-readLoopResCh: return handleReadLoopResponse(re) // 此處省略代碼(包含請求取消,請求超時等管道的輪詢) case err := <-bodyWriter.resc: // Prefer the read loop's response, if available. Issue 16102. select { case re := <-readLoopResCh: return handleReadLoopResponse(re) default: } if err != nil { cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), err } bodyWritten = true if d := cc.responseHeaderTimeout(); d != 0 { timer := time.NewTimer(d) defer timer.Stop() respHeaderTimer = timer.C } } }
筆者僅對上面的第二種狀況即請求body發送完成進行描述:
本文主要描述了兩個方面的內容:
鑑於HTTTP2.0的內容較多,且文章篇幅過長時不易閱讀,筆者將後續要分析的內容拆爲兩個部分:
readLoopResCh
管道。最後,衷心但願本文可以對各位讀者有必定的幫助。
注:
- 寫本文時, 筆者所用go版本爲: go1.14.2。
- 本文對h2c的狀況不予以考慮。
- 由於筆者分析的是請求流程,因此沒有在本地搭建server,而是使用了一個支持http2鏈接的圖片一步步的debug。eg: https://dss0.bdstatic.com/5aV...