咱們從零開始想象mutex是怎麼上鎖的, 假設咱們規定一種遊戲規則: "你必須把這個數字從0變成1". 改爲的人算贏, 沒改爲的人就等着. 等剛剛贏的人再把數字改回0, 這樣你就有機會再搶一把了. 這就是mutex上鎖的基本原理. 再進一步的, 有以下兩個細節:html
回到問題1, 問題出現的關鍵就是這個操做並非原子的, 也就是說是兩我的同時搶同時改, 若是咱們能讓這兩我的排一下隊, 他搶完了你再搶問題是否是就不會有這種問題了? 說的沒錯, 但感受都是廢話, 這種並行的東西如何保證讓他排隊按順序來呢?java
說到底核心就是兩個彙編指令: CMPXCHG
與 LOCK
, 設想咱們在單核心的狀況下, 也就是說一次只有一個線程在運行, 這種狀況下沒有真正的並行, 只有多核心的狀況下才會出現真正的並行去搶. 以上兩個命令箇中, 第一個搶到而且決定上鎖的人執行LOCK
, 先冷凍住其餘核心, 而後CMPXHG
負責作比對而後把修改過的數據存回那個內存裏golang
以上, 就是實現鎖最核心的一步, 如何讓他們排隊去修改一個內存, 對應Go裏面處處都是的atomic.CompareAndSwap(*addr,old,new)
, 若是你想要修改*addr
裏的東西, 從old修改爲new, 你必須排隊. 這些東西看起來像不像C++裏的volatile關鍵字? 由於兩者都使用了相似的冷凍+修改的把戲shell
咱們雖然解決了排隊搶鎖的問題, 可是這個鎖離真正的實用還有必定距離, 好比咱們就沒解決如何喚醒的問題, 咱們都知道有一種東西叫semaphore信號標, 這種東西就能作到沉睡/喚醒G, 比較神奇的是在Go語言裏面semaphore的本質上只是一個uint32
函數
type semaRoot struct {
treap *sudog
nwait int32
}
func getSemaRoot(addr *uint32) *semaRoot {
return &semtable[uintptr(unsafe.Pointer(addr))%semaTableSize]
}
複製代碼
一個數字固然不可能存的下又是沉睡線程又是喚醒這麼多功能的, 所以這玩意兒只是一個"index", 只是一個索引, 咱們用這個索引召集一個semaphore結構體: 咱們有一個全局變量用於存全部的semaphore, 而後用這個數字的地址做爲下標, 取出對應真正的結構體, 完成seamphore全部功能的正是這個結構體.源碼分析
func semacquire1(addr) {
s := acquireSudog()
root := semroot(addr)
atomic.Xadd(&root.nwait, 1)
for {
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
break
}
root.queue(addr, s)
goparkunlock(&root.lock)
}
releaseSudog(s)
}
複製代碼
這就是semaphore上鎖的過程, 首先咱們將當前的G打包成sudog, 而後利用這個uint32
獲取對應的semaphore, 並將信號標的等待計數+1, 而後進入for沉睡/喚醒循環:post
func semarelease1(addr) {
root := semroot(addr)
if atomic.Load(&root.nwait) == 0 {
return
}
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
readyWithTime(s)
goyield()
}
}
複製代碼
到了解鎖的時候, 咱們一樣拿着這個uint32
先獲取semaphore, 若是等待着隊列長度爲0, 咱們不用喚醒任何人, 直接退出, 不然取出一個沉睡的G, 將等待隊列長度減1, 經過goready喚醒這個G並放到P的run_next
, 最後經過yield()
完成一次調度, 直接切換到run_next
去執行ui
如今咱們已經搞定一個鎖的兩個核心組件了, 一個是排隊, 一個是沉睡+喚醒功能. 有了semaphore做爲做爲喚醒的基礎, 咱們惟一須要理解的就只有三種狀態是怎麼轉換的this
type Mutex struct {
state int32 // 狀態
sema uint32 // 用於計算休眠G數量的信號量
}
複製代碼
能夠看到鎖的狀態有S/W/L三項, 若是有兩個G在爭搶使用, 假設這兩個G分別是X與Y, 他們的狀態遷移是這樣的:atom
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
複製代碼
RWMutex是讀/寫互斥鎖。鎖能夠由任意數量的讀者或單個寫者持有
mutex
的目的是防止兩個寫G同時寫入readerSem
的場景: 一個寫G任務還沒結束, 讀G已經想開始讀了writerSem
的場景: 一些讀G任務還沒結束, 寫G已經想開始寫了readerCount
字段: 執行中+堵塞中的讀GreaderWait
字段: 只是執行中的讀Gconst rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_Semacquire(&rw.readerSem)
}
}
複製代碼
一個新的讀G來拉! 不管能不能進行下去, 先給count加一:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
}
複製代碼
首先咱們鎖上Mutex,防止其餘寫G跟我搶. 而後咱們將count減小1<<30:
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false)
}
}
}
複製代碼
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
rw.w.Unlock()
}
複製代碼
寫G結束了, 這個時候咱們能夠容許讀G開始工做了, 剛剛由於寫G工做, 一些讀G堵塞着在:
func pop(top) {
for {
this := top
next := top.next
if CompareAndSwap(top,this,next) {
break
}
}
}
複製代碼
ABA是CompareAndSwap
中潛在的隱患, 咱們以上面這張圖爲例, pop()
函數刪除鏈表最頂端的節點, 同時指針移到下一個位置上去
根據CAS的原理, 先檢查top是否是this: top指向節點C, 地址0x0014, 對上了, 因而top指針變成了0x0018, 直接指向了節點X, 中間的節點B被無視了.
這裏面的問題出在哪兒呢? CAS只管表層, 也就是地址的值不變就好了, 至於裏面存了什麼, 存的東西有沒有變是無論的. 這就好像你拎了一箱子錢, 人家把你箱子裏錢都偷沒了, 你卻只知道箱子仍是那個箱子, 殊不知道錢已經沒了, 一個道理. 這就是問題出現的根本緣由
doubleCAS被用於解決這種比較極端的例子, 仍是剛剛那個例子, 咱們要去CAS一個地址, 假設一個地址長32位, 那咱們就搞64位整數來存這個地址.
每次CAS的時候比較的不僅是地址, 而是地址+計數器這個64位整數, 只有兩個都對上了纔算經過, 結合上面的ABA問題, 若是這個時候A節點被刪了, 同時BC被推動來, 0x0014對應的count確定是不同的, 等到調度回來CAS沒法經過