Go語言學習 - Understanding Lock

Introduction

咱們從零開始想象mutex是怎麼上鎖的, 假設咱們規定一種遊戲規則: "你必須把這個數字從0變成1". 改爲的人算贏, 沒改爲的人就等着. 等剛剛贏的人再把數字改回0, 這樣你就有機會再搶一把了. 這就是mutex上鎖的基本原理. 再進一步的, 有以下兩個細節:html

  • 如今有兩個線程並行, 他們出手的時候都看到這個是0, 過會兒他們都把這個數字改爲1, 這個鎖被上了兩回, 並且他們都認爲本身是對的: "我看到的時候它的確是0呀?我錯在哪兒了?"
  • 第一我的用完鎖了, 其餘人如何得知這個鎖如今已經能夠繼續搶了?

並行哄搶的問題

回到問題1, 問題出現的關鍵就是這個操做並非原子的, 也就是說是兩我的同時搶同時改, 若是咱們能讓這兩我的排一下隊, 他搶完了你再搶問題是否是就不會有這種問題了? 說的沒錯, 但感受都是廢話, 這種並行的東西如何保證讓他排隊按順序來呢?java

說到底核心就是兩個彙編指令: CMPXCHGLOCK, 設想咱們在單核心的狀況下, 也就是說一次只有一個線程在運行, 這種狀況下沒有真正的並行, 只有多核心的狀況下才會出現真正的並行去搶. 以上兩個命令箇中, 第一個搶到而且決定上鎖的人執行LOCK, 先冷凍住其餘核心, 而後CMPXHG負責作比對而後把修改過的數據存回那個內存裏golang

以上, 就是實現鎖最核心的一步, 如何讓他們排隊去修改一個內存, 對應Go裏面處處都是的atomic.CompareAndSwap(*addr,old,new), 若是你想要修改*addr裏的東西, 從old修改爲new, 你必須排隊. 這些東西看起來像不像C++裏的volatile關鍵字? 由於兩者都使用了相似的冷凍+修改的把戲shell

沉睡喚醒的問題

咱們雖然解決了排隊搶鎖的問題, 可是這個鎖離真正的實用還有必定距離, 好比咱們就沒解決如何喚醒的問題, 咱們都知道有一種東西叫semaphore信號標, 這種東西就能作到沉睡/喚醒G, 比較神奇的是在Go語言裏面semaphore的本質上只是一個uint32函數

type semaRoot struct {
  treap  *sudog
  nwait  int32
}
func getSemaRoot(addr *uint32) *semaRoot {
  return &semtable[uintptr(unsafe.Pointer(addr))%semaTableSize]
}
複製代碼

一個數字固然不可能存的下又是沉睡線程又是喚醒這麼多功能的, 所以這玩意兒只是一個"index", 只是一個索引, 咱們用這個索引召集一個semaphore結構體: 咱們有一個全局變量用於存全部的semaphore, 而後用這個數字的地址做爲下標, 取出對應真正的結構體, 完成seamphore全部功能的正是這個結構體.源碼分析

func semacquire1(addr) {
    s := acquireSudog()
    root := semroot(addr)
    atomic.Xadd(&root.nwait, 1)
    for {
    		if cansemacquire(addr) {
			      atomic.Xadd(&root.nwait, -1)
			      break
		    }
        root.queue(addr, s)
        goparkunlock(&root.lock)
    }
    releaseSudog(s)
}
複製代碼

這就是semaphore上鎖的過程, 首先咱們將當前的G打包成sudog, 而後利用這個uint32獲取對應的semaphore, 並將信號標的等待計數+1, 而後進入for沉睡/喚醒循環:post

  • 若是能夠獲取信號標, 退出循環, 並將信號標等待計數減1
  • 若是不能得到信號標, 將當前g加入信號標等待隊列中
  • 經過gopark陷入沉睡, 等待喚醒
func semarelease1(addr) {
    root := semroot(addr)
  	if atomic.Load(&root.nwait) == 0 {
		    return
	  }
  	s, t0 := root.dequeue(addr)
	  if s != nil {
		    atomic.Xadd(&root.nwait, -1)
		    readyWithTime(s)
		    goyield()
	  }
}
複製代碼

到了解鎖的時候, 咱們一樣拿着這個uint32先獲取semaphore, 若是等待着隊列長度爲0, 咱們不用喚醒任何人, 直接退出, 不然取出一個沉睡的G, 將等待隊列長度減1, 經過goready喚醒這個G並放到P的run_next, 最後經過yield()完成一次調度, 直接切換到run_next去執行ui

sync.Mutex

如今咱們已經搞定一個鎖的兩個核心組件了, 一個是排隊, 一個是沉睡+喚醒功能. 有了semaphore做爲做爲喚醒的基礎, 咱們惟一須要理解的就只有三種狀態是怎麼轉換的this

type Mutex struct {
  state int32    // 狀態
  sema  uint32   // 用於計算休眠G數量的信號量
}   
複製代碼

能夠看到鎖的狀態有S/W/L三項, 若是有兩個G在爭搶使用, 假設這兩個G分別是X與Y, 他們的狀態遷移是這樣的:atom

  • 若是X已經鎖定, 則狀態爲--L(已上鎖,無人爭奪), 這個時候Y再嘗試鎖定, 進入沉睡,狀態爲S-L(已上鎖,有人沉睡)
  • X此時解除鎖定, 狀態從S-L變成S--(未上鎖,有人沉睡), 進一步的, 發現有人沉睡, 則開始喚醒步驟, 狀態變動成-W-(未上鎖,有人沉睡且開始喚醒)
    • Y被喚醒且成功搶到鎖, 狀態變成--L(已上鎖無人爭奪)
    • Y還沒被喚醒, X就又再次搶鎖, X搶成了, 狀態變成-WL(喚醒階段,且被上鎖), Y被喚醒後發現鎖不可用繼續沉睡, 狀態變成S-L

sync.RWMutex

type RWMutex struct {
	w           Mutex  
	writerSem   uint32 
	readerSem   uint32 
	readerCount int32  
	readerWait  int32 
}
複製代碼

RWMutex是讀/寫互斥鎖。鎖能夠由任意數量的讀者或單個寫者持有

  • mutex的目的是防止兩個寫G同時寫入
  • readerSem的場景: 一個寫G任務還沒結束, 讀G已經想開始讀了
  • writerSem的場景: 一些讀G任務還沒結束, 寫G已經想開始寫了
  • readerCount字段: 執行中+堵塞中的讀G
  • readerWait字段: 只是執行中的讀G
const rwmutexMaxReaders = 1 << 30

func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		runtime_Semacquire(&rw.readerSem)
	}
}
複製代碼

一個新的讀G來拉! 不管能不能進行下去, 先給count加一:

  1. 若是readerCount小於零說明寫G正在寫, 這個時候讀G是不能讀的, 經過拿下讀鎖進入堵塞
func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_Semacquire(&rw.writerSem)
    }
}
複製代碼

首先咱們鎖上Mutex,防止其餘寫G跟我搶. 而後咱們將count減小1<<30:

  1. 減小這麼大的數字會致使count必定小於零,看看上面的RLock函數, 若是count小於零會致使鎖上讀鎖,從而致使新來的讀G堵塞住.
  2. 檢查readerWait, 這個數字表明正在讀的(不包含等的)G的數量, 若是這個數字不爲零, 表明有讀G正在工做, 經過拿下寫鎖進入堵塞
func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            runtime_Semrelease(&rw.writerSem, false)
        }
    }
}
複製代碼
  1. 一個讀G結束了, 將readerCount減1, 若是小於零的話說明寫G正在工做, 可能也正在堵塞, 咱們想知道這個寫G究竟是在工做仍是在堵塞因而咱們將readerWait也減1
  2. 若是獲得的不是零, 說明寫G真的是在堵塞, 並且還有別的讀G還沒完工, 寫G在等大家讀G都結束了才能開始工做
  3. 若是獲得的是零, 說明寫G在堵塞, 並且最後一個讀G也已經結束了, 這時候釋放寫鎖, 剛剛卡在上鎖環節的寫G此時被喚醒開始工做
func (rw *RWMutex) Unlock() {
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false)
    }
    rw.w.Unlock()
}
複製代碼

寫G結束了, 這個時候咱們能夠容許讀G開始工做了, 剛剛由於寫G工做, 一些讀G堵塞着在:

  1. 將readerCount加回來, 這個時候獲得的數字表明正在堵塞中的讀G的數量. 經過sema_release放那些讀G開始工做
  2. 咱們須要將Mutex釋放, 容許其餘寫G開始工做

ABA問題

func pop(top) {
  for {
      this := top
      next := top.next
      if CompareAndSwap(top,this,next) {
          break
    }
  }
}
複製代碼

ABA是CompareAndSwap中潛在的隱患, 咱們以上面這張圖爲例, pop()函數刪除鏈表最頂端的節點, 同時指針移到下一個位置上去

  1. (圖一) 開始刪了, 預期是能刪走節點A, 同時top指針會指在節點X上
  2. (圖二) 就在執行CAS的瞬間, 調度了, 另外一個G進來, 先是刪了節點A, 而後將節點BC推動來了, 並且節點C的地址恰巧就是以前節點A的地址
  3. 結束調度, 回到以前的G, 你認爲這個時候會發生什麼?

根據CAS的原理, 先檢查top是否是this: top指向節點C, 地址0x0014, 對上了, 因而top指針變成了0x0018, 直接指向了節點X, 中間的節點B被無視了.

這裏面的問題出在哪兒呢? CAS只管表層, 也就是地址的值不變就好了, 至於裏面存了什麼, 存的東西有沒有變是無論的. 這就好像你拎了一箱子錢, 人家把你箱子裏錢都偷沒了, 你卻只知道箱子仍是那個箱子, 殊不知道錢已經沒了, 一個道理. 這就是問題出現的根本緣由

ABA問題的解決

doubleCAS被用於解決這種比較極端的例子, 仍是剛剛那個例子, 咱們要去CAS一個地址, 假設一個地址長32位, 那咱們就搞64位整數來存這個地址.

每次CAS的時候比較的不僅是地址, 而是地址+計數器這個64位整數, 只有兩個都對上了纔算經過, 結合上面的ABA問題, 若是這個時候A節點被刪了, 同時BC被推動來, 0x0014對應的count確定是不同的, 等到調度回來CAS沒法經過

Reference

相關文章
相關標籤/搜索