Golang 源碼導讀 —— chann

因爲掘金不支持 mermaid 流程圖,因此想看完整的版本,能夠到個人我的博客中查看git

01.chan 的數據結構:

golang 中 chan 的源碼在 src/runtime/chan.go 文件中,hchan 則爲 chan 的結構體github

hchan:golang

type hchan struct {
    qcount   uint // 當前緩存數據的總量 
    dataqsiz uint // 緩存數據的容量 
    buf      unsafe.Pointer // 緩存數據,爲一個循環數組,容量大小爲 dataqsiz,當前大小爲 qcount
    elemsize uint16 // 數據類型的大小,好比 int 爲 4
    closed   uint32 // 標記是否關閉
    elemtype *_type // 數據的類型
    sendx    uint  // 發送隊列 sendq 的長度
    recvx    uint  // 接收隊列 recvq 的長度
    recvq    waitq // 阻塞的接收 goroutine 的隊列
    sendq    waitq // 阻塞的發送 goroutine 的隊列
    lock mutex     // 鎖,用於併發控制隊列操做
}
複製代碼

waitq:面試

type waitq struct {
    first *sudog
    last  *sudog
}
複製代碼

waitq 爲雙向鏈表,sudog 表明一個封裝的 goroutine,其參數 g 爲 goroutine 實例結,構以下圖:數組

當 goroutine 遇到阻塞或等待的場景,會被打包成 sudog。一個 goroutine 可能被打包爲多個 sudog,分別掛在不一樣的等待隊列上.緩存


02. 新建 chan:

在 go 中,經過以下代碼建立 chan安全

c := make(chan int, 4)
複製代碼

以上代碼,對應的是源碼:bash

func makechan(t *chantype, size int) *hchan 複製代碼

邏輯流程以下:數據結構

func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	// 安全檢查,數據項大小不超過 16K
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	// 獲取要分配的內存
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}
	var c *hchan
	switch {
	case mem == 0:
		// size 爲 0 的狀況,分配 hchan 結構體大小的內存,64位系統爲 96 Byte.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.kind&kindNoPointers != 0:
		// 數據項不爲指針類型,調用 mallocgc 一次性分配內存大小,hchan 結構體大小 + 數據總量大小
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 數據項爲指針類型,hchan 和 buf 分開分配內存,GC 中指針類型判斷 reachable and unreadchable.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	// chan 賦值屬性, 數據項大小、數據項類型、緩存數據的容量
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	return c
}
複製代碼

03.讀寫chan

在 go 中,寫入 chan 的代碼以下:併發

v := 1
c := make(chan int)
c <- v
複製代碼

讀取 chan 的代碼以下:

var v int
c := make(chan int)
c -> v
複製代碼

c <- v 操做對應的源碼爲 runtime 中的

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 複製代碼

c -> v 操做對應源碼爲 runtime 中的

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 複製代碼

其中 c 爲 chansend 的 c, v 的地址爲 chansend 的 ep.

邏輯流程以下:

因爲發送和接收的邏輯都是差很少的,因此這裏就直接放上發送的邏輯代碼來分析就行了

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 校驗
    if c == nil {
        if !block {
            return false
        }
        // 參數異常,block == true, 進行阻塞 goroutine.
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }
    
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    
    // 加鎖,併發讀寫控制
    lock(&c.lock)

    // 查看 chan 是否關閉
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    
    // 從等待接收列隊 recvq 中試圖獲取獲取封裝的 goroutine sudog.
    if sg := c.recvq.dequeue(); sg != nil {
        // 找到等待接收 chan 的 goroutine sudog,直接發送 value 給接收者,並經過 goready() 喚醒接受者 goroutine
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 查看查看緩存空間是否 buf 是否還有剩餘
    if c.qcount < c.dataqsiz {
        // 將數據移動到 qp 中並放入 chan 緩存,sendx++
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    
    // chan 若是爲非阻塞,unlock 後直接返回
    if !block {
        unlock(&c.lock)
        return false
    }
    
    // 將當前 goroutine 封裝 sudog,並放入到等待發送隊列 sendq 中
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    // 阻塞當前 goroutine,等待被接受者 chanrecv() 的喚醒
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
    
    // KeepAlive 方法,因爲 GC 的緣故,而調用
    KeepAlive(ep)

    // goroutine 被喚醒,重置 gorotuine 狀態 和 sudog
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}
複製代碼

代碼的部分詳解:

gopark:M(工做線程) 會保存 goroutine 的上下文,而調度器會讓當前工做線程線程 M 綁定執行其餘的 goroutine.

KeepAlive(ep): 因爲 GC 的機制,當 ep 再也不被上下文引用的時候,GC 會主動回收 eq,致使 buf 被回收,因此調用 KeepAlive,告訴 GC 不須要對 eq 變量進行內存回收,具體能夠查看 runtime.SetFinalizer 方法部分有詳細介紹.

喚醒:goroutine 會在 chanrecv 這個 chan 接收接收函數中,從 hchan.sendq 被取出,執行 goready(), 通知調度器去喚醒,而後放入 P(邏輯處理器) 的執行等待隊列中,等待被下一次調用.

send()/recv(): 經過 memmove() 的方式從發送方拷貝 buf 到接收方.


下面咱們經過一個使用channel作生產/消費的模型來試圖分解一下 chan 的步驟:

func main(){
    //初始化任務隊列 channel
    ch := make(chan Data, 4)
    //生產者往channel丟數據
    for _, task := range  {
        ch <- task
    }
    //初始化消費者
    for i := 0; i< ConsumerNum; i++ {
        go consumer(ch)
    }
    ...
}

// 消費者
func consumer(ch chan Data){
    for {
        //收取任務並處理
        data := <- ch
        process(data)
    }
}
複製代碼

從 main 函數開始,golang 就會開啓一個 goroutine 來執行代碼,咱們能夠將其記做生產者 G'p, 代碼中 go consumer 標記 consumer 函數也開啓一個 goroutine 來進行,咱們記其爲消費者 G'c.

  • 初始化任務隊列channel 此時會在堆區域分配一塊內存,用於存儲 hchan 結構體和 buf 的緩存數據。hchan.buf指向一個大小爲4的數組,而且hchan.sendx、hchan.recvx置0,hchan.dataqsiz置4。

  • 生產者往channel丟數據 G'p 往 ch 發送數據的時候,會執行 lock(&hchan.lock) 對 buf 加鎖,把要發送的數據拷貝到 buf 裏,hchan.sendx++,以後 unlock(hchan.lock) 釋放鎖。

  • 消費者執行消費行爲 G'c 從 ch 中獲取數據的時候,會執行 lock(&hchan.lock) 對 buf 加鎖,將 buf 裏面的一條數據拷貝到接收變量 data 對應的空間中,hchan.recvx++,以後釋放鎖。


0.4 關閉 chan

在 go 中,關閉 chan 的代碼以下:

ch := make(chan int ,10)
close(ch)
複製代碼

close(ch) 對應的runtime的函數:

func closechan(c *hchan)
複製代碼

邏輯流程以下:

func closechan(c *hchan) {
    // 檢查,chan 是否爲空
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 加鎖,防止資源競爭
    lock(&c.lock)
    
    // chan 若是已關閉,則 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }
    
    // 置 hchan.close = 1, 標記已關閉
    c.closed = 1
    
    var glist gList
    
    // 釋放recvq的全部等待接收者
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    
    // 釋放sendq的全部等待發送者
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    
    // 解鎖,unlock
    unlock(&c.lock)
    
    // 喚醒recvq和sendq的全部goroutine
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}
複製代碼

close 的主要做用是用於喚醒全部監測 chann 的 goroutine,可是要注意的是:

  • 若是 sendq 的緩衝區還有發送者,這些發送者都會 panic
  • 若是兩次 close chan,會致使 panic

0.5 關於 chan 的面試問題

  • chan 如何處理併發讀寫問題 hchan 結構體中經過鎖 lock mutex 參數進行對公共緩存資源 buf 的控制達到併發讀寫的 race 問題.

  • 若是往 chan 發送數據,size 滿了,或者往 chan 獲取數據,buf 空。這會致使阻塞,此時runtime的行爲是怎麼樣的呢? 因爲二者邏輯同樣,咱們就直接講往 chan 發送數據,size 滿了的狀況. 若是往 chan 發送數據,size 滿了,此時 goroutine 和 buf 會被打包成 sudog,經過 gopark 將 goroutine 狀態置爲等待, 同時把 sudog 放入 hchan.sendq 等待發送隊列中,等待接收者接收並調用 goready() 從新調度 goroutine. 此時 goroutine 被阻塞後,M(工做線程) 會與 goroutine 解綁,經過 P(邏輯處理器) 從新進行調度,M 與新的 goroutine 從新綁定執行.

感悟

仍是有一部分以目前的知識仍是沒法看懂,之後慢慢積累後再回來補坑,或大佬們能夠幫我指出一下,謝謝.

相關文章
相關標籤/搜索