讀寫鎖區別與互斥鎖的主要區別就是讀鎖之間是共享的,多個goroutine能夠同時加讀鎖,可是寫鎖與寫鎖、寫鎖與讀鎖之間則是互斥的git
由於讀鎖是共享的,因此若是當前已經有讀鎖,那後續goroutine繼續加讀鎖正常狀況下是能夠加鎖成功,可是若是一直有讀鎖進行加鎖,那嘗試加寫鎖的goroutine則可能會長期獲取不到鎖,這就是由於讀鎖而致使的寫鎖飢餓問題github
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // 用於writer等待讀完成排隊的信號量
readerSem uint32 // 用於reader等待寫完成排隊的信號量
readerCount int32 // 讀鎖的計數器
readerWait int32 // 等待讀鎖釋放的數量
}
複製代碼
讀寫鎖中容許加讀鎖的最大數量是4294967296,在go裏面對寫鎖的計數採用了負值進行,經過遞減最大容許加讀鎖的數量從而進行寫鎖對讀鎖的搶佔golang
const rwmutexMaxReaders = 1 << 30
複製代碼
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 累加reader計數器,若是小於0則代表有writer正在等待
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 當前有writer正在等待讀鎖,讀鎖就加入排隊
runtime_SemacquireMutex(&rw.readerSem, false)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
複製代碼
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 若是小於0,則代表當前有writer正在等待
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 將等待reader的計數減1,證實當前是已經有一個讀的,若是值==0,則進行喚醒等待的
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
if race.Enabled {
race.Enable()
}
}
複製代碼
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 首先獲取mutex鎖,同時多個goroutine只有一個能夠進入到下面的邏輯
rw.w.Lock()
// 對readerCounter進行進行搶佔,經過遞減rwmutexMaxReaders容許最大讀的數量
// 來實現寫鎖對讀鎖的搶佔
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 記錄須要等待多少個reader完成,若是發現不爲0,則代表當前有reader正在讀取,當前goroutine
// 須要進行排隊等待
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
複製代碼
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// 將reader計數器復位,上面減去了一個rwmutexMaxReaders如今再從新加回去便可復位
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 喚醒全部的讀鎖
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// 釋放mutex
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
複製代碼
加寫鎖的搶佔緩存
// 在加寫鎖的時候經過將readerCount遞減最大容許加讀鎖的數量,來實現對加讀鎖的搶佔
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
複製代碼
加讀鎖的搶佔檢測併發
// 若是沒有寫鎖的狀況下讀鎖的readerCount進行Add後必定是一個>0的數字,這裏經過檢測值爲負數
//就實現了讀鎖對寫鎖搶佔的檢測
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false)
}
複製代碼
寫鎖搶佔讀鎖後後續的讀鎖就會加鎖失敗,可是若是想加寫鎖成功還要繼續對已經加讀鎖成功的進行等待ide
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 寫鎖發現須要等待的讀鎖釋放的數量不爲0,就本身本身去休眠了
runtime_SemacquireMutex(&rw.writerSem, false)
}
複製代碼
寫鎖既然休眠了,則一定要有一種喚醒機制其實就是每次釋放鎖的時候,當檢查到有加寫鎖的狀況下,就遞減readerWait,並由最後一個釋放reader lock的goroutine來實現喚醒寫鎖性能
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
複製代碼
在加寫鎖的時候必須先進行mutex的加鎖,而mutex自己在普通模式下是非公平的,只有在飢餓模式下才是公平的ui
rw.w.Lock()
複製代碼
在加讀鎖和寫鎖的工程中都使用atomic.AddInt32來進行遞增,而該指令在底層是會經過LOCK來進行CPU總線加鎖的,所以多個CPU同時執行readerCount其實只會有一個成功,從這上面看實際上是寫鎖與讀鎖之間是相對公平的,誰先達到誰先被CPU調度執行,進行LOCK鎖cache line成功,誰就加成功鎖atom
在併發場景中特別是JAVA中一般會提到併發裏面的兩個問題:可見性與內存屏障、原子性, 其中可見性一般是指在cpu多級緩存下如何保證緩存的一致性,即在一個CPU上修改了了某個數據在其餘的CPU上不會繼續讀取舊的數據,內存屏障一般是爲了CPU爲了提升流水線性能,而對指令進行重排序而來,而原子性則是指的執行某個操做的過程的不可分割spa
go裏面並無volatile這種關鍵字,那如何能保證上面的AddInt32這個操做能夠知足上面的兩個問題呢, 其實關鍵就在於底層的2條指令,經過LOCK指令配合CPU的MESI協議,實現可見性和內存屏障,同時經過XADDL則用來保證原子性,從而解決上面提到的可見性與原子性問題
// atomic/asm_amd64.s TEXT runtime∕internal∕atomic·Xadd(SB)
LOCK
XADDL AX, 0(BX)
複製代碼
更多文章能夠訪問www.sreguide.com
本篇文章由一文多發平臺ArtiPub自動發佈