golang 併發安全Map以及分段鎖的實現

涉及概念

  1. 併發安全Map
  2. 分段鎖
  3. sync.Map
  4. CAS ( Compare And Swap )
  5. 雙檢查

分斷鎖

type SimpleCache struct {
    mu    sync.RWMutex
    items map[interface{}]*simpleItem
}

    在平常開發中, 上述這種數據結構確定很多見,由於golang的原生map是非併發安全的,因此爲了保證map的併發安全,最簡單的方式就是給map加鎖。
    以前使用過兩個本地內存緩存的開源庫, gcache, cache2go,其中存儲緩存對象的結構都是這樣,對於輕量級的緩存庫,爲了設計簡潔(包含清理過時對象等 ) 再加上當須要緩存大量數據時有redis,memcache等明星項目解決。 可是若是拋開這些因素遇到真正數量巨大的數據量時,直接對一個map加鎖,當map中的值愈來愈多,訪問map的請求愈來愈多,你們都競爭這一把鎖顯得併發訪問控制變重。 在go1.9引入sync.Map 以前,比較流行的作法就是使用分段鎖,顧名思義就是將鎖分段,將鎖的粒度變小,將存儲的對象分散到各個分片中,每一個分片由一把鎖控制,這樣使得當須要對在A分片上的數據進行讀寫時不會影響B分片的讀寫。
圖片描述git

分段鎖的實現

// Map 分片
type ConcurrentMap []*ConcurrentMapShared

// 每個Map 是一個加鎖的併發安全Map
type ConcurrentMapShared struct {
    items map[string]interface{}
    sync.RWMutex    // 各個分片Map各自的鎖
}

    主流的分段鎖,即經過hash取模的方式找到當前訪問的key處於哪個分片之上,再對該分片進行加鎖以後再讀寫。分片定位時,經常使用有BKDR, FNV32等hash算法獲得key的hash值。github

func New() ConcurrentMap {
    // SHARD_COUNT  默認32個分片
    m := make(ConcurrentMap, SHARD_COUNT)
    for i := 0; i < SHARD_COUNT; i++ {
        m[i] = &ConcurrentMapShared{
            items: make(map[string]interface{}),
        }
    }
    return m
}

    在初始化好分片後, 對分片上的數據進行讀寫時就須要用hash取模進行分段定位來確認即將要讀寫的分片。golang

獲取段定位

func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
    return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

// FNV hash
func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    const prime32 = uint32(16777619)
    for i := 0; i < len(key); i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}

以後對於map的GET SET 就簡單順利成章的完成redis

Set And Get

func (m ConcurrentMap) Set(key string, value interface{}) {
    shard := m.GetShard(key) // 段定位找到分片
    shard.Lock()              // 分片上鎖
    shard.items[key] = value // 分片操做 
    shard.Unlock()              // 分片解鎖
}

func (m ConcurrentMap) Get(key string) (interface{}, bool) {
    shard := m.GetShard(key)
    shard.RLock()
    val, ok := shard.items[key]
    shard.RUnlock()
    return val, ok
}

    由此一個分段鎖Map就實現了, 可是比起普通的Map, 經常使用到的方法好比獲取全部key, 獲取全部Val 操做是要比原生Map複雜的,由於要遍歷每個分片的每個數據, 好在golang的併發特性使得解決這類問題變得很是簡單算法

Keys

// 統計當前分段map中item的個數
func (m ConcurrentMap) Count() int {
    count := 0
    for i := 0; i < SHARD_COUNT; i++ {
        shard := m[i]
        shard.RLock()
        count += len(shard.items)
        shard.RUnlock()
    }
    return count
}

// 獲取全部的key
func (m ConcurrentMap) Keys() []string {
    count := m.Count()
    ch := make(chan string, count)

    // 每個分片啓動一個協程 遍歷key
    go func() {
        wg := sync.WaitGroup{}
        wg.Add(SHARD_COUNT)
        for _, shard := range m {

            go func(shard *ConcurrentMapShared) {
                defer wg.Done()
                
                shard.RLock()

                // 每一個分片中的key遍歷後都寫入統計用的channel
                for key := range shard.items {
                    ch <- key
                }

                shard.RUnlock()
            }(shard)
        }
        wg.Wait()
        close(ch)
    }()

    keys := make([]string, count)
    // 統計各個協程併發讀取Map分片的key
    for k := range ch {
        keys = append(keys, k)
    }
    return keys
}

    這裏寫了一個benchMark來對該分段鎖Map和原生的Map加鎖方式進行壓測, 場景爲將一萬個不重複的鍵值對同時以100萬次寫和100萬次讀,分別進行5次壓測, 以下壓測代碼編程

func BenchmarkMapShared(b *testing.B) {
    num := 10000
    testCase := genNoRepetTestCase(num) // 10000個不重複的鍵值對
    m := New()
    for _, v := range testCase {
        m.Set(v.Key, v.Val)
    }
    b.ResetTimer()

    for i := 0; i < 5; i++ {
        b.Run(strconv.Itoa(i), func(b *testing.B) {

            b.N = 1000000

            wg := sync.WaitGroup{}
            wg.Add(b.N * 2)
            for i := 0; i < b.N; i++ {
                e := testCase[rand.Intn(num)]

                go func(key string, val interface{}) {
                    m.Set(key, val)
                    wg.Done()
                }(e.Key, e.Val)

                go func(key string) {
                    _, _ = m.Get(key)
                    wg.Done()
                }(e.Key)

            }
            wg.Wait()
        })
    }
}

原生Map加鎖壓測結果
圖片描述緩存

分段鎖壓測結果
圖片描述安全

能夠看出在將鎖的粒度細化後再面對大量須要控制併發安全的訪問時,分段鎖Map的耗時比原生Map加鎖要快3倍有餘數據結構

Sync.Map

    go1.9以後加入了支持併發安全的Map sync.Map, sync.Map 經過一份只使用原子操做的數據和一份冗餘了只讀數據的加鎖數據實現必定程度上的讀寫分離,使得大多數讀操做和更新操做是原子操做,寫入新數據才加鎖的方式來提高性能。如下是 sync.Map源碼剖析, 結構體中的註釋都會在具體實現代碼中提示相呼應多線程

type Map struct {
    // 保護dirty的鎖
    mu Mutex                        
    // 只讀數據(修改採用原子操做)
    read atomic.Value                
    // 包含只讀中全部數據(冗餘),寫入新數據時也在dirty中操做
    dirty map[interface{}]*entry  
    // 當原子操做訪問只讀read時找不到數據時會去dirty中尋找,此時misses+1,dirty及做爲存儲新寫入的數據,又冗餘了只讀結構中的數據,因此當misses > dirty 的長度時, 會將dirty升級爲read,同時將老的dirty置nil
    misses int 
}

// Map struct 中的 read 就是readOnly 的指針
type readOnly struct {
    // 基礎Map
    m   map[interface{}]*entry 
    // 用於表示當前dirty中是否有read中不存在的數據, 在寫入數據時, 若是發現dirty中沒有新數據且dirty爲nil時,會將read中未被刪除的數據拷貝一份冗餘到dirty中, 過程與Map struct中的 misses相呼應
    amended bool 
}

// 數據項
type entry struct {
    p unsafe.Pointer 
}

// 用於標記數據項已被刪除(主要保證數據冗餘時的併發安全)
// 上述Map結構中說到有一個將read數據拷貝冗餘至dirty的過程, 由於刪除數據項是將*entry置nil, 爲了不冗餘過程當中因併發問題致使*entry改變而影響到拷貝後的dirty正確性,因此sync.Map使用expunged來標記entry是否被刪除
var expunged = unsafe.Pointer(new(interface{}))

    在下面sync.Map具體實現中將會看到不少「雙檢查」代碼,由於經過原子操做獲取的值可能在進行其餘非原子操做過程當中已改變,因此再非原子操做後須要使用以前原子操做獲取的值須要再次進行原子操做獲取。

    compareAndSwap 交換並比較, 用於在多線程編程中實現不被打斷的數據交換操做,從而避免多線程同時改寫某一數據時致使數據不一致問題。

sync.Map Write

func (m *Map) Store(key, value interface{}) {
    // 先不上鎖,而是從只讀數據中按key讀取, 若是已存在以compareAndSwap操做進行覆蓋(update)
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }
    
    m.mu.Lock()
    // 雙檢查獲取read
    read, _ = m.read.Load().(readOnly)
    // 若是data在read中,更新entry
    if e, ok := read.m[key]; ok {
        // 若是原子操做讀到的數據是被標記刪除的, 則視爲新數據寫入dirty
        if e.unexpungeLocked() {
            m.dirty[key] = e
        }
        // 原子操做寫新數據
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 原子操做寫新數據
        e.storeLocked(&value)
    } else {
        // 新數據 
        // 當dirty中沒有新數據時,將read中數據冗餘到dirty
        if !read.amended {
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

func (e *entry) tryStore(i *interface{}) bool {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}


// 在dirty中沒有比read多出的新數據時觸發冗餘
func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        // 檢查entry是否被刪除, 被刪除的數據不冗餘
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        // 將被刪除(置nil)的數據以cas原子操做標記爲expunged(防止因併發狀況下其餘操做致使冗餘進dirty的數據不正確)
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

sync.Map Read

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]

    // 只讀數據中沒有,而且dirty有比read多的數據,加鎖在dirty中找
    if !ok && read.amended {
        m.mu.Lock()
        // 雙檢查, 由於上鎖以前的語句是非原子性的
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 只讀中沒有讀取到的次數+1
            e, ok = m.dirty[key]
            // 檢查是否達到觸發dirty升級read的條件
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    // atomic.Load 但被標記爲刪除的會返回nil
    return e.load()
}

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

sync.Map DELETE

func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 只讀中不存在須要到dirty中去刪除
    if !ok && read.amended {
        m.mu.Lock() 
        // 雙檢查, 由於上鎖以前的語句是非原子性的
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete()
    }
}

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

一樣以剛剛壓測原生加鎖Map和分段鎖的方式來壓測sync.Map
圖片描述

    壓測平均下來sync.Map和分段鎖差異不大,可是比起分段鎖, sync.Map則將鎖的粒度更加的細小到對數據的狀態上,使得大多數據能夠無鎖化操做, 同時比分段鎖擁有更好的拓展性,由於分段鎖使用前老是要定一個分片數量, 在作擴容或者縮小時很麻煩, 但要達到sync.Map這種性能既好又能動態擴容的程度,代碼就相對複雜不少。

    還有注意在使用sync.Map時切忌不要將其拷貝, go源碼中有對sync.Map註釋到」 A Map must not be copied after first use.」由於當sync.Map被拷貝以後, Map類型的dirty仍是那個map 可是read 和 鎖卻不是以前的read和鎖(都不在一個世界你拿什麼保護我), 因此必然致使併發不安全(爲了寫博我把sync.Map代碼複製出來一份把私有成員改爲可外部訪問的打印指針)
圖片描述

圖片描述

相關文章
相關標籤/搜索