深刻 Go 併發模型:Context

介紹

在Go服務器中,每一個傳入請求都在其本身的goroutine中處理。 請求處理程序一般會啓動其餘goroutine來訪問後端,例如數據庫和RPC服務。 處理請求的goroutine集合一般須要訪問特定於請求的值,例如最終用戶的身份,受權令牌和請求的截止日期。 當請求被取消或超時時,處理該請求的全部goroutine都應該快速退出,以便系統能夠回收他們正在使用的任何資源。html

Context是專門用來簡化對於處理單個請求,多個goroutine之間數據共享、取消信號、超時處理等相關操做。翻譯自 Go Concurrency Patterns: Contextgolang

特性

  • Context是gorountine併發安全的。
  • 支持樹狀的上級控制一個或者多個下級,不支持反向控制和平級控制。
  • gorountine傳遞cancel信號,結束子gorountine 生命。
  • gorountine初始化啓動子gorountine服務時,傳入截止時刻或者超時時間來控制子gorountine。
  • gorountine結束,對應的全部子gorountine生命週期結束。

使用場景

  • 併發多服務調用狀況下,好比一個請求進來,啓動3個goroutine進行 RpcA 、RpcB 、RpcC三個服務的調用。這時候只要有其中一個服務錯誤,就返回錯誤,同時取消另外兩個Rpc服務。能夠經過 WithCancel 方法來實現。
  • 超時請求,好比對Http、Rpc進行超時限制,能夠經過 WithDeadline 和 WithTimeout 來實現。
  • 攜帶數據,好比一個請求的用戶信息,通常業務場景,咱們會有一個專門的中間件來校驗用戶信息,而後把用戶信息注入到context中,或者共享給派生出來的多個goroutine使用,能夠經過 WithValue 方法實現。

官方示例

package main
import (
	"context"
	"fmt"
)

func main() {
	gen := func(ctx context.Context) <-chan int {
		dst := make(chan int)
		n := 1
		go func() {
			for {
				select {
				case <-ctx.Done():
					return // returning not to leak the goroutine
				case dst <- n:
					n++
				}
			}
		}()
		return dst
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // cancel when we are finished consuming integers

	for n := range gen(ctx) {
		fmt.Println(n)
		if n == 5 {
			break
		}
	}
}
複製代碼

Context

context 是一個接口,定義以下:源碼數據庫

type Context interface {
	Deadline() (deadline time.Time, ok bool)

	Done() <-chan struct{}

	Err() error

	Value(key interface{}) interface{}
}
複製代碼

包含了以下3個功能:後端

  • 生存時間
  • 取消信號
  • request 派生的gorouting之間共享value

emptyCtx是對Context實現,分別實現了Deadline、Done、Err、Value、String方法,安全

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return 
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}
複製代碼

cancelCtx

從示例中,咱們看到使用 context 第一個須要作的初始化操做bash

ctx, cancel := context.WithCancel(context.Background())
複製代碼

這裏,context.Background() 返回的就是 emptyCtx 類型的指針。服務器

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {
	return background
}

func TODO() Context {
	return todo
}
複製代碼

咱們再來看看,WithCancel 函數接收了 background 做爲參數,建立了一個cancelCtx實例。同時將Context 做爲它的一個匿名字段,這樣,它就能夠被當作一個 Context。markdown

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) } //什麼意思,看下文您就明白了
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}
複製代碼

看下圖,WithCancel 的主要職責是建立一個 cancelCtx,把本身掛載到父節點,而後返回cancelCtx和cancel()函數,用來觸發取消事件。 併發

cancelCtx 實現了本身的Done()、Err()、String()接口。值得關注的是,done 字段採用懶建立的方式, 在Done()第一次被調用的時候被建立,並且返回的是一個只讀的Channel。

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

func (c *cancelCtx) String() string {
	return fmt.Sprintf("%v.WithCancel", c.Context)
}
複製代碼

不只如此,cancelCtx最重要的是實現了 cancel() 方法。主工做流程以下:函數

  1. 設置取消的錯誤提示信息
  2. 關閉 channel:c.done
  3. 遞歸 關閉全部子節點
  4. 從父節點中刪除本身
  5. 最終經過關閉channel把取消信號,傳遞給全部子節點
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	// 已經被取消
	if c.err != nil {
		c.mu.Unlock()
		return 
	}
	// 設置 cancelCtx 錯誤信息
	c.err = err
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	//  遞歸地取消全部子節點
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	// 清空子節點
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		// 從父節點中移除本身 
		removeChild(c.Context, c)
	}
}
複製代碼

還有一個重點函數 propagateCancel須要重點關注

func propagateCancel(parent Context, child canceler) {
	// 父節點是一個空節點,能夠理解爲本節點爲根節點,不須要掛載
	if parent.Done() == nil {
		return // parent is never canceled
	}
	// 父節點是可取消類型的
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			// 父節點被取消了,本節點也須要取消
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			// 掛載到父節點
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		// 爲了兼容,Context 內嵌到一個類型裏的狀況發生
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}
複製代碼

這裏說明下上述代碼中 else 的狀況,爲何須要開一個goroutine來監控取消信號,先看下第一個case:

case <-parent.Done():
複製代碼

此處主要了爲了不cancelCtx被內嵌的一個類型中,作爲匿名字段的狀況,好比:

type CancelContext struct {
    Context
}
複製代碼

這時候 parentCancelCtx 函數 是沒法正常識別出CancelContext是可取消類型的ctx。
再看第二個 case:

case <-child.Done():
複製代碼

主要做用是在子節點取消的時候,可讓select語句正常退出,避免goroutine泄露。

經過以下parentCancelCtx源碼,咱們肯定一旦入參的parent是通過包裝的類型,parentCancelCtx就沒法正確的識別出parent的類型。

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	for {
		switch c := parent.(type) {
		case *cancelCtx:
			return c, true
		case *timerCtx:
			return &c.cancelCtx, true
		case *valueCtx:
			parent = c.Context
		default:
			return nil, false
		}
	}
}
複製代碼

timerCtx

從定義中咱們能夠看出timerCtx基於cancelCtx實現,多出了timer和deadline兩個字段。並且timerCtx實現了本身的Deadline方法。

type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}
複製代碼

所以咱們直接看核心的函數WithDeadline

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	// 判斷父節點是否超時,(非timeCtx類型的Deadline()直接return的)
	// 若是父節點的超時時間比當前節點早,直接返回cancalCtx便可
	// 由於父節點超時會自動調用cancel,子節點隨之取消,因此不須要單獨處理子節點的超時問題
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	propagateCancel(parent, c)
	dur := time.Until(d)
	// 直接取消
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	// 核心代碼在這,若是當前節點沒被取消,則經過time.AfterFunc在dur時間後調用cancel函數,自動取消。
	if c.err == nil {
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}
// 基於WithDeadline封裝實現
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}
複製代碼

valueCtx

源碼比較簡單,將Context 做爲匿名字段來實現類型鏈表的接口,一層層傳遞,獲取值主要查看Value方法,它會一層層判斷是否有值,直到找到頂層的Context。 所以這裏也有個要注意的地方,就是子節點的key值是會覆蓋父節點的值,所以在命名key值得時候須要特別注意。

func WithValue(parent Context, key, val interface{}) Context {
	if key == nil {
		panic("nil key")
	}
	if !reflect.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val interface{}
}

func (c *valueCtx) String() string {
	return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}
複製代碼

解析

Done 方法返回 <-chan struct{} ,用來goroutine間進行消息通訊。

結束

歡迎關注個人Github

參考

Go Concurrency Patterns: Context
Go context
深度解密Go語言之context

相關文章
相關標籤/搜索