「本文已參與好文召集令活動,點擊查看:後端、大前端雙賽道投稿,2萬元獎池等你挑戰!」前端
從一個nil的chan接收數據會deadlockweb
func main() {
var a chan int
fmt.Println(<-a)
}
fatal error: all goroutines are asleep - deadlock!
複製代碼
向一個nil的chan發送數據會deadlock後端
func main() {
var a chan int
a <- 1
}
fatal error: all goroutines are asleep - deadlock!
複製代碼
從一個已經關閉的chan獲取數據,獲得對應的零值數組
func main() {
var a chan int
a = make(chan int, 1)
close(a)
v, ok := <-a
fmt.Println(v, ok) //0,false
}
複製代碼
向一個已經關閉的chan發送數據會panic安全
func main() {
var a chan int
a = make(chan int, 1)
close(a)
a <- 1
}
panic: send on closed channel
複製代碼
把一個已經關閉的chan再次關閉會panicmarkdown
func main() {
var a chan int
a = make(chan int, 1)
close(a)
close(a)
}
panic: close of closed channel
複製代碼
沒有buffer的chan,要提早作好接收的準備,不然會deadlocksvg
func main() {
var a chan int
a = make(chan int)
a <- 1
}
fatal error: all goroutines are asleep - deadlock!
複製代碼
有buffer的chan,在buffer滿了以後,再發送會deadlock源碼分析
func main() {
var a chan int
a = make(chan int, 1)
a <- 1 //不會報錯
a <- 2 //報錯
}
fatal error: all goroutines are asleep - deadlock!
複製代碼
nil的chan,在select和default組合下,不會報錯post
func main() {
var a chan int
select {
case <-a: //nil的chan並不會報錯
default: //會走到default
}
}
複製代碼
range一個chan,記得close。不然會deadlockui
func main() {
var a chan int
a = make(chan int)
go func() {
a <- 1
close(a) //若是沒close,那麼就會deadlock
}()
for v := range a {
fmt.Println(v)
}
time.Sleep(time.Second)
}
複製代碼
從make chan開始
func makechan(t *chantype, size int) *hchan {
elem := t.elem
省略...
switch {
case mem == 0: //無緩衝的chan
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: //元素不含指針
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: //默認
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
複製代碼
hchan的結構
type hchan struct {
qcount uint // 環形隊列的長度
dataqsiz uint // 環形隊列的長度、緩衝區的大小
buf unsafe.Pointer // 環形隊列指針
elemsize uint16 // 每一個元素的大小
closed uint32 // 通道是否關閉,1關閉 0打開
elemtype *_type // 元素類型
sendx uint // 已發送元素在循環數組中的索引
recvx uint // 已接收元素在循環數組中的索引
recvq waitq // 等待接收的goroutine隊列
sendq waitq // 等待發送的goroutine隊列
lock mutex //互斥鎖,數據的出入都是要鎖保護的
}
複製代碼
發一個數據
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc()) // block=true
}
複製代碼
像c<-x這樣的語句會被編譯成chansend1
,chansend1
調用chansend
。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block { //block=false的狀況 通常在select的時候
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) //向nil的chan發送會發生阻塞
throw("unreachable")
}
if !block && c.closed == 0 && full(c) { //select的時候,chan沒關閉,且(buffer也滿了或接收方未準備好接收)
return false
}
...
lock(&c.lock) //上鎖安全保護
if c.closed != 0 { //已經關閉的chan
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil { //正好有等待取的goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz { //緩衝區沒滿
qp := chanbuf(c, c.sendx) //獲取當前sendx的索引
...
typedmemmove(c.elemtype, qp, ep) //新元素的copy進去
c.sendx++ // 索引+1
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++ //數量加+1
unlock(&c.lock)
return true
}
// 接下來都是緩衝區滿的狀況
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog() //打包suog
c.sendq.enqueue(mysg) //推入發送隊列
...
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) //讓發送的goroutine進入睡眠,等待被喚醒
//如下是恢復時乾的事
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg) //釋放sudog
return true
}
複製代碼
sudog
結構接收一個數據
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
複製代碼
像<-c這樣的語句會被編譯成chanrecv1
,chanrecv1
調用chanrecv
。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil {
if !block { //不阻塞的話,直接返回
return
}
//從一個nil的chan接收數據會阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
lock(&c.lock) //上鎖
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil { //正好有發送者
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) //(帶緩衝和不帶緩衝的接收)
return true, true
}
if c.qcount > 0 { // buffer有數據
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //直接取數據 copy方式
}
typedmemclr(c.elemtype, qp) //清理取掉的位置
c.recvx++ //索引加1
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//如下緩衝區無數據
if !block { //不須要阻塞的話,直接返回
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog() //打包成sudog
...
c.recvq.enqueue(mysg) //推入recvq隊列
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) //讓出cpu,等待下次調度
//被喚醒後
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg) //解包sudog
return true, !closed
}
複製代碼
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { //無緩衝區
...
if ep != nil {
// 從發送者那裏copy數據
recvDirect(c.elemtype, sg, ep)
}
} else {//緩衝區必定滿了
...
// copy data
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
...
goready(gp, skip+1) //喚醒準備發送的goroutine
}
複製代碼
關閉一個通道
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel")) // close 一個nil的chan panic
}
lock(&c.lock) //上鎖
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) //已經關閉的chan 再次close會panic
}
...
c.closed = 1 //關閉chan
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
複製代碼