目錄 [−]
在Go 1.6以前, 內置的map類型是部分goroutine安全的,併發的讀沒有問題,併發的寫可能有問題。自go 1.6以後, 併發地讀寫map會報錯,這在一些知名的開源庫中都存在這個問題,因此go 1.9以前的解決方案是額外綁定一個鎖,封裝成一個新的struct或者單獨使用鎖均可以。java
本文帶你深刻到sync.Map
的具體實現中,看看爲了增長一個功能,代碼是如何變的複雜的,以及做者在實現sync.Map
的一些思想。git
有併發問題的map
官方的faq 已經提到內建的map
不是線程(goroutine)安全的。程序員
首先,讓咱們看一段併發讀寫的代碼,下列程序中一個goroutine一直讀,一個goroutine一隻寫同一個鍵值,即即便讀寫的鍵不相同,並且map也沒有"擴容"等操做,代碼仍是會報錯。github
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main
func main() {
m := make (map [int ]int )
go func () {
for {
_ = m[1 ]
}
}()
go func () {
for {
m[2 ] = 2
}
}()
select {}
}
錯誤信息是: fatal error: concurrent map read and map write
。golang
若是你查看Go的源代碼: hashmap_fast.go#L118 ,會看到讀的時候會檢查hashWriting
標誌, 若是有這個標誌,就會報併發錯誤。安全
寫的時候會設置這個標誌: hashmap.go#L542 微信
hashmap.go#L628 設置完以後會取消這個標記。數據結構
固然,代碼中還有好幾處併發讀寫的檢查, 好比寫的時候也會檢查是否是有併發的寫,刪除鍵的時候相似寫,遍歷的時候併發讀寫問題等。併發
有時候,map的併發問題不是那麼容易被發現, 你能夠利用-race
參數來檢查。ide
Go 1.9以前的解決方案
可是,不少時候,咱們會併發地使用map對象,尤爲是在必定規模的項目中,map總會保存goroutine共享的數據。在Go官方blog的Go maps in action 一文中,提供了一種簡便的解決方案。
1
2
3
4
var counter = struct {
sync.RWMutex
m map [string ]int
}{m: make (map [string ]int )}
它使用嵌入struct爲map增長一個讀寫鎖。
讀數據的時候很方便的加鎖:
1
2
3
4
counter.RLock()
n := counter.m["some_key" ]
counter.RUnlock()
fmt.Println("some_key:" , n)
寫數據的時候:
1
2
3
counter.Lock()
counter.m["some_key" ]++
counter.Unlock()
sync.Map
能夠說,上面的解決方案至關簡潔,而且利用讀寫鎖而不是Mutex能夠進一步減小讀寫的時候由於鎖帶來的性能。
可是,它在一些場景下也有問題,若是熟悉Java的同窗,能夠對比一下java的ConcurrentHashMap
的實現,在map的數據很是大的狀況下,一把鎖會致使大併發的客戶端共爭一把鎖,Java的解決方案是shard
, 內部使用多個鎖,每一個區間共享一把鎖,這樣減小了數據共享一把鎖帶來的性能影響,orcaman 提供了這個思路的一個實現: concurrent-map ,他也詢問了Go相關的開發人員是否在Go中也實現這種方案 ,因爲實現的複雜性,答案是Yes, we considered it.
,可是除非有特別的性能提高和應用場景,不然沒有進一步的開發消息。
那麼,在Go 1.9中sync.Map
是怎麼實現的呢?它是如何解決併發提高性能的呢?
sync.Map
的實現有幾個優化點,這裏先列出來,咱們後面慢慢分析。
空間換時間。 經過冗餘的兩個數據結構(read、dirty),實現加鎖對性能的影響。
使用只讀數據(read),避免讀寫衝突。
動態調整,miss次數多了以後,將dirty數據提高爲read。
double-checking。
延遲刪除。 刪除一個鍵值只是打標記,只有在提高dirty的時候才清理刪除的數據。
優先從read讀取、更新、刪除,由於對read的讀取不須要鎖。
下面咱們介紹sync.Map
的重點代碼,以便理解它的實現思想。
首先,咱們看一下sync.Map
的數據結構:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Map struct {
mu Mutex
read atomic.Value
dirty map [interface {}]*entry
misses int
}
它的數據結構很簡單,值包含四個字段:read
、mu
、dirty
、misses
。
它使用了冗餘的數據結構read
、dirty
。dirty
中會包含read
中爲刪除的entries,新增長的entries會加入到dirty
中。
read
的數據結構是:
1
2
3
4
type readOnly struct {
m map [interface {}]*entry
amended bool
}
amended
指明Map.dirty
中有readOnly.m
未包含的數據,因此若是從Map.read
找不到數據的話,還要進一步到Map.dirty
中查找。
對Map.read的修改是經過原子操做進行的。
雖然read
和dirty
有冗餘數據,但這些數據是經過指針指向同一個數據,因此儘管Map的value會很大,可是冗餘的空間佔用仍是有限的。
readOnly.m
和Map.dirty
存儲的值類型是*entry
,它包含一個指針p, 指向用戶存儲的value值。
1
2
3
type entry struct {
p unsafe.Pointer
}
p有三種值:
nil: entry已被刪除了,而且m.dirty爲nil
expunged: entry已被刪除了,而且m.dirty不爲nil,並且這個entry不存在於m.dirty中
其它: entry是一個正常的值
以上是sync.Map
的數據結構,下面咱們重點看看Load
、Store
、Delete
、Range
這四個方法,其它輔助方法能夠參考這四個方法來理解。
Load
加載方法,也就是提供一個鍵key
,查找對應的值value
,若是不存在,經過ok
反映:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Map) Load(key interface {}) (value interface {}, ok bool ) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil , false
}
return e.load()
}
這裏有兩個值的關注的地方。一個是首先從m.read
中加載,不存在的狀況下,而且m.dirty
中有新數據,加鎖,而後從m.dirty
中加載。
二是這裏使用了雙檢查的處理,由於在下面的兩個語句中,這兩行語句並非一個原子操做。
1
2
if !ok && read.amended {
m.mu.Lock()
雖然第一句執行的時候條件知足,可是在加鎖以前,m.dirty
可能被提高爲m.read
,因此加鎖後還得再檢查m.read
,後續的方法中都使用了這個方法。
雙檢查的技術Java程序員很是熟悉了,單例模式的實現之一就是利用雙檢查的技術。
能夠看到,若是咱們查詢的鍵值正好存在於m.read
中,無須加鎖,直接返回,理論上性能優異。即便不存在於m.read
中,通過miss
幾回以後,m.dirty
會被提高爲m.read
,又會從m.read
中查找。因此對於更新/增長較少,加載存在的key不少的case,性能基本和無鎖的map相似。
下面看看m.dirty
是如何被提高的。 missLocked
方法中可能會將m.dirty
提高。
1
2
3
4
5
6
7
8
9
func (m *Map) missLocked() {
m.misses++
if m.misses < len (m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
上面的最後三行代碼就是提高m.dirty
的,很簡單的將m.dirty
做爲readOnly
的m
字段,原子更新m.read
。提高後m.dirty
、m.misses
重置, 而且m.read.amended
爲false。
Store
這個方法是更新或者新增一個entry。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (m *Map) Store(key, value interface {}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true })
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make (map [interface {}]*entry, len (read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool ) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil , expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
你能夠看到,以上操做都是先從操做m.read
開始的,不知足條件再加鎖,而後操做m.dirty
。
Store
可能會在某種狀況下(初始化或者m.dirty剛被提高後)從m.read
中複製數據,若是這個時候m.read
中數據量很是大,可能會影響性能。
Delete
刪除一個鍵值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (m *Map) Delete(key interface {}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete (m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete ()
}
}
一樣,刪除操做仍是從m.read
中開始, 若是這個entry不存在於m.read
中,而且m.dirty
中有新數據,則加鎖嘗試從m.dirty
中刪除。
注意,仍是要雙檢查的。 從m.dirty
中直接刪除便可,就當它沒存在過,可是若是是從m.read
中刪除,並不會直接刪除,而是打標記:
1
2
3
4
5
6
7
8
9
10
11
12
13
func (e *entry) delete () (hadValue bool ) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil ) {
return true
}
}
}
Range
由於for ... range map
是內建的語言特性,因此沒有辦法使用for range
遍歷sync.Map
, 可是可使用它的Range
方法,經過回調的方式遍歷。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (m *Map) Range(f func (key, value interface {}) bool ) {
read, _ := m.read.Load().(readOnly)
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
Range方法調用前可能會作一個m.dirty
的提高,不過提高m.dirty
不是一個耗時的操做。
sync.Map的性能
Go 1.9源代碼中提供了性能的測試: map_bench_test.go 、map_reference_test.go
我也基於這些代碼修改了一下,獲得下面的測試數據,相比較之前的解決方案,性能多少回有些提高,若是你特別關注性能,能夠考慮sync.Map
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
BenchmarkHitAll/*sync .RWMutexMap-4 20000000 83.8 ns/op
BenchmarkHitAll/*sync .Map-4 30000000 59.9 ns/op
BenchmarkHitAll_WithoutPrompting/*sync .RWMutexMap-4 20000000 96.9 ns/op
BenchmarkHitAll_WithoutPrompting/*sync .Map-4 20000000 64.1 ns/op
BenchmarkHitNone/*sync .RWMutexMap-4 20000000 79.1 ns/op
BenchmarkHitNone/*sync .Map-4 30000000 43.3 ns/op
BenchmarkHit_WithoutPrompting/*sync .RWMutexMap-4 20000000 81.5 ns/op
BenchmarkHit_WithoutPrompting/*sync .Map-4 30000000 44.0 ns/op
BenchmarkUpdate/*sync .RWMutexMap-4 5000000 328 ns/op
BenchmarkUpdate/*sync .Map-4 10000000 146 ns/op
BenchmarkUpdate_WithoutPrompting/*sync .RWMutexMap-4 5000000 336 ns/op
BenchmarkUpdate_WithoutPrompting/*sync .Map-4 5000000 324 ns/op
BenchmarkDelete/*sync .RWMutexMap-4 10000000 155 ns/op
BenchmarkDelete/*sync .Map-4 30000000 55.0 ns/op
BenchmarkDelete_WithoutPrompting/*sync .RWMutexMap-4 10000000 173 ns/op
BenchmarkDelete_WithoutPrompting/*sync .Map-4 10000000 147 ns/op
其它
sync.Map
沒有Len
方法,而且目前沒有跡象要加上 (issue#20680 ),因此若是想獲得當前Map中有效的entries的數量,須要使用Range
方法遍歷一次, 比較X疼。
LoadOrStore
方法若是提供的key存在,則返回已存在的值(Load),不然保存提供的鍵值(Store)。