一文讀懂Channel設計

在Go中,要理解channel,首先須要認識goroutine。git

1、爲何會有goroutine

現代操做系統中爲咱們提供了三種基本的構造併發程序的方法:多進程、I/O多路複用和多線程。其中最簡單的構造方式當屬多進程,可是多進程的併發程序,因爲對進程控制和進程間通訊開銷巨大,這樣的併發方式每每會很慢。github

所以,操做系統提供了更小粒度的運行單元:線程(確切叫法是內核線程)。它是一種運行在進程上下文中的邏輯流,線程之間經過操做系統來調度,其調度模型以下圖所示。golang

image

多線程的併發方式,相較於多進程而言要快得多。可是因爲線程上下文切換老是不可避免的陷入內核態,它的開銷依然較大。那麼有沒有沒必要陷入內核態的運行載體呢?有,用戶級線程。 用戶級線程的切換由用戶程序本身控制,不須要內核干涉,所以少了進出內核態的消耗。express

image

這裏的用戶級線程就是協程(coroutine),它們的切換由運行時系統來統一調度管理,內核並不知道它的存在。協程是抽象於內核線程之上的對象,一個內核線程能夠對應多個協程。但最終的系統調用仍然須要內核線程來完成。注意,線程的調度是操做系統來管理,是一種搶佔式調度。而協程不一樣,協程之間須要合做,會主動交出執行權,是一種協做式調度,這也是爲什麼被稱爲協程的緣由。數組

Go天生在語言層面支持了協程,即咱們常說的goroutine。Go的runtime系統實現的是一種M:N調度模型,經過GMP對象來描述,其中G表明的就是協程,M是線程,P是調度上下文。在Go程序中,一個goroutine就表明着一個最小用戶代碼執行流,它們也是併發流的最小單元。緩存

2、channel的存在定位

從內存的角度而言,併發模型只分兩種:基於共享內存和基於消息通訊(內存拷貝)。在Go中,兩種併發模型的同步原語均有提供:sync.\和atomic.\表明的就是基於共享內存;channel表明的就是基於消息通訊。而Go提倡後者,它包括三大元素:goroutine(執行體),channel(通訊),select(協調)。安全

Do not communicate by sharing memory; instead, share memory by communicating.多線程

在Go中經過goroutine+channel的方式,能夠簡單、高效地解決併發問題,channel就是goroutine之間的數據橋樑。併發

Concurrency is the key to designing high performance network services. Go's concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution. ide

如下是一個簡單的channel使用示例代碼。

func goroutineA(ch <-chan int)  {
    fmt.Println("[goroutineA] want a data")
    val := <- ch
    fmt.Println("[goroutineA] received the data", val)
}

func goroutineB(ch chan<- int)  {
    time.Sleep(time.Second*1)
    ch <- 1
    fmt.Println("[goroutineB] send the data 1")
}

func main() {
    ch := make(chan int, 1)
    go goroutineA(ch)
    go goroutineB(ch)
    time.Sleep(2*time.Second)
}

上述過程趣解圖以下

image

image

image

image

3、channel源碼解析

channel源碼位於src/go/runtime/chan.go。本章內容分爲兩部分:channel內部結構和channel操做。

3.1 channel內部結構

ch := make(chan int,2)

對於以上channel的申明語句,咱們能夠在程序中加入斷點,獲得ch的信息以下。

image

很好,看起來很是的清晰。可是,這些信息表明的是什麼含義呢?接下來,咱們先看幾個重要的結構體。

  • hchan

當咱們經過make(chan Type, size)生成channel時,在runtime系統中,生成的是一個hchan結構體對象。源碼位於src/runtime/chan.go

type hchan struct {
    qcount   uint           // 循環隊列中數據數
    dataqsiz uint           // 循環隊列的大小
    buf      unsafe.Pointer // 指向大小爲dataqsize的包含數據元素的數組指針
    elemsize uint16         // 數據元素的大小
    closed   uint32         // 表明channel是否關閉   
    elemtype *_type         // _type表明Go的類型系統,elemtype表明channel中的元素類型
    sendx    uint           // 發送索引號,初始值爲0
    recvx    uint           // 接收索引號,初始值爲0
  recvq    waitq          // 接收等待隊列,存儲試圖從channel接收數據(<-ch)的阻塞goroutines
    sendq    waitq          // 發送等待隊列,存儲試圖發送數據(ch<-)到channel的阻塞goroutines

    lock mutex              // 加鎖能保護hchan的全部字段,包括waitq中sudoq對象
}
  • waitq

waitq用於表達處於阻塞狀態的goroutines鏈表信息,first指向鏈頭goroutine,last指向鏈尾goroutine

type waitq struct {
    first *sudog           
    last  *sudog
}
  • sudug

sudog表明的就是一個處於等待列表中的goroutine對象,源碼位於src/runtime/runtime2.go

type sudog struct {
    g *g
    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)
    c        *hchan // channel
  ...
}

爲了更好理解hchan結構體,咱們將經過如下代碼來理解hchan中的字段含義。

package main

import "time"

func goroutineA(ch chan int) {
    ch <- 100
}

func goroutineB(ch chan int) {
    ch <- 200
}

func goroutineC(ch chan int) {
    ch <- 300
}

func goroutineD(ch chan int) {
    ch <- 300
}

func main() {
    ch := make(chan int, 4)
    for i := 0; i < 4; i++ {
        ch <- i * 10
    }
    go goroutineA(ch)
    go goroutineB(ch)
    go goroutineC(ch)
    go goroutineD(ch)
    // 第一個sleep是爲了給上足夠的時間讓全部goroutine都已啓動
    time.Sleep(time.Millisecond * 500)
    time.Sleep(time.Second)
}

打開代碼調試功能,將程序運行至斷點time.Sleep(time.Second)處,此時獲得的chan信息以下。

image

在該channel中,經過make(chan int, 4)定義的channel大小爲4,即dataqsiz的值爲4。同時因爲循環隊列中已經添加了4個元素,因此qcount值也爲4。此時,有4個goroutine(A-D)想發送數據給channel,可是因爲存放數據的循環隊列已滿,因此只能進入發送等待列表,即sendq。同時要注意到,此時的發送和接收索引值均爲0,即下一次接收數據的goroutine會從循環隊列的第一個元素拿,發送數據的goroutine會發送到循環隊列的第一個位置。

上述hchan結構可視化圖解以下

image

3.2 channel操做

將channel操做分爲四部分:建立、發送、接收和關閉。

建立

本文的參考Go版本爲1.15.2。其channel的建立實現代碼位於src/go/runtime/chan.go的makechan方法。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

  // 發送元素大小限制
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
  // 對齊檢查
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

  // 判斷是否會內存溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

  // 爲構造的hchan對象分配內存
    var c *hchan
    switch {
  // 無緩衝的channel或者元素大小爲0的狀況
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
  // 元素不包含指針的狀況  
    case elem.ptrdata == 0:
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
  // 元素包含指針  
    default:
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

  // 初始化相關參數
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

能夠看到,makechan方法主要就是檢查傳送元素的合法性,併爲hchan分配內存,初始化相關參數,包括對鎖的初始化。

發送

channel的發送實現代碼位於src/go/runtime/chan.go的chansend方法。發送過程,存在如下幾種狀況。

  1. 當發送的channel爲nil
if c == nil {
    if !block {
        return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
}

往一個nil的channel中發送數據時,調用gopark函數將當前執行的goroutine從running態轉入waiting態。

  1. 往已關閉的channel中發送數據
if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

若是向已關閉的channel中發送數據,會引起panic。

  1. 若是已經有阻塞的接收goroutines(即recvq中指向非空),那麼數據將被直接發送給接收goroutine。
if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

該邏輯的實現代碼在send方法和sendDirect中。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ... // 省略了競態代碼
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

其中,memmove咱們已經在源碼系列中遇到屢次了,它的目的是將內存中src的內容拷貝至dst中去。另外,注意到goready(gp, skip+1)這句代碼,它會使得以前在接收等待隊列中的第一個goroutine的狀態變爲runnable,這樣go的調度器就能夠從新讓該goroutine獲得執行。

  1. 對於有緩衝的channel來講,若是當前緩衝區hchan.buf有可用空間,那麼會將數據拷貝至緩衝區
if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    if raceenabled {
        raceacquire(qp)
        racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
  // 發送索引號+1
    c.sendx++
  // 由於存儲數據元素的結構是循環隊列,因此噹噹前索引號已經到隊末時,將索引號調整到隊頭
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
  // 當前循環隊列中存儲元素數+1
    c.qcount++
    unlock(&c.lock)
    return true
}

其中,chanbuf(c, c.sendx)是獲取指向對應內存區域的指針。typememmove會調用memmove方法,完成數據的拷貝工做。另外注意到,當對hchan進行實際操做時,是須要調用lock(&c.lock)加鎖,所以,在完成數據拷貝後,經過unlock(&c.lock)將鎖釋放。

  1. 有緩衝的channel,當hchan.buf已滿;或者無緩衝的channel,當前沒有接收的goroutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
    mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

經過getg獲取當前執行的goroutine。acquireSudog是先得到當前執行goroutine的線程M,再獲取M對應的P,最後將P的sudugo緩存隊列中的隊頭sudog取出(詳見源碼src/runtime/proc.go)。經過c.sendq.enqueue將sudug加入到channel的發送等待列表中,並調用gopark將當前goroutine轉爲waiting態。

  • 發送操做會對hchan加鎖。
  • 當recvq中存在等待接收的goroutine時,數據元素將會被直接拷貝給接收goroutine。
  • 當recvq等待隊列爲空時,會判斷hchan.buf是否可用。若是可用,則會將發送的數據拷貝至hchan.buf中。
  • 若是hchan.buf已滿,那麼將當前發送goroutine置於sendq中排隊,並在運行時中掛起。
  • 向已經關閉的channel發送數據,會引起panic。

對於無緩衝的channel來講,它自然就是hchan.buf已滿的狀況,由於它的hchan.buf的容量爲0。

package main

import "time"

func main() {
    ch := make(chan int)
    go func(ch chan int) {
        ch <- 100
    }(ch)
    time.Sleep(time.Millisecond * 500)
    time.Sleep(time.Second)
}

在上述示例中,發送goroutine向無緩衝的channel發送數據,可是沒有接收goroutine。將斷點置於time.Sleep(time.Second),獲得此時ch結構以下。

image

能夠看到,在無緩衝的channel中,其hchan的buf長度爲0,當沒有接收groutine時,發送的goroutine將被置於sendq的發送隊列中。

接收

channel的接收實現分兩種,v :=<-ch對應於chanrecv1,v, ok := <- ch對應於chanrecv2,但它們都依賴於位於src/go/runtime/chan.go的chanrecv方法。

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

chanrecv的詳細代碼此處就再也不展現,和chansend邏輯對應,具體處理準則以下。

  • 接收操做會對hchan加鎖。
  • 當sendq中存在等待發送的goroutine時,意味着此時的hchan.buf已滿(無緩存的自然已滿),分兩種狀況(見代碼src/go/runtime/chan.go的recv方法):1. 若是是有緩存的hchan,那麼先將緩衝區的數據拷貝給接收goroutine,再將sendq的隊頭sudog出隊,將出隊的sudog上的元素拷貝至hchan的緩存區。 2. 若是是無緩存的hchan,那麼直接將出隊的sudog上的元素拷貝給接收goroutine。兩種狀況的最後都會喚醒出隊的sudog上的發送goroutine。
  • 當sendq發送隊列爲空時,會判斷hchan.buf是否可用。若是可用,則會將hchan.buf的數據拷貝給接收goroutine。
  • 若是hchan.buf不可用,那麼將當前接收goroutine置於recvq中排隊,並在運行時中掛起。
  • 與發送不一樣的是,當channel關閉時,goroutine還能從channel中獲取數據。若是recvq等待列表中有goroutines,那麼它們都會被喚醒接收數據。若是hchan.buf中還有未接收的數據,那麼goroutine會接收緩衝區中的數據,不然goroutine會獲取到元素的零值。

如下是channel關閉以後,接收goroutine的讀取示例代碼。

func main() {
    ch := make(chan int, 1)
    ch <- 10
    close(ch)
    a, ok := <-ch
    fmt.Println(a, ok)
    b, ok := <-ch
    fmt.Println(b, ok)
    c := <-ch
    fmt.Println(c)
}

//輸出以下
10 true
0 false
0

注意:在channel中進行的全部元素轉移都伴隨着內存的拷貝。

func main() {
    type Instance struct {
        ID   int
        name string
    }

    var ins = Instance{ID: 1, name: "Golang"}

    ch := make(chan Instance, 3)
    ch <- ins

    fmt.Println("ins的原始值:", ins)

    ins.name = "Python"
    go func(ch chan Instance) {
        fmt.Println("channel接收值:", <-ch)
    }(ch)

    time.Sleep(time.Second)
    fmt.Println("ins的最終值:", ins)
}

// 輸出結果
ins的原始值: {1 Golang}
channel接收值: {1 Golang}
ins的最終值: {1 Python}

前半段圖解以下

image

後半段圖解以下

image

注意,若是把channel傳遞類型替換爲Instance指針時,那麼儘管channel存入到buf中的元素已是拷貝對象了,從channel中取出又被拷貝了一次。可是因爲它們的類型是Instance指針,拷貝對象與原始對象均會指向同一個內存地址,修改原有元素對象的數據時,會影響到取出數據。

func main() {
    type Instance struct {
        ID   int
        name string
    }

    var ins = &Instance{ID: 1, name: "Golang"}

    ch := make(chan *Instance, 3)
    ch <- ins

    fmt.Println("ins的原始值:", ins)

    ins.name = "Python"
    go func(ch chan *Instance) {
        fmt.Println("channel接收值:", <-ch)
    }(ch)

    time.Sleep(time.Second)
    fmt.Println("ins的最終值:", ins)
}

// 輸出結果
ins的原始值: &{1 Golang}
channel接收值: &{1 Python}
ins的最終值: &{1 Python}

所以,在使用channel時,儘可能避免傳遞指針,若是傳遞指針,則需謹慎。

關閉

channel的關閉實現代碼位於src/go/runtime/chan.go的chansend方法,詳細執行邏輯已經過註釋寫明。

func closechan(c *hchan) {
  // 若是hchan對象爲nil,則會引起painc
    if c == nil {
        panic(plainError("close of nil channel"))
    }

  // 對hchan加鎖
    lock(&c.lock)
  // 不一樣屢次調用close(c chan<- Type)方法,不然會引起painc
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

  // close標誌
    c.closed = 1

  // gList表明Go的GMP調度的G集合
    var glist gList

    // 該for循環是爲了釋放recvq上的全部等待接收sudog
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 該for循環會釋放sendq上的全部等待發送sudog
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
  // 釋放sendq和recvq以後,hchan釋放鎖
    unlock(&c.lock)

  // 將上文中glist中的加入的goroutine取出,讓它們均變爲runnable(可執行)狀態,等待調度器執行
    // 注意:咱們上文中分析過,試圖向一個已關閉的channel發送數據,會引起painc。
  // 因此,若是是釋放sendq中的goroutine,它們一旦獲得執行將會引起panic。
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

關於關閉操做,有幾個點須要注意一下。

  • 若是關閉已關閉的channel會引起painc。
  • 對channel關閉後,若是有阻塞的讀取或發送goroutines將會被喚醒。讀取goroutines會獲取到hchan的已接收元素,若是沒有,則獲取到元素零值;發送goroutine的執行則會引起painc。

對於第二點,咱們能夠很好利用這一特性來實現對程序執行流的控制(相似於sync.WaitGroup的做用),如下是示例程序代碼。

func main() {
    ch := make(chan struct{})
    //
    go func() {
        // do something work...
        // when work has done, call close()
        close(ch)
    }()
    // waiting work done
    <- ch
    // other work continue...
}

4、總結

channel是Go中很是強大有用的機制,爲了更有效地使用它,咱們必須瞭解它的實現原理,這也是寫做本文的目的。

  • hchan結構體有鎖的保證,對於併發goroutine而言是安全的
  • channel接收、發送數據遵循FIFO(First In First Out)原語
  • channel的數據傳遞依賴於內存拷貝
  • channel能阻塞(gopark)、喚醒(goready)goroutine
  • 所謂無緩存的channel,它的工做方式就是直接發送goroutine拷貝數據給接收goroutine,而不經過hchan.buf

另外,能夠看到Go在channel的設計上權衡了簡單與性能。爲了簡單性,hchan是有鎖的結構,由於有鎖的隊列會更易理解和實現,可是這樣會損失一些性能。考慮到整個 channel 操做帶鎖的成本較高,其實官方也曾考慮過使用無鎖 channel 的設計,可是因爲目前已有提案中(https://github.com/golang/go/...),無鎖實現的channel可維護性差、且實際性能測試不具備說服力,並且也不符合Go的簡單哲學,所以官方目前爲止並無採納無鎖設計。

在性能上,有一點,咱們須要認識到:所謂channel中阻塞goroutine,只是在runtime系統中被blocked,它是用戶層的阻塞。而實際的底層內核線程不受影響,它仍然是unblocked的。

參考連接

https://speakerdeck.com/kavya...

https://codeburst.io/diving-d...

https://github.com/talkgo/nig...

相關文章
相關標籤/搜索