工做中,常常會碰到併發讀寫 map 而形成 panic 的狀況,爲何在併發讀寫的時候,會 panic 呢?由於在併發讀寫的狀況下,map 裏的數據會被寫亂,以後就是 Garbage in, garbage out
,還不如直接 panic 了。git
Go 語言原生 map 並非線程安全的,對它進行併發讀寫操做的時候,須要加鎖。而 sync.map
則是一種併發安全的 map,在 Go 1.9 引入。github
sync.map
是線程安全的,讀取,插入,刪除也都保持着常數級的時間複雜度。
sync.map
的零值是有效的,而且零值是一個空的 map。在第一次使用以後,不容許被拷貝。
通常狀況下解決併發讀寫 map 的思路是加一把大鎖,或者把一個 map 分紅若干個小 map,對 key 進行哈希,只操做相應的小 map。前者鎖的粒度比較大,影響效率;後者實現起來比較複雜,容易出錯。golang
而使用 sync.map
以後,對 map 的讀寫,不須要加鎖。而且它經過空間換時間的方式,使用 read 和 dirty 兩個 map 來進行讀寫分離,下降鎖時間來提升效率。面試
使用很是簡單,和普通 map 相比,僅遍歷的方式略有區別:編程
package main import ( "fmt" "sync" ) func main() { var m sync.Map // 1. 寫入 m.Store("qcrao", 18) m.Store("stefno", 20) // 2. 讀取 age, _ := m.Load("qcrao") fmt.Println(age.(int)) // 3. 遍歷 m.Range(func(key, value interface{}) bool { name := key.(string) age := value.(int) fmt.Println(name, age) return true }) // 4. 刪除 m.Delete("qcrao") age, ok := m.Load("qcrao") fmt.Println(age, ok) // 5. 讀取或寫入 m.LoadOrStore("stefno", 100) age, _ = m.Load("stefno") fmt.Println(age) }
第 1 步,寫入兩個 k-v 對;segmentfault
第 2 步,使用 Load 方法讀取其中的一個 key;緩存
第 3 步,遍歷全部的 k-v 對,並打印出來;安全
第 4 步,刪除其中的一個 key,再讀這個 key,獲得的就是 nil;數據結構
第 5 步,使用 LoadOrStore,嘗試讀取或寫入 "Stefno",由於這個 key 已經存在,所以寫入不成功,而且讀出原值。併發
程序輸出:
18 stefno 20 qcrao 18 <nil> false 20
sync.map
適用於讀多寫少的場景。對於寫多的場景,會致使 read map 緩存失效,須要加鎖,致使衝突變多;並且因爲未命中 read map 次數過多,致使 dirty map 提高爲 read map,這是一個 O(N) 的操做,會進一步下降性能。
先來看下 map 的數據結構。去掉大段的註釋後:
type Map struct { mu Mutex read atomic.Value // readOnly dirty map[interface{}]*entry misses int }
互斥量 mu
保護 read 和 dirty。
read
是 atomic.Value 類型,能夠併發地讀。但若是須要更新 read
,則須要加鎖保護。對於 read 中存儲的 entry 字段,可能會被併發地 CAS 更新。可是若是要更新一個以前已被刪除的 entry,則須要先將其狀態從 expunged 改成 nil,再拷貝到 dirty 中,而後再更新。
dirty
是一個非線程安全的原始 map。包含新寫入的 key,而且包含 read
中的全部未被刪除的 key。這樣,能夠快速地將 dirty
提高爲 read
對外提供服務。若是 dirty
爲 nil,那麼下一次寫入時,會新建一個新的 dirty
,這個初始的 dirty
是 read
的一個拷貝,但除掉了其中已被刪除的 key。
每當從 read 中讀取失敗,都會將 misses
的計數值加 1,當加到必定閾值之後,須要將 dirty 提高爲 read,以期減小 miss 的情形。
read map
和dirty map
的存儲方式是不一致的。
前者使用 atomic.Value,後者只是單純的使用 map。
緣由是 read map 使用 lock free 操做,必須保證 load/store 的原子性;而 dirty map 的 load+store 操做是由 lock(就是 mu)來保護的。
真正存儲 key/value
的是 read 和 dirty 字段。read
使用 atomic.Value,這是 lock-free 的基礎,保證 load/store 的原子性。dirty
則直接用了一個原始的 map,對於它的 load/store 操做須要加鎖。
read
字段裏其實是存儲的是:
// readOnly is an immutable struct stored atomically in the Map.read field. type readOnly struct { m map[interface{}]*entry amended bool // true if the dirty map contains some key not in m. }
注意到 read 和 dirty 裏存儲的東西都包含 entry
,來看一下:
type entry struct { p unsafe.Pointer // *interface{} }
很簡單,它是一個指針,指向 value。看來,read 和 dirty 各自維護一套 key,key 指向的都是同一個 value。也就是說,只要修改了這個 entry,對 read 和 dirty 都是可見的。這個指針的狀態有三種:
當 p == nil
時,說明這個鍵值對已被刪除,而且 m.dirty == nil,或 m.dirty[k] 指向該 entry。
當 p == expunged
時,說明這條鍵值對已被刪除,而且 m.dirty != nil,且 m.dirty 中沒有這個 key。
其餘狀況,p 指向一個正常的值,表示實際 interface{}
的地址,而且被記錄在 m.read.m[key] 中。若是這時 m.dirty 不爲 nil,那麼它也被記錄在 m.dirty[key] 中。二者實際上指向的是同一個值。
當刪除 key 時,並不實際刪除。一個 entry 能夠經過原子地(CAS 操做)設置 p 爲 nil 被刪除。若是以後建立 m.dirty,nil 又會被原子地設置爲 expunged,且不會拷貝到 dirty 中。
若是 p 不爲 expunged,和 entry 相關聯的這個 value 能夠被原子地更新;若是 p == expunged
,那麼僅當它初次被設置到 m.dirty 以後,才能夠被更新。
總體用一張圖來表示:
先來看 expunged:
var expunged = unsafe.Pointer(new(interface{}))
它是一個指向任意類型的指針,用來標記從 dirty map 中刪除的 entry。
// Store sets the value for a key. func (m *Map) Store(key, value interface{}) { // 若是 read map 中存在該 key 則嘗試直接更改(因爲修改的是 entry 內部的 pointer,所以 dirty map 也可見) read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { // 若是 read map 中存在該 key,但 p == expunged,則說明 m.dirty != nil 而且 m.dirty 中不存在該 key 值 此時: // a. 將 p 的狀態由 expunged 更改成 nil // b. dirty map 插入 key m.dirty[key] = e } // 更新 entry.p = value (read map 和 dirty map 指向同一個 entry) e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { // 若是 read map 中不存在該 key,但 dirty map 中存在該 key,直接寫入更新 entry(read map 中仍然沒有這個 key) e.storeLocked(&value) } else { // 若是 read map 和 dirty map 中都不存在該 key,則: // a. 若是 dirty map 爲空,則須要建立 dirty map,並從 read map 中拷貝未刪除的元素到新建立的 dirty map // b. 更新 amended 字段,標識 dirty map 中存在 read map 中沒有的 key // c. 將 kv 寫入 dirty map 中,read 不變 if !read.amended { // 到這裏就意味着,當前的 key 是第一次被加到 dirty map 中。 // store 以前先判斷一下 dirty map 是否爲空,若是爲空,就把 read map 淺拷貝一次。 m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } // 寫入新 key,在 dirty 中存儲 value m.dirty[key] = newEntry(value) } m.mu.Unlock() }
總體流程:
p == expunged
,說明 m.dirty != nil 而且 m.dirty 中不存在該 key 值 此時: a. 將 p 的狀態由 expunged 更改成 nil;b. dirty map 插入 key。而後,直接更新對應的 value。再來看一些子函數:
// 若是 entry 沒被刪,tryStore 存儲值到 entry 中。若是 p == expunged,即 entry 被刪,那麼返回 false。 func (e *entry) tryStore(i *interface{}) bool { for { p := atomic.LoadPointer(&e.p) if p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } } }
tryStore
在 Store 函數最開始的時候就會調用,是比較常見的 for
循環加 CAS 操做,嘗試更新 entry,讓 p 指向新的值。
unexpungeLocked
函數確保了 entry 沒有被標記成已被清除:
// unexpungeLocked 函數確保了 entry 沒有被標記成已被清除。 // 若是 entry 先前被清除過了,那麼在 mutex 解鎖以前,它必定要被加入到 dirty map 中 func (e *entry) unexpungeLocked() (wasExpunged bool) { return atomic.CompareAndSwapPointer(&e.p, expunged, nil) }
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 若是沒在 read 中找到,而且 amended 爲 true,即 dirty 中存在 read 中沒有的 key if !ok && read.amended { m.mu.Lock() // dirty map 不是線程安全的,因此須要加上互斥鎖 // double check。避免在上鎖的過程當中 dirty map 提高爲 read map。 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] // 仍然沒有在 read 中找到這個 key,而且 amended 爲 true if !ok && read.amended { e, ok = m.dirty[key] // 從 dirty 中找 // 無論 dirty 中有沒有找到,都要"記一筆",由於在 dirty 提高爲 read 以前,都會進入這條路徑 m.missLocked() } m.mu.Unlock() } if !ok { // 若是沒找到,返回空,false return nil, false } return e.load() }
處理路徑分爲 fast path 和 slow path,總體流程以下:
這裏主要看下 missLocked
的函數的實現:
func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } // dirty map 晉升 m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
直接將 misses 的值加 1,表示一次未命中,若是 misses 值小於 m.dirty 的長度,就直接返回。不然,將 m.dirty 晉升爲 read,並清空 dirty,清空 misses 計數值。這樣,以前一段時間新加入的 key 都會進入到 read 中,從而可以提高 read 的命中率。
再來看下 entry 的 load 方法:
func (e *entry) load() (value interface{}, ok bool) { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return nil, false } return *(*interface{})(p), true }
對於 nil 和 expunged 狀態的 entry,直接返回 ok=false
;不然,將 p 轉成 interface{}
返回。
// Delete deletes the value for a key. func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 若是 read 中沒有這個 key,且 dirty map 不爲空 if !ok && read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { delete(m.dirty, key) // 直接從 dirty 中刪除這個 key } m.mu.Unlock() } if ok { e.delete() // 若是在 read 中找到了這個 key,將 p 置爲 nil } }
能夠看到,基本套路仍是和 Load,Store 相似,都是先從 read 裏查是否有這個 key,若是有則執行 entry.delete
方法,將 p 置爲 nil,這樣 read 和 dirty 都能看到這個變化。
若是沒在 read 中找到這個 key,而且 dirty 不爲空,那麼就要操做 dirty 了,操做以前,仍是要先上鎖。而後進行 double check,若是仍然沒有在 read 裏找到此 key,則從 dirty 中刪掉這個 key。但不是真正地從 dirty 中刪除,而是更新 entry 的狀態。
來看下 entry.delete
方法:
func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
它真正作的事情是將正常狀態(指向一個 interface{})的 p 設置成 nil。沒有設置成 expunged 的緣由是,當 p 爲 expunged 時,表示它已經不在 dirty 中了。這是 p 的狀態機決定的,在 tryExpungeLocked
函數中,會將 nil 原子地設置成 expunged。
tryExpungeLocked
是在新建立 dirty 時調用的,會將已被刪除的 entry.p 從 nil 改爲 expunged,這個 entry 就不會寫入 dirty 了。
func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { // 若是原來是 nil,說明原 key 已被刪除,則將其轉爲 expunged。 if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
注意到若是 key 同時存在於 read 和 dirty 中時,刪除只是作了一個標記,將 p 置爲 nil;而若是僅在 dirty 中含有這個 key 時,會直接刪除這個 key。緣由在於,若二者都存在這個 key,僅作標記刪除,能夠在下次查找這個 key 時,命中 read,提高效率。若只有在 dirty 中存在時,read 起不到「緩存」的做用,直接刪除。
這個函數結合了 Load 和 Store 的功能,若是 map 中存在這個 key,那麼返回這個 key 對應的 value;不然,將 key-value 存入 map。這在須要先執行 Load 查看某個 key 是否存在,以後再更新此 key 對應的 value 時頗有效,由於 LoadOrStore 能夠併發執行。
具體的過程再也不一一分析了,可參考 Load 和 Store 的源碼分析。
Range 的參數是一個函數:
f func(key, value interface{}) bool
由使用者提供實現,Range 將遍歷調用時刻 map 中的全部 k-v 對,將它們傳給 f 函數,若是 f 返回 false,將中止遍歷。
func (m *Map) Range(f func(key, value interface{}) bool) { read, _ := m.read.Load().(readOnly) if read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) if read.amended { read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } for k, e := range read.m { v, ok := e.load() if !ok { continue } if !f(k, v) { break } } }
當 amended 爲 true 時,說明 dirty 中含有 read 中沒有的 key,由於 Range 會遍歷全部的 key,是一個 O(n) 操做。將 dirty 提高爲 read,會將開銷分攤開來,因此這裏直接就提高了。
以後,遍歷 read,取出 entry 中的值,調用 f(k, v)。
關於爲什麼 sync.map
沒有 Len 方法,參考資料裏給出了 issue,bcmills
認爲對於併發的數據結構和非併發的數據結構並不必定要有相同的方法。例如,map 有 Len 方法,sync.map 卻不必定要有。就像 sync.map 有 LoadOrStore 方法,map 就沒有同樣。
有些實現增長了一個計數器,並原子地增長或減小它,以此來表示 sync.map 中元素的個數。但 bcmills
提出這會引入競爭:atomic
並非 contention-free
的,它只是把競爭下沉到了 CPU 層級。這會給其餘不須要 Len 方法的場景帶來負擔。
sync.map
是線程安全的,讀取,插入,刪除也都保持着常數級的時間複雜度。k,v
,返回值是一個布爾值:f func(key, value interface{}) bool
。【德志大佬-設計併發安全的 map】https://halfrost.com/go_map_chapter_one/
【德志大佬-設計併發安全的 map】https://halfrost.com/go_map_chapter_two/
【關於 sync.map 爲何沒有 len 方法的 issue】https://github.com/golang/go/issues/20680
【芮神增長了 len 方法】http://xiaorui.cc/archives/4972
【圖解 map 操做】https://wudaijun.com/2018/02/go-sync-map-implement/
【從一道面試題開始】http://www.javashuo.com/article/p-vvbdwsqz-v.html
【源碼分析】https://zhuanlan.zhihu.com/p/44585993
【行文通暢,流程圖清晰】http://www.javashuo.com/article/p-kdgzgony-gp.html