golang中鎖mutex的實現

golang中的鎖是經過CAS原子操做實現的,Mutex結構以下:
type Mutex struct {
    state int32                
    sema  uint32
}
 
//state表示鎖當前狀態,每一個位都有意義,零值表示未上鎖
//sema用作信號量,經過PV操做從等待隊列中阻塞/喚醒goroutine,等待鎖的goroutine會掛到等待隊列中,而且陷入睡眠不被調度,unlock鎖時才喚醒。具體在sync/mutex.go Lock函數實現中。
 
插播一下sema
雖然在Mutex中就是一個整形字段,可是它是很重要的一環,這個字段就是用於信號量管理goroutine的睡眠和喚醒的。
sema具體實現還沒詳看,這裏大概分析下功能,注意不許確!!
首先sema爲goroutine的「調度」提供了一種實現,可讓goroutine阻塞和喚醒
信號量申請資源在runtime/sema.go中semacquire1
信號量釋放資源在semrelease1中
首先sema中,一個semaRoot結構和一個全局semtable變量,一個semaRoot用於一個信號量的PV操做(猜想與goroutine調度模型MGP有關,一個Processor掛多個goroutine,對於一個processor下的多個goroutine的須要一個信號量來管理,固然須要一個輕量的鎖在goroutine的狀態轉換時加鎖,即下面的lock結構,這個鎖與Mutex中的鎖不相同的,是sema中本身實現的),多個semaRoot的分配和查找就經過全局變量semtable來管理
type semaRoot struct {
    lock  mutex
    treap *sudog // root of balanced tree of unique waiters.
    nwait uint32 // Number of waiters. Read w/o the lock.
}
var semtable [semTabSize]struct {
    root semaRoot
    pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
1 讓當前goroutine睡眠阻塞是經過goparkunlock實現的,在semacquire1中這樣調用:
          1) root := semroot(addr)
                semroot中是經過信號量地址找到semaRoot結構
          2) 略過一段..... 直接到使當前goroutine睡眠位置
                首先lock(&root.lock)上鎖
                而後調用root.queue()讓當前goroutine進入等待隊列(注意一個信號量管理多個goroutine,goroutine睡眠前,自己的詳細信息就要保存起來,放到隊列中,也就是在掛到了semaRoot結構的treap上,看註釋隊列是用平衡樹實現的?)
          3)調用goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4) 
                最後會調用到gopark,gopark會讓系統從新執行一次調度,在從新調度以前,會將當前goroutine,即G對象狀態置爲sleep狀態,再也不被調度直到被喚醒,而後unlock鎖,這個函數給了系統一個機會,將代碼執行權限轉交給runtime調度器,runtime會去調度別的goroutine。
 
2 既然阻塞,就須要有喚醒的機制
   喚醒機制是經過semtable結構
   sema.go並不是專門爲mutex鎖中的設計的,在mutex中使用的話,是在其它goroutine釋放Mutex時,調用的semrelease1,從隊列中喚醒goroutine執行。詳細沒看。
   不過根據分析,Mutex是互斥鎖,Mutex中的信號量應該是二值信號量,只有0和1。在Mutex中調用Lock,假如執行到semacquire1,從中判斷信號量若是爲0,就讓當前goroutine睡眠,
func cansemacquire(addr *uint32) bool {
    for {
        v := atomic.Load(addr)
        if v == 0 {
            return false
        }
        if atomic.Cas(addr, v, v-1) {
            return true
        }
    }
}
      若是不斷有goroutine嘗試獲取Mutex鎖,都會判斷到信號量爲0,會不斷有goroutine陷入睡眠狀態。只有當unlock時,信號量纔會+1,固然不能重複執行unlock,因此這個信號量應該只爲0和1。
 
大概分析了下sema,轉回到Mutex中來。
上面說了sema字段的做用,state字段在Mutex中是更爲核心的字段,標識了當前鎖的一個狀態。
state     |31|30|....|      2    |     1      |      0     |
                  |                |           |      第0位表示當前被加鎖,0,unlock,   1 locked
                  |                |        是否有goroutine已被喚醒,0 喚醒, 1 沒有
                  |           這一位表示當前Mutex處於什麼模式,兩種模式,0 Normal   1 Starving
             第三位表示嘗試Lock這個鎖而等待的goroutine的個數
 
先解釋下Mutex的normal和starving兩種模式,代碼中關於Mutex的註釋以下
兩種模式是爲了鎖的公平性而實現,摘取網上的一段翻譯: http://blog.51cto.com/qiangmzsx/2134786
互斥量可分爲兩種操做模式:正常和飢餓。
在正常模式下,等待的goroutines按照FIFO(先進先出)順序排隊,可是goroutine被喚醒以後並不能當即獲得mutex鎖,它須要與新到達的goroutine爭奪mutex鎖。
由於新到達的goroutine已經在CPU上運行了,因此被喚醒的goroutine很大機率是爭奪mutex鎖是失敗的。出現這樣的狀況時候,被喚醒的goroutine須要排隊在隊列的前面。
若是被喚醒的goroutine有超過1ms沒有獲取到mutex鎖,那麼它就會變爲飢餓模式。
在飢餓模式中,mutex鎖直接從解鎖的goroutine交給隊列前面的goroutine。新達到的goroutine也不會去爭奪mutex鎖(即便沒有鎖,也不能去自旋),而是到等待隊列尾部排隊。
在飢餓模式下,有一個goroutine獲取到mutex鎖了,若是它知足下條件中的任意一個,mutex將會切換回去正常模式:
1. 是等待隊列中的最後一個goroutine
2. 它的等待時間不超過1ms。
正常模式有更好的性能,由於goroutine能夠連續屢次得到mutex鎖;
飢餓模式對於預防隊列尾部goroutine一致沒法獲取mutex鎖的問題。
 
具體實現以下:
在Lock函數中
    // Fast path: grab unlocked mutex.
    // 1  使用原子操做修改鎖狀態爲locked
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }   
        return
    }   
Mutex多個goroutine在任什麼時候機都會嘗試去獲取,Mutex的state又實時在變化,各類場景有點多,這裏挑典型的來講。
1) 假設當前mutex處於初始狀態,即m.state=0,那麼當前goroutine會在這裏會直接獲取到鎖,m.state變爲locked,
則m.state = 00...001     上鎖了,Not Woken, normal狀態。 
      運氣好,一來就獲取到,就跟上面說的同樣,來時就在cpu裏,又遇上鎖沒人佔,天生自帶光環,呵呵。
      Lock結束return
 
      若是這個goroutine不釋放鎖,那麼而後再來一個goroutine就鎖不上了,進入第二步
 
2) 緊接着一個for循環,大概就是嘗試獲取鎖,求而不得,就睡一會吧,等着被叫醒,醒了看看是否是等的時間太長餓了,餓了就進入starving,starving就會被優先調度了,沒有那運氣,就只能等了。
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state    //剛纔已經設置m.state=001,old也爲001
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        // old=001,鎖着呢
        // 而後runtime_canSpin看看能不能自旋啊,就是看傳進來的iter,每次循環都是自增
        // 自旋條件:多核,GOMAXPROCS>1,至少有另一個運行的P而且本地隊列不空。或許是懼怕單核自旋,程序都停了。另外最多自旋4次,iter爲4時不會再進if
                             咱們這裏考慮多核的狀況,會進if
        // old在每次if中會從新獲取,這裏自旋的目的就是等待鎖釋放,當前佔用cpu的goroutine就能夠佔了,go裏面老是儘可能讓在cpu中的goroutine佔用鎖
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            // 當前awoke爲false,可是沒有goroutine在等待,那麼unlock時,不必喚醒隊列goroutine。
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }   
            runtime_doSpin()     //自旋,執行沒用的指令30次
            iter++
            old = m.state           //old從新獲取一次state值,若是有其它goroutine釋放了,那麼下次循環就不進if了
            continue                   //自旋完再循環一次
        }   
        //if出來後,會有兩種狀況
        2.1)其它goroutine  unlock了,上面if判斷非Locked跳出,此時 m.state=000, old=000, awoke=false, 沒有goroutine在等待,這是最簡單的狀況了
        new := old                  //new=000,   old=000,  m.state=000,  awoke=false,這裏初始化new,後面要設置鎖狀態,m.state設置爲new
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {      //new=000, 當前鎖並非starving模式,正在運行的goroutine要佔用這個鎖,若是是starving模式,當前的goroutine要去排隊,把鎖讓給隊列中快餓死的兄弟     
            new |= mutexLocked              //new=001, 要上鎖
        }   
        if old&(mutexLocked|mutexStarving) != 0 {       //old=000, 當前正在跑的這個goroutine要佔鎖,不會進隊列, new=001
            new += 1 << mutexWaiterShift
        }   
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {             //starving=false,只有goroutine在unlock喚醒後,發現等待時間過長,starving才設置爲true,由於隊列中其它的goroutine都等的有點長了,因此在鎖可用時,優先給隊列中的goroutine。這個邏輯在後面,當前不進這個if,new=001
            new |= mutexStarving
        }   
        if awoke {                       //awoke爲false,不去喚醒等待隊列, new仍爲001
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }   
            new &^= mutexWoken
        } 
           至此new初始化完畢,new=001,要去更改Mutex的鎖狀態,真正獨佔鎖了
          //保險起見,以防在new設置過程當中,有其它goroutine更改了鎖狀態,原子性的設置當前鎖狀態爲new=001,這裏就是上鎖
          if atomic.CompareAndSwapInt32(&m.state, old, new) {        
            if old&(mutexLocked|mutexStarving) == 0 {                           //old=000,直接break,由於上面是將m.state置爲上鎖,已經成功了,至此後面邏輯不走了
                break // locked the mutex with CAS                                    //回頭看2.1,咱們若是是自旋次數夠了跳出呢?如2.2邏輯
            }   
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }   
            runtime_SemacquireMutex(&m.sema, queueLifo)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }   
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
 
       2.2)new := old,    此時new=001, old=001, m.state=001, awoke=false (awoke在if中設置爲true的狀況就不討論了,太多了。。。。)
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked                    //new=001
        }
        if old&(mutexLocked|mutexStarving) != 0 {    //old=001, 當前跑的這個goroutine要進隊列,new的第3位到第31位表示隊列中goroutine數量,這裏+1
            new += 1 << mutexWaiterShift                  //new=1001
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {        //starving=false,並不須要進入starving模式
            new |= mutexStarving
        }
        if awoke {                                                      //awoke=false
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
              new初始化爲1001, old=001
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {                 //old=001,這裏不會break,由於當前的goroutine拿不到鎖須要阻塞睡眠
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0                                      //判斷當前goroutine是否是for循環第一次走到這裏,是的話,waitStartTime=0
            if waitStartTime == 0 {                                                    //queueLifo的true仍是false決定了goroutine入隊列時,是排隊仍是插到隊頭
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo)          //當前goroutine入等待隊列, 跳到 「註腳1」,更多說明。此時goroutine會阻塞在這,鎖釋放,若是在隊頭,纔會被喚醒。
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs    //喚醒時判斷是否等待時間過長,超過了1ms,就設置starving爲true,「註腳2」更多說明
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
 
註腳1    這的runtime_SemacquireMutex是對上面說的sema.go中semacquire1的簡單封裝,裏面最後會調用goPark讓當前goroutine讓出執行權限給runtime,同時設置當前goroutine爲睡眠狀態,不參與調度(表如今程序上,就是阻在那了)。
 
註腳2    1) 這也分兩種狀況,若是沒有超1ms,starving=false
                     old = m.state              //當前確定是unlock了,當前goroutine才被喚醒了,因此old至少爲000,咱們假定爲000
                     if old&mutexStarving != 0    //old不是starving模式,不進if
       
                   awoke = true    //充置awoke和iter,從新走循環
                    iter = 0
                     ///////////////////////////
                     下次循環中,最後會設置new=001,當前goroutine被喚醒,加鎖1,不是starving狀態。
                     最後會在下面這break,跳出Lock函數
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
 
 
            2)若是超了1ms,straving = true
                 old = m.state              //當前確定是unlock了,當前goroutine才被喚醒了,因此old至少爲000,咱們假定爲000
                   if old&mutexStarving != 0    //old不是starving模式,不進if
 
                   awoke = true    //充置awoke和iter,從新走循環
                   iter = 0
                   ///////////////////////////
                 下次循環 new=101, 鎖處於starving模式,當前goroutine被喚醒,已加鎖
 
    二  若是處於starving會有什麼影響?主要提如今Unlock函數中
    // Fast path: drop lock bit.
    //先清掉lock位,假設最簡單的狀況,其它位都爲0,則m.state=000, new=000
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
 
    //這裏就是starving模式的影響,若是處於starving模式,那麼直接走else,從隊列頭部喚醒一個goroutine。
    if new&mutexStarving == 0 {
        old := new                   //old = 000
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            //若是隊列中沒有等待的goroutine或者有goroutine已經被喚醒而且搶佔了鎖(這種狀況就如lock中,正好處在cpu中的goroutine在自旋,正好在unlock後,立刻搶佔了鎖),那麼就不須要wake等待隊列了。
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            
            //若是隊列中有等着的,而且也沒有處在cpu中的goroutine去自旋獲取鎖,那麼就抓住機會從等待隊列中喚醒一個goroutine。
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
 
        //starving模式,直接從隊列頭取goroutine喚醒。上面lock函數中沒有分析runtime_SemacquireMutex(&m.sema, queueLifo)阻塞被喚醒後,若是lock處因而starving模式,會怎麼樣,這裏分析一下,註腳3
        runtime_Semrelease(&m.sema, true)
    }
 
註腳3  首先在unlock函數開頭即便清了lock位,cpu中的goroutine也不能獲取到鎖(由於判斷m.state的starving位是飢餓模式,只能隊列中等待的goroutine取獲取鎖,因此cpu中的goroutine會進入等待隊列),那麼在unlock函數中runtime_Semrelease(&m.sema, true)時,會喚醒隊列中一個睡眠的goroutine。
回到lock函數中,此時m.state應爲100
 
            runtime_SemacquireMutex(&m.sema, queueLifo)    //在這被喚醒
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state            //old = 100
            if old&mutexStarving != 0 {          //lock處於starving中
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)                //先將當前等待隊列減一個
                if !starving || old>>mutexWaiterShift == 1 {                              //若是當前隊列空了,就把starving清0了
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)      //加鎖跳出
                break
            }
總結:這裏只簡單說了下互斥鎖,另外還有讀寫鎖,不作贅述。互斥鎖是在原子操做atomic之上實現的,後面會再詳細寫下原子操做。
這裏先說幾個有意思的問題,答案不必定正確,但願大佬指正。
1  一個全局int變量,多核中一個goroutine讀,一個寫,沒有更多操做,需不須要作原子操做。
   應該是不須要加的,intel P6處理器在硬件層面上是支持32位變量的load和store的原子性的。另外編譯器對於變量的讀或寫也不會編譯成多條指令。
 
2   一個全局int變量i, 對於多核,兩個協程都同時執行i++,須要原子操做嗎?
    須要的,對於i++,是典型的讀改寫操做,對於這樣的操做,須要CAS原子操做保證原子性。
 
3  對於一個map,寫加原子操做,讀要不要加
    若是隻是讀或者寫,而且值類型是整形的,應該是不須要atomic原子操做的,這裏的意思是對於整形,不會出現寫一半,或者讀一半的狀況,可是不可避免的,會出現這種狀況,goroutine1對map寫入1,goroutine2讀到1,在處理的過程當中,goroutine1又從新賦值。
相關文章
相關標籤/搜索