go 讀寫鎖實現原理解讀

引言

鎖是編程開發中用於併發控制的一種同步機制,提供多線程(或協程)之間併發讀寫一個共享數據的方法。在go語言中使用鎖也很簡單:編程

var loc sync.Mutex
var rwLoc sync.RWMutex
var idx int
var writeRatio = 3

func Inc(){
   loc.Lock()
   defer loc.Unlock()
   timer := time.NewTimer(100 * time.Millisecond)
   select{
   case <- timer.C:
      idx ++
   }
}

func Dec(){
   loc.Lock()
   defer loc.Unlock()
   timer := time.NewTimer(100 * time.Millisecond)
   select{
   case <- timer.C:
      idx --
   }
}

func main(){
   wg := sync.WaitGroup{}
   wg.Add(6)
   for i := 0; i < 3; i++ {
      go func() {
         defer wg.Done()
         Inc()
      }()
      go func(){
         defer wg.Done()
         Dec()
      }()
   }
   wg.Wait()
   fmt.Printf("i: %vn", idx)
}

使用標準包sync.Mutex便可標記一段臨界區,保證對數據的併發讀寫符合預期。
注意到每次讀寫變量idx的時候都須要加鎖,也就是任一時候只有一個goroutine容許讀寫該變量,而事實上若是併發執行的goroutine都是讀的操做,是沒有必要加鎖的(由於變量的內容並無改變),加鎖是爲了處理寫操做的goroutine能正確同步變量的值。那麼有沒有不負如來不負卿的雙全法呢,既能正確同步寫操做,又能避免讀操做的無謂加鎖?事實上這就是讀寫鎖的目的。數據結構

讀寫鎖

簡單地說,讀寫鎖就是一種能保證:多線程

  • 併發讀操做之間不互斥;
  • 併發寫操做之間互斥;
  • 併發讀操做和寫操做互斥;

的鎖。
go語言的sync包也包含了這一數據結構,即RWMutex,使用方法與普通的鎖基本相同,惟一的區別在於讀操做的加鎖、釋放鎖用的是RLock方法和RUnlock方法:併發

var rwLoc sync.RWMutex
var idx int

func ReadRW() {
   rwLoc.RLock()
   defer rwLoc.RUnlock()
   _, _ = fmt.Fprint(ioutil.Discard, idx)
}

func WriteRW() {
   rwLoc.Lock()
   defer rwLoc.Unlock()
   idx = 3
}

那麼go是怎麼實現讀寫鎖的呢,讓咱們經過源碼分析一下它的實現原理。源碼分析

源碼分析

在看源碼以前咱們不妨先思考一下,若是本身實現,須要怎麼設計這個數據結構來知足上面那三個要求,而後再參看源碼會有更多理解。
首先,爲了知足第二點和第三點要求,確定須要一個互斥鎖:性能

type RWMutex struct{
    w Mutex // held if there are pending writers
    ...
}

這個互斥鎖是在寫操做時使用的:ui

func (rw *RWMutex) Lock(){
    ...
    rw.w.Lock()
    ...
}

func (rw *RWMutex) Unlock(){
    ...
    rw.w.Unlock()
    ...
}

而讀操做之間是不互斥的,所以讀操做的RLock()過程並不獲取這個互斥鎖。但讀寫之間是互斥的,那麼RLock()若是不獲取互斥鎖又怎麼能阻塞住寫操做呢?go語言的實現是這樣的:
經過一個int32變量記錄當前正在讀的goroutine數:atom

type RWMutex struct{
    w           Mutex // held if there are pending writers
    readerCount int32 // number of pending readers
    ...
}

每次調用Rlock方法時將readerCount加1,對應地,每次調用RUnlock方法時將readerCount減1:線程

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 若是readerCount小於0則經過同步原語阻塞住,不然將readerCount加1後即返回
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
    // 若是readerCount減1後小於0,則調用rUnlockSlow方法,將這個方法剝離出來是爲了RUnlock能夠內聯,這樣能進一步提高讀操做時的取鎖性能
        rw.rUnlockSlow(r)
    }
}

既然每次RLock時都會將readerCount增長,那判斷它是否小於0有什麼意義呢?這就須要和寫操做的取鎖過程Lock()參看:設計

// 總結一下Lock的流程:1. 阻塞新來的寫操做;2. 阻塞新來的讀操做;3. 等待以前的讀操做完成;
func (rw *RWMutex) Lock() {
    // 經過rw.w.Lock阻塞其它寫操做
    rw.w.Lock()
    // 將readerCount減去一個最大數(2的30次方,RWMutex能支持的最大同時讀操做數),這樣readerCount將變成一個小於0的很小的數,
    // 後續再調RLock方法時將會由於readerCount<0而阻塞住,這樣也就阻塞住了新來的讀請求
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 等待以前的讀操做完成
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

寫操做獲取鎖時經過將readerCount改爲一個很小的數保證新來的讀操做會由於readerCount<0而阻塞住;那以前未完成的讀操做怎麼處理呢?很簡單,只要跟蹤寫操做Lock以前未完成的reader數就好了,這裏經過一個int32變量readerWait來作這件事情:

type RWMutex struct{
    w           Mutex // held if there are pending writers
    readerCount int32 // number of pending readers
    readerWait  int32 // number of departing readers
    ...
}

每次寫操做Lock時會將當前readerCount數量記在readerWait裏。
回想一下,當寫操做Lock後readerCount會小於0,這時reader unlock時會執行rUnlockSlow方法,如今能夠來看它的實現過程了:

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        throw("sync: RUnlock of unlocked RWMutex")
    }
    // 每一個reader完成讀操做後將readerWait減少1
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // 當readerWait爲0時表明writer等待的全部reader都已經完成了,能夠喚醒writer了
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

最後再看寫操做的釋放鎖過程:

func (rw *RWMutex) Unlock() {
    // 將readerCount置回原來的值,這樣reader又能夠進入了
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        throw("sync: Unlock of unlocked RWMutex")
    }
    // 喚醒那些等待的reader
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 釋放互斥鎖,這樣新的writer能夠得到鎖
    rw.w.Unlock()
}

將上面這些過程梳理一下:

  1. 若是沒有writer請求進來,則每一個reader開始後只是將readerCount增1,完成後將readerCount減1,整個過程不阻塞,這樣就作到「併發讀操做之間不互斥」;
  2. 當有writer請求進來時首先經過互斥鎖阻塞住新來的writer,作到「併發寫操做之間互斥」;
  3. 而後將readerCount改爲一個很小的值,從而阻塞住新來的reader;
  4. 記錄writer進來以前未完成的reader數量,等待它們都完成後再喚醒writer;這樣就作到了「併發讀操做和寫操做互斥」;
  5. writer結束後將readerCount置回原來的值,保證新的reader不會被阻塞,而後喚醒以前等待的reader,再將互斥鎖釋放,使後續writer不會被阻塞。

這就是go語言中讀寫鎖的核心源碼(簡潔起見,這裏將競態部分的代碼省略,TODO:競態分析原理分析),相信看到這你已經對讀寫鎖的實現原理了然於胸了,若是你感興趣,不妨一塊兒繼續思考這幾個問題。

思考

1. writer lock時在判斷是否有未完成的reader時爲何使用r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0

回想一下Lock方法:

func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

爲了判斷是否還有未完成的reader,直接判斷 r!= 0不就好了嗎,爲何還須要判斷atomic.AddInt32(&rw.readerWait, r)!=0
這是由於上面第三行和第四行的代碼並非原子的,這就意味着中間頗有可能插進其它goroutine執行,假如某個時刻執行完第三行代碼,r=1,也就是此時還有一個reader,但執行第四行以前先執行了該reader的goroutine,而且reader完成RUnlock操做,此時若是隻判斷r!=0就會錯誤地阻塞住,由於這時候已經沒有未完成的reader了。而reader在執行RUnlock的時候會將readerWait減1,因此readerWait+r就表明未完成的reader數。
那麼只判斷atomic.AddInt32(&rw.readerWait, r)!=0不就能夠嗎?理論上應該是能夠的,先判斷r!=0應該是一種短路操做:若是r==0那就不用執行atomic.AddInt32了(注意r==0時readerWait也等於0)。

Benchmark

最後讓咱們經過Benchmark看看讀寫鎖的性能提高有多少:

func Read() {
   loc.Lock()
   defer loc.Unlock()
   _, _ = fmt.Fprint(ioutil.Discard, idx)
   time.Sleep(1000 * time.Nanosecond)
}

func ReadRW() {
   rwLoc.RLock()
   defer rwLoc.RUnlock()
   _, _ = fmt.Fprint(ioutil.Discard, idx)
   time.Sleep(1000 * time.Nanosecond)
}

func Write() {
   loc.Lock()
   defer loc.Unlock()
   idx = 3
   time.Sleep(1000 * time.Nanosecond)
}

func WriteRW() {
   rwLoc.Lock()
   defer rwLoc.Unlock()
   idx = 3
   time.Sleep(1000 * time.Nanosecond)
}

func BenchmarkLock(b *testing.B) {
   b.RunParallel(func(pb *testing.PB) {
      foo := 0
      for pb.Next() {
         foo++
         if foo % writeRatio == 0 {
            Write()
         } else {
            Read()
         }
      }
   })
}

func BenchmarkRWLock(b *testing.B) {
   b.RunParallel(func(pb *testing.PB) {
      foo := 0
      for pb.Next() {
         foo++
         if foo % writeRatio == 0 {
            WriteRW()
         } else {
            ReadRW()
         }
      }
   })
}

這裏使用了go語言內置的Benchmark功能,執行go test -bench='Benchmark.*Lock' -run=none mutex_test.go便可觸發benchmark運行,-run=none是爲了跳過單測。
結果以下:

cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkLock
BenchmarkLock-12            235062          5025 ns/op
BenchmarkRWLock
BenchmarkRWLock-12          320209          3834 ns/op

能夠看出使用讀寫鎖後耗時下降了24%左右。
上面writeRatio用於控制讀、寫的頻率比例,即讀:寫=3,隨着這個比例越高耗時下降的比例也越大,這裏做個簡單比較:

writeRatio 3 10 20 50 100 1000
耗時下降 24% 71.3% 83.7% 90.9% 93.5% 95.7%

能夠看出當讀的比例越高時,使用讀寫鎖得到的性能提高比例越高。

相關文章
相關標籤/搜索