鎖是編程開發中用於併發控制的一種同步機制,提供多線程(或協程)之間併發讀寫一個共享數據的方法。在go語言中使用鎖也很簡單:編程
var loc sync.Mutex var rwLoc sync.RWMutex var idx int var writeRatio = 3 func Inc(){ loc.Lock() defer loc.Unlock() timer := time.NewTimer(100 * time.Millisecond) select{ case <- timer.C: idx ++ } } func Dec(){ loc.Lock() defer loc.Unlock() timer := time.NewTimer(100 * time.Millisecond) select{ case <- timer.C: idx -- } } func main(){ wg := sync.WaitGroup{} wg.Add(6) for i := 0; i < 3; i++ { go func() { defer wg.Done() Inc() }() go func(){ defer wg.Done() Dec() }() } wg.Wait() fmt.Printf("i: %vn", idx) }
使用標準包sync.Mutex便可標記一段臨界區,保證對數據的併發讀寫符合預期。
注意到每次讀寫變量idx
的時候都須要加鎖,也就是任一時候只有一個goroutine容許讀寫該變量,而事實上若是併發執行的goroutine都是讀的操做,是沒有必要加鎖的(由於變量的內容並無改變),加鎖是爲了處理寫操做的goroutine能正確同步變量的值。那麼有沒有不負如來不負卿的雙全法呢,既能正確同步寫操做,又能避免讀操做的無謂加鎖?事實上這就是讀寫鎖的目的。數據結構
簡單地說,讀寫鎖就是一種能保證:多線程
的鎖。
go語言的sync包也包含了這一數據結構,即RWMutex
,使用方法與普通的鎖基本相同,惟一的區別在於讀操做的加鎖、釋放鎖用的是RLock
方法和RUnlock
方法:併發
var rwLoc sync.RWMutex var idx int func ReadRW() { rwLoc.RLock() defer rwLoc.RUnlock() _, _ = fmt.Fprint(ioutil.Discard, idx) } func WriteRW() { rwLoc.Lock() defer rwLoc.Unlock() idx = 3 }
那麼go是怎麼實現讀寫鎖的呢,讓咱們經過源碼分析一下它的實現原理。源碼分析
在看源碼以前咱們不妨先思考一下,若是本身實現,須要怎麼設計這個數據結構來知足上面那三個要求,而後再參看源碼會有更多理解。
首先,爲了知足第二點和第三點要求,確定須要一個互斥鎖:性能
type RWMutex struct{ w Mutex // held if there are pending writers ... }
這個互斥鎖是在寫操做時使用的:ui
func (rw *RWMutex) Lock(){ ... rw.w.Lock() ... } func (rw *RWMutex) Unlock(){ ... rw.w.Unlock() ... }
而讀操做之間是不互斥的,所以讀操做的RLock()過程並不獲取這個互斥鎖。但讀寫之間是互斥的,那麼RLock()若是不獲取互斥鎖又怎麼能阻塞住寫操做呢?go語言的實現是這樣的:
經過一個int32變量記錄當前正在讀的goroutine數:atom
type RWMutex struct{ w Mutex // held if there are pending writers readerCount int32 // number of pending readers ... }
每次調用Rlock方法時將readerCount加1,對應地,每次調用RUnlock方法時將readerCount減1:線程
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // 若是readerCount小於0則經過同步原語阻塞住,不然將readerCount加1後即返回 runtime_SemacquireMutex(&rw.readerSem, false, 0) } } func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // 若是readerCount減1後小於0,則調用rUnlockSlow方法,將這個方法剝離出來是爲了RUnlock能夠內聯,這樣能進一步提高讀操做時的取鎖性能 rw.rUnlockSlow(r) } }
既然每次RLock時都會將readerCount增長,那判斷它是否小於0有什麼意義呢?這就須要和寫操做的取鎖過程Lock()參看:設計
// 總結一下Lock的流程:1. 阻塞新來的寫操做;2. 阻塞新來的讀操做;3. 等待以前的讀操做完成; func (rw *RWMutex) Lock() { // 經過rw.w.Lock阻塞其它寫操做 rw.w.Lock() // 將readerCount減去一個最大數(2的30次方,RWMutex能支持的最大同時讀操做數),這樣readerCount將變成一個小於0的很小的數, // 後續再調RLock方法時將會由於readerCount<0而阻塞住,這樣也就阻塞住了新來的讀請求 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 等待以前的讀操做完成 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
寫操做獲取鎖時經過將readerCount改爲一個很小的數保證新來的讀操做會由於readerCount<0而阻塞住;那以前未完成的讀操做怎麼處理呢?很簡單,只要跟蹤寫操做Lock以前未完成的reader數就好了,這裏經過一個int32變量readerWait來作這件事情:
type RWMutex struct{ w Mutex // held if there are pending writers readerCount int32 // number of pending readers readerWait int32 // number of departing readers ... }
每次寫操做Lock時會將當前readerCount數量記在readerWait裏。
回想一下,當寫操做Lock後readerCount會小於0,這時reader unlock時會執行rUnlockSlow方法,如今能夠來看它的實現過程了:
func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { throw("sync: RUnlock of unlocked RWMutex") } // 每一個reader完成讀操做後將readerWait減少1 if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 當readerWait爲0時表明writer等待的全部reader都已經完成了,能夠喚醒writer了 runtime_Semrelease(&rw.writerSem, false, 1) } }
最後再看寫操做的釋放鎖過程:
func (rw *RWMutex) Unlock() { // 將readerCount置回原來的值,這樣reader又能夠進入了 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { throw("sync: Unlock of unlocked RWMutex") } // 喚醒那些等待的reader for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // 釋放互斥鎖,這樣新的writer能夠得到鎖 rw.w.Unlock() }
將上面這些過程梳理一下:
這就是go語言中讀寫鎖的核心源碼(簡潔起見,這裏將競態部分的代碼省略,TODO:競態分析原理分析),相信看到這你已經對讀寫鎖的實現原理了然於胸了,若是你感興趣,不妨一塊兒繼續思考這幾個問題。
r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0
回想一下Lock方法:
func (rw *RWMutex) Lock() { rw.w.Lock() r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
爲了判斷是否還有未完成的reader,直接判斷 r!= 0
不就好了嗎,爲何還須要判斷atomic.AddInt32(&rw.readerWait, r)!=0
?
這是由於上面第三行和第四行的代碼並非原子的,這就意味着中間頗有可能插進其它goroutine執行,假如某個時刻執行完第三行代碼,r=1,也就是此時還有一個reader,但執行第四行以前先執行了該reader的goroutine,而且reader完成RUnlock操做,此時若是隻判斷r!=0
就會錯誤地阻塞住,由於這時候已經沒有未完成的reader了。而reader在執行RUnlock的時候會將readerWait減1,因此readerWait+r
就表明未完成的reader數。
那麼只判斷atomic.AddInt32(&rw.readerWait, r)!=0
不就能夠嗎?理論上應該是能夠的,先判斷r!=0
應該是一種短路操做:若是r==0
那就不用執行atomic.AddInt32
了(注意r==0時readerWait也等於0)。
最後讓咱們經過Benchmark看看讀寫鎖的性能提高有多少:
func Read() { loc.Lock() defer loc.Unlock() _, _ = fmt.Fprint(ioutil.Discard, idx) time.Sleep(1000 * time.Nanosecond) } func ReadRW() { rwLoc.RLock() defer rwLoc.RUnlock() _, _ = fmt.Fprint(ioutil.Discard, idx) time.Sleep(1000 * time.Nanosecond) } func Write() { loc.Lock() defer loc.Unlock() idx = 3 time.Sleep(1000 * time.Nanosecond) } func WriteRW() { rwLoc.Lock() defer rwLoc.Unlock() idx = 3 time.Sleep(1000 * time.Nanosecond) } func BenchmarkLock(b *testing.B) { b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { foo++ if foo % writeRatio == 0 { Write() } else { Read() } } }) } func BenchmarkRWLock(b *testing.B) { b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { foo++ if foo % writeRatio == 0 { WriteRW() } else { ReadRW() } } }) }
這裏使用了go語言內置的Benchmark功能,執行go test -bench='Benchmark.*Lock' -run=none mutex_test.go
便可觸發benchmark運行,-run=none
是爲了跳過單測。
結果以下:
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz BenchmarkLock BenchmarkLock-12 235062 5025 ns/op BenchmarkRWLock BenchmarkRWLock-12 320209 3834 ns/op
能夠看出使用讀寫鎖後耗時下降了24%左右。
上面writeRatio用於控制讀、寫的頻率比例,即讀:寫=3,隨着這個比例越高耗時下降的比例也越大,這裏做個簡單比較:
writeRatio | 3 | 10 | 20 | 50 | 100 | 1000 |
---|---|---|---|---|---|---|
耗時下降 | 24% | 71.3% | 83.7% | 90.9% | 93.5% | 95.7% |
能夠看出當讀的比例越高時,使用讀寫鎖得到的性能提高比例越高。