最近在看源碼,發現好多地方用到了這個semaphore
。git
本文是在go version go1.13.15 darwin/amd64
上進行的github
下面是官方的描述golang
// Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case // of other synchronization primitives. // Thus it targets the same goal as Linux's futex, // but it has much simpler semantics. // // That is, don't think of these as semaphores. // Think of them as a way to implement sleep and wakeup // such that every sleep is paired with a single wakeup, // even if, due to races, the wakeup happens before the sleep. // 具體的用法是提供 sleep 和 wakeup 原語 // 以使其可以在其它同步原語中的競爭狀況下使用 // 所以這裏的 semaphore 和 Linux 中的 futex 目標是一致的 // 只不過語義上更簡單一些 // // 也就是說,不要認爲這些是信號量 // 把這裏的東西看做 sleep 和 wakeup 實現的一種方式 // 每個 sleep 都會和一個 wakeup 配對 // 即便在發生 race 時,wakeup 在 sleep 以前時也是如此
上面提到了和futex
做用同樣,關於futex
編程
futex(快速用戶區互斥的簡稱)是一個在Linux上實現鎖定和構建高級抽象鎖如信號量和POSIX互斥的基本工具緩存
Futex 由一塊可以被多個進程共享的內存空間(一個對齊後的整型變量)組成;這個整型變量的值可以經過彙編語言調用CPU提供的原子操做指令來增長或減小,而且一個進程能夠等待直到那個值變成正數。Futex 的操做幾乎所有在用戶空間完成;只有當操做結果不一致從而須要仲裁時,才須要進入操做系統內核空間執行。這種機制容許使用 futex 的鎖定原語有很是高的執行效率:因爲絕大多數的操做並不須要在多個進程之間進行仲裁,因此絕大多數操做均可以在應用程序空間執行,而不須要使用(相對高代價的)內核系統調用。併發
go中的semaphore
做用和futex
目標同樣,提供sleep
和wakeup
原語,使其可以在其它同步原語中的競爭狀況下使用。當一個goroutine
須要休眠時,將其進行集中存放,當須要wakeup
時,再將其取出,從新放入調度器中。app
例如在讀寫鎖的實現中,讀鎖和寫鎖以前的相互阻塞喚醒,就是經過sleep
和wakeup
實現,當有讀鎖存在的時候,新加入的寫鎖經過semaphore
阻塞本身,當前面的讀鎖完成,在經過semaphore
喚醒被阻塞的寫鎖。異步
寫鎖ide
// 獲取互斥鎖 // 阻塞等待全部讀操做結束(若是有的話) func (rw *RWMutex) Lock() { ... // 原子的修改readerCount的值,直接將readerCount減去rwmutexMaxReaders // 說明,有寫鎖進來了,這在上面的讀鎖中也有體現 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 當r不爲0說明,當前寫鎖以前有讀鎖的存在 // 修改下readerWait,也就是當前寫鎖須要等待的讀鎖的個數 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // 阻塞當前寫鎖 runtime_SemacquireMutex(&rw.writerSem, false, 0) } ... }
經過runtime_SemacquireMutex
對當前寫鎖進行sleep
函數
讀鎖釋放
// 減小讀操做計數,即readerCount-- // 喚醒等待寫操做的協程(若是有的話) func (rw *RWMutex) RUnlock() { ... // 首先經過atomic的原子性使readerCount-1 // 1.若readerCount大於0, 證實當前還有讀鎖, 直接結束本次操做 // 2.若readerCount小於0, 證實已經沒有讀鎖, 可是還有由於讀鎖被阻塞的寫鎖存在 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // 嘗試喚醒被阻塞的寫鎖 rw.rUnlockSlow(r) } ... } func (rw *RWMutex) rUnlockSlow(r int32) { ... // readerWait--操做,若是readerWait--操做以後的值爲0,說明,寫鎖以前,已經沒有讀鎖了 // 經過writerSem信號量,喚醒隊列中第一個阻塞的寫鎖 if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 喚醒一個寫鎖 runtime_Semrelease(&rw.writerSem, false, 1) } }
寫鎖處理完以後,調用runtime_Semrelease
來喚醒sleep
的寫鎖
在go/src/sync/runtime.go
中,定義了這幾個方法
// Semacquire等待*s > 0,而後原子遞減它。 // 它是一個簡單的睡眠原語,用於同步 // library and不該該直接使用。 func runtime_Semacquire(s *uint32) // SemacquireMutex相似於Semacquire,用來阻塞互斥的對象 // 若是lifo爲true,waiter將會被插入到隊列的頭部 // skipframes是跟蹤過程當中要省略的幀數,從這裏開始計算 // runtime_SemacquireMutex's caller. func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) // Semrelease會自動增長*s並通知一個被Semacquire阻塞的等待的goroutine // 它是一個簡單的喚醒原語,用於同步 // library and不該該直接使用。 // 若是handoff爲true, 傳遞信號到隊列頭部的waiter // skipframes是跟蹤過程當中要省略的幀數,從這裏開始計算 // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
具體的實現是在go/src/runtime/sema.go
中
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire func sync_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) }
semaphore
的實現使用到了sudog
,咱們先來看下
sudog 是運行時用來存放處於阻塞狀態的goroutine
的一個上層抽象,是用來實現用戶態信號量的主要機制之一。 例如當一個goroutine
由於等待channel
的數據須要進行阻塞時,sudog
會將goroutine
及其用於等待數據的位置進行記錄, 並進而串聯成一個等待隊列,或二叉平衡樹。
// sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // 如下字段受hchan保護 g *g // isSelect 表示 g 正在參與一個 select, so // 所以 g.selectDone 必須以 CAS 的方式來獲取wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // 數據元素(可能指向棧) // 如下字段不會併發訪問。 // 對於通道,waitlink只被g訪問。 // 對於信號量,全部字段(包括上面的字段) // 只有當持有一個semroot鎖時才被訪問。 acquiretime int64 releasetime int64 ticket uint32 parent *sudog //semaRoot 二叉樹 waitlink *sudog // g.waiting 列表或 semaRoot waittail *sudog // semaRoot c *hchan // channel }
sudog
的獲取和歸還,遵循如下策略:
一、獲取,首先從per-P
緩存獲取,對於per-P
緩存,若是per-P
緩存爲空,則從全局池抓取一半,而後取出per-P
緩存中的最後一個;
二、歸還,歸還到per-P
緩存,若是per-P
緩存滿了,就把per-P
緩存的一半歸還到全局緩存中,而後歸還sudog
到per-P
緩存中。
一、若是per-P
緩存的內容沒達到長度的通常,則會從全局額緩存中抓取一半;
二、而後返回把per-P
緩存中最後一個sudog
返回,而且置空;
// go/src/runtime/proc.go //go:nosplit func acquireSudog() *sudog { // Delicate dance: 信號量的實現調用acquireSudog,而後acquireSudog調用new(sudog) // new調用malloc, malloc調用垃圾收集器,垃圾收集器在stopTheWorld調用信號量 // 經過在new(sudog)周圍執行acquirem/releasem來打破循環 // acquirem/releasem在new(sudog)期間增長m.locks,防止垃圾收集器被調用。 // 獲取當前 g 所在的 m mp := acquirem() // 獲取p的指針 pp := mp.p.ptr() if len(pp.sudogcache) == 0 { lock(&sched.sudoglock) // 首先,嘗試從中央緩存獲取一批數據。 for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(&sched.sudoglock) // 若是中央緩存中沒有,新分配 if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } // 取緩存中最後一個 n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil // 將剛取出的在緩存中移除 pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } releasem(mp) return s }
一、若是per-P
緩存滿了,就歸還per-P
緩存通常的內容到全局緩存;
二、而後將回收的sudog
放到per-P
緩存中。
// go/src/runtime/proc.go //go:nosplit func releaseSudog(s *sudog) { if s.elem != nil { throw("runtime: sudog with non-nil elem") } if s.isSelect { throw("runtime: sudog with non-false isSelect") } if s.next != nil { throw("runtime: sudog with non-nil next") } if s.prev != nil { throw("runtime: sudog with non-nil prev") } if s.waitlink != nil { throw("runtime: sudog with non-nil waitlink") } if s.c != nil { throw("runtime: sudog with non-nil c") } gp := getg() if gp.param != nil { throw("runtime: releaseSudog with non-nil gp.param") } // 避免從新安排到另外一個P mp := acquirem() // avoid rescheduling to another P pp := mp.p.ptr() // 若是緩存滿了 if len(pp.sudogcache) == cap(pp.sudogcache) { // 將本地高速緩存的一半傳輸到中央高速緩存 var first, last *sudog for len(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p } lock(&sched.sudoglock) last.next = sched.sudogcache sched.sudogcache = first unlock(&sched.sudoglock) } // 歸還sudog到`per-P`緩存中 pp.sudogcache = append(pp.sudogcache, s) releasem(mp) }
// go/src/runtime/sema.go // 用於sync.Mutex的異步信號量。 // semaRoot擁有一個具備不一樣地址(s.elem)的sudog平衡樹。 // 每一個sudog均可以依次(經過s.waitlink)指向一個列表,在相同地址上等待的其餘sudog。 // 對具備相同地址的sudog內部列表進行的操做所有爲O(1)。頂層semaRoot列表的掃描爲O(log n), // 其中,n是阻止goroutines的不一樣地址的數量,經過他們散列到給定的semaRoot。 type semaRoot struct { lock mutex // waiters的平衡樹的根節點 treap *sudog // waiters的數量,讀取的時候無所 nwait uint32 } // Prime to not correlate with any user patterns. const semTabSize = 251 var semtable [semTabSize]struct { root semaRoot pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte }
// go/src/runtime/sema.go //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire func poll_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { // 判斷這個goroutine,是不是m上正在運行的那個 gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } // *addr -= 1 if cansemacquire(addr) { return } // 增長等待計數 // 再試一次 cansemacquire 若是成功則直接返回 // 將本身做爲等待者入隊 // 休眠 // (等待器描述符由出隊信號產生出隊行爲) // 獲取一個sudog s := acquireSudog() root := semroot(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } s.acquiretime = t0 } for { lock(&root.lock) // 添加咱們本身到nwait來禁用semrelease中的"easy case" atomic.Xadd(&root.nwait, 1) // 檢查cansemacquire避免錯過喚醒 if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // 任何在 cansemacquire 以後的 semrelease 都知道咱們在等待(由於設置了 nwait),所以休眠 // 隊列將s添加到semaRoot中被阻止的goroutine中 root.queue(addr, s, lifo) // 將當前goroutine置於等待狀態並解鎖鎖。 // 經過調用goready(gp),可使goroutine再次可運行。 goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3+skipframes) } // 歸還sudog releaseSudog(s) } func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
// go/src/runtime/sema.go //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } func semrelease1(addr *uint32, handoff bool, skipframes int) { root := semroot(addr) atomic.Xadd(addr, 1) // Easy case:沒有等待者 // 這個檢查必須發生在xadd以後,以免錯過喚醒 if atomic.Load(&root.nwait) == 0 { return } // Harder case: 找到等待者,而且喚醒 lock(&root.lock) if atomic.Load(&root.nwait) == 0 { // 該計數已被另外一個goroutine佔用, // 所以無需喚醒其餘goroutine。 unlock(&root.lock) return } // 搜索一個等待着而後將其喚醒 s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) if s != nil { // 可能會很慢,所以先解鎖 acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3+skipframes) } if s.ticket != 0 { throw("corrupted semaphore ticket") } if handoff && cansemacquire(addr) { s.ticket = 1 } // goready(s.g, 5) // 標記 runnable,等待被從新調度 readyWithTime(s, 5+skipframes) } }
摘自"同步原語"的一段總結
這一對 semacquire 和 semrelease 理解上可能不太直觀。 首先,咱們必須意識到這兩個函數必定是在兩個不一樣的 M(線程)上獲得執行,不然不會出現併發,咱們不妨設爲 M1 和 M2。 當 M1 上的 G1 執行到 semacquire1 時,若是快速路徑成功,則說明 G1 搶到鎖,可以繼續執行。但一旦失敗且在慢速路徑下 依然搶不到鎖,則會進入 goparkunlock,將當前的 G1 放到等待隊列中,進而讓 M1 切換並執行其餘 G。 當 M2 上的 G2 開始調用 semrelease1 時,只是單純的將等待隊列的 G1 從新放到調度隊列中,而當 G1 從新被調度時(假設運氣好又在 M1 上被調度),代碼仍然會從 goparkunlock 以後開始執行,並再次嘗試競爭信號量,若是成功,則會歸還 sudog。
【同步原語】https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Go併發編程實戰--信號量的使用方法和其實現原理】https://juejin.cn/post/6906677772479889422
【Semaphore】https://github.com/cch123/golang-notes/blob/master/semaphore.md
【進程同步之信號量機制(pv操做)及三個經典同步問題】https://blog.csdn.net/SpeedMe/article/details/17597373
本文做者:liz
本文連接:https://boilingfrog.github.io/2021/04/02/semaphore/
版權聲明:本文爲博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處連接和本聲明。