go sema 源碼分析

sema.go semacquire1和 semrelease1 函數是 sync.mutex 用來阻塞 g 和釋放 g 的實現,這兩個方法也實現了相似信號量的功能,而且是針對 goroutine 的信號量,因爲還沒看 go 調度相關的代碼,sema 裏跟調度相關的邏輯也不作仔細說明和代碼註解app

semacquire1 函數

大體流程:獲取 sudog 和 semaRoot ,其中 sudog 是 g 放在等待隊列裏的包裝對象,sudog 裏會有 g 的信息和一些其餘的參數, semaRoot 則是隊列結構體,內部是堆樹,把和當前 g 關聯的 sudog 放到 semaRoot 裏,而後把 g 的狀態改成等待,調用調度器執行別的 g,此時當前 g 就中止執行了。一直到被調度器從新調度執行,會首先釋放 sudog 而後再去執行別的代碼邏輯函數

semaRootoop

// 一個 semaRoot 持有不一樣地址的 sudog 的平衡樹
// 每個 sudog 可能反過來指向等待在同一個地址上的 sudog 的列表
// 對同一個地址上的 sudog 的內聯列表的操做的時間複雜度都是O(1),掃描 semaRoot 的頂部列表是 O(log n)
// n 是 hash 到而且阻塞在同一個 semaRoot 上的不一樣地址的 goroutines 的總數
type semaRoot struct {
	lock  mutex
	treap *sudog // root of balanced tree of unique waiters. 不一樣 waiter 的平衡樹
	nwait uint32 // Number of waiters. Read w/o the lock. waiter 的數量
}

複製代碼
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
	gp := getg()
	if gp != gp.m.curg {
		throw("semacquire not on the G stack")
	}

	// Easy case.
	if cansemacquire(addr) {
		return
	}

	// Harder case:
	// increment waiter count
	// try cansemacquire one more time, return if succeeded
	// enqueue itself as a waiter
	// sleep
	// (waiter descriptor is dequeued by signaler)
	// 獲取一個 sudog
	s := acquireSudog()
	// 獲取一個 semaRoot
	root := semroot(addr)
	t0 := int64(0)
	s.releasetime = 0
	s.acquiretime = 0
	s.ticket = 0
	// 一些性能採集的參數 應該是
	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
		t0 = cputicks()
		s.releasetime = -1
	}
	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
		if t0 == 0 {
			t0 = cputicks()
		}
		s.acquiretime = t0
	}
	for {
		// 鎖定在 semaRoot 上
		lock(&root.lock)
		// Add ourselves to nwait to disable "easy case" in semrelease.
		// nwait 加一
		atomic.Xadd(&root.nwait, 1)
		// Check cansemacquire to avoid missed wakeup.
		if cansemacquire(addr) {
			atomic.Xadd(&root.nwait, -1)
			unlock(&root.lock)
			break
		}
		// Any semrelease after the cansemacquire knows we're waiting
		// (we set nwait above), so go to sleep.
		// 加到 semaRoot treap 上
		root.queue(addr, s, lifo)
		// 解鎖 semaRoot ,而且把當前 g 的狀態改成等待,而後讓當前的 m 調用其餘的 g 執行,當前 g 至關於等待
		goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
		if s.ticket != 0 || cansemacquire(addr) {
			break
		}
	}
	if s.releasetime > 0 {
		blockevent(s.releasetime-t0, 3+skipframes)
	}
	// 釋放 sudog
	releaseSudog(s)
}		
複製代碼

關鍵的 goparkunlock 函數,調用的是 gopark函數性能

// 把當前的 goroutine 改成等待狀態,而且調用 unlockf 函數,若是函數返回 flase,則當前 g 被恢復
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
	if reason != waitReasonSleep {
		checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy 兩個 goroutine 使調度器忙時,有可能會超時
	}
	mp := acquirem()
	gp := mp.curg
	status := readgstatus(gp)
	if status != _Grunning && status != _Gscanrunning {
		throw("gopark: bad g status")
	}
	mp.waitlock = lock
	// 記住: unlockf 永遠返回 true
	mp.waitunlockf = unlockf
	gp.waitreason = reason
	mp.waittraceev = traceEv
	mp.waittraceskip = traceskip
	releasem(mp)
	// can't do anything that might move the G between Ms here.
	mcall(park_m)
}
複製代碼

macll 會先切換成 g0,並把當前 g 做爲參數調用 park_mui

// 在 g0 上繼續 park
func park_m(gp *g) {
	// 當前 g 是g0
	_g_ := getg()

	if trace.enabled {
		traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
	}

	// 設置參數 g 的狀態
	casgstatus(gp, _Grunning, _Gwaiting)
	// 刪除參數 g 和 m 的關係
	dropg()

	if fn := _g_.m.waitunlockf; fn != nil {
		// 執行解鎖操做, 假如是從 sema 過來的,fn 永遠返回 true
		ok := fn(gp, _g_.m.waitlock)
		_g_.m.waitunlockf = nil
		_g_.m.waitlock = nil
		if !ok {
			if trace.enabled {
				traceGoUnpark(gp, 2)
			}
			casgstatus(gp, _Gwaiting, _Grunnable)
			execute(gp, true) // Schedule it back, never returns.
		}
	}
	// 調度其餘的 g 執行
	schedule()
}
複製代碼

park_m 執行以後,調度器就調度並執行其餘的 g, 以前的 gp 也就等待了this

semrelease1 函數

大體流程: 設置 addr 信號,從隊列裏取 sudog s,把 s 對應的 g 變爲可執行狀態,而且放在 p 的本地隊列下一個執行的位置。若是參數 handoff 爲 true,而且當前 m.locks == 0,就把當前的 g 放到 p 本地隊列的隊尾,調用調度器,由於s.g 被放到 p 本地隊列的下一個執行位置,因此調度器此刻執行的就是 s.gatom

func semrelease1(addr *uint32, handoff bool, skipframes int) {
	root := semroot(addr)
	atomic.Xadd(addr, 1)

	// Easy case: no waiters?
	// This check must happen after the xadd, to avoid a missed wakeup
	// (see loop in semacquire).
	// 沒有等待的 sudog
	if atomic.Load(&root.nwait) == 0 {
		return
	}

	// Harder case: search for a waiter and wake it.
	lock(&root.lock)
	if atomic.Load(&root.nwait) == 0 {
		// The count is already consumed by another goroutine,
		// so no need to wake up another goroutine.
		unlock(&root.lock)
		return
	}
	// 從隊列裏取出來 sudog ,此時 ticket == 0
	s, t0 := root.dequeue(addr)
	if s != nil {
		atomic.Xadd(&root.nwait, -1)
	}
	unlock(&root.lock)
	if s != nil { // May be slow or even yield, so unlock first
		acquiretime := s.acquiretime
		if acquiretime != 0 {
			mutexevent(t0-acquiretime, 3+skipframes)
		}
		if s.ticket != 0 {
			throw("corrupted semaphore ticket")
		}
		if handoff && cansemacquire(addr) {
			s.ticket = 1
		}
		// 把 sudog 對應的 g 改成待執行狀態,而且放到 p 本地隊列的下一個執行
		readyWithTime(s, 5+skipframes)
		if s.ticket == 1 && getg().m.locks == 0 {
			// Direct G handoff
			// readyWithTime has added the waiter G as runnext in the
			// current P; we now call the scheduler so that we start running
			// the waiter G immediately.
			// Note that waiter inherits our time slice: this is desirable
			// to avoid having a highly contended semaphore hog the P
			// indefinitely. goyield is like Gosched, but it does not emit a
			// GoSched trace event and, more importantly, puts the current G
			// on the local runq instead of the global one.
			// We only do this in the starving regime (handoff=true), as in
			// the non-starving case it is possible for a different waiter
			// to acquire the semaphore while we are yielding/scheduling,
			// and this would be wasteful. We wait instead to enter starving
			// regime, and then we start to do direct handoffs of ticket and
			// P.
			// See issue 33747 for discussion.
			// 調用調度器當即執行 G
			// 等待的 g 繼承時間片,避免無限制的爭奪信號量
			// 把當前 g 放到 p 本地隊列的隊尾,啓動調度器,由於 s.g 在本地隊列的下一個,因此調度器立馬執行 s.g
			goyield()
		}
	}
}
複製代碼

readyWithTime 把 sudog 對應的 g 喚醒,而且放到 p 本地隊列的下一個執行位置spa

readWithTime 會調用 systemstack , systemstack 會切換到當前 os 線程的堆棧執行 read線程

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
	if trace.enabled {
		traceGoUnpark(gp, traceskip)
	}

	status := readgstatus(gp)

	// Mark runnable.
	// 此刻的— _g_ 不是 gp
	_g_ := getg()
	mp := acquirem() // disable preemption because it can be holding p in a local var
	if status&^_Gscan != _Gwaiting {
		dumpgstatus(gp)
		throw("bad g->status in ready")
	}

	// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
	casgstatus(gp, _Gwaiting, _Grunnable)
	// 把 g 放到 p 本地隊列,next 爲 true, 就放在下一個執行, next 爲 false,放在隊尾
	runqput(_g_.m.p.ptr(), gp, next)
	// TODO 這個看了調度代碼再解釋
	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
		wakep()
	}
	releasem(mp)
}
複製代碼

goyield 調用 mcall 執行 goyield_m, goyield_m 會把當前的 g 放到 p 本地對象的隊尾, 而後執行調度器code

func goyield_m(gp *g) {
	pp := gp.m.p.ptr()
	casgstatus(gp, _Grunning, _Grunnable)
	dropg()
	runqput(pp, gp, false)
	schedule()
}
複製代碼
相關文章
相關標籤/搜索