在不少實際狀況,好比處理網絡請求時,咱們須要啓動多個goroutine來處理不一樣的邏輯,好比一個主要的goroutine用來響應請求,生成網頁,同時它還啓動一個子線程用來獲取數據庫信息,還有一個則寫日誌等等。正常狀況都沒有問題,可是一旦出現異常,如何優雅的退出這些子線程,同時釋放掉可能佔用的資源呢?html
在golang中,人們發明了context接口處理這種狀況。早在14年,這個庫就出現了,而且提出了基於context的併發編程範式(英文好的同窗能夠直接擼這篇文章)。
今年8月go1.7發佈後,它正式成爲了標準庫的一員。golang
在golang的context庫中,首先定義了context的接口,而後給出了context接口的4種實現:web
WithCancel(parent Context) (Context, CancelFunc)
初始化一個能夠被cancel的context,同時把新context對象做爲child放入parent的children數組中。當parent終止時,child也會接受到信號。這個過程叫propagateCancel
數據庫
WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
一樣初始化一個context,除了實現跟WithCancel
一樣的功能外,還增長了一個時間變量,一旦當前時間超過這個deadline,那麼這個context以及它的全部子孫都被被cancel。編程
WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
跟WithDeadline
相似,若是說WithDeadline
是一個絕對時間上的限制,那麼WithTimeout
就是一個相對時間的限制json
WithValue(parent Context, key, val interface{}) Context
單純給parent增長value,不須要propagateCancel
。value能夠用來跨進程、跨api的傳遞數據,最好是和某個請求相關的參數,不要傳遞太多大量數據。api
因此關鍵就在於propagateCancel
,實際工程中,全部context共同組成了一個依賴樹,他們都繼承自一個祖先。一旦parent被cancel,就會經過propagateCancel
遞歸的傳播給下面的全部子孫。能夠看出,context就比如信使,或者說通信協議,經過遵循context接口構建的這個框架,可以保證子線程及時得到與他相關的父線程的狀態,從而由子線程根據狀況做出反應。至於怎麼反應,就取決於各位碼農的能力和搬磚當時的心情了。。。數組
另外,golang有一套靜態分析工具能夠分析context的傳播過程,因此爲了方便這個工具的使用,實際使用中有幾個規定:服務器
不要把context做爲struct內部變量使用,而是把它和其餘變量一塊做爲參數傳入下一個函數。網絡
context變量須要做爲函數的第一個參數傳入,命名通常爲ctx
這個例子來源於基於context的併發編程範式,可是爲了符合國情我作了些修改:
包括3部分:
server.go
主線程,會建立一個server服務器,能夠經過localhost:9090/search
訪問。接到請求後,它會建立父context,同時生成一個新goroutine,去fakesrv(原本應該去google上的)上請求數據。
google.go
替換原來的google網址,改爲由fakesrv提供的一個網址。主要就是演示一下context的運行過程,請求fakesrv的工做在一個新goroutine中進行,同時它還有一個訪問數據庫的操做。若是父context由於timeout超時了,那麼對fakesrv和數據庫的訪問也會終止。在代碼中,演示瞭如何監聽context信息的過程。
query.go
解析url中的query參數
fakesrv.go
提供http://localhost:9000/context...供google.go訪問。
// The server program issues Google search requests and demonstrates the use of // the go.net Context API. It serves on port 8080. // // The /search endpoint accepts these query params: // q=the Google search query // timeout=a timeout for the request, in time.Duration format // // For example, http://localhost:8080/search?q=golang&timeout=1s serves the // first few Google search results for "golang" or a "deadline exceeded" error // if the timeout expires. package main import ( "html/template" "log" "net/http" "time" "context" "mycontext/google" "mycontext/query" ) func main() { http.HandleFunc("/search", handleSearch) log.Fatal(http.ListenAndServe(":9090", nil)) } // handleSearch handles URLs like /search?q=golang&timeout=1s by forwarding the // query to google.Search. If the query param includes timeout, the search is // canceled after that duration elapses. func handleSearch(w http.ResponseWriter, req *http.Request) { // ctx is the Context for this handler. Calling cancel closes the // ctx.Done channel, which is the cancellation signal for requests // started by this handler. var ( ctx context.Context qctx *query.QueryCtx cancel context.CancelFunc ) timeout, err := time.ParseDuration(req.FormValue("timeout")) if err == nil { // The request has a timeout, so create a context that is // canceled automatically when the timeout expires. ctx, cancel = context.WithTimeout(context.Background(), timeout) } else { ctx, cancel = context.WithCancel(context.Background()) } defer cancel() // Cancel ctx as soon as handleSearch returns. qctx, err = query.NewQueryCtx(ctx, req) if err != nil { http.Error(w, "no query", http.StatusBadRequest) return } // Run the Google search and print the results. start := time.Now() results, err := google.Search(qctx) elapsed := time.Since(start) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if err := resultsTemplate.Execute(w, struct { Results google.Results Timeout, Elapsed time.Duration }{ Results: results, Timeout: timeout, Elapsed: elapsed, }); err != nil { log.Print(err) return } } var resultsTemplate = template.Must(template.New("results").Parse(` <html> <head/> <body> <ol> {{range .Results}} <li>{{.Title}} - <span>{{.SubTitle}}</span></li> {{end}} </ol> <p>{{len .Results}} results in {{.Elapsed}}; timeout {{.Timeout}}</p> </body> </html> `))
// Package google provides a function to do Google searches using the Google Web // Search API. See https://developers.google.com/web-search/docs/ // // This package is an example to accompany https://blog.golang.org/context. // It is not intended for use by others. // // Google has since disabled its search API, // and so this package is no longer useful. package google import ( "context" "encoding/json" "log" "mycontext/query" "net/http" "time" ) // Results is an ordered list of search results. type Results []Result // A Result contains the title and URL of a search result. type Result struct { Title, SubTitle string } // Search sends query to Google search and returns the results. func Search(ctx *query.QueryCtx) (Results, error) { // Prepare the Google Search API request. req, err := http.NewRequest("GET", "http://localhost:9000/context_demo", nil) if err != nil { return nil, err } ctx.SetReq(req) // Issue the HTTP request and handle the response. The httpDo function // cancels the request if ctx.Done is closed. var results Results err = httpDo(ctx, req, func(resp *http.Response, err error) error { if err != nil { return err } defer resp.Body.Close() // Parse the JSON search result. // https://developers.google.com/web-search/docs/#fonje var data struct { ResponseData struct { Results []struct { Title, SubTitle string } } } if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { return err } for _, res := range data.ResponseData.Results { results = append(results, Result{Title: res.Title, SubTitle: res.SubTitle}) } return nil }) // httpDo waits for the closure we provided to return, so it's safe to // read results here. return results, err } // httpDo issues the HTTP request and calls f with the response. If ctx.Done is // closed while the request or f is running, httpDo cancels the request, waits // for f to exit, and returns ctx.Err. Otherwise, httpDo returns f's error. func httpDo(ctx *query.QueryCtx, req *http.Request, f func(*http.Response, error) error) error { // Run the HTTP request in a goroutine and pass the response to f. tr := &http.Transport{} client := &http.Client{Transport: tr} // WithCancel會在ctx的children中增長cancelDb,這樣當 // ctx 結束的時候,cancelDb也會受到消息 cancelDb, cancel := context.WithCancel(ctx.Context) defer cancel() c := make(chan error, 1) go func() { c <- f(client.Do(req)) }() go func(ctx context.Context) { t := time.NewTimer(2 * time.Second) select { case <-t.C: log.Println("db access finished!") case <-ctx.Done(): log.Println("canceld by parent, release resource") } }(cancelDb) select { case <-ctx.Done(): tr.CancelRequest(req) <-c // Wait for f to return. return ctx.Err() case err := <-c: return err } }
package query import ( "context" "fmt" "net/http" ) func NewQueryCtx(ctx context.Context, req *http.Request) (*QueryCtx, error) { q := req.FormValue("q") if q == "" { return nil, fmt.Errorf("no query supplied!") } return &QueryCtx{ctx, q}, nil } type QueryCtx struct { context.Context val string } func (ctx *QueryCtx) SetReq(req *http.Request) { q := req.URL.Query() q.Set("q", ctx.val) req.URL.RawQuery = q.Encode() }
package main import ( "bytes" "encoding/json" "fmt" "log" "math/rand" "net/http" "strconv" "strings" "time" ) func init() { log.SetFlags(log.Lshortfile) } type Results struct { ResponseData struct { Results []Content } } // A Result contains the title and URL of a search result. type Content struct { Title, SubTitle string } func main() { http.HandleFunc("/context_demo", handleContext) http.ListenAndServe(":9000", nil) } func handleContext(resp http.ResponseWriter, req *http.Request) { defer func() { if e := recover(); e != nil { if msg, ok := e.(string); ok { resp.Write([]byte(msg)) } else { panic(e) } } }() check_error := func(err error, msg string) { if err != nil { if msg != "" { panic(err.Error() + ":" + msg) } else { panic(err.Error()) } } } if req.Method == "GET" { q := req.FormValue("q") seg := strings.Split(q, ":") if len(seg) < 2 { log.Println("query format wrong") resp.Write([]byte("query format wrong")) return } title := seg[0] num, err := strconv.Atoi(seg[1]) check_error(err, "") rs := Results{} for i := 0; i < num; i++ { rs.ResponseData.Results = append(rs.ResponseData.Results, Content{fmt.Sprintf("%s %d", title, i), RandomString(20)}) } buff := bytes.NewBuffer(nil) err = json.NewEncoder(buff).Encode(rs) check_error(err, "") time.Sleep(time.Second * 2) resp.Write(buff.Bytes()) } else { resp.Write([]byte("請使用get方法!")) } } func RandomString(strlen int) string { rand.Seed(time.Now().UTC().UnixNano()) const chars = "abcdefghijklmnopqrstuvwxyz0123456789" result := make([]byte, strlen) for i := 0; i < strlen; i++ { result[i] = chars[rand.Intn(len(chars))] } return string(result) }
run: go build ./mycontext & cd fakesrv && go build && ./fakesrv & test: @echo "======= test without timeout =======" curl localhost:9090/search?q=title:6 @echo "======= test with timeout 1s =======" curl localhost:9090/search?q=title:6\&timeout=1s @echo "======= test with timeout 4s =======" curl localhost:9090/search?q=title:6\&timeout=4s
在命令行運行以下命令,便可看到具體結果make runmake test