深刻理解Go 1.9 sync.Map

Go官方的faq已經提到內建的map不是線程(goroutine)安全的。在Go 1.6以前, 內置的map類型是部分goroutine安全的,併發的讀沒有問題,併發的寫可能有問題。自go 1.6以後, 併發地讀寫map會報錯,這在一些知名的開源庫中都存在這個問題,因此go 1.9以前的解決方案是額外綁定一個鎖,封裝成一個新的struct或者單獨使用鎖均可以。另外筆者在go 1.9以前一般是使用concurrent-map來解決這類問題,可是不是全部的第三方庫都以此來解決問題。git

咱們先來看看這個代碼樣例:程序中一個goroutine一直讀,一個goroutine一直寫同一個鍵值,即便讀寫的鍵不相同,並且map也沒有"擴容"等操做,代碼仍是會報錯的,錯誤信息是: fatal error: concurrent map read and map write。github

package main
func main() {
	m := make(map[int]int)
	go func() {
		for {
			_ = m[1]
		}
	}()
	go func() {
		for {
			m[2] = 2
		}
	}()
	select {}
}

問題的根源在Go的源代碼: hashmap_fast.go#L118,會看到讀的時候會檢查hashWriting標誌, 若是有這個標誌,就會報併發錯誤。golang

寫的時候會設置這個標誌: hashmap.go#L542安全

h.flags |= hashWriting

hashmap.go#L628設置完以後會取消這個標記。這樣併發讀寫的檢查有不少處, 好比寫的時候也會檢查是否是有併發的寫,刪除鍵的時候相似寫,遍歷的時候併發讀寫問題等。map的併發問題並非那麼容易被發現, 你能夠利用-race參數來檢查。數據結構

併發地使用map對象是咱們平常開發中一個很常見的需求,特別是在一些大項目中。map總會保存goroutine共享的數據。Go 1.9以前在Go官方blog的Go maps in action一文中,給出了一種簡便的解決方案。併發

首先,經過嵌入struct爲map增長一個讀寫鎖性能

var counter = struct{
    sync.RWMutex
    m map[string]int
}{m: make(map[string]int)}

讀寫數據時,能夠很方便的加鎖測試

counter.RLock()
n := counter.m["some_key"]
counter.RUnlock()
fmt.Println("some_key:", n)

counter.Lock()
counter.m["some_key"]++
counter.Unlock()

固然,你也可使用concurrent-map來解決問題優化

// Create a new map.
map := cmap.New()
	
// Sets item within map, sets "bar" under key "foo"
map.Set("foo", "bar")

// Retrieve item from map.
if tmp, ok := map.Get("foo"); ok {
	bar := tmp.(string)
}

// Removes item under key "foo"
map.Remove("foo")

二者本質上都是使用sync.RWMutex來保障線程(goroutine)安全的。這種解決方案至關簡潔,而且利用讀寫鎖而不是Mutex能夠進一步減小讀寫的時候由於鎖帶來的性能。但在map的數據很是大的狀況下,一把鎖會致使大併發的客戶端共爭一把鎖,這時,在Go 1.9中sync.Map就很是實用。(除了以上這些以外,還有一個筆者想提到的庫,cmap也是一個至關好,安全且性能出色的第三方庫)atom

Go 1.9中sync.Map的實現有如下優化點:

  1. 空間換時間。 經過冗餘的兩個數據結構(read、dirty),實現加鎖對性能的影響。
  2. 使用只讀數據(read),避免讀寫衝突。
  3. 動態調整,miss次數多了以後,將dirty數據提高爲read。
  4. double-checking。
  5. 延遲刪除。 刪除一個鍵值只是打標記,只有在提高dirty的時候才清理刪除的數據。
  6. 優先從read讀取、更新、刪除,由於對read的讀取不須要鎖。

sync.Map數據結構很簡單,包含四個字段:readmudirtymisses

type Map struct {
	// 當涉及到dirty數據的操做的時候,須要使用此鎖
	mu Mutex
	// 一個只讀的數據結構,由於只讀,因此不會有讀寫衝突。
	// 因此從這個數據中讀取老是安全的。
	// 實際上,實際也會更新這個數據的entries,若是entry是未刪除的(unexpunged), 並不須要加鎖。若是entry已經被刪除了,須要加鎖,以便更新dirty數據。
	read atomic.Value // readOnly
	// dirty數據包含當前的map包含的entries,它包含最新的entries(包括read中未刪除的數據,雖有冗餘,可是提高dirty字段爲read的時候很是快,不用一個一個的複製,而是直接將這個數據結構做爲read字段的一部分),有些數據還可能沒有移動到read字段中。
	// 對於dirty的操做須要加鎖,由於對它的操做可能會有讀寫競爭。
	// 當dirty爲空的時候, 好比初始化或者剛提高完,下一次的寫操做會複製read字段中未刪除的數據到這個數據中。
	dirty map[interface{}]*entry
	// 當從Map中讀取entry的時候,若是read中不包含這個entry,會嘗試從dirty中讀取,這個時候會將misses加一,
	// 當misses累積到 dirty的長度的時候, 就會將dirty提高爲read,避免從dirty中miss太屢次。由於操做dirty須要加鎖。
	misses int
}

read的數據結構

type readOnly struct {
	m       map[interface{}]*entry
	amended bool // 若是Map.dirty有些數據不在其中的時候,這個值爲true
}

這裏的精髓是,使用了冗餘的數據結構readdirtydirty中會包含read中未刪除的entries,新增長的entries會加入到dirty中。amended指明Map.dirty中有readOnly.m未包含的數據,因此若是從Map.read找不到數據的話,還要進一步到Map.dirty中查找。而對Map.read的修改是經過原子操做進行的。雖然readdirty有冗餘數據,但這些數據是經過指針指向同一個數據,因此儘管Map的value會很大,可是冗餘的空間佔用仍是有限的。readOnly.mMap.dirty存儲的值類型是*entry,它包含一個指針p, 指向用戶存儲的value值。

type entry struct {
	p unsafe.Pointer // *interface{}
}

p有三種值:

  • nil: entry已被刪除了,而且m.dirty爲nil
  • expunged: entry已被刪除了,而且m.dirty不爲nil,並且這個entry不存在於m.dirty中
  • 其它: entry是一個正常的值

理解了sync.Map的數據結構,那麼咱們先來看看sync.Map的Load方法實現

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	// 1.首先從m.read中獲得只讀readOnly,從它的map中查找,不須要加鎖
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	// 2. 若是沒找到,而且m.dirty中有新數據,須要從m.dirty查找,這個時候須要加鎖
	if !ok && read.amended {
		m.mu.Lock()
		// 雙檢查,避免加鎖的時候m.dirty提高爲m.read,這個時候m.read可能被替換了。
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		// 若是m.read中仍是不存在,而且m.dirty中有新數據
		if !ok && read.amended {
			// 從m.dirty查找
			e, ok = m.dirty[key]
			// 無論m.dirty中存不存在,都將misses計數加一
			// missLocked()中知足條件後就會提高m.dirty
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

Load加載方法,提供一個鍵key,查找對應的值value,若是不存在,經過ok反映。這裏的精髓是從m.read中加載,不存在的狀況下,而且m.dirty中有新數據,加鎖,而後從m.dirty中加載。另一點是這裏使用了雙檢查的處理,由於在下面的兩個語句中,這兩行語句並非一個原子操做。

if !ok && read.amended {
		m.mu.Lock()

雖然第一句執行的時候條件知足,可是在加鎖以前,m.dirty可能被提高爲m.read,因此加鎖後還得再檢查m.read,後續的方法中都使用了這個方法。若是咱們查詢的鍵值正好存在於m.read中,則無須加鎖,直接返回,理論上性能優異。即便不存在於m.read中,通過miss幾回以後,m.dirty會被提高爲m.read,又會從m.read中查找。因此對於更新/增長較少,加載存在的key不少的場景,性能基本和無鎖的map相差無幾。

通過miss幾回以後,m.dirty會被提高爲m.read,那麼m.dirty又是如何被提高的呢?重點在missLocked方法中。

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

最後三行代碼就是提高m.dirty的,很簡單的將m.dirty做爲readOnlym字段,原子更新m.read。提高後m.dirtym.misses重置, 而且m.read.amended爲false。

sync.Map的Store方法實現

func (m *Map) Store(key, value interface{}) {
	// 若是m.read存在這個鍵,而且這個entry沒有被標記刪除,嘗試直接存儲。
	// 由於m.dirty也指向這個entry,因此m.dirty也保持最新的entry。
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}
	// 若是`m.read`不存在或者已經被標記刪除
	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() { //標記成未被刪除
			m.dirty[key] = e //m.dirty中不存在這個鍵,因此加入m.dirty
		}
		e.storeLocked(&value) //更新
	} else if e, ok := m.dirty[key]; ok { // m.dirty存在這個鍵,更新
		e.storeLocked(&value)
	} else { //新鍵值
		if !read.amended { //m.dirty中沒有新的數據,往m.dirty中增長第一個新鍵
			m.dirtyLocked() //從m.read中複製未刪除的數據
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value) //將這個entry加入到m.dirty中
	}
	m.mu.Unlock()
}

func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}
	read, _ := m.read.Load().(readOnly)
	m.dirty = make(map[interface{}]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := atomic.LoadPointer(&e.p)
	for p == nil {
		// 將已經刪除標記爲nil的數據標記爲expunged
		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
	}
	return p == expunged
}

Store方法是更新或者新增一個entry。以上操做都是先從操做m.read開始的,不知足條件再加鎖,而後操做m.dirty。Store可能會在某種狀況下(初始化或者m.dirty剛被提高後)從m.read中複製數據,若是這個時候m.read中數據量很是大,可能會影響性能。

sync.Map的Delete方法實現

func (m *Map) Delete(key interface{}) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			delete(m.dirty, key)
		}
		m.mu.Unlock()
	}
	if ok {
		e.delete()
	}
}

func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		// 已標記爲刪除
		if p == nil || p == expunged {
			return false
		}
		// 原子操做,e.p標記爲nil
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

Delete方法刪除一個鍵值。和Store方法同樣,刪除操做仍是從m.read中開始, 若是這個entry不存在於m.read中,而且m.dirty中有新數據,則加鎖嘗試從m.dirty中刪除。注意,仍是要雙檢查的。 從m.dirty中直接刪除便可,就當它沒存在過,可是若是是從m.read中刪除,並不會直接刪除,而是打標記而已。

sync.Map的Range方法實現

func (m *Map) Range(f func(key, value interface{}) bool) {
	read, _ := m.read.Load().(readOnly)
	// 若是m.dirty中有新數據,則提高m.dirty,而後在遍歷
	if read.amended {
		//提高m.dirty
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly) //雙檢查
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}
	// 遍歷, for range是安全的
	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

在Go語言中,for ... range map是內建的語言特性,因此沒有辦法使用for range遍歷sync.Map, 因而變通的有了Range方法,經過回調的方式遍歷。Range方法調用前可能會作一個m.dirty的提高,不過提高m.dirty不是一個耗時的操做。

sync.Map的LoadOrStore 方法實現

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		actual, loaded, ok := e.tryLoadOrStore(value)
		if ok {
			return actual, loaded
		}
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			m.dirty[key] = e
		}
		actual, loaded, _ = e.tryLoadOrStore(value)
	} else if e, ok := m.dirty[key]; ok {
		actual, loaded, _ = e.tryLoadOrStore(value)
		m.missLocked()
	} else {
		if !read.amended {
			// 給dirty添加一個新key,
			// 標記只讀爲不完整
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
		actual, loaded = value, false
	}
	m.mu.Unlock()

	return actual, loaded
}

func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == expunged {
		return nil, false, false
	}
	if p != nil {
		return *(*interface{})(p), true, true
	}
	ic := i
	for {
		if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
			return i, false, true
		}
		p = atomic.LoadPointer(&e.p)
		if p == expunged {
			return nil, false, false
		}
		if p != nil {
			return *(*interface{})(p), true, true
		}
	}
}

LoadOrStore方法若是提供的key存在,則返回已存在的值(Load),不然保存提供的鍵值(Store)。一樣是從m.read開始,而後是m.dirty,最後還有雙檢查機制。

Go 1.9源代碼中提供了性能的測試: map_bench_test.gomap_reference_test.go,和之前的解決方案比較,性能會有很多的提高。

最後sync.Map沒有Len方法,而且目前沒有跡象要加上 (issue#20680),因此若是想獲得當前Map中有效的entries的數量,須要使用Range方法遍歷一次。

相關文章
相關標籤/搜索