衆所周知,go普通的map是不支持併發的,換而言之,不是線程(goroutine)安全的。博主是從golang 1.4開始使用的,那時候map的併發讀是沒有支持,可是併發寫會出現髒數據。golang 1.6以後,併發地讀寫會直接panic:git
fatal error: concurrent map read and map write
package main func main() { m := make(map[int]int) go func() { for { _ = m[1] } }() go func() { for { m[2] = 2 } }() select {} }
因此須要支持對map的併發讀寫時候,博主使用兩種方法:github
golang 1.9以後,go 在sync包下引入了併發安全的map,也爲博主提供了第三種方法。本文重點也在此,爲了時效性,本文基於golang 1.10源碼進行分析。golang
type Map struct { mu Mutex //互斥鎖,用於鎖定dirty map read atomic.Value //優先讀map,支持原子操做,註釋中有readOnly不是說read是隻讀,而是它的結構體。read實際上有寫的操做 dirty map[interface{}]*entry // dirty是一個當前最新的map,容許讀寫 misses int // 主要記錄read讀取不到數據加鎖讀取read map以及dirty map的次數,當misses等於dirty的長度時,會將dirty複製到read }
readOnly 主要用於存儲,經過原子操做存儲在Map.read中元素。安全
type readOnly struct { m map[interface{}]*entry amended bool // 若是數據在dirty中但沒有在read中,該值爲true,做爲修改標識 }
type entry struct { // nil: 表示爲被刪除,調用Delete()能夠將read map中的元素置爲nil // expunged: 也是表示被刪除,可是該鍵只在read而沒有在dirty中,這種狀況出如今將read複製到dirty中,即複製的過程會先將nil標記爲expunged,而後不將其複製到dirty // 其餘: 表示存着真正的數據 p unsafe.Pointer // *interface{} }
若是你接觸過大Java,那你必定對CocurrentHashMap利用鎖分段技術增長了鎖的數目,從而使爭奪同一把鎖的線程的數目獲得控制的原理記憶深入。
那麼Golang的sync.Map是否也是使用了相同的原理呢?sync.Map的原理很簡單,使用了空間換時間策略,經過冗餘的兩個數據結構(read、dirty),實現加鎖對性能的影響。
經過引入兩個map將讀寫分離到不一樣的map,其中read map提供併發讀和已存元素原子寫,而dirty map則負責讀寫。 這樣read map就能夠在不加鎖的狀況下進行併發讀取,當read map中沒有讀取到值時,再加鎖進行後續讀取,並累加未命中數,當未命中數大於等於dirty map長度,將dirty map上升爲read map。從以前的結構體的定義能夠發現,雖然引入了兩個map,可是底層數據存儲的是指針,指向的是同一份值。數據結構
開始時sync.Map寫入數據併發
X=1 Y=2 Z=3
dirty map主要接受寫請求,read map沒有數據,此時read map與dirty map數據以下圖。app
讀取數據的時候從read map中讀取,此時read map並無數據,miss記錄從read map讀取失敗的次數,當misses>=len(dirty map)時,將dirty map直接升級爲read map,這裏直接對dirty map進行地址拷貝而且dirty map被清空,misses置爲0。此時read map與dirty map數據以下圖。函數
如今有需求對Z元素進行修改Z=4,sync.Map會直接修改read map的元素。源碼分析
新加元素K=5,新加的元素就須要操做dirty map了,若是misses達到閥值後dirty map直接升級爲read map而且dirty map爲空map(read的amended==false),則dirty map須要從read map複製數據。 性能
升級後的效果以下。
若是須要刪除Z,須要分幾種狀況:
一種read map存在該元素且read的amended==false:直接將read中的元素置爲nil。
另外一種爲元素剛剛寫入dirty map且未升級爲read map:直接調用golang內置函數delete刪除dirty map的元素;
還有一種是read map和dirty map同時存在該元素:將read map中的元素置爲nil,由於read map和dirty map 使用的均爲元素地址,因此均被置爲nil。
Load返回存儲在映射中的鍵值,若是沒有值,則返回nil。ok結果指示是否在映射中找到值。
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { // 第一次檢測元素是否存在 read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { // 爲dirty map 加鎖 m.mu.Lock() // 第二次檢測元素是否存在,主要防止在加鎖的過程當中,dirty map轉換成read map,從而致使讀取不到數據 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { // 從dirty map中獲取是爲了應對read map中不存在的新元素 e, ok = m.dirty[key] // 不論元素是否存在,均須要記錄miss數,以便dirty map升級爲read map m.missLocked() } // 解鎖 m.mu.Unlock() } // 元素不存在直接返回 if !ok { return nil, false } return e.load() }
dirty map升級爲read map
func (m *Map) missLocked() { // misses自增1 m.misses++ // 判斷dirty map是否能夠升級爲read map if m.misses < len(m.dirty) { return } // dirty map升級爲read map m.read.Store(readOnly{m: m.dirty}) // dirty map 清空 m.dirty = nil // misses重置爲0 m.misses = 0 }
元素取值
func (e *entry) load() (value interface{}, ok bool) { p := atomic.LoadPointer(&e.p) // 元素不存在或者被刪除,則直接返回 if p == nil || p == expunged { return nil, false } return *(*interface{})(p), true }
read map主要用於讀取,每次Load都先從read讀取,當read中不存在且amended爲true,就從dirty讀取數據 。不管dirty map中是否存在該元素,都會執行missLocked函數,該函數將misses+1,當m.misses < len(m.dirty)
時,便會將dirty複製到read,此時再將dirty置爲nil,misses=0。
設置Key=>Value。
func (m *Map) Store(key, value interface{}) { // 若是read存在這個鍵,而且這個entry沒有被標記刪除,嘗試直接寫入,寫入成功,則結束 // 第一次檢測 read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } // dirty map鎖 m.mu.Lock() // 第二次檢測 read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { // unexpungelocc確保元素沒有被標記爲刪除 // 判斷元素被標識爲刪除 if e.unexpungeLocked() { // 這個元素以前被刪除了,這意味着有一個非nil的dirty,這個元素不在裏面. m.dirty[key] = e } // 更新read map 元素值 e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { // 此時read map沒有該元素,可是dirty map有該元素,並需修改dirty map元素值爲最新值 e.storeLocked(&value) } else { // read.amended==false,說明dirty map爲空,須要將read map 複製一份到dirty map if !read.amended { m.dirtyLocked() // 設置read.amended==true,說明dirty map有數據 m.read.Store(readOnly{m: read.m, amended: true}) } // 設置元素進入dirty map,此時dirty map擁有read map和最新設置的元素 m.dirty[key] = newEntry(value) } // 解鎖,有人認爲鎖的範圍有點大,假設read map數據很大,那麼執行m.dirtyLocked()會耗費花時間較多,徹底能夠在操做dirty map時才加鎖,這樣的想法是不對的,由於m.dirtyLocked()中有寫入操做 m.mu.Unlock() }
嘗試存儲元素。
func (e *entry) tryStore(i *interface{}) bool { // 獲取對應Key的元素,判斷是否標識爲刪除 p := atomic.LoadPointer(&e.p) if p == expunged { return false } for { // cas嘗試寫入新元素值 if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } // 判斷是否標識爲刪除 p = atomic.LoadPointer(&e.p) if p == expunged { return false } } }
unexpungelocc確保元素沒有被標記爲刪除。若是這個元素以前被刪除了,它必須在未解鎖前被添加到dirty map上。
func (e *entry) unexpungeLocked() (wasExpunged bool) { return atomic.CompareAndSwapPointer(&e.p, expunged, nil) }
從read map複製到dirty map。
func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { // 若是標記爲nil或者expunged,則不復制到dirty map if !e.tryExpungeLocked() { m.dirty[k] = e } } }
若是對應的元素存在,則返回該元素的值,若是不存在,則將元素寫入到sync.Map。若是已加載值,則加載結果爲true;若是已存儲,則爲false。
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { // 不加鎖的狀況下讀取read map // 第一次檢測 read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok { // 若是元素存在(是否標識爲刪除由tryLoadOrStore執行處理),嘗試獲取該元素已存在的值或者將元素寫入 actual, loaded, ok := e.tryLoadOrStore(value) if ok { return actual, loaded } } m.mu.Lock() // 第二次檢測 // 如下邏輯參看Store read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { m.dirty[key] = e } actual, loaded, _ = e.tryLoadOrStore(value) } else if e, ok := m.dirty[key]; ok { actual, loaded, _ = e.tryLoadOrStore(value) m.missLocked() } else { if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) actual, loaded = value, false } m.mu.Unlock() return actual, loaded }
若是沒有刪除元素,tryLoadOrStore將自動加載或存儲一個值。若是刪除元素,tryLoadOrStore保持條目不變並返回ok= false。
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { p := atomic.LoadPointer(&e.p) // 元素標識刪除,直接返回 if p == expunged { return nil, false, false } // 存在該元素真實值,則直接返回原來的元素值 if p != nil { return *(*interface{})(p), true, true } // 若是p爲nil(此處的nil,並是否是指元素的值爲nil,而是atomic.LoadPointer(&e.p)爲nil,元素的nil在unsafe.Pointer是有值的),則更新該元素值 ic := i for { if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { return i, false, true } p = atomic.LoadPointer(&e.p) if p == expunged { return nil, false, false } if p != nil { return *(*interface{})(p), true, true } } }
刪除元素,採用延遲刪除,當read map存在元素時,將元素置爲nil,只有在提高dirty的時候才清理刪除的數,延遲刪除能夠避免後續獲取刪除的元素時候須要加鎖。當read map不存在元素時,直接刪除dirty map中的元素
func (m *Map) Delete(key interface{}) { // 第一次檢測 read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() // 第二次檢測 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { // 不論dirty map是否存在該元素,都會執行刪除 delete(m.dirty, key) } m.mu.Unlock() } if ok { // 若是在read中,則將其標記爲刪除(nil) e.delete() } }
元素值置爲nil
func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
遍歷獲取sync.Map中全部的元素,使用的爲快照方式,因此不必定是準確的。
func (m *Map) Range(f func(key, value interface{}) bool) { // 第一檢測 read, _ := m.read.Load().(readOnly) // read.amended=true,說明dirty map包含全部有效的元素(含新加,不含被刪除的),使用dirty map if read.amended { // 第二檢測 m.mu.Lock() read, _ = m.read.Load().(readOnly) if read.amended { // 使用dirty map而且升級爲read map read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } // 一向原則,使用read map做爲讀 for k, e := range read.m { v, ok := e.load() // 被刪除的不計入 if !ok { continue } // 函數返回false,終止 if !f(k, v) { break } } }
通過了上面的分析能夠獲得,sync.Map並不適合同時存在大量讀寫的場景,大量的寫會致使read map讀取不到數據從而加鎖進行進一步讀取,同時dirty map不斷升級爲read map。 從而致使總體性能較低,特別是針對cache場景.針對append-only以及大量讀,少許寫場景使用sync.Map則相對比較合適。
sync.Map沒有提供獲取元素個數的Len()方法,不過能夠經過Range()實現。
func Len(sm sync.Map) int { lengh := 0 f := func(key, value interface{}) bool { lengh++ return true } one:=lengh lengh=0 sm.Range(f) if one != lengh { one = lengh lengh=0 sm.Range(f) if one <lengh { return lengh } } return one }
參考