深刻理解Golang之channel

前言

Golang在併發編程上有兩大利器,分別是channelgoroutine,這篇文章咱們先聊聊channel。熟悉Golang的人都知道一句名言:「使用通訊來共享內存,而不是經過共享內存來通訊」。這句話有兩層意思,Go語言確實在sync包中提供了傳統的鎖機制,但更推薦使用channel來解決併發問題。這篇文章會先從channel的用法、channel的原理兩部分對channel作一個較爲深刻的探究。html

channel用法

什麼是channel

從字面上看,channel的意思大概就是管道的意思。channel是一種go協程用以接收或發送消息的安全的消息隊列,channel就像兩個go協程之間的導管,來實現各類資源的同步。能夠用下圖示意:
git

channel的用法很簡單:github

func main() {
    ch := make(chan int1// 建立一個類型爲int,緩衝區大小爲1的channel
    ch <- 2 // 將2發送到ch
    n, ok := <- ch // n接收從ch發出的值
    if ok {
        fmt.Println(n) // 2
    }
    close(ch) // 關閉channel
}
複製代碼

使用channel時有幾個注意點:golang

  • 向一個nil channel發送消息,會一直阻塞;
  • 向一個已經關閉的channel發送消息,會引起運行時恐慌(panic)
  • channel關閉後不能夠繼續向channel發送消息,但能夠繼續從channel接收消息;
  • channel關閉而且緩衝區爲空時,繼續從從channel接收消息會獲得一個對應類型的零值。

Unbuffered channels與Buffered channels

Unbuffered channels是指緩衝區大小爲0的channel,這種channel的接收者會阻塞直至接收到消息,發送者會阻塞直至接收者接收到消息,這種機制能夠用於兩個goroutine進行狀態同步;Buffered channels擁有緩衝區,發送者在將消息發送到緩衝區以前是阻塞的,當緩衝區已滿時,發送者會阻塞;當緩衝區爲空時,接收者會阻塞。web

引用The Nature Of Channels In Go中的兩張圖來講明Unbuffered channelsBuffered channels, 很是形象,讀者可自行體會一下:shell

Unbuffered channels
編程

Unbuffered channels
Unbuffered channels

Buffered channels
數組

Buffered channels
Buffered channels

channel的遍歷

for range

channel支持 for range 的方式進行遍歷:安全

package main  

import "fmt"  

func main() {  
    ci := make(chan int5)  
    for i := 1; i <= 5; i++ {
        ci <- i
    }    
    close(ci)  

    for i := range ci {  
        fmt.Println(i)  
    }  
}  
複製代碼

值得注意的是,在遍歷時,若是channel 沒有關閉,那麼會一直等待下去,出現 deadlock 的錯誤;若是在遍歷時channel已經關閉,那麼在遍歷完數據後自動退出遍歷。也就是說,for range 的遍歷方式時阻塞型的遍歷方式。數據結構

for select

select能夠處理非阻塞式消息發送、接收及多路選擇。

package main  

import "fmt"  

func main() {  
    ci := make(chan int2)
    for i := 1; i <= 2; i++ {
        ci <- i
    }
    close(ci)

    cs := make(chan string2)
    cs <- "hi"
    cs <- "golang"
    close(cs)

    ciClosed, csClosed := falsefalse
    for {
        if ciClosed && csClosed {
            return
        }
        select {
        case i, ok := <-ci:
            if ok {
                fmt.Println(i)
            } else {
                ciClosed = true
                fmt.Println("ci closed")
            }
        case s, ok := <-cs:
            if ok {
                fmt.Println(s)
            } else {
                csClosed = true
                fmt.Println("cs closed")
            }
        default:
            fmt.Println("waiting...")
        }
    }
}  
複製代碼

select中有case代碼塊,用於channel發送或接收消息,任意一個case代碼塊準備好時,執行其對應內容;多個case代碼塊準備好時,隨機選擇一個case代碼塊並執行;全部case代碼塊都沒有準備好,則等待;還能夠有一個default代碼塊,全部case代碼塊都沒有準備好時執行default代碼塊。

channel原理

先貼一下channel源碼地址,讀者能夠對照來看。

數據結構

先看channel的結構體:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    // channel中元素大小
    elemsize uint16 
    // 是否已關閉
    closed   uint32
    // channel中元素類型
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}
複製代碼

channel的緩衝區實際上是一個環形隊列,qcount表示隊列中元素的數量,dataqsiz表示環形隊列的總大小,buf表示一個指向循環數組的指針;sendxrecvx分別用來標識當前發送和接收的元素在循環隊列中的位置;recvqsendq都是一個列表,分別用於存儲當前處於等待接收和等待發送的Goroutine

再看一下waitq的數據結構:

type waitq struct {
    first *sudog
    last  *sudog
}

type sudog struct {
    // 當前goroutine
    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}
複製代碼

其中sudog表示處於等待列表中的Goroutine封裝,包含了一些上下文信息,firstlast分別指向等待列表的首位的Goroutine

編譯分析

在分析channel的原理以前,咱們先使用go tool分析如下代碼,看看channel的各類操做在底層調用了什麼運行時方法:

ch := make(chan int2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
    fmt.Println(n)
}
close(ch)
複製代碼

編譯

go build test.go
go tool objdump -s "main\.main" test | grep CALL
複製代碼

CALL過濾出來:

  test.go:118           0x1092f55               e81612f7ff              CALL runtime.makechan(SB)
  test.go:119           0x1092f74               e82714f7ff              CALL runtime.chansend1(SB)
  test.go:120           0x1092f8e               e80d14f7ff              CALL runtime.chansend1(SB)
  test.go:121           0x1092fa5               e8361ff7ff              CALL runtime.chanrecv1(SB)
  test.go:122           0x1092fbd               e85e1ff7ff              CALL runtime.chanrecv2(SB)
  test.go:126           0x1092fd7               e8841cf7ff              CALL runtime.closechan(SB)
  test.go:124           0x1092fea               e8b156f7ff              CALL runtime.convT64(SB)
  print.go:275          0x1093041               e88a98ffff              CALL fmt.Fprintln(SB)
  test.go:47            0x1093055               e896c1fbff              CALL runtime.morestack_noctxt(SB)
複製代碼

建立

從上面的編譯分析能夠看出在建立channel時調用了運行時方法makechan:

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    // 計算緩衝區須要的總大小(緩衝區大小*元素大小),並判斷是否超出最大可分配範圍
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // 緩衝區大小爲0,或者channel中元素大小爲0(struct{}{})時,只需分配channel必需的空間便可
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, niltrue))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:
        // 經過位運算知道channel中元素類型不是指針,分配一片連續內存空間,所需空間等於 緩衝區數組空間 + hchan必需的空間。
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, niltrue))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 元素中包含指針,爲hchan和緩衝區分別分配空間
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}
複製代碼

makechan的代碼邏輯仍是比較簡單的,首先校驗元素類型和緩衝區空間大小,而後建立hchan,分配所需空間。這裏有三種狀況:當緩衝區大小爲0,或者channel中元素大小爲0時,只需分配channel必需的空間便可;當channel元素類型不是指針時,則只須要爲hchan和緩衝區分配一片連續內存空間,空間大小爲緩衝區數組空間加上hchan必需的空間;默認狀況,緩衝區包含指針,則須要爲hchan和緩衝區分別分配內存。最後更新hchan的其餘字段,包括elemsizeelemtypedataqsiz

發送

channel的發送操做調用了運行時方法chansend1, 在
chansend1內部又調用了chansend,直接來看chansend的實現:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // channel爲nil
    if c == nil {
        // 若是是非阻塞,直接返回發送不成功
        if !block {
            return false
        }
        // 不然,當前Goroutine阻塞掛起
        gopark(nilnil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.

    // 對於非阻塞且channel未關閉,若是無緩衝區且沒有等待接收的Goroutine,或者有緩衝區且緩衝區已滿,那麼都直接返回發送不成功
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)

    // 若是channel已關閉
    if c.closed != 0 {
        // 解鎖,直接panic
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // 除了以上狀況,當channel未關閉時,就有如下幾種狀況:

    // 一、當存在等待接收的Goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).

        // 那麼直接把正在發送的值發送給等待接收的Goroutine
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 二、當緩衝區未滿時
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        // 獲取指向緩衝區數組中位於sendx位置的元素的指針
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 將當前發送的值拷貝到緩衝區
        typedmemmove(c.elemtype, qp, ep)
        // sendx索引加一
        c.sendx++
        // 由於是循環隊列,sendx等於隊列長度時置爲0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 隊列中元素總數加一,並解鎖,返回發送成功
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // 三、當既沒有等待接收的Goroutine,緩衝區也沒有剩餘空間,若是是非阻塞的發送,那麼直接解鎖,返回發送失敗
    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    // 四、若是是阻塞發送,那麼就將當前的Goroutine打包成一個sudog結構體,並加入到channel的發送隊列sendq裏
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

    // 調用goparkunlock將當前Goroutine設置爲等待狀態並解鎖,進入休眠等待被喚醒
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
    // 被喚醒以後執行清理工做並釋放sudog結構體
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}
複製代碼

chansend的執行邏輯,上面的註釋已經寫得很清楚了,咱們再來梳理一下。對於非阻塞發送或者channel已經關閉條件下的幾種發送失敗的狀況,處理邏輯比較簡單,讀者能夠對照註釋來看;這裏咱們重點關注channel未關閉時幾種常規狀況:

存在等待接收的Goroutine

若是等待接收的隊列recvq中存在Goroutine,那麼直接把正在發送的值發送給等待接收的Goroutine。示意圖以下:


具體看一下 send方法:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()skip int) {
    ...

    if sg.elem != nil {
        // 將發送的值直接拷貝到接收值(好比v = <-ch 中的v)的內存地址
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    // 獲取等待接收數據的Goroutine
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 喚醒以前等待接收數據的Goroutine
    goready(gp, skip+1)
}
複製代碼

這裏有必要說明一下Goroutine在調度過程當中的幾種狀態:

_Gidle = iota // goroutine剛剛分配,尚未初始化

_Grunnable // goroutine處於運行隊列中, 尚未運行,沒有本身的棧

_Grunning // goroutine在運行中,擁有本身的棧,被分配了M(線程)和P(調度上下文)

_Gsyscall // goroutine在執行系統調用

_Gwaiting // goroutine被阻塞

_Gdead // goroutine沒有被使用,多是剛剛退出,或者正在初始化中

_Gcopystack // 表示g當前的棧正在被移除並分配新棧
複製代碼

當調用goready時,將Goroutine的狀態從 _Gwaiting置爲_Grunnable,等待下一次調度再次執行。

當緩衝區未滿時

當緩衝區未滿時,找到sendx所指向的緩衝區數組的位置,將正在發送的值拷貝到該位置,並增長sendx索引以及釋放鎖,示意圖以下:

阻塞發送

若是是阻塞發送,那麼就將當前的Goroutine打包成一個sudog結構體,並加入到channel的發送隊列sendq裏。示意圖以下:

以後則調用goparkunlock將當前Goroutine設置爲_Gwaiting狀態並解鎖,進入阻塞狀態等待被喚醒(調用goready);若是被調度器喚醒,執行清理工做並最終釋放對應的sudog結構體。

接收

channel的接收有兩種形式:

<-ch
n, ok := <-ch
複製代碼

這兩種方式分別調用運行時方法chanrecv1chanrecv2:

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}
複製代碼

這兩個方法最終都會調用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    // channel爲nil
    if c == nil {
        // 非阻塞直接返回(false, false)
        if !block {
            return
        }
        // 阻塞接收,則當前Goroutine阻塞掛起
        gopark(nilnil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.

    // 非阻塞模式,對於如下兩種狀況:
    // 一、無緩衝區且等待發送隊列也爲空
    // 二、有緩衝區但緩衝區數組爲空且channel未關閉
    // 這兩種狀況都是接收失敗, 直接返回(false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)
    // 若是channel已關閉,而且緩衝區無元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        // 有等待接收的變量(即 v = <-ch中的v)
        if ep != nil {
            //根據channel元素的類型清理ep對應地址的內存,即ep接收了channel元素類型的零值
            typedmemclr(c.elemtype, ep)
        }
        // 返回(true, false),即接收到值,但不是從channel中接收的有效值
        return truefalse
    }

    // 除了以上很是規狀況,還有有如下幾種常見狀況:

    // 一、等待發送的隊列sendq裏存在Goroutine,那麼有兩種狀況:當前channel無緩衝區,或者當前channel已滿
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        // 若是無緩衝區,那麼直接從sender接收數據;不然,從buf隊列的頭部接收數據,並把sender的數據加到buf隊列的尾部
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        // 接收成功
        return truetrue
    }

    // 二、緩衝區buf中有元素
    if c.qcount > 0 {
        // Receive directly from queue
        // 從recvx指向的位置獲取元素
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            // 將從buf中取出的元素拷貝到當前協程
            typedmemmove(c.elemtype, ep, qp)
        }
        // 同時將取出的數據所在的內存清空
        typedmemclr(c.elemtype, qp)
        // 接收索引 +1
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf元素總數 -1
        c.qcount--
        // 解鎖,返回接收成功
        unlock(&c.lock)
        return truetrue
    }

    // 三、非阻塞模式,且沒有數據能夠接受
    if !block {
        // 解鎖,直接返回接收失敗
        unlock(&c.lock)
        return falsefalse
    }

    // no sender available: block on this channel.
    // 四、阻塞模式,獲取當前Goroutine,打包一個sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 加入到channel的等待接收隊列recvq中
    c.recvq.enqueue(mysg)
    // 掛起當前Goroutine,設置爲_Gwaiting狀態並解鎖,進入休眠等待被喚醒
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // someone woke us up
    // 被喚醒以後執行清理工做並釋放sudog結構體
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
複製代碼

chanrecv方法的處理邏輯與chansend很是相似,咱們這裏仍然只分析幾種常見狀況,其餘狀況上述註釋也解釋得比較清楚了,讀者可對照相應代碼和註釋查看。

存在等待發送的Goroutine

若是等待發送的隊列sendq裏存在掛起的Goroutine,那麼有兩種狀況:當前channel無緩衝區,或者當前channel已滿。從sendq中取出最早阻塞的Goroutine,而後調用recv方法:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()skip int) {
    if c.dataqsiz == 0 {
        // 無緩衝區
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 緩衝區已滿
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 將等待發送數據的Goroutine的狀態從_Gwaiting置爲 _Grunnable,等待下一次調度。
    goready(gp, skip+1)
}
複製代碼

一、若是無緩衝區,那麼直接從sender接收數據;
二、若是緩衝區已滿,從buf隊列的頭部接收數據,並把sender的數據加到buf隊列的尾部;
三、最後調用goready函數將等待發送數據的Goroutine的狀態從_Gwaiting置爲_Grunnable,等待下一次調度。

下圖示意了當緩衝區已滿時的處理過程:

緩衝區buf中還有數據

若是緩衝區buf中還有元素,那麼就走正常的接收,將從buf中取出的元素拷貝到當前協程的接收數據目標內存地址中。值得注意的是,即便此時channel已經關閉,仍然能夠正常地從緩衝區buf中接收數據。這種狀況比較簡單,示意圖就不畫了。

阻塞接收

若是是阻塞模式,且當前沒有數據能夠接收,那麼就須要將當前Goroutine打包成一個sudog加入到channel的等待接收隊列recvq中,將當前Goroutine的狀態置爲_Gwaiting,等待喚醒。示意圖以下:

若是以後當前Goroutine被調度器喚醒,則執行清理工做並最終釋放對應的sudog結構體。

關閉

說完收發數據,最後就是關閉channel了:

func closechan(c *hchan) {
    // nil channel檢查
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    // 已關閉的channel不能再次關閉
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }
    // 設置關閉狀態爲1
    c.closed = 1

    var glist glist

    // release all readers
    // 遍歷recvq,清除sudog的數據,取出其中處於_Gwaiting狀態的Goroutine加入到glist中
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
    // 遍歷sendq,清除sudog的數據,取出其中處於_Gwaiting狀態的Goroutine加入到glist中
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    將glist中全部Goroutine的狀態置爲_Grunnable,等待調度器進行調度
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}
複製代碼

一、關閉channel時,會遍歷recvqsendq(實際只有recvq或者sendq),取出sudog中掛起的Goroutine加入到glist列表中,並清除sudog上的一些信息和狀態。

二、而後遍歷glist列表,爲每一個Goroutine調用goready函數,將全部Goroutine置爲_Grunnable狀態,等待調度。

三、當Goroutine被喚醒以後,會繼續執行chansendchanrecv函數中當前Goroutine被喚醒後的剩餘邏輯。

總結

總結一下,本文先經過channel的基本用法對channel的定義、用法細節進行了介紹,而後對channel的基本操做包括髮送、接收和關閉進行了較爲詳細和深刻的探究。細心的讀者應該也會發現channel的操做跟協程的調度關係密切,不過這篇文章關於goroutine的調度只是一筆帶過,後續時機成熟會對這部份內容再做探究。

參考資料

一、The Nature Of Channels In Go
二、Concurrency in Golang

相關文章
相關標籤/搜索