Golang在併發編程上有兩大利器,分別是channel
和goroutine
,這篇文章咱們先聊聊channel
。熟悉Golang的人都知道一句名言:「使用通訊來共享內存,而不是經過共享內存來通訊」。這句話有兩層意思,Go語言確實在sync
包中提供了傳統的鎖機制,但更推薦使用channel
來解決併發問題。這篇文章會先從channel
的用法、channel
的原理兩部分對channel
作一個較爲深刻的探究。html
從字面上看,channel
的意思大概就是管道的意思。channel
是一種go協程用以接收或發送消息的安全的消息隊列,channel
就像兩個go協程之間的導管,來實現各類資源的同步。能夠用下圖示意:
git
channel
的用法很簡單:github
func main() {
ch := make(chan int, 1) // 建立一個類型爲int,緩衝區大小爲1的channel
ch <- 2 // 將2發送到ch
n, ok := <- ch // n接收從ch發出的值
if ok {
fmt.Println(n) // 2
}
close(ch) // 關閉channel
}
複製代碼
使用channel
時有幾個注意點:golang
nil
channel
發送消息,會一直阻塞;channel
發送消息,會引起運行時恐慌(panic)
;channel
關閉後不能夠繼續向channel
發送消息,但能夠繼續從channel
接收消息;channel
關閉而且緩衝區爲空時,繼續從從channel
接收消息會獲得一個對應類型的零值。Unbuffered channels
是指緩衝區大小爲0的channel
,這種channel
的接收者會阻塞直至接收到消息,發送者會阻塞直至接收者接收到消息,這種機制能夠用於兩個goroutine
進行狀態同步;Buffered channels
擁有緩衝區,發送者在將消息發送到緩衝區以前是阻塞的,當緩衝區已滿時,發送者會阻塞;當緩衝區爲空時,接收者會阻塞。web
引用The Nature Of Channels In Go中的兩張圖來講明Unbuffered channels
與Buffered channels
, 很是形象,讀者可自行體會一下:shell
Unbuffered channels
:
編程
Buffered channels
:
數組
channel
支持 for range
的方式進行遍歷:安全
package main
import "fmt"
func main() {
ci := make(chan int, 5)
for i := 1; i <= 5; i++ {
ci <- i
}
close(ci)
for i := range ci {
fmt.Println(i)
}
}
複製代碼
值得注意的是,在遍歷時,若是channel
沒有關閉,那麼會一直等待下去,出現 deadlock
的錯誤;若是在遍歷時channel
已經關閉,那麼在遍歷完數據後自動退出遍歷。也就是說,for range
的遍歷方式時阻塞型的遍歷方式。數據結構
select
能夠處理非阻塞式消息發送、接收及多路選擇。
package main
import "fmt"
func main() {
ci := make(chan int, 2)
for i := 1; i <= 2; i++ {
ci <- i
}
close(ci)
cs := make(chan string, 2)
cs <- "hi"
cs <- "golang"
close(cs)
ciClosed, csClosed := false, false
for {
if ciClosed && csClosed {
return
}
select {
case i, ok := <-ci:
if ok {
fmt.Println(i)
} else {
ciClosed = true
fmt.Println("ci closed")
}
case s, ok := <-cs:
if ok {
fmt.Println(s)
} else {
csClosed = true
fmt.Println("cs closed")
}
default:
fmt.Println("waiting...")
}
}
}
複製代碼
select
中有case
代碼塊,用於channel
發送或接收消息,任意一個case
代碼塊準備好時,執行其對應內容;多個case
代碼塊準備好時,隨機選擇一個case
代碼塊並執行;全部case
代碼塊都沒有準備好,則等待;還能夠有一個default
代碼塊,全部case
代碼塊都沒有準備好時執行default
代碼塊。
先貼一下channel
的源碼地址,讀者能夠對照來看。
先看channel
的結構體:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
// channel中元素大小
elemsize uint16
// 是否已關閉
closed uint32
// channel中元素類型
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
複製代碼
channel
的緩衝區實際上是一個環形隊列,qcount
表示隊列中元素的數量,dataqsiz
表示環形隊列的總大小,buf
表示一個指向循環數組的指針;sendx
和recvx
分別用來標識當前發送和接收的元素在循環隊列中的位置;recvq
和sendq
都是一個列表,分別用於存儲當前處於等待接收和等待發送的Goroutine
。
再看一下waitq
的數據結構:
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// 當前goroutine
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
複製代碼
其中sudog
表示處於等待列表中的Goroutine
封裝,包含了一些上下文信息,first
和last
分別指向等待列表的首位的Goroutine
。
在分析channel
的原理以前,咱們先使用go tool
分析如下代碼,看看channel
的各類操做在底層調用了什麼運行時方法:
ch := make(chan int, 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
fmt.Println(n)
}
close(ch)
複製代碼
編譯
go build test.go
go tool objdump -s "main\.main" test | grep CALL
複製代碼
把CALL
過濾出來:
test.go:118 0x1092f55 e81612f7ff CALL runtime.makechan(SB)
test.go:119 0x1092f74 e82714f7ff CALL runtime.chansend1(SB)
test.go:120 0x1092f8e e80d14f7ff CALL runtime.chansend1(SB)
test.go:121 0x1092fa5 e8361ff7ff CALL runtime.chanrecv1(SB)
test.go:122 0x1092fbd e85e1ff7ff CALL runtime.chanrecv2(SB)
test.go:126 0x1092fd7 e8841cf7ff CALL runtime.closechan(SB)
test.go:124 0x1092fea e8b156f7ff CALL runtime.convT64(SB)
print.go:275 0x1093041 e88a98ffff CALL fmt.Fprintln(SB)
test.go:47 0x1093055 e896c1fbff CALL runtime.morestack_noctxt(SB)
複製代碼
從上面的編譯分析能夠看出在建立channel
時調用了運行時方法makechan
:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 計算緩衝區須要的總大小(緩衝區大小*元素大小),並判斷是否超出最大可分配範圍
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 緩衝區大小爲0,或者channel中元素大小爲0(struct{}{})時,只需分配channel必需的空間便可
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// 經過位運算知道channel中元素類型不是指針,分配一片連續內存空間,所需空間等於 緩衝區數組空間 + hchan必需的空間。
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素中包含指針,爲hchan和緩衝區分別分配空間
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
複製代碼
makechan
的代碼邏輯仍是比較簡單的,首先校驗元素類型和緩衝區空間大小,而後建立hchan
,分配所需空間。這裏有三種狀況:當緩衝區大小爲0,或者channel
中元素大小爲0時,只需分配channel
必需的空間便可;當channel
元素類型不是指針時,則只須要爲hchan
和緩衝區分配一片連續內存空間,空間大小爲緩衝區數組空間加上hchan
必需的空間;默認狀況,緩衝區包含指針,則須要爲hchan
和緩衝區分別分配內存。最後更新hchan
的其餘字段,包括elemsize
,elemtype
,dataqsiz
。
channel
的發送操做調用了運行時方法chansend1
, 在chansend1
內部又調用了chansend
,直接來看chansend
的實現:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// channel爲nil
if c == nil {
// 若是是非阻塞,直接返回發送不成功
if !block {
return false
}
// 不然,當前Goroutine阻塞掛起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 對於非阻塞且channel未關閉,若是無緩衝區且沒有等待接收的Goroutine,或者有緩衝區且緩衝區已滿,那麼都直接返回發送不成功
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// 若是channel已關閉
if c.closed != 0 {
// 解鎖,直接panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 除了以上狀況,當channel未關閉時,就有如下幾種狀況:
// 一、當存在等待接收的Goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 那麼直接把正在發送的值發送給等待接收的Goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 二、當緩衝區未滿時
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 獲取指向緩衝區數組中位於sendx位置的元素的指針
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 將當前發送的值拷貝到緩衝區
typedmemmove(c.elemtype, qp, ep)
// sendx索引加一
c.sendx++
// 由於是循環隊列,sendx等於隊列長度時置爲0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 隊列中元素總數加一,並解鎖,返回發送成功
c.qcount++
unlock(&c.lock)
return true
}
// 三、當既沒有等待接收的Goroutine,緩衝區也沒有剩餘空間,若是是非阻塞的發送,那麼直接解鎖,返回發送失敗
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
// 四、若是是阻塞發送,那麼就將當前的Goroutine打包成一個sudog結構體,並加入到channel的發送隊列sendq裏
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 調用goparkunlock將當前Goroutine設置爲等待狀態並解鎖,進入休眠等待被喚醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 被喚醒以後執行清理工做並釋放sudog結構體
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
複製代碼
chansend
的執行邏輯,上面的註釋已經寫得很清楚了,咱們再來梳理一下。對於非阻塞發送或者channel
已經關閉條件下的幾種發送失敗的狀況,處理邏輯比較簡單,讀者能夠對照註釋來看;這裏咱們重點關注channel
未關閉時幾種常規狀況:
若是等待接收的隊列recvq
中存在Goroutine
,那麼直接把正在發送的值發送給等待接收的Goroutine
。示意圖以下:
send
方法:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
// 將發送的值直接拷貝到接收值(好比v = <-ch 中的v)的內存地址
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 獲取等待接收數據的Goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒以前等待接收數據的Goroutine
goready(gp, skip+1)
}
複製代碼
這裏有必要說明一下Goroutine
在調度過程當中的幾種狀態:
_Gidle = iota // goroutine剛剛分配,尚未初始化
_Grunnable // goroutine處於運行隊列中, 尚未運行,沒有本身的棧
_Grunning // goroutine在運行中,擁有本身的棧,被分配了M(線程)和P(調度上下文)
_Gsyscall // goroutine在執行系統調用
_Gwaiting // goroutine被阻塞
_Gdead // goroutine沒有被使用,多是剛剛退出,或者正在初始化中
_Gcopystack // 表示g當前的棧正在被移除並分配新棧
複製代碼
當調用goready
時,將Goroutine
的狀態從 _Gwaiting
置爲_Grunnable
,等待下一次調度再次執行。
當緩衝區未滿時,找到sendx
所指向的緩衝區數組的位置,將正在發送的值拷貝到該位置,並增長sendx
索引以及釋放鎖,示意圖以下:
若是是阻塞發送,那麼就將當前的Goroutine
打包成一個sudog
結構體,並加入到channel
的發送隊列sendq
裏。示意圖以下:
以後則調用goparkunlock
將當前Goroutine
設置爲_Gwaiting
狀態並解鎖,進入阻塞狀態等待被喚醒(調用goready
);若是被調度器喚醒,執行清理工做並最終釋放對應的sudog
結構體。
channel
的接收有兩種形式:
<-ch
n, ok := <-ch
複製代碼
這兩種方式分別調用運行時方法chanrecv1
和chanrecv2
:
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
複製代碼
這兩個方法最終都會調用chanrecv
方法:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// channel爲nil
if c == nil {
// 非阻塞直接返回(false, false)
if !block {
return
}
// 阻塞接收,則當前Goroutine阻塞掛起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 非阻塞模式,對於如下兩種狀況:
// 一、無緩衝區且等待發送隊列也爲空
// 二、有緩衝區但緩衝區數組爲空且channel未關閉
// 這兩種狀況都是接收失敗, 直接返回(false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// 若是channel已關閉,而且緩衝區無元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// 有等待接收的變量(即 v = <-ch中的v)
if ep != nil {
//根據channel元素的類型清理ep對應地址的內存,即ep接收了channel元素類型的零值
typedmemclr(c.elemtype, ep)
}
// 返回(true, false),即接收到值,但不是從channel中接收的有效值
return true, false
}
// 除了以上很是規狀況,還有有如下幾種常見狀況:
// 一、等待發送的隊列sendq裏存在Goroutine,那麼有兩種狀況:當前channel無緩衝區,或者當前channel已滿
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 若是無緩衝區,那麼直接從sender接收數據;不然,從buf隊列的頭部接收數據,並把sender的數據加到buf隊列的尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// 接收成功
return true, true
}
// 二、緩衝區buf中有元素
if c.qcount > 0 {
// Receive directly from queue
// 從recvx指向的位置獲取元素
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 將從buf中取出的元素拷貝到當前協程
typedmemmove(c.elemtype, ep, qp)
}
// 同時將取出的數據所在的內存清空
typedmemclr(c.elemtype, qp)
// 接收索引 +1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf元素總數 -1
c.qcount--
// 解鎖,返回接收成功
unlock(&c.lock)
return true, true
}
// 三、非阻塞模式,且沒有數據能夠接受
if !block {
// 解鎖,直接返回接收失敗
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
// 四、阻塞模式,獲取當前Goroutine,打包一個sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 加入到channel的等待接收隊列recvq中
c.recvq.enqueue(mysg)
// 掛起當前Goroutine,設置爲_Gwaiting狀態並解鎖,進入休眠等待被喚醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
// 被喚醒以後執行清理工做並釋放sudog結構體
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
複製代碼
chanrecv
方法的處理邏輯與chansend
很是相似,咱們這裏仍然只分析幾種常見狀況,其餘狀況上述註釋也解釋得比較清楚了,讀者可對照相應代碼和註釋查看。
若是等待發送的隊列sendq
裏存在掛起的Goroutine
,那麼有兩種狀況:當前channel
無緩衝區,或者當前channel
已滿。從sendq
中取出最早阻塞的Goroutine
,而後調用recv
方法:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
// 無緩衝區
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 緩衝區已滿
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 將等待發送數據的Goroutine的狀態從_Gwaiting置爲 _Grunnable,等待下一次調度。
goready(gp, skip+1)
}
複製代碼
一、若是無緩衝區,那麼直接從sender
接收數據;
二、若是緩衝區已滿,從buf
隊列的頭部接收數據,並把sender
的數據加到buf隊列的尾部;
三、最後調用goready
函數將等待發送數據的Goroutine
的狀態從_Gwaiting
置爲_Grunnable
,等待下一次調度。
下圖示意了當緩衝區已滿時的處理過程:
若是緩衝區buf
中還有元素,那麼就走正常的接收,將從buf
中取出的元素拷貝到當前協程的接收數據目標內存地址中。值得注意的是,即便此時channel
已經關閉,仍然能夠正常地從緩衝區buf
中接收數據。這種狀況比較簡單,示意圖就不畫了。
若是是阻塞模式,且當前沒有數據能夠接收,那麼就須要將當前Goroutine
打包成一個sudog
加入到channel
的等待接收隊列recvq
中,將當前Goroutine
的狀態置爲_Gwaiting
,等待喚醒。示意圖以下:
若是以後當前Goroutine
被調度器喚醒,則執行清理工做並最終釋放對應的sudog
結構體。
說完收發數據,最後就是關閉channel
了:
func closechan(c *hchan) {
// nil channel檢查
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 已關閉的channel不能再次關閉
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 設置關閉狀態爲1
c.closed = 1
var glist glist
// release all readers
// 遍歷recvq,清除sudog的數據,取出其中處於_Gwaiting狀態的Goroutine加入到glist中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 遍歷sendq,清除sudog的數據,取出其中處於_Gwaiting狀態的Goroutine加入到glist中
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
將glist中全部Goroutine的狀態置爲_Grunnable,等待調度器進行調度
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
複製代碼
一、關閉channel
時,會遍歷recvq
和sendq
(實際只有recvq
或者sendq
),取出sudog
中掛起的Goroutine
加入到glist
列表中,並清除sudog
上的一些信息和狀態。
二、而後遍歷glist
列表,爲每一個Goroutine
調用goready
函數,將全部Goroutine
置爲_Grunnable
狀態,等待調度。
三、當Goroutine
被喚醒以後,會繼續執行chansend
和chanrecv
函數中當前Goroutine
被喚醒後的剩餘邏輯。
總結一下,本文先經過channel
的基本用法對channel
的定義、用法細節進行了介紹,而後對channel
的基本操做包括髮送、接收和關閉進行了較爲詳細和深刻的探究。細心的讀者應該也會發現channel
的操做跟協程的調度關係密切,不過這篇文章關於goroutine
的調度只是一筆帶過,後續時機成熟會對這部份內容再做探究。