信號量,鎖和 golang 相關源碼分析

  • 1資源同步算法

    • 1.1 解決方案編程

  • 2 信號量緩存

    • 2.1 共享變量數據結構

    • 2.2 信號量多線程

  • 3 鎖併發

    • 3.1 死鎖app

    • 3.2 活鎖編程語言

    • 3.3 飢餓ide

  • 4 Golang sync 包函數

    • 4.4.1 數據結構

    • 4.4.2 NewCond函數

    • 4.4.3 Wait方法

    • 4.4.4 Singal方法

    • 4.4.5 Broadcast方法

    • 4.3.1 數據結構

    • 4.3.2 Add和Done方法

    • 4.3.3 Wait方法

    • 4.2.1 常量和結構

    • 4.2.1 RLock和RUnlock方法

    • 4.2.2 Lock和Unlock方法

    • 4.1.1 接口和結構

    • 4.1.2 Lock 方法

    • 4.1.3 Unlock方法

    • 4.1 sync.mutex.go

    • 4.2 sync.rwmutex.go

    • 4.3 sync.waitgroup.go

    • 4.4 sync.cond.go

1資源同步

併發已經成爲現代程序設計中的重要考慮內容,可是併發涉及到一個很重要的內容就是資源同步。當兩個或者多個線程訪問一樣的資源的時候,運行的結果取決於線程運行時精確的時序。這樣致使結果與指望的結果截然不同,所以咱們須要對資源的訪問順序進行控制,已達到資源同步的目的。

1.1 解決方案

將共享資源(也就是共享內存)的程序片斷成爲臨界區域,經過適當安排,使得兩個者線程同時位於臨界區域。對於臨界區域訪問解決方案,須要知足以下4個條件

  • 任何兩個線程不能同時位於臨界區

  • 不對CPU執行速度和時間作任何假設

  • 臨界區外運行的線程不阻塞其餘線程

  • 不能使線程無限期等待進入臨界區

常見的互斥解決方案:

屏蔽中斷線程的切換是由CPU中斷機制提供的,若是一個線程進入臨界區域後,CPU關閉中斷響應;在離開臨界區域後,再打開中斷機制。那麼在臨界區域將不會有其餘線程來競爭資源。 當時將屏蔽中斷權利交給用戶空間執行是不明智的,並且對於多核CPU而言沒有效果。

鎖變量幾乎每個編程語言都提供了資源同步方式:鎖機制。該機制經過對資源進行LockUnlock,以達到對關鍵資源有序訪問。

嚴格輪換法線程不停的執行CPU時間,連續測試某一個值是否出現。可是若是認爲等待的時間很是短,可使用該方式浪費CPU時間,用於等待的鎖也成爲自旋鎖

2 信號量

2.1 共享變量

在理解信號量以前,先了解採用共享變量使用多線程會出現什麼問題。下面是一個C代碼片斷

1for (i=0; i<niters; i++){
2    cnt ++;
3}

cnt爲全局變量,一個線程執行該代碼片斷的時候的彙編代碼以下:

 1   movq (%rdi), %rcx
2    testq %rcx, %rcx
3    jle .L2
4    movl $0, %eax
5.L3:
6    movq cnt(%rip), %rdx
7    addq %eax
8    movq %eax, cnt(%rip)
9    addq $1, %rax
10    cmpq %rcx, %rax
11    jne .L3
12.L2


2.2 信號量其中6-8行分別對應對應着加載cnt,更新cnt和存儲cnt。將cnt變量從內存位置讀出,加載到CPU寄存器中,在CPU運算器中加1,而後存儲到cnt的內存位置。雖然代碼中cnt++只有一行,可是轉換爲彙編代碼的時候不僅有一個操做,也就是說該語句不是原子操做。若是多個線程同時執行代碼,按照以前的條件,不對CPU的執行順序作任何假設,若是其中線程a在執行7行彙編代碼,而線程b執行6行彙編代碼,那麼b將"看不到"線程a對全局變量cnt加1的操做,那麼每次執行的結果cnt也不徹底一致。


計算機領域先驅Dijkstra提出經典的解決上述問題的方法:信號量(semaphore)。它是一個非負整數的全局變量。並且該變量只能有兩個特殊操做來處理: PV

  • P(s): 若是s非零,那麼Ps1,而且當即返回。若是s爲零,那麼就掛起這個線程,知道s爲非零。

  • V(s): V操做將s1。若是有任何線程阻塞在P操做等待s非零,那麼V將重啓其中線程中的一個。

Posix標準定義須要操做信號量的函數

1#include <semaphore.h>
2int sem_init(sem_t *sem, 0unsigned int value);
3int sem_wait(sem_t *s)/*P(s)*/
4int sem_post(sem_t *s)/*P(s)*/

那麼如何使用信號量是的2.1小節出現同步問題解決呢?首先定義全局信號量

1volatile long cnt = 0/* global variable */
2sem_t mutex; /*global semaphore*/

初始化信號量,在這裏初始值爲1

1sem_init(&mutex, 0, 1);

最後使用信號量操做函數將臨界區域代碼包含起來

1for (i =0; i<niters; i++){
2    sem_wait(&mutex);
3    cnt++;
4    sem_post(&mutex);
5}
3 鎖

3.1 死鎖

首先看一下死鎖的規範定義:

若是一個線程(進程)集合中的每個線程(進程)都在等待只能由該線程(進程)集合中的其餘線程(進程)才能引起的事件,那麼該線程(進程)集合是死鎖的。

舉一個例子,若是線程 a 和線程 b 同是執行,線程a獲取了資源r1,等待獲取資源r2;而線程b獲取了資源r2,等待獲取資源r1。那麼線程a和線程b組成的集合是死鎖的。

預防死鎖

  • 破壞佔有等待條件

    對於須要獲取多個資源的線程,一次性獲取所有資源,而不是依次獲取各個資源。

  • 破壞環路等待條件

    死鎖集合的線程按照等佔有線程和等待線程能夠組成有向環圖。那麼若是對全部資源進行排序,全部線程按照資源順序獲取資源。

3.2 活鎖

在某些狀況下,當線程意識它不能獲下一個資源的時候,它會「禮貌性」地釋放已經得到的資源,而後等待1ms,在嘗試一次。若是另外一個線程也在相同的時候作了相同的操做, 那麼同步的步調將致使兩個線程都沒法前進。

3.3 飢餓

在信號量小節中,當執行V操做後,將恢復掛起線程中的一個,那麼問題出現了:若是有多個線程被掛起,那麼選擇哪一個線程恢復呢?若是隨機選擇一個線程恢復,若是源源不斷的線程到達臨界區域而且掛起,那麼頗有可能出現某一個線程一直等待資源,而致使"飢餓"。固然也有好的FILO調度策略來解決調用問題。當時問題在於剛剛到達的線程有很好的局部性,也就是CPU的寄存器、緩存等包含了該線程的局部變量,若是程得到資源鎖,很好的避免了線程上下文切換,對性能提升頗有幫助。

go語言的互斥鎖中採用結合上述兩種策略,接下來小節中,將會仔細分析源碼。

4 Golang sync 包

4.1 sync.mutex.go

4.1.1 接口和結構

包含了Locker接口和Mutex結構:

1type Locker interface {
2    Lock()
3    Unlock()
4}
5type Mutex struct {
6    state int32
7    sema  uint32
8}

Mutex實現了Locker接口,該結構包含了state的字段,用來表示該鎖當前狀態;sema則爲一個信號量。state是一個32位的整數,不一樣比特位包含了不一樣的意義,其中源碼中的有很詳細的註釋,該註釋很好解釋mutex如何工做:

互斥鎖有兩種狀態:正常狀態和飢餓狀態。在正常狀態下,全部掛起等待的goroutine按照FIFO順序等待。喚醒的goroutine將會和剛剛到達的goroutine競爭互斥鎖的擁有權,由於剛剛到達的goroutine具備優點:它剛剛正在CPU上執行,因此剛剛喚醒的goroutine有很大可能在鎖競爭中失敗。若是一個等待的goroutine超過1ms沒有獲取互斥鎖,那麼它將會把互斥鎖轉變爲飢餓模式。在飢餓模式下,互斥鎖的全部權將移交給等待隊列中的第一個。新來的goroutine將不會嘗試去得到互斥鎖,也不會去嘗試自旋操做,而是放在隊列的最後一個。若是一個等待的goroutine獲取的互斥鎖,如何它知足一下其中的任何一個條件:(1)它是隊列中的最後一個;(2)它等待的時候小於1ms。它會將互斥鎖的轉檯轉換爲正常狀態。正常狀態有很好的性能表現,飢餓模式也是很是重要的的,由於它能阻止尾部延遲的現象。

1const (
2    mutexLocked = 1 << iota // mutex is locked
3    mutexWoken
4    mutexStarving
5    mutexWaiterShift = iota
6    starvationThresholdNs = 1e6
7)
  • mutexLocked

該值爲1, 第一位比特位1,表明了該是否該互斥鎖已經被鎖住。mutex.state與它進行&操做,若是爲1表示已經鎖住,0則表示未被鎖住。

  • mutexWoken

該值爲2,第二位比特位1,表明了該互斥鎖是否被喚醒,mutex.state與它進行&操做,若是爲1表示已經被喚醒,0表明未被喚醒

  • mutexStarving

該值爲4,第三位比特爲1,表明了該互斥鎖是否處於飢餓狀態,mutex.state與它進行&操做,若是爲1表示處於飢餓轉態,0表示處於正常狀態。

  • mutexWaiterShift

該值爲3,表示mutex.state右移3位後爲等待的goroutine的數量。

  • starvationThresholdNs

goroutine將互斥鎖轉換狀態的時間等待的臨界值:一百萬納秒,也就是1ms。

4.1.2 Lock 方法

1if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
2        if race.Enabled {
3            race.Acquire(unsafe.Pointer(m))
4        }
5        return
6}

CompareAndSwapInt32是一個原子操做,它判斷是一個參數的值是否等於第二個參數,若是相等,則將第一個參數設置爲第三個參數,並返回true;不然對一個參數不作任何操做而且返回false。這一段是代碼是處理第一次goroutine進行嘗試Lock操做,若是一切都是初始狀態,則m.state.....0000001而且返回,進入臨界區域代碼,不然代碼繼續往下走。

1var waitStartTime int64
2starving := false
3awoke := false
4iter := 0
5old := m.state

首先定義了一下變量:goroutine等待時間,是否飢餓轉檯,是否喚醒和自旋迭代次數和保存當前互斥鎖狀態。接下來是一個for循環,只有退出循環才能進入臨界區域代碼,縱觀代碼只有兩處使用break來退出循環。

 1for {
2    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
3            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
4                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
5                awoke = true
6            }
7            runtime_doSpin()
8            iter++
9            old = m.state
10            continue
11        }
12}

首先判斷鎖狀態是被鎖並且不是處於飢餓模式,加上還能自旋額次數,進入下一層判斷。若是當前goroutine沒有被喚醒其餘goroutine也沒有被喚醒等待的goroutine超過1能夠將m.state設置爲喚醒轉態四個條件同時知足,將awoke設置true`。而後進行自旋操做,進行一輪循環。

 1new := old
2if old&mutexStarving == 0 {
3            new |= mutexLocked
4}
5if old&(mutexLocked|mutexStarving) != 0 {
6        new += 1 << mutexWaiterShift
7}
8if starving && old&mutexLocked != 0 {
9            new |= mutexStarving
10}

這三個判斷條件作了以下工做:若是當前的mutex.state處於正常模式,則將new的鎖位設置爲1,若是當前鎖鎖狀態爲鎖定狀態或者處於飢餓模式,則將等待的線程數量+1。若是starving變量爲true而且處於鎖定狀態,則new的飢餓狀態位打開。

1if awoke {
2    if new&mutexWoken == 0 {
3                throw("sync: inconsistent mutex state")
4    }
5    new &^= mutexWoken
6}

若是 goroutine已經被喚醒,則清空new的喚醒位。

1if atomic.CompareAndSawpInt32(&m.state, old, new){
2    //...
3}else{
4    //...
5}

若是更新m.state成功

1if old&(mutexLocked|mutexStarving) == 0 {
2                break 
3}

若是未被鎖定而且並非出於飢餓狀態,到達第一個break,進入代碼臨界區域。

1queueLifo := waitStartTime != 0
2if waitStartTime == 0 {
3    waitStartTime = runtime_nanotime()
4}
5runtime_SemacquireMutex(&m.sema, queueLifo)

runtime_SemacquireMutex(s *uint32, lifo bool)函數相似P操做,若是lifotrue則將等待goroutine插入到隊列的前面。在這裏,對於每個到達的goroutine,若是CompareAndSawpInt32成功,而且到達時候若是鎖出於鎖定狀態,那麼將該goroutine插入到等待隊列的最後,不然插入到最前面。此時goroutine將會被掛起,等待UnlockV操做,將喚醒goroutines

 1starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
2old = m.state
3if old&mutexStarving != 0 {
4                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
5                    throw("sync: inconsistent mutex state")
6                }
7                delta := int32(mutexLocked - 1<<mutexWaiterShift)
8                if !starving |
| old>>mutexWaiterShift == 1 {
9                    delta -= mutexStarving
10                }
11                atomic.AddInt32(&m.state, delta)
12                break
13}
14

判斷被喚醒的線程是否爲達到飢餓狀態,也就是等待時間超過1ms,若是以前的m.state不是飢餓狀態,繼續循環,給新到來goroutine讓出互斥鎖。若是已經飢餓狀態,則修改等待goroutine數量和飢餓狀態位,並返回進入臨界代碼區域。

4.1.3 Unlock方法

1new := atomic.AddInt32(&m.state, -mutexLocked)

首先建立變量new,該變量的鎖位爲0。接下來是飢餓狀態判斷

 1if new&mutexStarving == 0 {
2        old := new
3        for {
4                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
5                return
6            }
7            new = (old - 1<<mutexWaiterShift) | mutexWoken
8            if atomic.CompareAndSwapInt32(&m.state, old, new) {
9                runtime_Semrelease(&m.sema, false)
10                return
11            }
12            old = m.state
13        }
14    } else {
15        runtime_Semrelease(&m.sema, true)
16    }

若是是正常狀態,則判斷若是等待的goroutine爲零,或者已經被鎖定、喚醒、或者已經變成飢餓狀態,返回,不須要喚醒任何其餘被掛起的goroutine,由於互斥鎖已經被其餘goroutine搶佔。不然更新new值(修改等待的goroutine數量)並設置喚醒爲,若是CompareAndSwapInt32成功,則經過runtime_Semrelease(&m.sema, false)恢復掛起的goroutine.r若是爲 true代表將喚醒第一個阻塞的goroutine,這第一點在else飢餓的分支中體現。

4.2 sync.rwmutex.go

讀寫鎖也是一種常見的鎖機制,它容許多個線程讀共享資源,只有一個線程寫共享資源,接下來看看go中如何實現讀寫鎖。

4.2.1 常量和結構

1type RWMutex struct {
2    w           Mutex  
3    writerSem   uint32 
4    readerSem   uint32 
5    readerCount int32  
6    readerWait  int32 
7}
8const rwmutexMaxReaders = 1 << 30

RWMutex結構包含了以下的字段

  • goroutine數量。

4.2.1 RLock和RUnlock方法

1func (rw *RWMutex) RLock() {
2    // [...]
3    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
4        runtime_Semacquire(&rw.readerSem)
5    }
6//[...]
7}

首先是readerCount值+1, 若是小於零,則掛起goroutine等待readerSem。是否是很奇怪,爲何會小於零判斷呢?在這裏先賣一個關子,接下來會看到爲何是這樣的設計邏輯。

 1func (rw *RWMutex) RUnlock() {
2    //[...]
3    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
4        if r+1 == 0 || r+1 == -rwmutexMaxReaders {
5            race.Enable()
6            throw("sync: RUnlock of unlocked RWMutex")
7        }
8        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
9            runtime_Semrelease(&rw.writerSem, false)
10        }
11    }
12    //[...]
13}

首先將readerCount減去1,若是小於零,再講readWait減去1,若是是離開讀的goroutine數量爲零,則對writerSem信號量進行V操做。

4.2.2 Lock和Unlock方法

1func (rw *RWMutex) Lock() {
2    //[...]
3    rw.w.Lock()
4    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
5    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
6        runtime_Semacquire(&rw.writerSem)
7    }
8    //[...]
9}

首先rw.w.Lock操做,來防止其餘goroutine對共享資源的寫訪問。而後將readerCount減去rwmutexMaxReaders,代表還剩多少goroutine能夠進行讀訪問,這也解釋在RLock中小於零的判斷,若是還能夠還能夠進行讀訪問,則必須得到readerSem信號量。在Lock中接下來是對readWait判斷,若是該數量不爲零,則須要對writerSem進行P操做,而V操做只在RUnlock方法中,若是最後一個讀goroutine離開,則進行V操做。

 1func (rw *RWMutex) Unlock() {
2    //[...]
3    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
4    if r >= rwmutexMaxReaders {
5        race.Enable()
6        throw("sync: Unlock of unlocked RWMutex")
7    }
8    for i := 0; i < int(r); i++ {
9        runtime_Semrelease(&rw.readerSem, false)
10    }
11    //[...]
12}

首先恢復readCounter爲正數,而後對readerSem信號量進行rV操做,喚醒在RLock中被掛起的goroutine

4.3 sync.waitgroup.go

WaitGroup一般用在等待一組goroutine所有完成。調用Add方法指明要等待的goroutine的數量,調用Done方法說明該goroutine已經完成,而Wait方法是阻塞等待的goroutine

4.3.1 數據結構

1type WaitGroup struct {
2    noCopy noCopy
3    state1 [12]byte
4    sema   uint32
5}

noCopy字段說明WaitGroup不容許拷貝,而state1字段是一個很是tricky的方法,用其中的8個字節(64bit)來保存一些狀態。高位的32bit用來表示須要等待的goroutine的數量,地位的32bit用來表示被掛起的goroutine的數量。至於爲何不直接使用64bit的數據主要是爲了考慮32爲編譯器沒法保證64位對齊。sema則是一個信號量。

1func (wg *WaitGroup) state() *uint64 {
2    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
3        return (*uint64)(unsafe.Pointer(&wg.state1))
4    } else {
5        return (*uint64)(unsafe.Pointer(&wg.state1[4]))
6    }
7}

該方法是一個輔助方法,用來獲取state,一個64爲的無符號整數。

4.3.2 Add和Done方法

1func (wg *WaitGroup) Done() {
2    wg.Add(-1)
3}

Done方法其實調用了Add(-1)方法,因此咱們着重討論Add方法。

 1func (wg *WaitGroup) Add(delta int) {
2    statep := wg.state()
3    //[...]
4    state := atomic.AddUint64(statep, uint64(delta)<<32)
5    v := int32(state >> 32)
6    w := uint32(state)
7    if race.Enabled && delta > 0 && v == int32(delta) {
8            race.Read(unsafe.Pointer(&wg.sema))
9    }
10    if v < 0 {
11        panic("sync: negative WaitGroup counter")
12    }
13    if w != 0 && delta > 0 && v == int32(delta) {
14        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
15    }
16    if v > 0 || w == 0 {
17        return
18    }
19    if *statep != state {
20        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
21    }
22    // Reset waiters count to 0.
23    *statep = 0
24    for ; w != 0; w-- {
25        runtime_Semrelease(&wg.sema, false)
26    }
27}

首先是獲取state,而後將delta右移32位,加上等待的goroutine數量。vw分別表明了須要等待的goroutine和被阻塞的goroutine的數量。接下來v==int32(delta)判斷條件代表若是是第一次Add操做,則必須與等待的goroutine同步,在Wait方法中能夠看到一樣的操做。接下來是一些拋異常操做,若是等待的數量爲負數,如何第一次Add操做沒有同步。if >0 || w==0條件代表如何v沒有降到零,或者被阻塞的goroutine數量爲零,直接返回。如何v爲零,則按照w的數量,依次對信號量ws.sema進行V操做。

4.3.3 Wait方法

 1func (wg *WaitGroup) Wait() {
2    //[...]
3    for {
4        state := atomic.LoadUint64(statep)
5        v := int32(state >> 32)
6        w := uint32(state)
7        //[...]
8        // Increment waiters count.
9        if atomic.CompareAndSwapUint64(statep, state, state+1) {
10            if race.Enabled && w == 0 {
11                race.Write(unsafe.Pointer(&wg.sema))
12            }
13            runtime_Semacquire(&wg.sema)
14            if *statep != 0 {
15                panic("sync: WaitGroup is reused before previous Wait has returned")
16            }
17            //[...]
18            return
19        }
20    }
21}

Wait方法一樣也是CAS算法,首先獲取須要等待的goroutine的數量v和阻塞的goroutine數量w, 而後將阻塞的goroutine數量+1,若是以前的w爲零,表示是第一次等待,則與Add操做進行同步,最後後對信號量wg.sema進行P操做。

4.4 sync.cond.go

在編程中使用Cond也叫管程(monitor),它能夠用來使不一樣線程完成互斥條件,也可使某個線程等待某個條件的發生。常見的使用模式以下:

 1var locker = new(sync.Mutex)
2var cond = sync.NewCond(locker)
3var condition = true
4// goroutine A
5cond.L.Lock()
6for condition {
7    cond.Wait()
8}
9// ...
10cond.L.Unlock()
11
12//goroutine B
13condiiton = false
14cond.Signal() 

爲何使用for循環做爲判斷進入Wait的條件而不是if呢?主要是防止爲被喚醒的goroutine在返回Wait調用的時候,剛好有別的goroutine修改了conditon的值,因此須要使用for循環做爲條件判斷。

4.4.1 數據結構

1type Cond struct {
2    noCopy noCopy
3    L Locker
4    notify  notifyList
5    checker copyChecker
6}

Cond結構不容許拷貝,包含了Locker的接口字段,和一個notifyList的集合字段。

4.4.2 NewCond函數

1func NewCond(l Locker) *Cond {
2    return &Cond{L: l}
3}

實現Locker接口的類型均可以,通常爲MutexRWMutex

4.4.3 Wait方法

1func (c *Cond) Wait() {
2    c.checker.check()
3    t := runtime_notifyListAdd(&c.notify)
4    c.L.Unlock()
5    runtime_notifyListWait(&c.notify, t)
6    c.L.Lock()
7}

在使用Wait方法以前,要調用c.L.Lock來進入臨界區域,將當前等待的goroutine加入到通知隊列中,而後調用c.L.Unlock()來退出臨界區域,以便讓其餘goroutine能夠進入等待區域。緊接着掛起goroutine,等待消息。

4.4.4 Singal方法

1func (c *Cond) Signal() {
2    c.checker.check()
3    runtime_notifyListNotifyOne(&c.notify)}

runtime_notifyListNotifyOne喚起其中的等待的goroutine

4.4.5 Broadcast方法

1func (c *Cond) Broadcast() {
2    c.checker.check()
3    runtime_notifyListNotifyAll(&c.notify)
4}

runtime_notifyListNotifyAll喚起所有等待的goroutine

相關文章
相關標籤/搜索