使用golang net/http庫發送http請求,最後都是調用 transport的 RoundTrip方法golang
type RoundTripper interface { RoundTrip(*Request) (*Response, error) }
RoundTrip executes a single HTTP transaction, returning the Response for the request req.
(RoundTrip 表明一個http事務,給一個請求返回一個響應)
說白了,就是你給它一個request,它給你一個response併發
下面咱們來看一下他的實現,對應源文件net/http/transport.go
,我感受這裏是http package裏面的精髓所在,go裏面一個struct就跟一個類同樣,transport這個類長這樣的app
type Transport struct { idleMu sync.Mutex wantIdle bool // user has requested to close all idle conns idleConn map[connectMethodKey][]*persistConn idleConnCh map[connectMethodKey]chan *persistConn reqMu sync.Mutex reqCanceler map[*Request]func() altMu sync.RWMutex altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper //Dial獲取一個tcp 鏈接,也就是net.Conn結構,你就記住能夠往裏面寫request //而後從裏面搞到response就好了 Dial func(network, addr string) (net.Conn, error) }
篇幅所限, https和代理相關的我就忽略了, 兩個 map
爲 idleConn
、idleConnCh
,idleConn
是保存從 connectMethodKey (表明着不一樣的協議 不一樣的host,也就是不一樣的請求)到 persistConn 的映射, idleConnCh
用來在併發http請求的時候在多個 goroutine 裏面相互發送持久鏈接,也就是說, 這些持久鏈接是能夠重複利用的, 你的http請求用某個persistConn
用完了,經過這個channel
發送給其餘http請求使用這個persistConn
,而後咱們找到transport
的RoundTrip
方法tcp
func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) { ... pconn, err := t.getConn(req, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } return pconn.roundTrip(treq) }
前面對輸入的錯誤處理部分咱們忽略, 其實就2步,先獲取一個TCP長鏈接,所謂TCP長鏈接就是三次握手創建鏈接後不close
而是一直保持重複使用(節約環保) 而後調用這個持久鏈接persistConn 這個struct的roundTrip方法ide
咱們跟蹤第一步高併發
func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error) { if pc := t.getIdleConn(cm); pc != nil { // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func() {}) return pc, nil } type dialRes struct { pc *persistConn err error } dialc := make(chan dialRes) //定義了一個發送 persistConn的channel prePendingDial := prePendingDial postPendingDial := postPendingDial handlePendingDial := func() { if prePendingDial != nil { prePendingDial() } go func() { if v := <-dialc; v.err == nil { t.putIdleConn(v.pc) } if postPendingDial != nil { postPendingDial() } }() } cancelc := make(chan struct{}) t.setReqCanceler(req, func() { close(cancelc) }) // 啓動了一個goroutine, 這個goroutine 獲取裏面調用dialConn搞到 // persistConn, 而後發送到上面創建的channel dialc裏面, go func() { pc, err := t.dialConn(cm) dialc <- dialRes{pc, err} }() idleConnCh := t.getIdleConnCh(cm) select { case v := <-dialc: // dialc 咱們的 dial 方法先搞到經過 dialc通道發過來了 return v.pc, v.err case pc := <-idleConnCh: // 這裏表明其餘的http請求用完了歸還的persistConn經過idleConnCh這個 // channel發送來的 handlePendingDial() return pc, nil case <-req.Cancel: handlePendingDial() return nil, errors.New("net/http: request canceled while waiting for connection") case <-cancelc: handlePendingDial() return nil, errors.New("net/http: request canceled while waiting for connection") } }
這裏面的代碼寫的頗有講究 , 上面代碼裏面我也註釋了, 定義了一個發送 persistConn
的channel dialc
, 啓動了一個goroutine
, 這個goroutine
獲取裏面調用dialConn
搞到persistConn
, 而後發送到dialc
裏面,主協程goroutine
在 select
裏面監聽多個channel
,看看哪一個通道里面先發過來 persistConn
,就用哪一個,而後return
。oop
這裏要注意的是 idleConnCh
這個通道里面發送來的是其餘的http請求用完了歸還的persistConn
, 若是從這個通道里面搞到了,dialc
這個通道也等着發呢,不能浪費,就經過handlePendingDial
這個方法把dialc
通道里面的persistConn
也發到idleConnCh
,等待後續給其餘http請求使用。 post
還有就是,讀者能夠翻一下代碼,每一個新建的persistConn的時候都把tcp鏈接裏地輸入流,和輸出流用br(br *bufio.Reader
),和bw(bw *bufio.Writer
)包裝了一下,往bw寫就寫到tcp輸入流裏面了,讀輸出流也是經過br讀,並啓動了讀循環和寫循環學習
pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF}) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() go pconn.writeLoop()
咱們跟蹤第二步pconn.roundTrip
調用這個持久鏈接persistConn 這個struct的roundTrip
方法。
先瞄一下 persistConn
這個structthis
type persistConn struct { t *Transport cacheKey connectMethodKey conn net.Conn tlsState *tls.ConnectionState br *bufio.Reader // 從tcp輸出流裏面讀 sawEOF bool // whether we've seen EOF from conn; owned by readLoop bw *bufio.Writer // 寫到tcp輸入流 reqch chan requestAndChan // 主goroutine 往channnel裏面寫,讀循環從 // channnel裏面接受 writech chan writeRequest // 主goroutine 往channnel裏面寫 // 寫循環從channel裏面接受 closech chan struct{} // 通知關閉tcp鏈接的channel writeErrCh chan error lk sync.Mutex // guards following fields numExpectedResponses int closed bool // whether conn has been closed broken bool // an error has happened on this connection; marked broken so it's not reused. canceled bool // whether this conn was broken due a CancelRequest // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) mutateHeaderFunc func(Header) }
裏面是各類channel, 用的是出神入化, 各位要好好理解一下, 我這裏畫一下
這裏有三個goroutine,分別用三個圓圈表示, channel用箭頭表示
有兩個channel writeRequest
和 requestAndChan
type writeRequest struct { req *transportRequest ch chan<- error }
主goroutine 往writeRequest裏面寫,寫循環從writeRequest裏面接受
type responseAndError struct { res *Response err error } type requestAndChan struct { req *Request ch chan responseAndError addedGzip bool }
主goroutine 往requestAndChan裏面寫,讀循環從requestAndChan裏面接受。
注意這裏的channel都是雙向channel,也就是channel 的struct裏面有一個chan類型的字段, 好比 reqch chan requestAndChan
這裏的 requestAndChan 裏面的 ch chan responseAndError
。
這個是很牛叉,主 goroutine 經過 reqch 發送requestAndChan 給讀循環,而後讀循環搞到response後經過 requestAndChan 裏面的通道responseAndError把response返給主goroutine,因此我畫了一個雙向箭頭。
咱們研究一下代碼,我理解下來其實就是三個goroutine經過channel互相協做的過程。
主循環:
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { ... 忽略 // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh} //把request發送給寫循環 resc := make(chan responseAndError, 1) pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} //發送給讀循環 var re responseAndError var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel WaitResponse: for { select { case err := <-writeErrCh: if isNetWriteError(err) { //寫循環經過這個channel報告錯誤 select { case re = <-resc: pc.close() break WaitResponse case <-time.After(50 * time.Millisecond): // Fall through. } } if err != nil { re = responseAndError{nil, err} pc.close() break WaitResponse } if d := pc.t.ResponseHeaderTimeout; d > 0 { timer := time.NewTimer(d) defer timer.Stop() // prevent leaks respHeaderTimer = timer.C } case <-pc.closech: // 若是長鏈接掛了, 這裏的channel有數據, 進入這個case, 進行處理 select { case re = <-resc: if fn := testHookPersistConnClosedGotRes; fn != nil { fn() } default: re = responseAndError{err: errClosed} if pc.isCanceled() { re = responseAndError{err: errRequestCanceled} } } break WaitResponse case <-respHeaderTimer: pc.close() re = responseAndError{err: errTimeout} break WaitResponse // 若是timeout,這裏的channel有數據, break掉for循環 case re = <-resc: break WaitResponse // 獲取到讀循環的response, break掉 for循環 case <-cancelChan: pc.t.CancelRequest(req.Request) cancelChan = nil } } if re.err != nil { pc.t.setReqCanceler(req.Request, nil) } return re.res, re.err }
這段代碼主要就幹了三件事
主goroutine ->requestAndChan -> 讀循環goroutine
主goroutine ->writeRequest-> 寫循環goroutine
主goroutine 經過select 監聽各個channel上的數據, 好比請求取消, timeout,長鏈接掛了,寫流出錯,讀流出錯, 都是其餘goroutine 發送過來的, 跟中斷同樣,而後相應處理,上面也提到了,有些channel是主goroutine經過channel發送給其餘goroutine的struct裏面包含的channel, 好比 case err := <-writeErrCh:
case re = <-resc:
讀循環代碼:
func (pc *persistConn) readLoop() { ... 忽略 alive := true for alive { ... 忽略 rc := <-pc.reqch var resp *Response if err == nil { resp, err = ReadResponse(pc.br, rc.req) if err == nil && resp.StatusCode == 100 { //100 Continue 初始的請求已經接受,客戶應當繼續發送請求的其 // 餘部分 resp, err = ReadResponse(pc.br, rc.req) // 讀pc.br(tcp輸出流)中的數據,這裏的代碼在response裏面 //解析statusCode,頭字段, 轉成標準的內存中的response 類型 // http在tcp數據流裏面,head和body以 /r/n/r/n分開, 各個頭 // 字段 以/r/n分開 } } if resp != nil { resp.TLS = pc.tlsState } ...忽略 //上面處理一些http協議的一些邏輯行爲, rc.ch <- responseAndError{resp, err} //把讀到的response返回給 //主goroutine .. 忽略 //忽略部分, 處理cancel req中斷, 發送idleConnCh歸還pc(持久鏈接)到持久鏈接池中(map) pc.close() }
無關代碼忽略,這段代碼主要乾了一件事情
讀循環goroutine 經過channel requestAndChan 接受主goroutine發送的request(
rc := <-pc.reqch
), 並從tcp輸出流中讀取response, 而後反序列化到結構體中, 最後經過channel 返給主goroutine (rc.ch <- responseAndError{resp, err}
)
func (pc *persistConn) writeLoop() { for { select { case wr := <-pc.writech: //接受主goroutine的 request if pc.isBroken() { wr.ch <- errors.New("http: can't write HTTP request on broken connection") continue } err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) //寫入tcp輸入流 if err == nil { err = pc.bw.Flush() } if err != nil { pc.markBroken() wr.req.Request.closeBody() } pc.writeErrCh <- err wr.ch <- err // 出錯的時候返給主goroutineto case <-pc.closech: return } } }
寫循環就更簡單了,select channel中主gouroutine的request,而後寫入tcp輸入流,若是出錯了,channel 通知調用者。
總體看下來,過程都很簡單,可是代碼中有不少值得咱們學習的地方,好比高併發請求如何複用tcp鏈接,這裏是鏈接池的作法,若是使用多個 goroutine相互協做完成一個http請求,出現錯誤的時候如何通知調用者中斷錯誤,代碼風格也有不少能夠借鑑的地方。
我打算寫一個系列,全面剖析go標準庫裏面的精彩之處,分享給你們。