Go 標準庫剖析 1(transport http 請求的承載者)

使用golang net/http庫發送http請求,最後都是調用 transport的 RoundTrip方法golang

type RoundTripper interface {
    RoundTrip(*Request) (*Response, error)
}

RoundTrip executes a single HTTP transaction, returning the Response for the request req. (RoundTrip 表明一個http事務,給一個請求返回一個響應)
說白了,就是你給它一個request,它給你一個response併發

下面咱們來看一下他的實現,對應源文件net/http/transport.go,我感受這裏是http package裏面的精髓所在,go裏面一個struct就跟一個類同樣,transport這個類長這樣的app

type Transport struct {
    idleMu     sync.Mutex
    wantIdle   bool // user has requested to close all idle conns
    idleConn   map[connectMethodKey][]*persistConn
    idleConnCh map[connectMethodKey]chan *persistConn

    reqMu       sync.Mutex
    reqCanceler map[*Request]func()

    altMu    sync.RWMutex
    altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper
    //Dial獲取一個tcp 鏈接,也就是net.Conn結構,你就記住能夠往裏面寫request
    //而後從裏面搞到response就好了
    Dial func(network, addr string) (net.Conn, error)
}

篇幅所限, https和代理相關的我就忽略了, 兩個 mapidleConnidleConnChidleConn 是保存從 connectMethodKey (表明着不一樣的協議 不一樣的host,也就是不一樣的請求)到 persistConn 的映射, idleConnCh 用來在併發http請求的時候在多個 goroutine 裏面相互發送持久鏈接,也就是說, 這些持久鏈接是能夠重複利用的, 你的http請求用某個persistConn用完了,經過這個channel發送給其餘http請求使用這個persistConn,而後咱們找到transportRoundTrip方法tcp

func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
    ...
    pconn, err := t.getConn(req, cm)
    if err != nil {
        t.setReqCanceler(req, nil)
        req.closeBody()
        return nil, err
    }

    return pconn.roundTrip(treq)
}

前面對輸入的錯誤處理部分咱們忽略, 其實就2步,先獲取一個TCP長鏈接,所謂TCP長鏈接就是三次握手創建鏈接後不close而是一直保持重複使用(節約環保) 而後調用這個持久鏈接persistConn 這個struct的roundTrip方法ide

咱們跟蹤第一步高併發

func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error) {
    if pc := t.getIdleConn(cm); pc != nil {
        // set request canceler to some non-nil function so we
        // can detect whether it was cleared between now and when
        // we enter roundTrip
        t.setReqCanceler(req, func() {})
        return pc, nil
    }
 
    type dialRes struct {
        pc  *persistConn
        err error
    }
    dialc := make(chan dialRes)
    //定義了一個發送 persistConn的channel

    prePendingDial := prePendingDial
    postPendingDial := postPendingDial

    handlePendingDial := func() {
        if prePendingDial != nil {
            prePendingDial()
        }
        go func() {
            if v := <-dialc; v.err == nil {
                t.putIdleConn(v.pc)
            }
            if postPendingDial != nil {
                postPendingDial()
            }
        }()
    }

    cancelc := make(chan struct{})
    t.setReqCanceler(req, func() { close(cancelc) })
 
    // 啓動了一個goroutine, 這個goroutine 獲取裏面調用dialConn搞到
    // persistConn, 而後發送到上面創建的channel  dialc裏面,    
    go func() {
        pc, err := t.dialConn(cm)
        dialc <- dialRes{pc, err}
    }()

    idleConnCh := t.getIdleConnCh(cm)
    select {
    case v := <-dialc:
        // dialc 咱們的 dial 方法先搞到經過 dialc通道發過來了
        return v.pc, v.err
    case pc := <-idleConnCh:
        // 這裏表明其餘的http請求用完了歸還的persistConn經過idleConnCh這個    
        // channel發送來的
        handlePendingDial()
        return pc, nil
    case <-req.Cancel:
        handlePendingDial()
        return nil, errors.New("net/http: request canceled while waiting for connection")
    case <-cancelc:
        handlePendingDial()
        return nil, errors.New("net/http: request canceled while waiting for connection")
    }
}

這裏面的代碼寫的頗有講究 , 上面代碼裏面我也註釋了, 定義了一個發送 persistConn的channel dialc, 啓動了一個goroutine, 這個goroutine 獲取裏面調用dialConn搞到persistConn, 而後發送到dialc裏面,主協程goroutineselect裏面監聽多個channel,看看哪一個通道里面先發過來 persistConn,就用哪一個,而後returnoop

這裏要注意的是 idleConnCh 這個通道里面發送來的是其餘的http請求用完了歸還的persistConn, 若是從這個通道里面搞到了,dialc這個通道也等着發呢,不能浪費,就經過handlePendingDial這個方法把dialc通道里面的persistConn也發到idleConnCh,等待後續給其餘http請求使用。 post

還有就是,讀者能夠翻一下代碼,每一個新建的persistConn的時候都把tcp鏈接裏地輸入流,和輸出流用br(br *bufio.Reader),和bw(bw *bufio.Writer)包裝了一下,往bw寫就寫到tcp輸入流裏面了,讀輸出流也是經過br讀,並啓動了讀循環和寫循環學習

pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF})
pconn.bw = bufio.NewWriter(pconn.conn)
go pconn.readLoop()
go pconn.writeLoop()

咱們跟蹤第二步pconn.roundTrip 調用這個持久鏈接persistConn 這個struct的roundTrip方法。
先瞄一下 persistConn 這個structthis

type persistConn struct {
    t        *Transport
    cacheKey connectMethodKey
    conn     net.Conn
    tlsState *tls.ConnectionState
    br       *bufio.Reader       // 從tcp輸出流裏面讀
    sawEOF   bool                // whether we've seen EOF from conn; owned by readLoop
    bw       *bufio.Writer       // 寫到tcp輸入流
     reqch    chan requestAndChan // 主goroutine 往channnel裏面寫,讀循環從     
                                 // channnel裏面接受
    writech  chan writeRequest   // 主goroutine 往channnel裏面寫                                      
                                 // 寫循環從channel裏面接受
    closech  chan struct{}       // 通知關閉tcp鏈接的channel 
    
    writeErrCh chan error

    lk                   sync.Mutex // guards following fields
    numExpectedResponses int
    closed               bool // whether conn has been closed
    broken               bool // an error has happened on this connection; marked broken so it's not reused.
    canceled             bool // whether this conn was broken due a CancelRequest
    // mutateHeaderFunc is an optional func to modify extra
    // headers on each outbound request before it's written. (the
    // original Request given to RoundTrip is not modified)
    mutateHeaderFunc func(Header)
}

裏面是各類channel, 用的是出神入化, 各位要好好理解一下, 我這裏畫一下

這裏有三個goroutine,分別用三個圓圈表示, channel用箭頭表示

仔細看

有兩個channel writeRequestrequestAndChan

type writeRequest struct {
    req *transportRequest
    ch  chan<- error
}

主goroutine 往writeRequest裏面寫,寫循環從writeRequest裏面接受

type responseAndError struct {
    res *Response
    err error
}

type requestAndChan struct {
    req *Request
    ch  chan responseAndError
    addedGzip bool
}

主goroutine 往requestAndChan裏面寫,讀循環從requestAndChan裏面接受。

注意這裏的channel都是雙向channel,也就是channel 的struct裏面有一個chan類型的字段, 好比 reqch chan requestAndChan 這裏的 requestAndChan 裏面的 ch chan responseAndError

這個是很牛叉,主 goroutine 經過 reqch 發送requestAndChan 給讀循環,而後讀循環搞到response後經過 requestAndChan 裏面的通道responseAndError把response返給主goroutine,因此我畫了一個雙向箭頭。

咱們研究一下代碼,我理解下來其實就是三個goroutine經過channel互相協做的過程。

主循環:

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
    ... 忽略
    // Write the request concurrently with waiting for a response,
    // in case the server decides to reply before reading our full
    // request body.
    writeErrCh := make(chan error, 1)
    pc.writech <- writeRequest{req, writeErrCh}
    //把request發送給寫循環
    resc := make(chan responseAndError, 1)
    pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
    //發送給讀循環
    var re responseAndError
    var respHeaderTimer <-chan time.Time
    cancelChan := req.Request.Cancel
WaitResponse:
    for {
        select {
        case err := <-writeErrCh:
            if isNetWriteError(err) {
                //寫循環經過這個channel報告錯誤
                select {
                case re = <-resc:
                    pc.close()
                    break WaitResponse
                case <-time.After(50 * time.Millisecond):
                    // Fall through.
                }
            }
            if err != nil {
                re = responseAndError{nil, err}
                pc.close()
                break WaitResponse
            }
            if d := pc.t.ResponseHeaderTimeout; d > 0 {
                timer := time.NewTimer(d)
                defer timer.Stop() // prevent leaks
                respHeaderTimer = timer.C
            }
        case <-pc.closech:
            // 若是長鏈接掛了, 這裏的channel有數據, 進入這個case, 進行處理
            
            select {
            case re = <-resc:
                if fn := testHookPersistConnClosedGotRes; fn != nil {
                    fn()
                }
            default:
                re = responseAndError{err: errClosed}
                if pc.isCanceled() {
                    re = responseAndError{err: errRequestCanceled}
                }
            }
            break WaitResponse
        case <-respHeaderTimer:
            pc.close()
            re = responseAndError{err: errTimeout}
            break WaitResponse
            // 若是timeout,這裏的channel有數據, break掉for循環
        case re = <-resc:
            break WaitResponse
           // 獲取到讀循環的response, break掉 for循環
        case <-cancelChan:
            pc.t.CancelRequest(req.Request)
            cancelChan = nil
        }
    }

    if re.err != nil {
        pc.t.setReqCanceler(req.Request, nil)
    }
    return re.res, re.err
}

這段代碼主要就幹了三件事

  • 主goroutine ->requestAndChan -> 讀循環goroutine

  • 主goroutine ->writeRequest-> 寫循環goroutine

  • 主goroutine 經過select 監聽各個channel上的數據, 好比請求取消, timeout,長鏈接掛了,寫流出錯,讀流出錯, 都是其餘goroutine 發送過來的, 跟中斷同樣,而後相應處理,上面也提到了,有些channel是主goroutine經過channel發送給其餘goroutine的struct裏面包含的channel, 好比 case err := <-writeErrCh: case re = <-resc:

讀循環代碼:

func (pc *persistConn) readLoop() {
    
    ... 忽略
    alive := true
    for alive {
        
        ... 忽略
        rc := <-pc.reqch

        var resp *Response
        if err == nil {
            resp, err = ReadResponse(pc.br, rc.req)
            if err == nil && resp.StatusCode == 100 {
                //100  Continue  初始的請求已經接受,客戶應當繼續發送請求的其 
                // 餘部分
                resp, err = ReadResponse(pc.br, rc.req)
                // 讀pc.br(tcp輸出流)中的數據,這裏的代碼在response裏面
                //解析statusCode,頭字段, 轉成標準的內存中的response 類型
                //  http在tcp數據流裏面,head和body以 /r/n/r/n分開, 各個頭
                // 字段 以/r/n分開
            }
        }

        if resp != nil {
            resp.TLS = pc.tlsState
        }

        ...忽略
        //上面處理一些http協議的一些邏輯行爲,
        rc.ch <- responseAndError{resp, err} //把讀到的response返回給    
                                             //主goroutine

        .. 忽略
        //忽略部分, 處理cancel req中斷, 發送idleConnCh歸還pc(持久鏈接)到持久鏈接池中(map)    
    pc.close()
}

無關代碼忽略,這段代碼主要乾了一件事情

讀循環goroutine 經過channel requestAndChan 接受主goroutine發送的request(rc := <-pc.reqch), 並從tcp輸出流中讀取response, 而後反序列化到結構體中, 最後經過channel 返給主goroutine (rc.ch <- responseAndError{resp, err} )

func (pc *persistConn) writeLoop() {
    for {
        select {
        case wr := <-pc.writech:   //接受主goroutine的 request
            if pc.isBroken() {
                wr.ch <- errors.New("http: can't write HTTP request on broken connection")
                continue
            }
            err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)   //寫入tcp輸入流
            if err == nil {
                err = pc.bw.Flush()
            }
            if err != nil {
                pc.markBroken()
                wr.req.Request.closeBody()
            }
            pc.writeErrCh <- err 
            wr.ch <- err         //  出錯的時候返給主goroutineto 
        case <-pc.closech:
            return
        }
    }
}

寫循環就更簡單了,select channel中主gouroutine的request,而後寫入tcp輸入流,若是出錯了,channel 通知調用者。

總體看下來,過程都很簡單,可是代碼中有不少值得咱們學習的地方,好比高併發請求如何複用tcp鏈接,這裏是鏈接池的作法,若是使用多個 goroutine相互協做完成一個http請求,出現錯誤的時候如何通知調用者中斷錯誤,代碼風格也有不少能夠借鑑的地方。

我打算寫一個系列,全面剖析go標準庫裏面的精彩之處,分享給你們。

相關文章
相關標籤/搜索