因爲掘金不支持 mermaid 流程圖,因此想看完整的版本,能夠到個人我的博客中查看git
golang 中 chan 的源碼在 src/runtime/chan.go 文件中,hchan
則爲 chan 的結構體github
hchan:golang
type hchan struct {
qcount uint // 當前緩存數據的總量
dataqsiz uint // 緩存數據的容量
buf unsafe.Pointer // 緩存數據,爲一個循環數組,容量大小爲 dataqsiz,當前大小爲 qcount
elemsize uint16 // 數據類型的大小,好比 int 爲 4
closed uint32 // 標記是否關閉
elemtype *_type // 數據的類型
sendx uint // 發送隊列 sendq 的長度
recvx uint // 接收隊列 recvq 的長度
recvq waitq // 阻塞的接收 goroutine 的隊列
sendq waitq // 阻塞的發送 goroutine 的隊列
lock mutex // 鎖,用於併發控制隊列操做
}
複製代碼
waitq:面試
type waitq struct {
first *sudog
last *sudog
}
複製代碼
waitq 爲雙向鏈表,sudog 表明一個封裝的 goroutine,其參數 g 爲 goroutine 實例結,構以下圖:數組
當 goroutine 遇到阻塞或等待的場景,會被打包成 sudog。一個 goroutine 可能被打包爲多個 sudog,分別掛在不一樣的等待隊列上.緩存
在 go 中,經過以下代碼建立 chan安全
c := make(chan int, 4)
複製代碼
以上代碼,對應的是源碼:bash
func makechan(t *chantype, size int) *hchan 複製代碼
邏輯流程以下:數據結構
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 安全檢查,數據項大小不超過 16K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 獲取要分配的內存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// size 爲 0 的狀況,分配 hchan 結構體大小的內存,64位系統爲 96 Byte.
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// 數據項不爲指針類型,調用 mallocgc 一次性分配內存大小,hchan 結構體大小 + 數據總量大小
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 數據項爲指針類型,hchan 和 buf 分開分配內存,GC 中指針類型判斷 reachable and unreadchable.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// chan 賦值屬性, 數據項大小、數據項類型、緩存數據的容量
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
複製代碼
在 go 中,寫入 chan 的代碼以下:併發
v := 1
c := make(chan int)
c <- v
複製代碼
讀取 chan 的代碼以下:
var v int
c := make(chan int)
c -> v
複製代碼
c <- v
操做對應的源碼爲 runtime 中的
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 複製代碼
而 c -> v
操做對應源碼爲 runtime 中的
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 複製代碼
其中 c 爲 chansend 的 c, v 的地址爲 chansend 的 ep.
邏輯流程以下:
因爲發送和接收的邏輯都是差很少的,因此這裏就直接放上發送的邏輯代碼來分析就行了
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 校驗
if c == nil {
if !block {
return false
}
// 參數異常,block == true, 進行阻塞 goroutine.
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖,併發讀寫控制
lock(&c.lock)
// 查看 chan 是否關閉
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 從等待接收列隊 recvq 中試圖獲取獲取封裝的 goroutine sudog.
if sg := c.recvq.dequeue(); sg != nil {
// 找到等待接收 chan 的 goroutine sudog,直接發送 value 給接收者,並經過 goready() 喚醒接受者 goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 查看查看緩存空間是否 buf 是否還有剩餘
if c.qcount < c.dataqsiz {
// 將數據移動到 qp 中並放入 chan 緩存,sendx++
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// chan 若是爲非阻塞,unlock 後直接返回
if !block {
unlock(&c.lock)
return false
}
// 將當前 goroutine 封裝 sudog,並放入到等待發送隊列 sendq 中
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 阻塞當前 goroutine,等待被接受者 chanrecv() 的喚醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// KeepAlive 方法,因爲 GC 的緣故,而調用
KeepAlive(ep)
// goroutine 被喚醒,重置 gorotuine 狀態 和 sudog
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
複製代碼
代碼的部分詳解:
gopark
:M(工做線程) 會保存 goroutine 的上下文,而調度器會讓當前工做線程線程 M 綁定執行其餘的 goroutine.
KeepAlive(ep)
: 因爲 GC 的機制,當 ep 再也不被上下文引用的時候,GC 會主動回收 eq,致使 buf 被回收,因此調用 KeepAlive,告訴 GC 不須要對 eq 變量進行內存回收,具體能夠查看 runtime.SetFinalizer 方法部分有詳細介紹.
喚醒
:goroutine 會在 chanrecv 這個 chan 接收接收函數中,從 hchan.sendq 被取出,執行 goready(), 通知調度器去喚醒,而後放入 P(邏輯處理器) 的執行等待隊列中,等待被下一次調用.
send()/recv()
: 經過 memmove()
的方式從發送方拷貝
buf 到接收方.
下面咱們經過一個使用channel作生產/消費的模型來試圖分解一下 chan 的步驟:
func main(){
//初始化任務隊列 channel
ch := make(chan Data, 4)
//生產者往channel丟數據
for _, task := range {
ch <- task
}
//初始化消費者
for i := 0; i< ConsumerNum; i++ {
go consumer(ch)
}
...
}
// 消費者
func consumer(ch chan Data){
for {
//收取任務並處理
data := <- ch
process(data)
}
}
複製代碼
從 main 函數開始,golang 就會開啓一個 goroutine 來執行代碼,咱們能夠將其記做生產者 G'p, 代碼中 go consumer
標記 consumer 函數也開啓一個 goroutine 來進行,咱們記其爲消費者 G'c.
初始化任務隊列channel 此時會在堆區域分配一塊內存,用於存儲 hchan 結構體和 buf 的緩存數據。hchan.buf指向一個大小爲4的數組,而且hchan.sendx、hchan.recvx置0,hchan.dataqsiz置4。
生產者往channel丟數據 G'p 往 ch 發送數據的時候,會執行 lock(&hchan.lock) 對 buf 加鎖,把要發送的數據拷貝到 buf 裏,hchan.sendx++,以後 unlock(hchan.lock) 釋放鎖。
消費者執行消費行爲 G'c 從 ch 中獲取數據的時候,會執行 lock(&hchan.lock) 對 buf 加鎖,將 buf 裏面的一條數據拷貝到接收變量 data 對應的空間中,hchan.recvx++,以後釋放鎖。
在 go 中,關閉 chan 的代碼以下:
ch := make(chan int ,10)
close(ch)
複製代碼
close(ch)
對應的runtime的函數:
func closechan(c *hchan)
複製代碼
邏輯流程以下:
func closechan(c *hchan) {
// 檢查,chan 是否爲空
if c == nil {
panic(plainError("close of nil channel"))
}
// 加鎖,防止資源競爭
lock(&c.lock)
// chan 若是已關閉,則 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 置 hchan.close = 1, 標記已關閉
c.closed = 1
var glist gList
// 釋放recvq的全部等待接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放sendq的全部等待發送者
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 解鎖,unlock
unlock(&c.lock)
// 喚醒recvq和sendq的全部goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
複製代碼
close 的主要做用是用於喚醒全部監測 chann 的 goroutine,可是要注意的是:
chan 如何處理併發讀寫問題 hchan 結構體中經過鎖 lock mutex
參數進行對公共緩存資源 buf 的控制達到併發讀寫的 race 問題.
若是往 chan 發送數據,size 滿了,或者往 chan 獲取數據,buf 空。這會致使阻塞,此時runtime的行爲是怎麼樣的呢? 因爲二者邏輯同樣,咱們就直接講往 chan 發送數據,size 滿了的狀況. 若是往 chan 發送數據,size 滿了,此時 goroutine 和 buf 會被打包成 sudog,經過 gopark 將 goroutine 狀態置爲等待, 同時把 sudog 放入 hchan.sendq 等待發送隊列中,等待接收者接收並調用 goready() 從新調度 goroutine. 此時 goroutine 被阻塞後,M(工做線程) 會與 goroutine 解綁,經過 P(邏輯處理器) 從新進行調度,M 與新的 goroutine 從新綁定執行.
仍是有一部分以目前的知識仍是沒法看懂,之後慢慢積累後再回來補坑,或大佬們能夠幫我指出一下,謝謝.