1資源同步算法
1.1 解決方案編程
2 信號量緩存
2.1 共享變量數據結構
2.2 信號量多線程
3 鎖併發
3.1 死鎖app
3.2 活鎖編程語言
3.3 飢餓ide
4 Golang sync 包函數
4.4.1 數據結構
4.4.2 NewCond函數
4.4.3 Wait方法
4.4.4 Singal方法
4.4.5 Broadcast方法
4.3.1 數據結構
4.3.2 Add和Done方法
4.3.3 Wait方法
4.2.1 常量和結構
4.2.1 RLock和RUnlock方法
4.2.2 Lock和Unlock方法
4.1.1 接口和結構
4.1.2 Lock 方法
4.1.3 Unlock方法
4.1 sync.mutex.go
4.2 sync.rwmutex.go
4.3 sync.waitgroup.go
4.4 sync.cond.go
併發已經成爲現代程序設計中的重要考慮內容,可是併發涉及到一個很重要的內容就是資源同步。當兩個或者多個線程訪問一樣的資源的時候,運行的結果取決於線程運行時精確的時序。這樣致使結果與指望的結果截然不同,所以咱們須要對資源的訪問順序進行控制,已達到資源同步的目的。
將共享資源(也就是共享內存)的程序片斷成爲臨界區域
,經過適當安排,使得兩個者線程同時位於臨界區域。對於臨界區域
訪問解決方案,須要知足以下4個條件
任何兩個線程不能同時位於臨界區
不對CPU執行速度和時間作任何假設
臨界區外運行的線程不阻塞其餘線程
不能使線程無限期等待進入臨界區
常見的互斥解決方案:
屏蔽中斷線程的切換是由CPU中斷機制提供的,若是一個線程進入臨界區域後,CPU關閉中斷響應;在離開臨界區域後,再打開中斷機制。那麼在臨界區域將不會有其餘線程來競爭資源。 當時將屏蔽中斷權利交給用戶空間執行是不明智的,並且對於多核CPU而言沒有效果。
鎖變量幾乎每個編程語言都提供了資源同步方式:鎖機制。該機制經過對資源進行Lock
和Unlock
,以達到對關鍵資源有序訪問。
嚴格輪換法線程不停的執行CPU時間,連續測試某一個值是否出現。可是若是認爲等待的時間很是短,可使用該方式浪費CPU時間,用於等待的鎖也成爲自旋鎖
。
在理解信號量以前,先了解採用共享變量使用多線程會出現什麼問題。下面是一個C代碼片斷
1for (i=0; i<niters; i++){
2 cnt ++;
3}
cnt
爲全局變量,一個線程執行該代碼片斷的時候的彙編代碼以下:
1 movq (%rdi), %rcx
2 testq %rcx, %rcx
3 jle .L2
4 movl $0, %eax
5.L3:
6 movq cnt(%rip), %rdx
7 addq %eax
8 movq %eax, cnt(%rip)
9 addq $1, %rax
10 cmpq %rcx, %rax
11 jne .L3
12.L2
2.2 信號量其中6-8
行分別對應對應着加載cnt
,更新cnt
和存儲cnt
。將cnt
變量從內存位置讀出,加載到CPU寄存器中,在CPU運算器中加1,而後存儲到cnt
的內存位置。雖然代碼中cnt++
只有一行,可是轉換爲彙編代碼的時候不僅有一個操做,也就是說該語句不是原子操做。若是多個線程同時執行代碼,按照以前的條件,不對CPU的執行順序作任何假設,若是其中線程a
在執行7
行彙編代碼,而線程b
執行6
行彙編代碼,那麼b
將"看不到"線程a
對全局變量cnt
加1的操做,那麼每次執行的結果cnt
也不徹底一致。
計算機領域先驅Dijkstra
提出經典的解決上述問題的方法:信號量(semaphore)。它是一個非負整數的全局變量。並且該變量只能有兩個特殊操做來處理: P
和V
。
P(s): 若是s
非零,那麼P
將s
減1
,而且當即返回。若是s
爲零,那麼就掛起這個線程,知道s
爲非零。
V(s): V
操做將s
加1
。若是有任何線程阻塞在P
操做等待s
非零,那麼V
將重啓其中線程中的一個。
Posix
標準定義須要操做信號量的函數
1#include <semaphore.h>
2int sem_init(sem_t *sem, 0, unsigned int value);
3int sem_wait(sem_t *s); /*P(s)*/
4int sem_post(sem_t *s); /*P(s)*/
那麼如何使用信號量是的2.1小節出現同步問題解決呢?首先定義全局信號量
1volatile long cnt = 0; /* global variable */
2sem_t mutex; /*global semaphore*/
初始化信號量,在這裏初始值爲1
1sem_init(&mutex, 0, 1);
最後使用信號量操做函數將臨界區域代碼包含起來
1for (i =0; i<niters; i++){3 鎖
2 sem_wait(&mutex);
3 cnt++;
4 sem_post(&mutex);
5}
首先看一下死鎖的規範定義:
若是一個線程(進程)集合中的每個線程(進程)都在等待只能由該線程(進程)集合中的其餘線程(進程)才能引起的事件,那麼該線程(進程)集合是死鎖的。
舉一個例子,若是線程 a
和線程 b
同是執行,線程a
獲取了資源r1
,等待獲取資源r2
;而線程b
獲取了資源r2
,等待獲取資源r1
。那麼線程a
和線程b
組成的集合是死鎖的。
預防死鎖
破壞佔有等待條件
對於須要獲取多個資源的線程,一次性獲取所有資源,而不是依次獲取各個資源。
破壞環路等待條件
死鎖集合的線程按照等佔有線程和等待線程能夠組成有向環圖。那麼若是對全部資源進行排序,全部線程按照資源順序獲取資源。
在某些狀況下,當線程意識它不能獲下一個資源的時候,它會「禮貌性」地釋放已經得到的資源,而後等待1ms
,在嘗試一次。若是另外一個線程也在相同的時候作了相同的操做, 那麼同步的步調將致使兩個線程都沒法前進。
在信號量小節中,當執行V
操做後,將恢復掛起線程中的一個,那麼問題出現了:若是有多個線程被掛起,那麼選擇哪一個線程恢復呢?若是隨機選擇一個線程恢復,若是源源不斷的線程到達臨界區域而且掛起,那麼頗有可能出現某一個線程一直等待資源,而致使"飢餓"。固然也有好的FILO
調度策略來解決調用問題。當時問題在於剛剛到達的線程有很好的局部性,也就是CPU的寄存器、緩存等包含了該線程的局部變量,若是程得到資源鎖,很好的避免了線程上下文切換,對性能提升頗有幫助。
在go
語言的互斥鎖中採用結合上述兩種策略,接下來小節中,將會仔細分析源碼。
包含了Locker
接口和Mutex
結構:
1type Locker interface {
2 Lock()
3 Unlock()
4}
5type Mutex struct {
6 state int32
7 sema uint32
8}
Mutex
實現了Locker
接口,該結構包含了state
的字段,用來表示該鎖當前狀態;sema
則爲一個信號量。state
是一個32位的整數,不一樣比特位包含了不一樣的意義,其中源碼中的有很詳細的註釋,該註釋很好解釋mutex
如何工做:
互斥鎖有兩種狀態:正常狀態和飢餓狀態。在正常狀態下,全部掛起等待的goroutine按照
FIFO
順序等待。喚醒的goroutine將會和剛剛到達的goroutine競爭互斥鎖的擁有權,由於剛剛到達的goroutine具備優點:它剛剛正在CPU上執行,因此剛剛喚醒的goroutine有很大可能在鎖競爭中失敗。若是一個等待的goroutine超過1ms沒有獲取互斥鎖,那麼它將會把互斥鎖轉變爲飢餓模式。在飢餓模式下,互斥鎖的全部權將移交給等待隊列中的第一個。新來的goroutine將不會嘗試去得到互斥鎖,也不會去嘗試自旋操做,而是放在隊列的最後一個。若是一個等待的goroutine獲取的互斥鎖,如何它知足一下其中的任何一個條件:(1)它是隊列中的最後一個;(2)它等待的時候小於1ms。它會將互斥鎖的轉檯轉換爲正常狀態。正常狀態有很好的性能表現,飢餓模式也是很是重要的的,由於它能阻止尾部延遲的現象。
1const (
2 mutexLocked = 1 << iota // mutex is locked
3 mutexWoken
4 mutexStarving
5 mutexWaiterShift = iota
6 starvationThresholdNs = 1e6
7)
mutexLocked
該值爲1
, 第一位比特位1
,表明了該是否該互斥鎖已經被鎖住。mutex.state
與它進行&
操做,若是爲1
表示已經鎖住,0
則表示未被鎖住。
mutexWoken
該值爲2
,第二位比特位1
,表明了該互斥鎖是否被喚醒,mutex.state
與它進行&
操做,若是爲1
表示已經被喚醒,0
表明未被喚醒
mutexStarving
該值爲4
,第三位比特爲1
,表明了該互斥鎖是否處於飢餓狀態,mutex.state
與它進行&
操做,若是爲1
表示處於飢餓轉態,0
表示處於正常狀態。
mutexWaiterShift
該值爲3
,表示mutex.state
右移3位後爲等待的goroutine
的數量。
starvationThresholdNs
goroutine
將互斥鎖轉換狀態的時間等待的臨界值:一百萬納秒,也就是1ms。
1if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
2 if race.Enabled {
3 race.Acquire(unsafe.Pointer(m))
4 }
5 return
6}
CompareAndSwapInt32
是一個原子操做,它判斷是一個參數的值是否等於第二個參數,若是相等,則將第一個參數設置爲第三個參數,並返回true
;不然對一個參數不作任何操做而且返回false
。這一段是代碼是處理第一次goroutine
進行嘗試Lock
操做,若是一切都是初始狀態,則m.state
爲.....0000001
而且返回,進入臨界區域代碼,不然代碼繼續往下走。
1var waitStartTime int64
2starving := false
3awoke := false
4iter := 0
5old := m.state
首先定義了一下變量:goroutine
等待時間,是否飢餓轉檯,是否喚醒和自旋迭代次數和保存當前互斥鎖狀態。接下來是一個for
循環,只有退出循環才能進入臨界區域代碼,縱觀代碼只有兩處使用break
來退出循環。
1for {
2 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
3 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
4 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
5 awoke = true
6 }
7 runtime_doSpin()
8 iter++
9 old = m.state
10 continue
11 }
12}
首先判斷鎖狀態是被鎖並且不是處於飢餓模式,加上還能自旋額次數,進入下一層判斷。若是當前goroutine沒有被喚醒
,
其餘goroutine也沒有被喚醒,
等待的goroutine超過1和
能夠將m.state設置爲喚醒轉態四個條件同時知足,將
awoke設置
true`。而後進行自旋操做,進行一輪循環。
1new := old
2if old&mutexStarving == 0 {
3 new |= mutexLocked
4}
5if old&(mutexLocked|mutexStarving) != 0 {
6 new += 1 << mutexWaiterShift
7}
8if starving && old&mutexLocked != 0 {
9 new |= mutexStarving
10}
這三個判斷條件作了以下工做:若是當前的mutex.state
處於正常模式,則將new
的鎖位設置爲1,若是當前鎖鎖狀態爲鎖定狀態或者處於飢餓模式,則將等待的線程數量+1。若是starving
變量爲true
而且處於鎖定狀態,則new
的飢餓狀態位打開。
1if awoke {
2 if new&mutexWoken == 0 {
3 throw("sync: inconsistent mutex state")
4 }
5 new &^= mutexWoken
6}
若是 goroutine
已經被喚醒,則清空new
的喚醒位。
1if atomic.CompareAndSawpInt32(&m.state, old, new){
2 //...
3}else{
4 //...
5}
若是更新m.state
成功
1if old&(mutexLocked|mutexStarving) == 0 {
2 break
3}
若是未被鎖定而且並非出於飢餓狀態,到達第一個break
,進入代碼臨界區域。
1queueLifo := waitStartTime != 0
2if waitStartTime == 0 {
3 waitStartTime = runtime_nanotime()
4}
5runtime_SemacquireMutex(&m.sema, queueLifo)
runtime_SemacquireMutex(s *uint32, lifo bool)
函數相似P
操做,若是lifo
爲true
則將等待goroutine
插入到隊列的前面。在這裏,對於每個到達的goroutine
,若是CompareAndSawpInt32
成功,而且到達時候若是鎖出於鎖定狀態,那麼將該goroutine
插入到等待隊列的最後,不然插入到最前面。此時goroutine
將會被掛起,等待Unlock
的V
操做,將喚醒goroutines
1starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
2old = m.state
3if old&mutexStarving != 0 {
4 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
5 throw("sync: inconsistent mutex state")
6 }
7 delta := int32(mutexLocked - 1<<mutexWaiterShift)
8 if !starving || old>>mutexWaiterShift == 1 {
9 delta -= mutexStarving
10 }
11 atomic.AddInt32(&m.state, delta)
12 break
13}
14
判斷被喚醒的線程是否爲達到飢餓狀態,也就是等待時間超過1ms
,若是以前的m.state
不是飢餓狀態,繼續循環,給新到來goroutine
讓出互斥鎖。若是已經飢餓狀態,則修改等待goroutine
數量和飢餓狀態位,並返回進入臨界代碼區域。
1new := atomic.AddInt32(&m.state, -mutexLocked)
首先建立變量new
,該變量的鎖位爲0
。接下來是飢餓狀態判斷
1if new&mutexStarving == 0 {
2 old := new
3 for {
4 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
5 return
6 }
7 new = (old - 1<<mutexWaiterShift) | mutexWoken
8 if atomic.CompareAndSwapInt32(&m.state, old, new) {
9 runtime_Semrelease(&m.sema, false)
10 return
11 }
12 old = m.state
13 }
14 } else {
15 runtime_Semrelease(&m.sema, true)
16 }
若是是正常狀態,則判斷若是等待的goroutine
爲零,或者已經被鎖定、喚醒、或者已經變成飢餓狀態,返回,不須要喚醒任何其餘被掛起的goroutine
,由於互斥鎖已經被其餘goroutine
搶佔。不然更新new
值(修改等待的goroutine數量)並設置喚醒爲,若是CompareAndSwapInt32
成功,則經過runtime_Semrelease(&m.sema, false)
恢復掛起的goroutine.r若是爲 true
代表將喚醒第一個阻塞的goroutine
,這第一點在else
飢餓的分支中體現。
讀寫鎖也是一種常見的鎖機制,它容許多個線程讀共享資源,只有一個線程寫共享資源,接下來看看go中如何實現讀寫鎖。
1type RWMutex struct {
2 w Mutex
3 writerSem uint32
4 readerSem uint32
5 readerCount int32
6 readerWait int32
7}
8const rwmutexMaxReaders = 1 << 30
RWMutex
結構包含了以下的字段
goroutine
數量。
1func (rw *RWMutex) RLock() {
2 // [...]
3 if atomic.AddInt32(&rw.readerCount, 1) < 0 {
4 runtime_Semacquire(&rw.readerSem)
5 }
6//[...]
7}
首先是readerCount
值+1, 若是小於零,則掛起goroutine
等待readerSem
。是否是很奇怪,爲何會小於零判斷呢?在這裏先賣一個關子,接下來會看到爲何是這樣的設計邏輯。
1func (rw *RWMutex) RUnlock() {
2 //[...]
3 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
4 if r+1 == 0 || r+1 == -rwmutexMaxReaders {
5 race.Enable()
6 throw("sync: RUnlock of unlocked RWMutex")
7 }
8 if atomic.AddInt32(&rw.readerWait, -1) == 0 {
9 runtime_Semrelease(&rw.writerSem, false)
10 }
11 }
12 //[...]
13}
首先將readerCount
減去1,若是小於零,再講readWait
減去1,若是是離開讀的goroutine
數量爲零,則對writerSem
信號量進行V
操做。
1func (rw *RWMutex) Lock() {
2 //[...]
3 rw.w.Lock()
4 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
5 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
6 runtime_Semacquire(&rw.writerSem)
7 }
8 //[...]
9}
首先rw.w.Lock
操做,來防止其餘goroutine
對共享資源的寫訪問。而後將readerCount
減去rwmutexMaxReaders
,代表還剩多少goroutine
能夠進行讀訪問,這也解釋在RLock
中小於零的判斷,若是還能夠還能夠進行讀訪問,則必須得到readerSem
信號量。在Lock
中接下來是對readWait
判斷,若是該數量不爲零,則須要對writerSem
進行P
操做,而V
操做只在RUnlock
方法中,若是最後一個讀goroutine
離開,則進行V
操做。
1func (rw *RWMutex) Unlock() {
2 //[...]
3 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
4 if r >= rwmutexMaxReaders {
5 race.Enable()
6 throw("sync: Unlock of unlocked RWMutex")
7 }
8 for i := 0; i < int(r); i++ {
9 runtime_Semrelease(&rw.readerSem, false)
10 }
11 //[...]
12}
首先恢復readCounter
爲正數,而後對readerSem
信號量進行r
次V
操做,喚醒在RLock
中被掛起的goroutine
。
WaitGroup
一般用在等待一組goroutine
所有完成。調用Add
方法指明要等待的goroutine
的數量,調用Done
方法說明該goroutine
已經完成,而Wait
方法是阻塞等待的goroutine
。
1type WaitGroup struct {
2 noCopy noCopy
3 state1 [12]byte
4 sema uint32
5}
noCopy
字段說明WaitGroup
不容許拷貝,而state1
字段是一個很是tricky
的方法,用其中的8
個字節(64bit)來保存一些狀態。高位的32bit用來表示須要等待的goroutine
的數量,地位的32
bit用來表示被掛起的goroutine
的數量。至於爲何不直接使用64bit
的數據主要是爲了考慮32爲編譯器沒法保證64位對齊。sema
則是一個信號量。
1func (wg *WaitGroup) state() *uint64 {
2 if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
3 return (*uint64)(unsafe.Pointer(&wg.state1))
4 } else {
5 return (*uint64)(unsafe.Pointer(&wg.state1[4]))
6 }
7}
該方法是一個輔助方法,用來獲取state
,一個64爲的無符號整數。
1func (wg *WaitGroup) Done() {
2 wg.Add(-1)
3}
Done方法其實調用了Add(-1)
方法,因此咱們着重討論Add
方法。
1func (wg *WaitGroup) Add(delta int) {
2 statep := wg.state()
3 //[...]
4 state := atomic.AddUint64(statep, uint64(delta)<<32)
5 v := int32(state >> 32)
6 w := uint32(state)
7 if race.Enabled && delta > 0 && v == int32(delta) {
8 race.Read(unsafe.Pointer(&wg.sema))
9 }
10 if v < 0 {
11 panic("sync: negative WaitGroup counter")
12 }
13 if w != 0 && delta > 0 && v == int32(delta) {
14 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
15 }
16 if v > 0 || w == 0 {
17 return
18 }
19 if *statep != state {
20 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
21 }
22 // Reset waiters count to 0.
23 *statep = 0
24 for ; w != 0; w-- {
25 runtime_Semrelease(&wg.sema, false)
26 }
27}
首先是獲取state
,而後將delta
右移32位,加上等待的goroutine
數量。v
和w
分別表明了須要等待的goroutine
和被阻塞的goroutine
的數量。接下來v==int32(delta)
判斷條件代表若是是第一次Add
操做,則必須與等待的goroutine
同步,在Wait
方法中能夠看到一樣的操做。接下來是一些拋異常操做,若是等待的數量爲負數,如何第一次Add
操做沒有同步。if >0 || w==0
條件代表如何v
沒有降到零,或者被阻塞的goroutine
數量爲零,直接返回。如何v
爲零,則按照w
的數量,依次對信號量ws.sema
進行V
操做。
1func (wg *WaitGroup) Wait() {
2 //[...]
3 for {
4 state := atomic.LoadUint64(statep)
5 v := int32(state >> 32)
6 w := uint32(state)
7 //[...]
8 // Increment waiters count.
9 if atomic.CompareAndSwapUint64(statep, state, state+1) {
10 if race.Enabled && w == 0 {
11 race.Write(unsafe.Pointer(&wg.sema))
12 }
13 runtime_Semacquire(&wg.sema)
14 if *statep != 0 {
15 panic("sync: WaitGroup is reused before previous Wait has returned")
16 }
17 //[...]
18 return
19 }
20 }
21}
Wait
方法一樣也是CAS算法,首先獲取須要等待的goroutine
的數量v
和阻塞的goroutine
數量w
, 而後將阻塞的goroutine
數量+1,若是以前的w
爲零,表示是第一次等待,則與Add
操做進行同步,最後後對信號量wg.sema
進行P
操做。
在編程中使用Cond
也叫管程(monitor)
,它能夠用來使不一樣線程完成互斥條件,也可使某個線程等待某個條件的發生。常見的使用模式以下:
1var locker = new(sync.Mutex)
2var cond = sync.NewCond(locker)
3var condition = true
4// goroutine A
5cond.L.Lock()
6for condition {
7 cond.Wait()
8}
9// ...
10cond.L.Unlock()
11
12//goroutine B
13condiiton = false
14cond.Signal()
爲何使用for
循環做爲判斷進入Wait
的條件而不是if
呢?主要是防止爲被喚醒的goroutine
在返回Wait
調用的時候,剛好有別的goroutine
修改了conditon
的值,因此須要使用for
循環做爲條件判斷。
1type Cond struct {
2 noCopy noCopy
3 L Locker
4 notify notifyList
5 checker copyChecker
6}
Cond
結構不容許拷貝,包含了Locker
的接口字段,和一個notifyList
的集合字段。
1func NewCond(l Locker) *Cond {
2 return &Cond{L: l}
3}
實現Locker
接口的類型均可以,通常爲Mutex
和RWMutex
1func (c *Cond) Wait() {
2 c.checker.check()
3 t := runtime_notifyListAdd(&c.notify)
4 c.L.Unlock()
5 runtime_notifyListWait(&c.notify, t)
6 c.L.Lock()
7}
在使用Wait
方法以前,要調用c.L.Lock
來進入臨界區域,將當前等待的goroutine
加入到通知隊列中,而後調用c.L.Unlock()
來退出臨界區域,以便讓其餘goroutine
能夠進入等待區域。緊接着掛起goroutine
,等待消息。
1func (c *Cond) Signal() {
2 c.checker.check()
3 runtime_notifyListNotifyOne(&c.notify)}
runtime_notifyListNotifyOne
喚起其中的等待的goroutine
。
1func (c *Cond) Broadcast() {
2 c.checker.check()
3 runtime_notifyListNotifyAll(&c.notify)
4}
runtime_notifyListNotifyAll
喚起所有等待的goroutine
。