Using context cancellation in Go

原文地址: https://neojos.com/blog/2019/...

文章介紹最近工做中遇到的一個問題,其中50%以上的內容都是Go的源代碼。剩下部分是本身的理解,若是有理解錯誤或探討的地方,但願你們指正。git

問題:針對同一個接口請求,絕大多數均可以正常處理,但卻有零星的幾請求總是處理失敗,錯誤信息返回 context canceled。重放失敗的請求,錯誤必現。github

根據返回的錯誤信息,再結合咱們工程中使用的golang.org/x/net/context/ctxhttp包。猜想多是在請求處理過程當中,異常調用了context 包的CancelFunc方法。同時,咱們對比了失敗請求和成功請求的區別,發現失敗請求的Response.Body數據量很是大。golang

以後在Google上找到了問題的緣由,還真是很容易被忽略,這裏是文章的連接:Context cancellation flake。爲了解決將來點進去404的悲劇,本文截取了其中的代碼...api

Code

代碼核心邏輯:向某個地址發送Get請求,並打印響應內容。其中函數fetch用於發送請求,readBody用於讀取響應。例子中處理請求的邏輯結構,跟咱們項目中的徹底一致。app

fetch方法中使用了默認的http.DefaultClient做爲http Client,而它自身是一個「零值」,並無指定請求的超時時間。因此,例子中又經過context.WithTimeout對超時時間進行了設置。異步

代碼中使用context.WithTimeout來取消請求,存在兩種可能狀況。第一種,處理的時間超過了指定的超時時間,程序返回deadlineExceededError錯誤,錯誤描述context deadline exceeded。另外一種是手動調用CancelFunc方法取消執行,返回Canceled錯誤,描述信息context canceledide

fetch代碼的處理邏輯中,當程序返回http.Response時,會執行cancel()方法,用於標記請求被取消。若是在readBody沒讀取完返回的數據以前,context被cancel掉了,就會返回context canceled錯誤。側面也反映了,關閉Context.Done()與讀取http.Response是一個時間賽跑的過程…..函數

package main

import (
    "context"
    "io/ioutil"
    "log"
    "net/http"
    "time"

    "golang.org/x/net/context/ctxhttp"
)

func main() {
    req, err := http.NewRequest("GET", "https://swapi.co/api/people/1", nil)
    if err != nil {
        log.Fatal(err)
    }
    resp, err := fetch(req)
    if err != nil {
        log.Fatal(err)
    }
    log.Print(readBody(resp))
}

func fetch(req *http.Request) (*http.Response, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    return ctxhttp.Do(ctx, http.DefaultClient, req)
}

func readBody(resp *http.Response) (string, error) {
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return "", err
    }
    return string(b), err
}

問題的解決辦法以下,做者也附帶了Test Case。 請求包括髮送請求和讀取響應兩部分,CancelFunc應該在請求被處理完成後調用。否則,就會發生上面遇到的問題。oop

In case it's still unclear, you need to wrap both the "do request" + "read body" inside the same cancellation context. The "defer cancel" should encompass both of them, sort of atomically, so the idea is to take it out of your fetch, one level up.

重現Bug

咱們準備經過控制請求返回的內容,來驗證咱們的結論。咱們在本地啓動一個新服務,並對外提供一個接口,來替代上述代碼中的請求地址。fetch

代碼以下,其中info接口實現了下載resource文件的功能。咱們經過控制resource文件的大小,來控制返回response大小的目的。

package main

import (
    "github.com/gin-gonic/gin"
    "io/ioutil"
    "log"
)

func main() {
    router := gin.Default()
    router.GET("/info", func(c *gin.Context) {
        data, err := ioutil.ReadFile("./resource")
        if err != nil {
            log.Println("read file err:", err.Error())
            return
        }

        log.Println("send file resource")
        c.Writer.Write(data)
    })
  
    router.Run(":8000")
}

首先,咱們向resource文件中寫入大量的內容,從新執行上述代碼。錯誤日誌輸出:2019/06/13 21:12:37 context canceled。確實重現了!

而後,將resource文件內容刪除到只剩一行數據,請求又能夠正常處理了。

req, err := http.NewRequest("GET", "http://127.0.0.1:8000/info", nil)

總結:上述錯誤代碼的執行結果,依賴請求返回的數據量大小。

修正Bug

根據上述分析,咱們對代碼進行調整:將defer cancel()調整到程序讀取完http.Response.Body以後執行。具體修改以下:

  1. fetch函數中,將cancel函數做爲返回值,返回給調用方。
func fetch(req *http.Request) (context.CancelFunc, *http.Response, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    resp, err := ctxhttp.Do(ctx, http.DefaultClient, req)
    return cancel, resp, err
}
  1. readBody讀取完數據以後,再調用cancel方法。
cancel, resp, err := fetch(req)
    if err != nil {
        log.Fatal(err)
    }
    defer cancel()
    log.Print(readBody(resp))

跟預期的同樣,再接口返回的數據量很大的狀況下,請求也能夠被正常處理。

三種錯誤類型

context deadline exceeded

咱們將代碼中context.WithTimeout的超時時間由5*time.Second調整爲1*time.Millisecond。執行代碼,輸出錯誤日誌:2019/06/13 21:29:11 context deadline exceeded

context canceled

參考上述代碼。

net/http: request canceled

工做中常見的錯誤之一:net/http: request canceled (Client.Timeout exceeded while awaiting headers),這是由http Client設置的超時時間決定的。接下來咱們重現一下這個error。

fetch方法中,咱們聲明一個自定義的client,並指定Timeout屬性爲time.Millisecond,來替換代碼中默認的client

func fetch(req *http.Request) (context.CancelFunc, *http.Response, error) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    customClient := &http.Client{
        Timeout: time.Millisecond,
    }
    resp, err := ctxhttp.Do(ctx, customClient, req)
    return cancel, resp, err
}

程序執行輸出:

2019/06/18 09:20:53 Get http://127.0.0.1:8000/info: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)

以下是http.Client結構體中對Timeout的註釋,它包括建立鏈接、請求跳轉、讀取響應的所有時間。

// Timeout specifies a time limit for requests made by this
// Client. The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
//
// A Timeout of zero means no timeout.
//
// The Client cancels requests to the underlying Transport
// as if the Request's Context ended.
//
// For compatibility, the Client will also use the deprecated
// CancelRequest method on Transport if found. New
// RoundTripper implementations should use the Request's Context
// for cancelation instead of implementing CancelRequest.

context原理

下面是context的接口類型,由於Done()的註解很好的解釋了context最本質的用法,因此,特地只將這部分貼出來。在for循環體內,執行每次循環時,使用select方法來監聽Done()是否被關閉了。若是關閉了,就退出循環。在ctxhttp包內,也是經過這種用法來實現對請求的控制的。

type Context interface {
    Deadline() (deadline time.Time, ok bool)

    // Done returns a channel that's closed when work done on behalf of this
    // context should be canceled. Done may return nil if this context can
    // never be canceled. Successive calls to Done return the same value.
    //
    // WithCancel arranges for Done to be closed when cancel is called;
    // WithDeadline arranges for Done to be closed when the deadline
    // expires; WithTimeout arranges for Done to be closed when the timeout
    // elapses.
    //
    // Done is provided for use in select statements:
    //
    //  // Stream generates values with DoSomething and sends them to out
    //  // until DoSomething returns an error or ctx.Done is closed.
    //  func Stream(ctx context.Context, out chan<- Value) error {
    //      for {
    //          v, err := DoSomething(ctx)
    //          if err != nil {
    //              return err
    //          }
    //          select {
    //          case <-ctx.Done():
    //              return ctx.Err()
    //          case out <- v:
    //          }
    //      }
    //  }
    //
    // See https://blog.golang.org/pipelines for more examples of how to use
    // a Done channel for cancelation.
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

由於有業務邏輯在監聽context.Done(),因此,必然須要有邏輯來Close調這個Channel。而整個context包也圍繞者兩個方面提供了一些方法,包括啓動定時器來關閉context.Done()。參考註釋中提到的WithCancelWithDeadline以及WithTimeout

源代碼

下面是用來獲取cancelCtx的方法,咱們能夠了解到context內部被封裝的三種類型,分別是cancelCtxtimerCtx以及valueCtx

// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    for {
        switch c := parent.(type) {
        case *cancelCtx:
            return c, true
        case *timerCtx:
            return &c.cancelCtx, true
        case *valueCtx:
            parent = c.Context
        default:
            return nil, false
        }
    }
}

查看這三種類型的聲明,內部都封裝了一個Context值,用來存儲父Context。偏偏也是經過這個字段,將整個Context串了起來。其中timerCtx是基於cancelCtx作的擴展,在其基礎上添加了計時的功能。另外,cancelCtx節點中的children用於保存它全部的子節點。

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     chan struct{}         // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
    Context
    key, val interface{}
}

接下來,咱們瞭解一下,將一個新的child context節點掛到parent context的過程。

首先,程序判斷parent的數據類型,若是是上述三種類型之一,且沒有錯誤信息,直接將child存儲到parnet.childrenmap結構中。

若是parnet不是上述類型之一,程序會啓動一個Goroutine異步監聽parent.Done()child.Done()是否被關閉。個人理解是,由於此時parent實際上是backgroundtodo中的一種(我稱它爲頂級parnet),而它們內部都沒有字段用於存儲和child的關係。因此,在程序select中綁定了它們的對應關係。另外,一個頂級parent也只能有一個child,而這個child應該是上述三種類型中的一種。只有這種一對一的狀況,當child.Done()被關閉的時候,整個select退出纔是合理的。

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
    if parent.Done() == nil {
        return // parent is never canceled
    }
    if p, ok := parentCancelCtx(parent); ok {
        p.mu.Lock()
        if p.err != nil {
            // parent has already been canceled
            child.cancel(false, p.err)
        } else {
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            p.children[child] = struct{}{}
        }
        p.mu.Unlock()
    } else {
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

咱們接着看一下WithCancelWithDeadline這兩個方法。前者經過調用CancelFunc來取消。後者在此基礎上,加了一個timer的定時觸發取消機制。若是WithDeadline參數d自己就是一個過去的時間點,那麼WithDeadlineWithCancel效果相同。

// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func() { c.cancel(true, Canceled) }
}

// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        // The current deadline is already sooner than the new one.
        return WithCancel(parent)
    }
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)
    dur := time.Until(d)
    if dur <= 0 {
        c.cancel(true, DeadlineExceeded) // deadline has already passed
        return c, func() { c.cancel(false, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

最後,咱們以timerCtx類型爲例,來看看cancel函數的具體實現。方法的調用過程是遞歸執行的,內部調用的是cancelCtxcancel方法。參數removeFromParent用來判斷是否要從父節點中移除該節點。同時,若是計時器存在的話,要關閉計時器。

func (c *timerCtx) cancel(removeFromParent bool, err error) {
    c.cancelCtx.cancel(false, err)
    if removeFromParent {
        // Remove this timerCtx from its parent cancelCtx's children.
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
        c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()
}

具體到cancelCtx中的cancel方法,函數依次cancelchildren中存儲的子節點。但咱們發現,在for循環移除子節點的時候,removeFromParent參數值爲false。個人理解是,子節點依賴的父節點都已經被移除了,子節點是否移除就不重要了。

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return // already canceled
    }
    c.err = err
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done)
    }
    for child := range c.children {
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
        removeChild(c.Context, c)
    }
}

ctxhttp中的應用

發送request

上面的例子中,咱們建立了一個頂級context.Background。在調用WithTimeout時,parent會建立一個異步的Goroutine用來進行監聽Done是否已經被關閉。同時還會爲新建立的context設置一個計時器timer,來計算到期時間。

ctx, cancel := context.WithTimeout(context.Background(), time.Second)

下面是發送請求的代碼,能夠看到這是一個for循環的過程,因此很是適合context的處理模型。另外,該方法中有咱們上面描述的錯誤狀況:net/http: request canceled。對於這種超時錯誤,咱們能夠經過判斷error類型,以及timeout是否爲true來判斷。

一直到這裏,咱們尚未看到context的核心邏輯…...

func (c *Client) do(req *Request) (retres *Response, reterr error) {
  // 刪除簡化代碼......
    for {
        reqs = append(reqs, req)
        var err error
        var didTimeout func() bool
        if resp, didTimeout, err = c.send(req, deadline); err != nil {
            // c.send() always closes req.Body
            reqBodyClosed = true
            if !deadline.IsZero() && didTimeout() {
                err = &httpError{
                    // TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancelation/
                    err:     err.Error() + " (Client.Timeout exceeded while awaiting headers)",
                    timeout: true,
                }
            }
            return nil, uerr(err)
        }

        var shouldRedirect bool
        redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
        if !shouldRedirect {
            return resp, nil
        }
    }
}

全部對context的處理,都是在Transport.roundTrip中實現的

// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {
    t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
    ctx := req.Context()
    trace := httptrace.ContextClientTrace(ctx)

    for {
        select {
        case <-ctx.Done():
            req.closeBody()
            return nil, ctx.Err()
        default:
        }

        // treq gets modified by roundTrip, so we need to recreate for each retry.
        treq := &transportRequest{Request: req, trace: trace}
        cm, err := t.connectMethodForRequest(treq)
    }
}

讀取response

在從conn讀取數據的時候,依舊對reqcontext作了判斷。同時也能夠看出,讀取Response.Body的過程,就是不斷從resc中讀取數據的過程。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
    // Write the request concurrently with waiting for a response,
    // in case the server decides to reply before reading our full
    // request body.
    startBytesWritten := pc.nwrite
    writeErrCh := make(chan error, 1)
    pc.writech <- writeRequest{req, writeErrCh, continueCh}

    resc := make(chan responseAndError)
    pc.reqch <- requestAndChan{
        req:        req.Request,
        ch:         resc,
        addedGzip:  requestedGzip,
        continueCh: continueCh,
        callerGone: gone,
    }

    var respHeaderTimer <-chan time.Time
    cancelChan := req.Request.Cancel
    ctxDoneChan := req.Context().Done()
    for {
        testHookWaitResLoop()
        select {
        case <-ctxDoneChan:
            pc.t.cancelRequest(req.Request, req.Context().Err())
            cancelChan = nil
            ctxDoneChan = nil
        }
    }
}

參考文章:

  1. Using context cancellation in Go
相關文章
相關標籤/搜索