使用context實現多個goroutine的依賴管理

解決的問題

在不少實際狀況,好比處理網絡請求時,咱們須要啓動多個goroutine來處理不一樣的邏輯,好比一個主要的goroutine用來響應請求,生成網頁,同時它還啓動一個子線程用來獲取數據庫信息,還有一個則寫日誌等等。正常狀況都沒有問題,可是一旦出現異常,如何優雅的退出這些子線程,同時釋放掉可能佔用的資源呢?html

context

在golang中,人們發明了context接口處理這種狀況。早在14年,這個庫就出現了,而且提出了基於context的併發編程範式(英文好的同窗能夠直接擼這篇文章)。
今年8月go1.7發佈後,它正式成爲了標準庫的一員。golang

如何使用

在golang的context庫中,首先定義了context的接口,而後給出了context接口的4種實現:web

  • WithCancel(parent Context) (Context, CancelFunc)
    初始化一個能夠被cancel的context,同時把新context對象做爲child放入parent的children數組中。當parent終止時,child也會接受到信號。這個過程叫propagateCancel數據庫

  • WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
    一樣初始化一個context,除了實現跟WithCancel一樣的功能外,還增長了一個時間變量,一旦當前時間超過這個deadline,那麼這個context以及它的全部子孫都被被cancel。編程

  • WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
    WithDeadline相似,若是說WithDeadline是一個絕對時間上的限制,那麼WithTimeout就是一個相對時間的限制json

  • WithValue(parent Context, key, val interface{}) Context
    單純給parent增長value,不須要propagateCancel。value能夠用來跨進程、跨api的傳遞數據,最好是和某個請求相關的參數,不要傳遞太多大量數據。api

因此關鍵就在於propagateCancel,實際工程中,全部context共同組成了一個依賴樹,他們都繼承自一個祖先。一旦parent被cancel,就會經過propagateCancel遞歸的傳播給下面的全部子孫。能夠看出,context就比如信使,或者說通信協議,經過遵循context接口構建的這個框架,可以保證子線程及時得到與他相關的父線程的狀態,從而由子線程根據狀況做出反應。至於怎麼反應,就取決於各位碼農的能力和搬磚當時的心情了。。。數組

另外,golang有一套靜態分析工具能夠分析context的傳播過程,因此爲了方便這個工具的使用,實際使用中有幾個規定:服務器

  • 不要把context做爲struct內部變量使用,而是把它和其餘變量一塊做爲參數傳入下一個函數。網絡

  • context變量須要做爲函數的第一個參數傳入,命名通常爲ctx

具體例子

這個例子來源於基於context的併發編程範式,可是爲了符合國情我作了些修改:
包括3部分:

  • server.go
    主線程,會建立一個server服務器,能夠經過localhost:9090/search訪問。接到請求後,它會建立父context,同時生成一個新goroutine,去fakesrv(原本應該去google上的)上請求數據。

  • google.go
    替換原來的google網址,改爲由fakesrv提供的一個網址。主要就是演示一下context的運行過程,請求fakesrv的工做在一個新goroutine中進行,同時它還有一個訪問數據庫的操做。若是父context由於timeout超時了,那麼對fakesrv和數據庫的訪問也會終止。在代碼中,演示瞭如何監聽context信息的過程。

  • query.go
    解析url中的query參數

  • fakesrv.go
    提供http://localhost:9000/context...供google.go訪問。

mycontext/serve.go

// The server program issues Google search requests and demonstrates the use of
// the go.net Context API. It serves on port 8080.
//
// The /search endpoint accepts these query params:
//   q=the Google search query
//   timeout=a timeout for the request, in time.Duration format
//
// For example, http://localhost:8080/search?q=golang&timeout=1s serves the
// first few Google search results for "golang" or a "deadline exceeded" error
// if the timeout expires.
package main

import (
    "html/template"
    "log"
    "net/http"
    "time"

    "context"
    "mycontext/google"
    "mycontext/query"
)

func main() {
    http.HandleFunc("/search", handleSearch)
    log.Fatal(http.ListenAndServe(":9090", nil))
}

// handleSearch handles URLs like /search?q=golang&timeout=1s by forwarding the
// query to google.Search. If the query param includes timeout, the search is
// canceled after that duration elapses.
func handleSearch(w http.ResponseWriter, req *http.Request) {
    // ctx is the Context for this handler. Calling cancel closes the
    // ctx.Done channel, which is the cancellation signal for requests
    // started by this handler.
    var (
        ctx    context.Context
        qctx   *query.QueryCtx
        cancel context.CancelFunc
    )
    timeout, err := time.ParseDuration(req.FormValue("timeout"))
    if err == nil {
        // The request has a timeout, so create a context that is
        // canceled automatically when the timeout expires.
        ctx, cancel = context.WithTimeout(context.Background(), timeout)
    } else {
        ctx, cancel = context.WithCancel(context.Background())
    }
    defer cancel() // Cancel ctx as soon as handleSearch returns.
    qctx, err = query.NewQueryCtx(ctx, req)
    if err != nil {
        http.Error(w, "no query", http.StatusBadRequest)
        return
    }

    // Run the Google search and print the results.
    start := time.Now()
    results, err := google.Search(qctx)
    elapsed := time.Since(start)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    if err := resultsTemplate.Execute(w, struct {
        Results          google.Results
        Timeout, Elapsed time.Duration
    }{
        Results: results,
        Timeout: timeout,
        Elapsed: elapsed,
    }); err != nil {
        log.Print(err)
        return
    }
}

var resultsTemplate = template.Must(template.New("results").Parse(`
<html>
<head/>
<body>
  <ol>
  {{range .Results}}
    <li>{{.Title}} - <span>{{.SubTitle}}</span></li>
  {{end}}
  </ol>
  <p>{{len .Results}} results in {{.Elapsed}}; timeout {{.Timeout}}</p>
</body>
</html>
`))

mycontext/google/google.go

// Package google provides a function to do Google searches using the Google Web
// Search API. See https://developers.google.com/web-search/docs/
//
// This package is an example to accompany https://blog.golang.org/context.
// It is not intended for use by others.
//
// Google has since disabled its search API,
// and so this package is no longer useful.
package google

import (
    "context"
    "encoding/json"
    "log"
    "mycontext/query"
    "net/http"
    "time"
)

// Results is an ordered list of search results.
type Results []Result

// A Result contains the title and URL of a search result.
type Result struct {
    Title, SubTitle string
}

// Search sends query to Google search and returns the results.
func Search(ctx *query.QueryCtx) (Results, error) {
    // Prepare the Google Search API request.
    req, err := http.NewRequest("GET", "http://localhost:9000/context_demo", nil)
    if err != nil {
        return nil, err
    }

    ctx.SetReq(req)
    // Issue the HTTP request and handle the response. The httpDo function
    // cancels the request if ctx.Done is closed.
    var results Results
    err = httpDo(ctx, req, func(resp *http.Response, err error) error {
        if err != nil {
            return err
        }
        defer resp.Body.Close()

        // Parse the JSON search result.
        // https://developers.google.com/web-search/docs/#fonje
        var data struct {
            ResponseData struct {
                Results []struct {
                    Title, SubTitle string
                }
            }
        }
        if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
            return err
        }
        for _, res := range data.ResponseData.Results {
            results = append(results, Result{Title: res.Title, SubTitle: res.SubTitle})
        }
        return nil
    })
    // httpDo waits for the closure we provided to return, so it's safe to
    // read results here.
    return results, err
}

// httpDo issues the HTTP request and calls f with the response. If ctx.Done is
// closed while the request or f is running, httpDo cancels the request, waits
// for f to exit, and returns ctx.Err. Otherwise, httpDo returns f's error.
func httpDo(ctx *query.QueryCtx, req *http.Request, f func(*http.Response, error) error) error {
    // Run the HTTP request in a goroutine and pass the response to f.
    tr := &http.Transport{}
    client := &http.Client{Transport: tr}
    // WithCancel會在ctx的children中增長cancelDb,這樣當
    // ctx 結束的時候,cancelDb也會受到消息
    cancelDb, cancel := context.WithCancel(ctx.Context)
    defer cancel()
    c := make(chan error, 1)
    go func() { c <- f(client.Do(req)) }()
    go func(ctx context.Context) {
        t := time.NewTimer(2 * time.Second)

        select {
        case <-t.C:
            log.Println("db access finished!")
        case <-ctx.Done():
            log.Println("canceld by parent, release resource")
        }
    }(cancelDb)
    select {
    case <-ctx.Done():
        tr.CancelRequest(req)
        <-c // Wait for f to return.
        return ctx.Err()
    case err := <-c:
        return err
    }
}

mycontext/query/query.go

package query

import (
    "context"
    "fmt"
    "net/http"
)

func NewQueryCtx(ctx context.Context, req *http.Request) (*QueryCtx, error) {
    q := req.FormValue("q")
    if q == "" {
        return nil, fmt.Errorf("no query supplied!")
    }
    return &QueryCtx{ctx, q}, nil
}

type QueryCtx struct {
    context.Context
    val string
}

func (ctx *QueryCtx) SetReq(req *http.Request) {
    q := req.URL.Query()
    q.Set("q", ctx.val)

    req.URL.RawQuery = q.Encode()
}

mycontext/fakesrv/main.go

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "strconv"
    "strings"
    "time"
)

func init() {
    log.SetFlags(log.Lshortfile)
}

type Results struct {
    ResponseData struct {
        Results []Content
    }
}

// A Result contains the title and URL of a search result.
type Content struct {
    Title, SubTitle string
}

func main() {

    http.HandleFunc("/context_demo", handleContext)
    http.ListenAndServe(":9000", nil)
}

func handleContext(resp http.ResponseWriter, req *http.Request) {
    defer func() {
        if e := recover(); e != nil {
            if msg, ok := e.(string); ok {
                resp.Write([]byte(msg))
            } else {
                panic(e)
            }
        }
    }()
    check_error := func(err error, msg string) {
        if err != nil {
            if msg != "" {
                panic(err.Error() + ":" + msg)
            } else {
                panic(err.Error())
            }
        }
    }
    if req.Method == "GET" {
        q := req.FormValue("q")
        seg := strings.Split(q, ":")
        if len(seg) < 2 {
            log.Println("query format wrong")
            resp.Write([]byte("query format wrong"))
            return
        }
        title := seg[0]
        num, err := strconv.Atoi(seg[1])
        check_error(err, "")
        rs := Results{}
        for i := 0; i < num; i++ {
            rs.ResponseData.Results = append(rs.ResponseData.Results,
                Content{fmt.Sprintf("%s %d", title, i), RandomString(20)})
        }
        buff := bytes.NewBuffer(nil)
        err = json.NewEncoder(buff).Encode(rs)
        check_error(err, "")
        time.Sleep(time.Second * 2)
        resp.Write(buff.Bytes())
    } else {
        resp.Write([]byte("請使用get方法!"))
    }
}
func RandomString(strlen int) string {
    rand.Seed(time.Now().UTC().UnixNano())
    const chars = "abcdefghijklmnopqrstuvwxyz0123456789"
    result := make([]byte, strlen)
    for i := 0; i < strlen; i++ {
        result[i] = chars[rand.Intn(len(chars))]
    }
    return string(result)
}

Makefile

run:
    go build 
    ./mycontext &
    cd fakesrv && go build && ./fakesrv &

test:
    @echo "======= test without timeout ======="
    curl localhost:9090/search?q=title:6
    @echo "======= test with timeout 1s ======="
    curl localhost:9090/search?q=title:6\&timeout=1s
    @echo "======= test with timeout 4s ======="
    curl localhost:9090/search?q=title:6\&timeout=4s

測試

在命令行運行以下命令,便可看到具體結果make runmake test

相關文章
相關標籤/搜索