Go Chanel 使用與原理 三

上篇已經記錄到發送數據到 chanel 的三種狀況的代碼邏輯,接下來是從 chanel 接收數據的邏輯。golang

chanrecv 方法

和 chansend 方法十分相似ui

若是 hchan 爲空

if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

若是 chenel 已經關閉

lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

接收數據的三種狀況

  • 若是 hchan 的 sendq 隊列中有阻塞的 goroutine,buf 已滿
  • 若是 hchan.buf 還有數據未取出
  • 若是 hchan.buf 爲空

下面分別截取三種狀況的代碼段。this

若是 hchan 的 sendq 隊列中有阻塞的 goroutine,buf 已滿

if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

若是 hchan.buf 還有數據未取出

if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

若是 hchan.buf 爲空

··· // 上面條件都不知足,則只剩一種狀況:hchan.buf 爲空
    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

阻塞與喚醒

假設有一個 goroutine sender,和一個 goroutine reciever,若是 reciever 執行 chanrecv 方法的時候,buf 已經爲空了,從上面的代碼最後一行知道,goparkunlock 方法使 reciever 阻塞,那麼 sender 寫數據進 chanel,reciever 又如何被 sender 喚醒呢?code

reciever 休眠後, sender 來了,sender 執行到如下代碼處。sender 從 recvq 隊列 彈出 reciever,而後執行 send 方法。對象

if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

send 方法最後一行執行 goready 方法將 reciever 喚醒。隊列

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            // Pretend we go through the buffer, even though
            // we copy directly. Note that we need to increment
            // the head/tail locations only when raceenabled.
            qp := chanbuf(c, c.recvx)
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

reciever 喚醒後,繼續執行 chanrecv 方法剩下的語句:ip

// someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed

closechan 方法

close(ch) 對應的執行方法即爲 closechan 方法。主要有如下步驟:ci

  • 關閉 nil chanel ,返回 panic:"close of nil channel"
  • 關閉 closed chanel,返回 panic:"close of closed channel"
  • 將 hchan.closed 的值設爲 1
  • 釋放全部 reader sudog 對象,釋放的同時將 sudog 中的 g 插入 glist 鏈表
  • 釋放全部 writer sudog 對象,釋放的同時將 sudog 中的 g 插入 glist 鏈表
  • 遍歷 glist 鏈表,喚醒 glist 中的全部 goroutine
func closechan(c *hchan) {
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

    c.closed = 1

    var glist *g

    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        gp.schedlink.set(glist)  // <------ a 語句
        glist = gp // <------ b 語句
    }

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        gp.schedlink.set(glist) // <------ c 語句
        glist = gp // <------ d 語句
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for glist != nil {
        gp := glist
        glist = glist.schedlink.ptr() // <------ e 語句
        gp.schedlink = 0
        goready(gp, 3)
    }
}


//go:nosplit
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) } // 得到 gp 的地址 

//go:nosplit
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) } // 設置 gp 的地址

上面代碼有個注意的點: schedlink 是 G 的一個屬性,用於指向下一個 G 。rem

參考文章
http://legendtkl.com/2017/08/...
相關文章
相關標籤/搜索