來自公衆號:新世界雜貨鋪
這是HTTP2.0系列的第二篇,因此筆者推薦閱讀順序以下:git
本篇主要分爲三個部分:數據幀,流控制器以及經過分析源碼逐步瞭解流控制。github
本有意將這三個部分拆成三篇文章,但它們之間又有聯繫,因此最後依舊決定放在一篇文章裏面。因爲內容較多,筆者認爲分三次分別閱讀三個部分較佳。segmentfault
HTTP2通訊的最小單位是數據幀,每個幀都包含兩部分:幀頭和Payload。不一樣數據流的幀能夠交錯發送(同一個數據流的幀必須順序發送),而後再根據每一個幀頭的數據流標識符從新組裝。app
因爲Payload中爲有效數據,故僅對幀頭進行分析描述。ide
幀頭總長度爲9個字節,幷包含四個部分,分別是:函數
用圖表示以下:oop
數據幀的格式和各部分的含義已經清楚了, 那麼咱們看看代碼中怎麼讀取一個幀頭:ui
func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) { _, err := io.ReadFull(r, buf[:http2frameHeaderLen]) if err != nil { return http2FrameHeader{}, err } return http2FrameHeader{ Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])), Type: http2FrameType(buf[3]), Flags: http2Flags(buf[4]), StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1), valid: true, }, nil }
在上面的代碼中http2frameHeaderLen
是一個常量,其值爲9。this
從io.Reader中讀取9個字節後,將前三個字節和後四個字節均轉爲uint32
的類型,從而獲得Payload長度和數據流ID。另外須要理解的是幀頭的前三個字節和後四個字節存儲格式爲大端(大小端筆者就不在這裏解釋了,請尚不瞭解的讀者自行百度)。編碼
根據http://http2.github.io/http2-...,數據幀類型總共有10個。在go源碼中均有體現:
const ( http2FrameData http2FrameType = 0x0 http2FrameHeaders http2FrameType = 0x1 http2FramePriority http2FrameType = 0x2 http2FrameRSTStream http2FrameType = 0x3 http2FrameSettings http2FrameType = 0x4 http2FramePushPromise http2FrameType = 0x5 http2FramePing http2FrameType = 0x6 http2FrameGoAway http2FrameType = 0x7 http2FrameWindowUpdate http2FrameType = 0x8 http2FrameContinuation http2FrameType = 0x9 )
http2FrameData
:主要用於發送請求body和接收響應的數據幀。
http2FrameHeaders
:主要用於發送請求header和接收響應header的數據幀。
http2FrameSettings
:主要用於client和server交流設置相關的數據幀。
http2FrameWindowUpdate
:主要用於流控制的數據幀。
其餘數據幀類型由於本文不涉及,故不作描述。
因爲數據幀標識符種類較多,筆者在這裏僅介紹其中部分標識符,先看源碼:
const ( // Data Frame http2FlagDataEndStream http2Flags = 0x1 // Headers Frame http2FlagHeadersEndStream http2Flags = 0x1 // Settings Frame http2FlagSettingsAck http2Flags = 0x1 // 此處省略定義其餘數據幀標識符的代碼 )
http2FlagDataEndStream
:在前篇中提到,調用(*http2ClientConn).newStream
方法會建立一個數據流,那這個數據流何時結束呢,這就是http2FlagDataEndStream
的做用。
當client收到有響應body的響應時(HEAD請求無響應body,301,302等響應也無響應body),一直讀到http2FrameData
數據幀的標識符爲http2FlagDataEndStream
則意味着本次請求結束能夠關閉當前數據流。
http2FlagHeadersEndStream
:若是讀到的http2FrameHeaders
數據幀有此標識符也意味着本次請求結束。
http2FlagSettingsAck
:該標示符意味着對方確認收到http2FrameSettings
數據幀。
流控制是一種阻止發送方向接收方發送大量數據的機制,以避免超出後者的需求或處理能力。Go中HTTP2經過http2flow
結構體進行流控制:
type http2flow struct { // n is the number of DATA bytes we're allowed to send. // A flow is kept both on a conn and a per-stream. n int32 // conn points to the shared connection-level flow that is // shared by all streams on that conn. It is nil for the flow // that's on the conn directly. conn *http2flow }
字段含義英文註釋已經描述的很清楚了,因此筆者再也不翻譯。下面看一下和流控制有關的方法。
此方法返回當前流控制可發送的最大字節數:
func (f *http2flow) available() int32 { n := f.n if f.conn != nil && f.conn.n < n { n = f.conn.n } return n }
f.conn
爲nil則意味着此控制器的控制級別爲鏈接,那麼可發送的最大字節數就是f.n
。f.conn
不爲nil則意味着此控制器的控制級別爲數據流,且當前數據流可發送的最大字節數不能超過當前鏈接可發送的最大字節數。此方法用於消耗當前流控制器的可發送字節數:
func (f *http2flow) take(n int32) { if n > f.available() { panic("internal error: took too much") } f.n -= n if f.conn != nil { f.conn.n -= n } }
經過實際須要傳遞一個參數,告知當前流控制器想要發送的數據大小。若是發送的大小超過流控制器容許的大小,則panic
,若是未超過流控制器容許的大小,則將當前數據流和當前鏈接的可發送字節數-n
。
有消耗就有新增,此方法用於增長流控制器可發送的最大字節數:
func (f *http2flow) add(n int32) bool { sum := f.n + n if (sum > n) == (f.n > 0) { f.n = sum return true } return false }
上面的代碼惟一須要注意的地方是,當sum超過int32正數最大值(2^31-1)時會返回false。
回顧:在前篇中提到的(*http2Transport).NewClientConn
方法和(*http2ClientConn).newStream
方法均經過(*http2flow).add
初始化可發送數據窗口大小。
有了幀和流控制器的基本概念,下面咱們結合源碼來分析總結流控制的具體實現。
前篇分析(*http2Transport).newClientConn
時止步於讀循環,那麼今天咱們就從(*http2ClientConn).readLoop
開始。
func (cc *http2ClientConn) readLoop() { rl := &http2clientConnReadLoop{cc: cc} defer rl.cleanup() cc.readerErr = rl.run() if ce, ok := cc.readerErr.(http2ConnectionError); ok { cc.wmu.Lock() cc.fr.WriteGoAway(0, http2ErrCode(ce), nil) cc.wmu.Unlock() } }
由上可知,readLoop的邏輯比較簡單,其核心邏輯在(*http2clientConnReadLoop).run
方法裏。
func (rl *http2clientConnReadLoop) run() error { cc := rl.cc rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse gotReply := false // ever saw a HEADERS reply gotSettings := false for { f, err := cc.fr.ReadFrame() // 此處省略代碼 maybeIdle := false // whether frame might transition us to idle switch f := f.(type) { case *http2MetaHeadersFrame: err = rl.processHeaders(f) maybeIdle = true gotReply = true case *http2DataFrame: err = rl.processData(f) maybeIdle = true case *http2GoAwayFrame: err = rl.processGoAway(f) maybeIdle = true case *http2RSTStreamFrame: err = rl.processResetStream(f) maybeIdle = true case *http2SettingsFrame: err = rl.processSettings(f) case *http2PushPromiseFrame: err = rl.processPushPromise(f) case *http2WindowUpdateFrame: err = rl.processWindowUpdate(f) case *http2PingFrame: err = rl.processPing(f) default: cc.logf("Transport: unhandled response frame type %T", f) } if err != nil { if http2VerboseLogs { cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err) } return err } if rl.closeWhenIdle && gotReply && maybeIdle { cc.closeIfIdle() } } }
由上可知,(*http2clientConnReadLoop).run
的核心邏輯是讀取數據幀而後對不一樣的數據幀進行不一樣的處理。
cc.fr.ReadFrame()
會根據前面介紹的數據幀格式讀出數據幀。
前篇中提到使用了一個支持h2協議的圖片進行分析,本篇繼續複用該圖片對(*http2clientConnReadLoop).run
方法進行debug。
讀循環會最早讀到http2FrameSettings
數據幀。讀到該數據幀後會調用(*http2clientConnReadLoop).processSettings
方法。(*http2clientConnReadLoop).processSettings
主要包含3個邏輯。
一、判斷是不是http2FrameSettings
的ack信息,若是是直接返回,不然繼續後面的步驟。
if f.IsAck() { if cc.wantSettingsAck { cc.wantSettingsAck = false return nil } return http2ConnectionError(http2ErrCodeProtocol) }
二、處理不一樣http2FrameSettings
的數據幀,並根據server傳遞的信息,修改maxConcurrentStreams
等的值。
err := f.ForeachSetting(func(s http2Setting) error { switch s.ID { case http2SettingMaxFrameSize: cc.maxFrameSize = s.Val case http2SettingMaxConcurrentStreams: cc.maxConcurrentStreams = s.Val case http2SettingMaxHeaderListSize: cc.peerMaxHeaderListSize = uint64(s.Val) case http2SettingInitialWindowSize: if s.Val > math.MaxInt32 { return http2ConnectionError(http2ErrCodeFlowControl) } delta := int32(s.Val) - int32(cc.initialWindowSize) for _, cs := range cc.streams { cs.flow.add(delta) } cc.cond.Broadcast() cc.initialWindowSize = s.Val default: // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. cc.vlogf("Unhandled Setting: %v", s) } return nil })
當收到ID爲http2SettingInitialWindowSize
的幀時,會調整當前鏈接中全部數據流的可發送數據窗口大小,並修改當前鏈接的initialWindowSize
(每一個新建立的數據流均會使用該值初始化可發送數據窗口大小)爲s.Val
。
三、發送http2FrameSettings
的ack信息給server。
cc.wmu.Lock() defer cc.wmu.Unlock() cc.fr.WriteSettingsAck() cc.bw.Flush() return cc.werr
在筆者debug的過程當中,處理完http2FrameSettings
數據幀後,緊接着就收到了http2WindowUpdateFrame
數據幀。收到該數據幀後會調用(*http2clientConnReadLoop).processWindowUpdate
方法:
func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error { cc := rl.cc cs := cc.streamByID(f.StreamID, false) if f.StreamID != 0 && cs == nil { return nil } cc.mu.Lock() defer cc.mu.Unlock() fl := &cc.flow if cs != nil { fl = &cs.flow } if !fl.add(int32(f.Increment)) { return http2ConnectionError(http2ErrCodeFlowControl) } cc.cond.Broadcast() return nil }
上面的邏輯主要用於更新當前鏈接和數據流的可發送數據窗口大小。若是http2WindowUpdateFrame幀中的StreamID爲0,則更新當前鏈接的可發送數據窗口大小,不然更新對應數據流可發送數據窗口大小。
注意:在debug的過程,收到http2WindowUpdateFrame
數據幀後,又收到一次http2FrameSettings
,且該數據幀標識符爲http2FlagSettingsAck
。
筆者在這裏特地提醒,這是由於前篇中提到的(*http2Transport).NewClientConn方法,也向server發送了http2FrameSettings數據幀和http2WindowUpdateFrame數據幀。
另外,在處理http2FrameSettings
和http2WindowUpdateFrame
過程當中,均出現了cc.cond.Broadcast()
調用,該調用主要用於喚醒由於如下兩種狀況而Wait
的請求:
maxConcurrentStreams
的上限(詳見前篇中(*http2ClientConn).awaitOpenSlotForRequest
方法分析)。收到此數據幀意味着某一個請求已經開始接收響應數據。此數據幀對應的處理函數爲(*http2clientConnReadLoop).processHeaders
:
func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error { cc := rl.cc cs := cc.streamByID(f.StreamID, false) // 此處省略代碼 res, err := rl.handleResponse(cs, f) if err != nil { // 此處省略代碼 cs.resc <- http2resAndError{err: err} return nil // return nil from process* funcs to keep conn alive } if res == nil { // (nil, nil) special case. See handleResponse docs. return nil } cs.resTrailer = &res.Trailer cs.resc <- http2resAndError{res: res} return nil }
首先咱們先看cs.resc <- http2resAndError{res: res}
這一行代碼,向數據流寫入http2resAndError
即本次請求的響應。在(*http2ClientConn).roundTrip
方法中有這樣一行代碼readLoopResCh := cs.resc
。
回顧:前篇(*http2ClientConn).roundTrip
方法的第7點和本部分關聯起來就能夠造成一個完整的請求鏈。
接下來咱們對rl.handleResponse
方法展開分析。
(*http2clientConnReadLoop).handleResponse
的主要做用是構建一個Response
變量,下面對該函數的關鍵步驟進行描述。
一、構建一個Response
變量。
header := make(Header) res := &Response{ Proto: "HTTP/2.0", ProtoMajor: 2, Header: header, StatusCode: statusCode, Status: status + " " + StatusText(statusCode), }
二、構建header(本篇不對header進行展開分析)。
for _, hf := range f.RegularFields() { key := CanonicalHeaderKey(hf.Name) if key == "Trailer" { t := res.Trailer if t == nil { t = make(Header) res.Trailer = t } http2foreachHeaderElement(hf.Value, func(v string) { t[CanonicalHeaderKey(v)] = nil }) } else { header[key] = append(header[key], hf.Value) } }
三、處理響應body的ContentLength。
streamEnded := f.StreamEnded() isHead := cs.req.Method == "HEAD" if !streamEnded || isHead { res.ContentLength = -1 if clens := res.Header["Content-Length"]; len(clens) == 1 { if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { res.ContentLength = clen64 } else { // TODO: care? unlike http/1, it won't mess up our framing, so it's // more safe smuggling-wise to ignore. } } else if len(clens) > 1 { // TODO: care? unlike http/1, it won't mess up our framing, so it's // more safe smuggling-wise to ignore. } }
由上可知,當前數據流沒有結束或者是HEAD請求才讀取ContentLength。若是header中的ContentLength不合法則res.ContentLength的值爲 -1。
四、構建res.Body
。
cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}} cs.bytesRemain = res.ContentLength res.Body = http2transportResponseBody{cs} go cs.awaitRequestCancel(cs.req) if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { res.Header.Del("Content-Encoding") res.Header.Del("Content-Length") res.ContentLength = -1 res.Body = &http2gzipReader{body: res.Body} res.Uncompressed = true }
根據Content-Encoding
的編碼方式,會構建兩種不一樣的Body:
http2transportResponseBody
。http2gzipReader
。收到此數據幀意味着咱們開始接收真實的響應,即日常開發中須要處理的業務數據。此數據幀對應的處理函數爲(*http2clientConnReadLoop).processData
。
由於server沒法及時知道數據流在client端的狀態,因此server可能會向client中一個已經不存在的數據流發送數據:
cc := rl.cc cs := cc.streamByID(f.StreamID, f.StreamEnded()) data := f.Data() if cs == nil { cc.mu.Lock() neverSent := cc.nextStreamID cc.mu.Unlock() // 此處省略代碼 if f.Length > 0 { cc.mu.Lock() cc.inflow.add(int32(f.Length)) cc.mu.Unlock() cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(f.Length)) cc.bw.Flush() cc.wmu.Unlock() } return nil }
接收到的數據幀在client沒有對應的數據流處理時,經過流控制器爲當前鏈接可讀窗口大小增長f.Length
,而且經過http2FrameWindowUpdate
數據幀告知server將當前鏈接的可寫窗口大小增長f.Length
。
若是client有對應的數據流且f.Length
大於0:
一、若是是head請求結束當前數據流並返回。
if cs.req.Method == "HEAD" && len(data) > 0 { cc.logf("protocol error: received DATA on a HEAD request") rl.endStreamError(cs, http2StreamError{ StreamID: f.StreamID, Code: http2ErrCodeProtocol, }) return nil }
二、檢查當前數據流可否處理f.Length
長度的數據。
cc.mu.Lock() if cs.inflow.available() >= int32(f.Length) { cs.inflow.take(int32(f.Length)) } else { cc.mu.Unlock() return http2ConnectionError(http2ErrCodeFlowControl) }
由上可知當前數據流若是可以處理該數據,經過流控制器調用cs.inflow.take
減少當前數據流可接受窗口大小。
三、當前數據流被重置或者被關閉即cs.didReset
爲true時又或者數據幀有填充數據時須要調整流控制窗口。
var refund int if pad := int(f.Length) - len(data); pad > 0 { refund += pad } // Return len(data) now if the stream is already closed, // since data will never be read. didReset := cs.didReset if didReset { refund += len(data) } if refund > 0 { cc.inflow.add(int32(refund)) cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(refund)) if !didReset { cs.inflow.add(int32(refund)) cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) } cc.bw.Flush() cc.wmu.Unlock() } cc.mu.Unlock()
最後,根據計算的refund增長當前鏈接或者當前數據流的可接受窗口大小,而且同時告知server增長當前鏈接或者當前數據流的可寫窗口大小。
四、數據長度大於0且數據流正常則將數據寫入數據流緩衝區。
if len(data) > 0 && !didReset { if _, err := cs.bufPipe.Write(data); err != nil { rl.endStreamError(cs, err) return err } }
回顧:前面的(*http2clientConnReadLoop).handleResponse
方法中有這樣一行代碼res.Body = http2transportResponseBody{cs}
,因此在業務開發時可以經過Response讀取到數據流中的緩衝數據。
在前面的內容裏,若是數據流狀態正常且數據幀沒有填充數據則數據流和鏈接的可接收窗口會一直變小,而這部份內容就是增長數據流的可接受窗口大小。
由於篇幅和主旨的問題筆者僅分析描述該方法內和流控制有關的部分。
一、讀取響應數據後計算當前鏈接須要增長的可接受窗口大小。
cc.mu.Lock() defer cc.mu.Unlock() var connAdd, streamAdd int32 // Check the conn-level first, before the stream-level. if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 { connAdd = http2transportDefaultConnFlow - v cc.inflow.add(connAdd) }
若是當前鏈接可接受窗口的大小已經小於http2transportDefaultConnFlow
(1G)的一半,則當前鏈接可接收窗口大小須要增長http2transportDefaultConnFlow - cc.inflow.available()
。
回顧:http2transportDefaultConnFlow
在前篇(*http2Transport).NewClientConn
方法部分有提到,且鏈接剛創建時會經過http2WindowUpdateFrame
數據幀告知server當前鏈接可發送窗口大小增長http2transportDefaultConnFlow
。
二、讀取響應數據後計算當前數據流須要增長的可接受窗口大小。
if err == nil { // No need to refresh if the stream is over or failed. // Consider any buffered body data (read from the conn but not // consumed by the client) when computing flow control for this // stream. v := int(cs.inflow.available()) + cs.bufPipe.Len() if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh { streamAdd = int32(http2transportDefaultStreamFlow - v) cs.inflow.add(streamAdd) } }
若是當前數據流可接受窗口大小加上當前數據流緩衝區剩餘未讀數據的長度小於http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh
(4M-4KB),則當前數據流可接受窗口大小須要增長http2transportDefaultStreamFlow - v
。
回顧:http2transportDefaultStreamFlow
在前篇(*http2Transport).NewClientConn
方法和(*http2ClientConn).newStream
方法中均有提到。
鏈接剛創建時,發送http2FrameSettings
數據幀,告知server每一個數據流的可發送窗口大小爲http2transportDefaultStreamFlow
。
在newStream
時,數據流默認的可接收窗口大小爲http2transportDefaultStreamFlow
。
三、將鏈接和數據流分別須要增長的窗口大小經過http2WindowUpdateFrame
數據幀告知server。
if connAdd != 0 || streamAdd != 0 { cc.wmu.Lock() defer cc.wmu.Unlock() if connAdd != 0 { cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd)) } if streamAdd != 0 { cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd)) } cc.bw.Flush() }
以上就是server向client發送數據的流控制邏輯。
前篇中(*http2ClientConn).roundTrip
未對(*http2clientStream).writeRequestBody
進行分析,下面咱們看看該方法的源碼:
func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { cc := cs.cc sentEnd := false // whether we sent the final DATA frame w/ END_STREAM // 此處省略代碼 req := cs.req hasTrailers := req.Trailer != nil remainLen := http2actualContentLength(req) hasContentLen := remainLen != -1 var sawEOF bool for !sawEOF { n, err := body.Read(buf[:len(buf)-1]) // 此處省略代碼 remain := buf[:n] for len(remain) > 0 && err == nil { var allowed int32 allowed, err = cs.awaitFlowControl(len(remain)) switch { case err == http2errStopReqBodyWrite: return err case err == http2errStopReqBodyWriteAndCancel: cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) return err case err != nil: return err } cc.wmu.Lock() data := remain[:allowed] remain = remain[allowed:] sentEnd = sawEOF && len(remain) == 0 && !hasTrailers err = cc.fr.WriteData(cs.ID, sentEnd, data) if err == nil { err = cc.bw.Flush() } cc.wmu.Unlock() } if err != nil { return err } } // 此處省略代碼 return err }
上面的邏輯可簡單總結爲:不停的讀取請求body而後將讀取的內容經過 cc.fr.WriteData
轉爲http2FrameData
數據幀發送給server,直到請求body讀完爲止。其中和流控制有關的方法是awaitFlowControl
,下面咱們對該方法進行分析。
此方法的主要做用是等待當前數據流可寫窗口有容量可以寫入數據。
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { cc := cs.cc cc.mu.Lock() defer cc.mu.Unlock() for { if cc.closed { return 0, http2errClientConnClosed } if cs.stopReqBody != nil { return 0, cs.stopReqBody } if err := cs.checkResetOrDone(); err != nil { return 0, err } if a := cs.flow.available(); a > 0 { take := a if int(take) > maxBytes { take = int32(maxBytes) // can't truncate int; take is int32 } if take > int32(cc.maxFrameSize) { take = int32(cc.maxFrameSize) } cs.flow.take(take) return take, nil } cc.cond.Wait() } }
根據源碼能夠知道,數據流被關閉或者中止發送請求body,則當前數據流沒法寫入數據。當數據流狀態正常時,又分爲兩種狀況:
take
。上面的第二種狀況在收到http2WindowUpdateFrame數據幀這一節中提到過。
server讀取當前數據流的數據後會向client對應數據流發送http2WindowUpdateFrame
數據幀,client收到該數據幀後會增大對應數據流可寫窗口,並執行cc.cond.Broadcast()
喚醒因發送數據已達流控制上限而等待的數據流繼續發送數據。
以上就是client向server發送數據的流控制邏輯。
流控制可分爲兩個步驟:
http2FrameSettings
數據幀和http2WindowUpdateFrame
數據幀告知對方當前鏈接讀寫窗口大小以及鏈接中數據流讀寫窗口大小。http2WindowUpdateFrame
數據幀控制另外一端的寫窗口大小。前篇和中篇已經完成,下一期將對http2.0標頭壓縮進行分析。
最後,衷心但願本文可以對各位讀者有必定的幫助。
注:寫本文時, 筆者所用go版本爲: go1.14.2