原文地址: https://neojos.com/blog/2019/...
文章介紹最近工做中遇到的一個問題,其中50%以上的內容都是Go的源代碼。剩下部分是本身的理解,若是有理解錯誤或探討的地方,但願你們指正。git
問題:針對同一個接口請求,絕大多數均可以正常處理,但卻有零星的幾請求總是處理失敗,錯誤信息返回 context canceled
。重放失敗的請求,錯誤必現。github
根據返回的錯誤信息,再結合咱們工程中使用的golang.org/x/net/context/ctxhttp
包。猜想多是在請求處理過程當中,異常調用了context
包的CancelFunc
方法。同時,咱們對比了失敗請求和成功請求的區別,發現失敗請求的Response.Body
數據量很是大。golang
以後在Google上找到了問題的緣由,還真是很容易被忽略,這裏是文章的連接:Context cancellation flake。爲了解決將來點進去404的悲劇,本文截取了其中的代碼...api
代碼核心邏輯:向某個地址發送Get
請求,並打印響應內容。其中函數fetch
用於發送請求,readBody
用於讀取響應。例子中處理請求的邏輯結構,跟咱們項目中的徹底一致。app
fetch
方法中使用了默認的http.DefaultClient
做爲http Client
,而它自身是一個「零值」,並無指定請求的超時時間。因此,例子中又經過context.WithTimeout
對超時時間進行了設置。異步
代碼中使用context.WithTimeout
來取消請求,存在兩種可能狀況。第一種,處理的時間超過了指定的超時時間,程序返回deadlineExceededError
錯誤,錯誤描述context deadline exceeded
。另外一種是手動調用CancelFunc
方法取消執行,返回Canceled
錯誤,描述信息context canceled
。ide
在fetch
代碼的處理邏輯中,當程序返回http.Response
時,會執行cancel()
方法,用於標記請求被取消。若是在readBody
沒讀取完返回的數據以前,context
被cancel掉了,就會返回context canceled
錯誤。側面也反映了,關閉Context.Done()
與讀取http.Response
是一個時間賽跑的過程…..函數
package main import ( "context" "io/ioutil" "log" "net/http" "time" "golang.org/x/net/context/ctxhttp" ) func main() { req, err := http.NewRequest("GET", "https://swapi.co/api/people/1", nil) if err != nil { log.Fatal(err) } resp, err := fetch(req) if err != nil { log.Fatal(err) } log.Print(readBody(resp)) } func fetch(req *http.Request) (*http.Response, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return ctxhttp.Do(ctx, http.DefaultClient, req) } func readBody(resp *http.Response) (string, error) { b, err := ioutil.ReadAll(resp.Body) if err != nil { return "", err } return string(b), err }
問題的解決辦法以下,做者也附帶了Test Case。 請求包括髮送請求和讀取響應兩部分,CancelFunc
應該在請求被處理完成後調用。否則,就會發生上面遇到的問題。oop
In case it's still unclear, you need to wrap both the "do request" + "read body" inside the same cancellation context. The "defer cancel" should encompass both of them, sort of atomically, so the idea is to take it out of your fetch, one level up.
咱們準備經過控制請求返回的內容,來驗證咱們的結論。咱們在本地啓動一個新服務,並對外提供一個接口,來替代上述代碼中的請求地址。fetch
代碼以下,其中info
接口實現了下載resource文件的功能。咱們經過控制resource文件的大小,來控制返回response大小的目的。
package main import ( "github.com/gin-gonic/gin" "io/ioutil" "log" ) func main() { router := gin.Default() router.GET("/info", func(c *gin.Context) { data, err := ioutil.ReadFile("./resource") if err != nil { log.Println("read file err:", err.Error()) return } log.Println("send file resource") c.Writer.Write(data) }) router.Run(":8000") }
首先,咱們向resource文件中寫入大量的內容,從新執行上述代碼。錯誤日誌輸出:2019/06/13 21:12:37 context canceled
。確實重現了!
而後,將resource文件內容刪除到只剩一行數據,請求又能夠正常處理了。
req, err := http.NewRequest("GET", "http://127.0.0.1:8000/info", nil)
總結:上述錯誤代碼的執行結果,依賴請求返回的數據量大小。
Bug
根據上述分析,咱們對代碼進行調整:將defer cancel()
調整到程序讀取完http.Response.Body
以後執行。具體修改以下:
fetch
函數中,將cancel
函數做爲返回值,返回給調用方。func fetch(req *http.Request) (context.CancelFunc, *http.Response, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) resp, err := ctxhttp.Do(ctx, http.DefaultClient, req) return cancel, resp, err }
readBody
讀取完數據以後,再調用cancel
方法。cancel, resp, err := fetch(req) if err != nil { log.Fatal(err) } defer cancel() log.Print(readBody(resp))
跟預期的同樣,再接口返回的數據量很大的狀況下,請求也能夠被正常處理。
context deadline exceeded
咱們將代碼中context.WithTimeout
的超時時間由5*time.Second
調整爲1*time.Millisecond
。執行代碼,輸出錯誤日誌:2019/06/13 21:29:11 context deadline exceeded
。
context canceled
參考上述代碼。
net/http: request canceled
工做中常見的錯誤之一:net/http: request canceled (Client.Timeout exceeded while awaiting headers)
,這是由http Client
設置的超時時間決定的。接下來咱們重現一下這個error。
fetch
方法中,咱們聲明一個自定義的client
,並指定Timeout
屬性爲time.Millisecond
,來替換代碼中默認的client
。
func fetch(req *http.Request) (context.CancelFunc, *http.Response, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) customClient := &http.Client{ Timeout: time.Millisecond, } resp, err := ctxhttp.Do(ctx, customClient, req) return cancel, resp, err }
程序執行輸出:
2019/06/18 09:20:53 Get http://127.0.0.1:8000/info: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
以下是http.Client
結構體中對Timeout
的註釋,它包括建立鏈接、請求跳轉、讀取響應的所有時間。
// Timeout specifies a time limit for requests made by this // Client. The timeout includes connection time, any // redirects, and reading the response body. The timer remains // running after Get, Head, Post, or Do return and will // interrupt reading of the Response.Body. // // A Timeout of zero means no timeout. // // The Client cancels requests to the underlying Transport // as if the Request's Context ended. // // For compatibility, the Client will also use the deprecated // CancelRequest method on Transport if found. New // RoundTripper implementations should use the Request's Context // for cancelation instead of implementing CancelRequest.
context
原理下面是context
的接口類型,由於Done()
的註解很好的解釋了context
最本質的用法,因此,特地只將這部分貼出來。在for
循環體內,執行每次循環時,使用select
方法來監聽Done()
是否被關閉了。若是關閉了,就退出循環。在ctxhttp
包內,也是經過這種用法來實現對請求的控制的。
type Context interface { Deadline() (deadline time.Time, ok bool) // Done returns a channel that's closed when work done on behalf of this // context should be canceled. Done may return nil if this context can // never be canceled. Successive calls to Done return the same value. // // WithCancel arranges for Done to be closed when cancel is called; // WithDeadline arranges for Done to be closed when the deadline // expires; WithTimeout arranges for Done to be closed when the timeout // elapses. // // Done is provided for use in select statements: // // // Stream generates values with DoSomething and sends them to out // // until DoSomething returns an error or ctx.Done is closed. // func Stream(ctx context.Context, out chan<- Value) error { // for { // v, err := DoSomething(ctx) // if err != nil { // return err // } // select { // case <-ctx.Done(): // return ctx.Err() // case out <- v: // } // } // } // // See https://blog.golang.org/pipelines for more examples of how to use // a Done channel for cancelation. Done() <-chan struct{} Err() error Value(key interface{}) interface{} }
由於有業務邏輯在監聽context.Done()
,因此,必然須要有邏輯來Close
調這個Channel
。而整個context
包也圍繞者兩個方面提供了一些方法,包括啓動定時器來關閉context.Done()
。參考註釋中提到的WithCancel
、WithDeadline
以及WithTimeout
。
下面是用來獲取cancelCtx
的方法,咱們能夠了解到context
內部被封裝的三種類型,分別是cancelCtx
、timerCtx
以及valueCtx
。
// parentCancelCtx follows a chain of parent references until it finds a // *cancelCtx. This function understands how each of the concrete types in this // package represents its parent. func parentCancelCtx(parent Context) (*cancelCtx, bool) { for { switch c := parent.(type) { case *cancelCtx: return c, true case *timerCtx: return &c.cancelCtx, true case *valueCtx: parent = c.Context default: return nil, false } } }
查看這三種類型的聲明,內部都封裝了一個Context
值,用來存儲父Context
。偏偏也是經過這個字段,將整個Context
串了起來。其中timerCtx
是基於cancelCtx
作的擴展,在其基礎上添加了計時的功能。另外,cancelCtx
節點中的children
用於保存它全部的子節點。
// A cancelCtx can be canceled. When canceled, it also cancels any children // that implement canceler. type cancelCtx struct { Context mu sync.Mutex // protects following fields done chan struct{} // created lazily, closed by first cancel call children map[canceler]struct{} // set to nil by the first cancel call err error // set to non-nil by the first cancel call } // A timerCtx carries a timer and a deadline. It embeds a cancelCtx to // implement Done and Err. It implements cancel by stopping its timer then // delegating to cancelCtx.cancel. type timerCtx struct { cancelCtx timer *time.Timer // Under cancelCtx.mu. deadline time.Time } // A valueCtx carries a key-value pair. It implements Value for that key and // delegates all other calls to the embedded Context. type valueCtx struct { Context key, val interface{} }
接下來,咱們瞭解一下,將一個新的child context
節點掛到parent context
的過程。
首先,程序判斷parent
的數據類型,若是是上述三種類型之一,且沒有錯誤信息,直接將child
存儲到parnet.children
的map
結構中。
若是parnet
不是上述類型之一,程序會啓動一個Goroutine
異步監聽parent.Done()
和child.Done()
是否被關閉。個人理解是,由於此時parent
實際上是background
和todo
中的一種(我稱它爲頂級parnet
),而它們內部都沒有字段用於存儲和child
的關係。因此,在程序select
中綁定了它們的對應關係。另外,一個頂級parent
也只能有一個child
,而這個child
應該是上述三種類型中的一種。只有這種一對一的狀況,當child.Done()
被關閉的時候,整個select
退出纔是合理的。
// propagateCancel arranges for child to be canceled when parent is. func propagateCancel(parent Context, child canceler) { if parent.Done() == nil { return // parent is never canceled } if p, ok := parentCancelCtx(parent); ok { p.mu.Lock() if p.err != nil { // parent has already been canceled child.cancel(false, p.err) } else { if p.children == nil { p.children = make(map[canceler]struct{}) } p.children[child] = struct{}{} } p.mu.Unlock() } else { go func() { select { case <-parent.Done(): child.cancel(false, parent.Err()) case <-child.Done(): } }() } }
咱們接着看一下WithCancel
和WithDeadline
這兩個方法。前者經過調用CancelFunc
來取消。後者在此基礎上,加了一個timer
的定時觸發取消機制。若是WithDeadline
參數d
自己就是一個過去的時間點,那麼WithDeadline
和WithCancel
效果相同。
// WithCancel returns a copy of parent with a new Done channel. The returned // context's Done channel is closed when the returned cancel function is called // or when the parent context's Done channel is closed, whichever happens first. // // Canceling this context releases resources associated with it, so code should // call cancel as soon as the operations running in this Context complete. func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { c := newCancelCtx(parent) propagateCancel(parent, &c) return &c, func() { c.cancel(true, Canceled) } } // WithDeadline returns a copy of the parent context with the deadline adjusted // to be no later than d. If the parent's deadline is already earlier than d, // WithDeadline(parent, d) is semantically equivalent to parent. The returned // context's Done channel is closed when the deadline expires, when the returned // cancel function is called, or when the parent context's Done channel is // closed, whichever happens first. // // Canceling this context releases resources associated with it, so code should // call cancel as soon as the operations running in this Context complete. func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) { if cur, ok := parent.Deadline(); ok && cur.Before(d) { // The current deadline is already sooner than the new one. return WithCancel(parent) } c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: d, } propagateCancel(parent, c) dur := time.Until(d) if dur <= 0 { c.cancel(true, DeadlineExceeded) // deadline has already passed return c, func() { c.cancel(false, Canceled) } } c.mu.Lock() defer c.mu.Unlock() if c.err == nil { c.timer = time.AfterFunc(dur, func() { c.cancel(true, DeadlineExceeded) }) } return c, func() { c.cancel(true, Canceled) } }
最後,咱們以timerCtx
類型爲例,來看看cancel
函數的具體實現。方法的調用過程是遞歸執行的,內部調用的是cancelCtx
的cancel
方法。參數removeFromParent
用來判斷是否要從父節點中移除該節點。同時,若是計時器存在的話,要關閉計時器。
func (c *timerCtx) cancel(removeFromParent bool, err error) { c.cancelCtx.cancel(false, err) if removeFromParent { // Remove this timerCtx from its parent cancelCtx's children. removeChild(c.cancelCtx.Context, c) } c.mu.Lock() if c.timer != nil { c.timer.Stop() c.timer = nil } c.mu.Unlock() }
具體到cancelCtx
中的cancel
方法,函數依次cancel
掉children
中存儲的子節點。但咱們發現,在for
循環移除子節點的時候,removeFromParent
參數值爲false
。個人理解是,子節點依賴的父節點都已經被移除了,子節點是否移除就不重要了。
// cancel closes c.done, cancels each of c's children, and, if // removeFromParent is true, removes c from its parent's children. func (c *cancelCtx) cancel(removeFromParent bool, err error) { if err == nil { panic("context: internal error: missing cancel error") } c.mu.Lock() if c.err != nil { c.mu.Unlock() return // already canceled } c.err = err if c.done == nil { c.done = closedchan } else { close(c.done) } for child := range c.children { // NOTE: acquiring the child's lock while holding parent's lock. child.cancel(false, err) } c.children = nil c.mu.Unlock() if removeFromParent { removeChild(c.Context, c) } }
ctxhttp
中的應用request
上面的例子中,咱們建立了一個頂級context.Background
。在調用WithTimeout
時,parent
會建立一個異步的Goroutine
用來進行監聽Done
是否已經被關閉。同時還會爲新建立的context
設置一個計時器timer
,來計算到期時間。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
下面是發送請求的代碼,能夠看到這是一個for循環的過程,因此很是適合context
的處理模型。另外,該方法中有咱們上面描述的錯誤狀況:net/http: request canceled
。對於這種超時錯誤,咱們能夠經過判斷error
類型,以及timeout
是否爲true
來判斷。
一直到這裏,咱們尚未看到context
的核心邏輯…...
func (c *Client) do(req *Request) (retres *Response, reterr error) { // 刪除簡化代碼...... for { reqs = append(reqs, req) var err error var didTimeout func() bool if resp, didTimeout, err = c.send(req, deadline); err != nil { // c.send() always closes req.Body reqBodyClosed = true if !deadline.IsZero() && didTimeout() { err = &httpError{ // TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancelation/ err: err.Error() + " (Client.Timeout exceeded while awaiting headers)", timeout: true, } } return nil, uerr(err) } var shouldRedirect bool redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0]) if !shouldRedirect { return resp, nil } } }
全部對context
的處理,都是在Transport.roundTrip
中實現的
// roundTrip implements a RoundTripper over HTTP. func (t *Transport) roundTrip(req *Request) (*Response, error) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) ctx := req.Context() trace := httptrace.ContextClientTrace(ctx) for { select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // treq gets modified by roundTrip, so we need to recreate for each retry. treq := &transportRequest{Request: req, trace: trace} cm, err := t.connectMethodForRequest(treq) } }
response
在從conn
讀取數據的時候,依舊對req
的context
作了判斷。同時也能夠看出,讀取Response.Body
的過程,就是不斷從resc
中讀取數據的過程。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. startBytesWritten := pc.nwrite writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh} resc := make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() for { testHookWaitResLoop() select { case <-ctxDoneChan: pc.t.cancelRequest(req.Request, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }
參考文章: