golang Mutex 實現上的幾個巧妙的點

    golang 的metux 的實現有幾個點作法是很是有意思的,一個是底層數據結構上,用了平時不多用的位運算,第二個,用到了自旋,並作了自旋策略控制,最後是用了信號量控制協程。golang

    首先是golang mutex 中用了不少位運算。位運算不作細介紹,對內存利用比較高的算法都有涉及,好比redies 的壓縮列表,好比golang 的Protobuffer。算法

    有幾個關鍵點,iota 在定義的時候,用的多,作自增運算:    數據結構

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

// 這裏 第一個變量爲1 ,第二個變量爲10, 第三個爲10

    而後,位運算的求或和求與用的不少,一個是與1 求或將某位置1,一個是與0 求與將某位置0,這些都是用於改變某些標誌位的方式,不要看懵逼了:函數

new := old | mutexLocked // 將old 的最後一位置1,表示new 鎖必定是被持有狀態
        if old&mutexLocked != 0 { // 將最後一位保留後,其餘位所有置0, 判斷最後一位的狀態是否是0,判斷是否是被持有
            if runtime_canSpin(iter) {
                // Active spinning makes sense.
                // Try to set mutexWoken flag to inform Unlock
                // to not wake other blocked goroutines.
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShift
        }

    第二個是golang 的加鎖會自旋,4次自旋沒拿到鎖後再將協程休眠,這樣能夠減小切換成本。這裏關判斷條件是自旋次數,cpu核數,p 的數量:ui

func sync_runtime_canSpin(i int) bool {
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

    最後是利用信號量掛起和喚醒協程,核心函數是atom

runtime_SemacquireMutex(&m.sema)
runtime_Semrelease(&m.sema)

    獲取信號時,當s > 0 ,將s--,若是s 爲負數,會將當前g 放入阻塞隊列,掛起直到s>0。code

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return // cas 獲取到鎖,直接返回
	}

	awoke := false  //循環標記
	iter := 0       //循環計數器
	for {
		old := m.state            //保存當前鎖狀態
		new := old | mutexLocked  //將狀態位最後一位指定1
		if old&mutexLocked != 0 { //鎖被佔用
			if runtime_canSpin(iter) { //檢查是否能夠進入自旋鎖,4次
				if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
					atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { 
                                        //awoke標記爲true
					awoke = true
				}
                                
				runtime_doSpin()//進入自旋
				iter++
				continue
			}
                       
			new = old + 1<<mutexWaiterShift //鎖被佔用,且自旋次數超過4次,掛起協程數+1,下面步驟將g 掛起並等待
		}
		if awoke {
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken //清除標誌
		}
               
		if atomic.CompareAndSwapInt32(&m.state, old, new) { //更新協程計數
			if old&mutexLocked == 0 {
				break
			}
                         
                        // 鎖請求失敗,進入休眠狀態,等待信號喚醒後從新開始循環,一直阻塞在這裏
			runtime_SemacquireMutex(&m.sema)
			awoke = true
			iter = 0
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

    解鎖的過程就和加鎖反過來便可:orm

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	new := atomic.AddInt32(&m.state, -mutexLocked)// 移除加鎖位
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}

	old := new
	for {
		//當休眠隊列內的等待計數爲0或者自旋狀態計數器爲0,退出
		if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
			return
		}
		// 等待協程數-1,更改清除標記位
		new = (old - 1<<mutexWaiterShift) | mutexWoken
		if atomic.CompareAndSwapInt32(&m.state, old, new) {         
			runtime_Semrelease(&m.sema)// 釋放鎖,發送釋放信號,對應以前的acquire
			return
		}
		old = m.state
	}
}
相關文章
相關標籤/搜索