Go語言channel與select原理

本文會嘗試解釋 go runtime 中 channel 和 select 的具體實現,部份內容來自 gophercon2017。Go版本爲1.8.3數組

channel

第一部分講述一下 channel 的用法。channel 能夠看作一個隊列,用於多個goroutine之間的通訊,例以下面的例子,一個goroutine發送msg,另外一個msg接受消息。channel 分爲帶緩衝和不帶緩衝,差異不是很大,具體請自行google。看一個簡單的例子,瞭解一下channel的使用。app

package main

import "fmt"

func main() {
    // Create a new channel with `make(chan val-type)`.
    // Channels are typed by the values they convey.
    messages := make(chan string)
    // Send a value into a channel using the `channel <-`
    // syntax. Here we send `"ping"`  to the `messages`
    // channel we made above, from a new goroutine.
    go func() { messages <- "ping" }()
    // The `<-channel` syntax receives a value from the
    // channel. Here we'll receive the `"ping"` message
    // we sent above and print it out.
    msg := <-messages
    fmt.Println(msg)
}

channel的功能點:oop

  1. 隊列
  2. 阻塞
  3. 當一端阻塞,能夠被另外一個端喚醒

咱們圍繞這3點功能展開,講講具體的實現。ui

channel結構

註釋標註了幾個重要的變量,從功能上大體能夠分爲兩個功能單元,一個是 ring buffer,用於存數據; 一個是存放 goroutine 的隊列。this

type hchan struct {
    qcount   uint           // 當前隊列中的元素個數
    dataqsiz uint           // 緩衝隊列的固定大小
    buf      unsafe.Pointer // 緩衝數組
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // 下一次發送的 index
    recvx    uint   // 下一次接收的 index
    recvq    waitq  // 接受者隊列
    sendq    waitq  // 發送者隊列

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

Ring Buffer

主要是如下變量組成的功能, 一個 buf 存儲實際數據,兩個指針分別表明發送,接收的索引位置,配合 size, count 在數組大小範圍內來回滑動。google

qcount   uint           // 當前隊列中的元素個數
dataqsiz uint           // 緩衝隊列的固定大小
buf      unsafe.Pointer // 緩衝數組
sendx    uint   // 下一次發送的 index
recvx    uint   // 下一次接收的 index

舉個例子,假設咱們初始化了一個帶緩衝的channel, ch := make(chan int, 3), 那麼它初始狀態的值爲:指針

qcount   = 0
dataqsiz = 3
buf      = [3]int{0, 0, 0} // 表示長度爲3的數組
sendx    = 0
recvx    = 0

第一步,向 channel 裏 send 一個值, ch <- 1, 由於如今緩衝還沒滿,因此操做後狀態以下:code

qcount   = 1
dataqsiz = 3
buf      = [3]int{1, 0, 0} // 表示長度爲3的數組
sendx    = 1
recvx    = 0

快進兩部,連續向 channel 裏 send 兩個值 (2, 3),狀態以下:索引

qcount   = 3
dataqsiz = 3
buf      = [3]int{1, 2, 3} // 表示長度爲3的數組
sendx    = 0 // 下一個發送的 index 回到了0
recvx    = 0

從 channel 中 receive 一個值, <- ch, 狀態以下:隊列

qcount   = 2
dataqsiz = 3
buf      = [3]int{1, 2, 3} // 表示長度爲3的數組
sendx    = 0 // 下一個發送的 index 回到了0
recvx    = 1 // 下一個接收的 index

阻塞

咱們看下,若是 receive channel 時,channel 的 buffer中沒有數據是怎麼處理的。邏輯在 chanrecv 這個方法中,它的大體流程以下,僅保留了阻塞操做的代碼。

func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 檢查 channdel 是否爲 nil
    
    // 當不阻塞時,檢查buffer大小,當前大小,檢查chennel是否關閉,看看是否能直接返回

    // 檢查發送端是否有等待的goroutine,下部分會提到

    // 當前buffer中有數據,則嘗試取出。
    
    // 若是非阻塞,直接返回

    // 沒有sender等待,buffer中沒有數據,則阻塞等待。
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    //關鍵操做:設置 goroutine 狀態爲 waiting, 把 G 和 M 分離
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // someone woke us up
    // 被喚醒,清理 sudog
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

這裏的操做就是 建立一個 當前 goroutine 的 sudog, 而後把這個 sudog 放入 channel 的接受者等待隊列;設置當前 G 的狀態,和 M分離,到這裏當前G就阻塞了,代碼不會執行下去。
當被喚醒後,執行sudog的清理操做。這裏接受buffer中的值的指針是 ep 這個變量,被喚醒後好像沒有向 ep 中賦值的操做。這個咱們下部分會講。

sudog

還剩最後一個疑問,當一個goroutine由於channel阻塞,另外一個goroutine是如何喚醒它的。

channel 中有兩個 waitq 類型的變量, 看下結構發現,就是sudog的鏈表,關鍵是 sudog。sudog中包含了goroutine的引用,注意一下 elem這個變量,註釋說可能會指向stack。

type waitq struct {
    first *sudog
    last  *sudog
}

type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this.
    g          *g
    selectdone *uint32 // CAS to 1 to win select race (may point to stack)
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // waitlink is only accessed by g.

    acquiretime int64
    releasetime int64
    ticket      uint32
    waitlink    *sudog // g.waiting list
    c           *hchan // channel
}

講阻塞部分的時候,咱們看到goroutine被調度以前,有一個 enqueue操做,這時,當前G的sudog已經被存入recvq中,咱們看下發送者這時的操做。

這裏的操做是,sender發送的值 直接被拷貝到 sudog.elem 了。而後喚醒 sudog.g ,這樣對面的receiver goroutine 就被喚醒了。具體請下面的註釋。

func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 檢查工做

    // 若是能從 chennel 的 recvq 彈出 sudog, 那麼直接send
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) })
        return true
    }

    // buffer有空餘空間,返回; 阻塞操做
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
    // 處理 index

    // 關鍵
    if sg.elem != nil {
        // 這裏是根據 elemtype.size 複製內存
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }

    // 一些處理

    // 從新設置 goroutine 的狀態,喚醒它
    goready(gp, 4)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src is on our stack, dst is a slot on another stack.

    // Once we read sg.elem out of sg, it will no longer
    // be updated if the destination's stack gets copied (shrunk).
    // So make sure that no preemption points can happen between read & use.
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

// memmove copies n bytes from "from" to "to".
// in memmove_*.s
//go:noescape
func memmove(to, from unsafe.Pointer, n uintptr)

select

在看 chanrecv()方法 時,發現了一個 block 參數,表明操做是否阻塞。通常狀況下,channel 都是阻塞的(不考慮buffer),那何時非阻塞呢?

第一個想到的就是 select, 在寫了default case的時候,其餘的channel是非阻塞的。

還有一個可能不經常使用,就是 channel 的反射 value, 能夠是非阻塞的,這個方法是public的,咱們先看下簡單的。

func (v Value) TryRecv() (x Value, ok bool)
func (v Value) TrySend(x Value) bool

select 就複雜一點點,首先在源碼中發現一段註釋:

// compiler implements
//
//    select {
//    case c <- v:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selectnbsend(c, v) {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
}

// compiler implements
//
//    select {
//    case v = <-c:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selectnbrecv(&v, c) {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
    selected, _ = chanrecv(t, c, elem, false)
    return
}

若是是一個 case + default 的模式,那麼編譯器就調用以上方法來實現。

若是是多個 case + default 的模式呢?select 在runtime究竟是如何執行的?寫個簡單的select編譯一下。

package main

func main() {
    var ch chan int
    select {
    case <-ch:
    case ch <- 1:
    default:
    }
}

go tool compile -S -l -N test.go > test.s 結果中找一下關鍵字,例如:

0x008c 00140 (test.go:5)    CALL    runtime.newselect(SB)
0x00ad 00173 (test.go:6)    CALL    runtime.selectrecv(SB)
0x00ec 00236 (test.go:7)    CALL    runtime.selectsend(SB)
0x0107 00263 (test.go:8)    CALL    runtime.selectdefault(SB)
0x0122 00290 (test.go:5)    CALL    runtime.selectgo(SB)

這裏 selectgo 是實際運行的方法,找一下,注意註釋。先檢查channel是否能操做,若是不能操做,就走 default 邏輯。

loop:
    // pass 1 - look for something already waiting
    var dfl *scase
    var cas *scase
    for i := 0; i < int(sel.ncase); i++ {
        cas = &scases[pollorder[i]]
        c = cas.c

        switch cas.kind {
        // 接受數據
        case caseRecv:
            sg = c.sendq.dequeue()
            // 若是有 sender 在等待
            if sg != nil {
                goto recv
            }
            // 當前buffer中有數據
            if c.qcount > 0 {
                goto bufrecv
            }
            // 關閉的channel
            if c.closed != 0 {
                goto rclose
            }
        case caseSend:
            if raceenabled {
                racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)
            }
            // 關閉
            if c.closed != 0 {
                goto sclose
            }
            // 有 receiver 正在等待
            sg = c.recvq.dequeue()
            if sg != nil {
                goto send
            }
            // 有空間接受
            if c.qcount < c.dataqsiz {
                goto bufsend
            }
        // 走default
        case caseDefault:
            dfl = cas
        }
    }

    if dfl != nil {
        selunlock(scases, lockorder)
        cas = dfl
        goto retc
    }
相關文章
相關標籤/搜索