前言:能夠說GO真正吸引到個人就是併發這塊了,深刻理解這個機制後讓我收益匪淺,接下來就用本身薄弱的認知來談談GO的併發機制。編程
在這以前,先看下asm_arm64.s中的彙編代碼關於啓動這塊的邏輯bootstrap
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·hashinit(SB)
CALL runtime·schedinit(SB)
// create a new goroutine to start program
PUSHQ $runtime·main·f(SB) // entry
PUSHQ $0 // arg size
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// start this M
CALL runtime·mstart(SB)複製代碼
接下來就進入分析環節數組
1,經過osinit函數還獲取cpu個數和page的大小,這塊挺簡單的
2,接下來看看schedinit函數(跟本節相關的重要代碼)bash
func schedinit() {
//獲取當前的G
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit()
}
//設置M的最大數量
sched.maxmcount = 10000
//初始化棧空間
stackinit()
//內存空間初始化操做
mallocinit()
//初始化當前的M
mcommoninit(_g_.m)
//將P的數量調整爲CPU數量
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procs > _MaxGomaxprocs {
procs = _MaxGomaxprocs
}
//初始化P
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
}複製代碼
3,上面咱們能夠看到調用了procresize函數來初始化P,那麼咱們來看下procresize函數。這塊代碼過長,分幾個部分解析(只貼重要的代碼)
(1) 初始化新的P多線程
for i := int32(0); i < nprocs; i++ {
pp := allp[i]
if pp == nil {
//新建一個P對象
pp = new(p)
pp.id = i
pp.status = _Pgcstop
//保存到allp數組(負責存儲P的數組)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
//若是P尚未cache,那麼進行分配
if pp.mcache == nil {
if old == 0 && i == 0 {
if getg().m.mcache == nil {
throw("missing mcache?")
}
pp.mcache = getg().m.mcache // bootstrap
} else {
pp.mcache = allocmcache()//分配cache
}
}
}複製代碼
(2) 釋放沒被使用的P併發
for i := nprocs; i < old; i++ {
p := allp[i]
// 將本地任務添加到全局隊列中
for p.runqhead != p.runqtail {
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
// 插入全局隊列的頭部
globrunqputhead(gp)
}
//釋放P所綁定的cache
freemcache(p.mcache)
p.mcache = nil
//將當前的P的G複用連接到全局
gfpurge(p)
p.status = _Pdead
// can't free P itself because it can be referenced by an M in syscall }複製代碼
通過這兩個步驟後,那麼咱們就建立了一批的P,閒置的P會被放進調度器Sched的空閒鏈表中函數
從上面的彙編代碼能夠看出接下來會去調用newproc函數來建立主G,而後用這個主函數去執行runtime.main,而後建立一個線程(這個線程在運行期間專門負責系統監控),接下來就進入GO程序中的main函數去運行了。
先看下newproc代碼學習
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)//獲取參數的地址
pc := getcallerpc(unsafe.Pointer(&siz))//獲取調用方的PC支
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, 0, pc)//真正建立G的地方
})
}複製代碼
接下來看下newpro1的主要代碼ui
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
//從當前P複用鏈表來獲取G
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
//若是獲取失敗,則新建一個
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
//將獲得的G放入P的運行隊列中
runqput(_p_, newg, true)
//下面三個條件分別爲:是否有空閒的P;M是否處於自旋狀態;當前是否建立runteime.main
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && runtimeInitTime != 0 {
wakep()
}
}複製代碼
這個wakep()函數的代碼也是值得一看的,這個思想能夠用到平時的代碼編程中去this
func wakep() {
//線程被喚醒後須要綁定一個P,這裏使用cas操做,能夠避免喚醒過多線程,這裏也對應了上面的三個判斷條件之一
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}複製代碼
startm的代碼就留給讀者本身去看了,否則感受整個博文都是代碼,主要的思想是:獲取一個空閒的P(若是傳入的P爲空),而後先嚐試獲取空閒M(空閒的M被調度器schedt管理,這個結構體也能夠去看下),獲取不到再去建立一個M等。
這塊就稍微比較簡單了,代碼也很少,可是看下來收穫仍是不少的
先看下結構體定義(有刪減)
type hchan struct {
qcount uint // 隊列中數據個數
dataqsiz uint // 緩衝槽大小
buf unsafe.Pointer // 指向緩衝槽的指針
elemsize uint16 // 數據大小
closed uint32 // 表示 channel 是否關閉
elemtype *_type // 數據類型
sendx uint // 發送位置索引
recvx uint // 接收位置索引
recvq waitq // 接收等待列表
sendq waitq // 發送等待列表
lock mutex // 鎖
}
type sudog struct {
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}複製代碼
上面的recvq實際上是讀操做阻塞在channel的G列表,sendq實際上是寫操做阻塞在channel的G列表,那麼G能夠同時阻塞在不一樣的channel上,那麼如何解決呢?這時候就引入了sudog,它實際上是對G的一個包裝,表明在等待隊列上的一個G。
接下來看看建立過程
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// 大小不超過64K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
var c *hchan
// 整個建立過程仍是簡單明瞭的
if elem.kind&kindNoPointers != 0 || size == 0 {
//一次性分配內存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
//設置數據大小,類型和緩衝槽大小
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}複製代碼
send函數的代碼有點長,接下來就拆分進行說明
(1) 若是recvq有G在阻塞,那麼就從該隊列取出該G,將數據給該G
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}複製代碼
(2) 若是hchan.buf還有可用的空間,那麼就將數據放入
//經過比較qcount和datasiz來判斷是否還有可用空間
if c.qcount < c.dataqsiz {
// 將數據放入buf中
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}複製代碼
(3) hchan.buf滿了,那麼就會阻塞住了
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//初始化一些參數
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 將當前 goroutine加入等待隊列
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)複製代碼
這裏咱們就能夠看到了,若是滿了,那麼sudog就會出現了,經過初始化後表明當前G進入等待隊列
同理,接收也分爲三種狀況
(1) 當前有發送goroutine阻塞在channel上,buf滿了
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}複製代碼
(2) buf中有數據
if c.qcount > 0 {
// 直接從隊列中接收
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}複製代碼
(3) buf中無數據了,那麼則會阻塞住
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 一樣的,由sudog表明G去排隊
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)複製代碼
總結:雖然這塊代碼邏輯不復雜,可是設計的東西不少,仍是用了不少時間,如今對M執行G的邏輯是懂了,可是還不清楚細節,後面會繼續研究。總的讀下來,首先第一是對併發的機制能夠說是很瞭解了,對之後在編寫相關代碼確定頗有幫助。第二,學習到了一些編程思想,例如cas操做,如何更好的進行封裝和抽象等。