全面解析Golang之map設計

因爲本文篇幅較長,故將目錄整理以下java

image

什麼是Map

維基百科的定義python

In computer science, an associative array, map, symbol table, or dictionary is an abstract data type composed of a collection of (key, value) pairs, such that each possible key appears at most once in the collection.git

說明:在計算機科學中,包含鍵值對(key-value)集合的抽象數據結構(關聯數組、符號表或字典),其每一個可能的鍵在該集合中最多出現一次,這樣的數據結構就是一種Map。github

操做

對Map的操做主要是增刪改查:golang

  • 在集合中增長鍵值對
  • 在集合中移除鍵值對
  • 修改某個存在的鍵值對
  • 根據特定的鍵尋找對應的值

實現

Map的實現主要有兩種方式:哈希表(hash table)和搜索樹(search tree)。例如Java中的hashMap是基於哈希表實現,而C++中的Map是基於一種平衡搜索二叉樹——紅黑樹而實現的。如下是不一樣實現方式的時間複雜度對比。算法

image

能夠看到,對於元素查找而言,二叉搜索樹的平均和最壞效率都是O(log n),哈希表實現的平均效率是O(1),但最壞狀況下能達到O(n),不過若是哈希表設計優秀,最壞狀況基本不會出現(因此,讀者不想知道Go是如何設計的Map嗎)。另外二叉搜索樹返回的key是有序的,而哈希表則是亂序。api

哈希表

因爲Go中map的基於哈希表(也被稱爲散列表)實現,本文不探討搜索樹的map實現。如下是Go官方博客對map的說明。數組

One of the most useful data structures in computer science is the hash table. Many hash table implementations exist with varying properties, but in general they offer fast lookups, adds, and deletes. Go provides a built-in map type that implements a hash table.緩存

學習哈希表首先要理解兩個概念:哈希函數和哈希衝突。安全

哈希函數

哈希函數(常被稱爲散列函數)是能夠用於將任意大小的數據映射到固定大小值的函數,常見的包括MD五、SHA系列等。

image

一個設計優秀的哈希函數應該包含如下特性:

  • 均勻性:一個好的哈希函數應該在其輸出範圍內儘量均勻地映射,也就是說,應以大體相同的機率生成輸出範圍內的每一個哈希值。
  • 效率高:哈希效率要高,即便很長的輸入參數也能快速計算出哈希值。
  • 可肯定性:哈希過程必須是肯定性的,這意味着對於給定的輸入值,它必須始終生成相同的哈希值。
  • 雪崩效應:微小的輸入值變化也會讓輸出值發生巨大的變化。
  • 不可逆:從哈希函數的輸出值不可反向推導出原始的數據。

哈希衝突

重複一遍,哈希函數是將任意大小的數據映射到固定大小值的函數。那麼,咱們能夠預見到,即便哈希函數設計得足夠優秀,幾乎每一個輸入值都能映射爲不一樣的哈希值。可是,當輸入數據足夠大,大到能超過固定大小值的組合能表達的最大數量數,衝突將不可避免!(參見抽屜原理)

image

抽屜原理:桌上有十個蘋果,要把這十個蘋果放到九個抽屜裏,不管怎樣放,咱們會發現至少會有一個抽屜裏面放很多於兩個蘋果。抽屜原理有時也被稱爲鴿巢原理。

如何解決哈希衝突

比較經常使用的Has衝突解決方案有鏈地址法和開放尋址法。

在講鏈地址法以前,先說明兩個概念。

  1. 哈希桶。哈希桶(也稱爲槽,相似於抽屜原理中的一個抽屜)能夠先簡單理解爲一個哈希值,全部的哈希值組成了哈希空間。
  2. 裝載因子。裝載因子是表示哈希表中元素的填滿程度。它的計算公式:裝載因子=填入哈希表中的元素個數/哈希表的長度。裝載因子越大,填入的元素越多,空間利用率就越高,但發生哈希衝突的概率就變大。反之,裝載因子越小,填入的元素越少,衝突發生的概率減少,但空間浪費也會變得更多,並且還會提升擴容操做的次數。裝載因子也是決定哈希表是否進行擴容的關鍵指標,在java的HashMap的中,其默認裝載因子爲0.75;Python的dict默認裝載因子爲2/3。
鏈地址法

鏈地址法的思想就是將映射在一個桶裏的全部元素用鏈表串起來。

下面以一個簡單的哈希函數 H(key) = key MOD 7(除數取餘法)對一組元素 [50, 700, 76, 85, 92, 73, 101] 進行映射,經過圖示來理解鏈地址法處理Hash衝突的處理邏輯。

image
鏈地址法解決衝突的方式與圖的鄰接表存儲方式在樣式上很類似,發生衝突,就用單鏈表組織起來。

開放尋址法

對於鏈地址法而言,槽位數m與鍵的數目n是沒有直接關係的。可是對於開放尋址法而言,全部的元素都是存儲在Hash表當中的,因此不管任什麼時候候都要保證哈希表的槽位數m大於或等於鍵的數據n(必要時,須要對哈希表進行動態擴容)。

開放尋址法有多種方式:線性探測法、平方探測法、隨機探測法和雙重哈希法。這裏以線性探測法來幫助讀者理解開放尋址法思想。

  • 線性探測法

Hash(key) 表示關鍵字 key 的哈希值, 表示哈希表的槽位數(哈希表的大小)。

線性探測法則能夠表示爲:

若是 Hash(x) % M 已經有數據,則嘗試 (Hash(x) + 1) % M ;

若是 (Hash(x) + 1) % M 也有數據了,則嘗試 (Hash(x) + 2) % M ;

若是 (Hash(x) + 2) % M 也有數據了,則嘗試 (Hash(x) + 3) % M ;

......

咱們一樣以哈希函數 H(key) = key MOD 7 (除數取餘法)對 [50, 700, 76, 85, 92, 73, 101] 進行映射,經過圖示來理解線性探測法處理 Hash 碰撞。

image

其中,empty表明槽位爲空,occupied表明槽位已被佔(後續映射到該槽,則須要線性向下繼續探測),而lazy delete則表明將槽位裏面的數據清除,並不釋放存儲空間。

兩種解決方案比較

對於開放尋址法而言,它只有數組一種數據結構就可完成存儲,繼承了數組的優勢,對CPU緩存友好,易於序列化操做。可是它對內存的利用率不如鏈地址法,且發生衝突時代價更高。當數據量明確、裝載因子小,適合採用開放尋址法。

鏈表節點能夠在須要時再建立,沒必要像開放尋址法那樣事先申請好足夠內存,所以鏈地址法對於內存的利用率會比開方尋址法高。鏈地址法對裝載因子的容忍度會更高,而且適合存儲大對象、大數據量的哈希表。並且相較於開放尋址法,它更加靈活,支持更多的優化策略,好比可採用紅黑樹代替鏈表。可是鏈地址法須要額外的空間來存儲指針。

值得一提的是,在Python中dict在發生哈希衝突時採用的開放尋址法,而java的HashMap採用的是鏈地址法。

Go Map實現

同python與java同樣,Go語言中的map是也基於哈希表實現的,它解決哈希衝突的方式是鏈地址法,即經過使用數組+鏈表的數據結構來表達map。

注意:本文後續出現的map統一代指Go中實現的map類型。

map數據結構

map中的數據被存放於一個數組中的,數組的元素是桶(bucket),每一個桶至多包含8個鍵值對數據。哈希值低位(low-order bits)用於選擇桶,哈希值高位(high-order bits)用於在一個獨立的桶中區別出鍵。哈希值高低位示意圖以下

image

本文基於go 1.15.2 darwin/amd64分析,源碼位於src/runtime/map.go.

  • map的結構體爲hmap
// A header for a Go map.
type hmap struct {
    count     int // 表明哈希表中的元素個數,調用len(map)時,返回的就是該字段值。
    flags     uint8 // 狀態標誌,下文常量中會解釋四種狀態位含義。
    B         uint8  // buckets(桶)的對數log_2(哈希表元素數量最大可達到裝載因子*2^B)
    noverflow uint16 // 溢出桶的大概數量。
    hash0     uint32 // 哈希種子。

    buckets    unsafe.Pointer // 指向buckets數組的指針,數組大小爲2^B,若是元素個數爲0,它爲nil。
    oldbuckets unsafe.Pointer // 若是發生擴容,oldbuckets是指向老的buckets數組的指針,老的buckets數組大小是新的buckets的1/2。非擴容狀態下,它爲nil。
    nevacuate  uintptr        // 表示擴容進度,小於此地址的buckets表明已搬遷完成。

    extra *mapextra // 這個字段是爲了優化GC掃描而設計的。當key和value均不包含指針,而且均可以inline時使用。extra是指向mapextra類型的指針。
  • mapextra的結構體
// mapextra holds fields that are not present on all maps.
type mapextra struct {
    // 若是 key 和 value 都不包含指針,而且能夠被 inline(<=128 字節)
    // 就使用 hmap的extra字段 來存儲 overflow buckets,這樣能夠避免 GC 掃描整個 map
    // 然而 bmap.overflow 也是個指針。這時候咱們只能把這些 overflow 的指針
    // 都放在 hmap.extra.overflow 和 hmap.extra.oldoverflow 中了
    // overflow 包含的是 hmap.buckets 的 overflow 的 buckets
    // oldoverflow 包含擴容時的 hmap.oldbuckets 的 overflow 的 bucket
    overflow    *[]*bmap
    oldoverflow *[]*bmap

    // 指向空閒的 overflow bucket 的指針
    nextOverflow *bmap
}
  • bmap結構體
// A bucket for a Go map.
type bmap struct {
    // tophash包含此桶中每一個鍵的哈希值最高字節(高8位)信息(也就是前面所述的high-order bits)。
    // 若是tophash[0] < minTopHash,tophash[0]則表明桶的搬遷(evacuation)狀態。
    tophash [bucketCnt]uint8
}

bmap也就是bucket(桶)的內存模型圖解以下(相關代碼邏輯可查看src/cmd/compile/internal/gc/reflect.go中的bmap()函數)。

image

在以上圖解示例中,該桶的第7位cell和第8位cell還未有對應鍵值對。須要注意的是,key和value是各自存儲起來的,並不是想象中的key/value/key/value...的形式。這樣作雖然會讓代碼組織稍顯複雜,可是它的好處是能讓咱們消除例如map[int64]int所須要的填充(padding)。此外,在8個鍵值對數據後面有一個overflow指針,由於桶中最多隻能裝8個鍵值對,若是有多餘的鍵值對落到了當前桶,那麼就須要再構建一個桶(稱爲溢出桶),經過overflow指針連接起來。

  • 重要常量標誌
const (
    // 一個桶中最多能裝載的鍵值對(key-value)的個數爲8
    bucketCntBits = 3
    bucketCnt     = 1 << bucketCntBits

  // 觸發擴容的裝載因子爲13/2=6.5
    loadFactorNum = 13
    loadFactorDen = 2

    // 鍵和值超過128個字節,就會被轉換爲指針
    maxKeySize  = 128
    maxElemSize = 128

    // 數據偏移量應該是bmap結構體的大小,它須要正確地對齊。
  // 對於amd64p32而言,這意味着:即便指針是32位的,也是64位對齊。
    dataOffset = unsafe.Offsetof(struct {
        b bmap
        v int64
    }{}.v)


  // 每一個桶(若是有溢出,則包含它的overflow的連接桶)在搬遷完成狀態(evacuated* states)下,要麼會包含它全部的鍵值對,要麼一個都不包含(但不包括調用evacuate()方法階段,該方法調用只會在對map發起write時發生,在該階段其餘goroutine是沒法查看該map的)。簡單的說,桶裏的數據要麼一塊兒搬走,要麼一個都還未搬。
  // tophash除了放置正常的高8位hash值,還會存儲一些特殊狀態值(標誌該cell的搬遷狀態)。正常的tophash值,最小應該是5,如下列出的就是一些特殊狀態值。
    emptyRest      = 0 // 表示cell爲空,而且比它高索引位的cell或者overflows中的cell都是空的。(初始化bucket時,就是該狀態)
    emptyOne       = 1 // 空的cell,cell已經被搬遷到新的bucket
    evacuatedX     = 2 // 鍵值對已經搬遷完畢,key在新buckets數組的前半部分
    evacuatedY     = 3 // 鍵值對已經搬遷完畢,key在新buckets數組的後半部分
    evacuatedEmpty = 4 // cell爲空,整個bucket已經搬遷完畢
    minTopHash     = 5 // tophash的最小正常值

    // flags
    iterator     = 1 // 可能有迭代器在使用buckets
    oldIterator  = 2 // 可能有迭代器在使用oldbuckets
    hashWriting  = 4 // 有協程正在向map寫人key
    sameSizeGrow = 8 // 等量擴容

    // 用於迭代器檢查的bucket ID
    noCheck = 1<<(8*sys.PtrSize) - 1
)

綜上,咱們以B等於4爲例,展現一個完整的map結構圖。

image

建立map

map初始化有如下兩種方式

make(map[k]v)
// 指定初始化map大小爲hint
make(map[k]v, hint)

對於不指定初始化大小,和初始化值hint<=8(bucketCnt)時,go會調用makemap_small函數(源碼位置src/runtime/map.go),並直接從堆上進行分配。

func makemap_small() *hmap {
    h := new(hmap)
    h.hash0 = fastrand()
    return h
}

當hint>8時,則調用makemap函數

// 若是編譯器認爲map和第一個bucket能夠直接建立在棧上,h和bucket可能都是非空
// 若是h != nil,那麼map能夠直接在h中建立
// 若是h.buckets != nil,那麼h指向的bucket能夠做爲map的第一個bucket使用
func makemap(t *maptype, hint int, h *hmap) *hmap {
  // math.MulUintptr返回hint與t.bucket.size的乘積,並判斷該乘積是否溢出。
    mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
// maxAlloc的值,根據平臺系統的差別而不一樣,具體計算方式參照src/runtime/malloc.go
    if overflow || mem > maxAlloc {
        hint = 0
    }

// initialize Hmap
    if h == nil {
        h = new(hmap)
    }
  // 經過fastrand獲得哈希種子
    h.hash0 = fastrand()

    // 根據輸入的元素個數hint,找到能裝下這些元素的B值
    B := uint8(0)
    for overLoadFactor(hint, B) {
        B++
    }
    h.B = B

    // 分配初始哈希表
  // 若是B爲0,那麼buckets字段後續會在mapassign方法中lazily分配
    if h.B != 0 {
        var nextOverflow *bmap
    // makeBucketArray建立一個map的底層保存buckets的數組,它最少會分配h.B^2的大小。
        h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
        if nextOverflow != nil {
    h.extra = new(mapextra)
            h.extra.nextOverflow = nextOverflow
        }
    }

    return h
}

分配buckets數組的makeBucketArray函數

// makeBucket爲map建立用於保存buckets的數組。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
    base := bucketShift(b)
    nbuckets := base
  // 對於小的b值(小於4),即桶的數量小於16時,使用溢出桶的可能性很小。對於此狀況,就避免計算開銷。
    if b >= 4 {
    // 當桶的數量大於等於16個時,正常狀況下就會額外建立2^(b-4)個溢出桶
        nbuckets += bucketShift(b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }

 // 這裏,dirtyalloc分兩種狀況。若是它爲nil,則會分配一個新的底層數組。若是它不爲nil,則它指向的是曾經分配過的底層數組,該底層數組是由以前一樣的t和b參數經過makeBucketArray分配的,若是數組不爲空,須要把該數組以前的數據清空並複用。
    if dirtyalloc == nil {
        buckets = newarray(t.bucket, int(nbuckets))
    } else {
        buckets = dirtyalloc
        size := t.bucket.size * nbuckets
        if t.bucket.ptrdata != 0 {
            memclrHasPointers(buckets, size)
        } else {
            memclrNoHeapPointers(buckets, size)
        }
}

  // 即b大於等於4的狀況下,會預分配一些溢出桶。
  // 爲了把跟蹤這些溢出桶的開銷降至最低,使用瞭如下約定:
  // 若是預分配的溢出桶的overflow指針爲nil,那麼能夠經過指針碰撞(bumping the pointer)得到更多可用桶。
  // (關於指針碰撞:假設內存是絕對規整的,全部用過的內存都放在一邊,空閒的內存放在另外一邊,中間放着一個指針做爲分界點的指示器,那所分配內存就僅僅是把那個指針向空閒空間那邊挪動一段與對象大小相等的距離,這種分配方式稱爲「指針碰撞」)
  // 對於最後一個溢出桶,須要一個安全的非nil指針指向它。
    if base != nbuckets {
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}

根據上述代碼,咱們能肯定在正常狀況下,正常桶和溢出桶在內存中的存儲空間是連續的,只是被 hmap 中的不一樣字段引用而已。

哈希函數

在初始化go程序運行環境時(src/runtime/proc.go中的schedinit),就須要經過alginit方法完成對哈希的初始化。

func schedinit() {
    lockInit(&sched.lock, lockRankSched)
    
    ...
    
    tracebackinit()
    moduledataverify()
    stackinit()
    mallocinit()
    fastrandinit() // must run before mcommoninit
    mcommoninit(_g_.m, -1)
    cpuinit()       // must run before alginit
    // 這裏調用alginit()
    alginit()       // maps must not be used before this call
    modulesinit()   // provides activeModules
    typelinksinit() // uses maps, activeModules
    itabsinit()     // uses activeModules
    
    ...

    goargs()
    goenvs()
    parsedebugvars()
    gcinit()
  
  ...
}

對於哈希算法的選擇,程序會根據當前架構判斷是否支持AES,若是支持就使用AES hash,其實現代碼位於src/runtime/asm_{386,amd64,arm64}.s中;若不支持,其hash算法則根據xxhash算法(https://code.google.com/p/xxh...)和cityhash算法(https://code.google.com/p/cit...)啓發而來,代碼分別對應於32位(src/runtime/hash32.go)和64位機器(src/runtime/hash32.go)中,對這部份內容感興趣的讀者能夠深刻研究。

func alginit() {
    // Install AES hash algorithms if the instructions needed are present.
    if (GOARCH == "386" || GOARCH == "amd64") &&
        cpu.X86.HasAES && // AESENC
        cpu.X86.HasSSSE3 && // PSHUFB
        cpu.X86.HasSSE41 { // PINSR{D,Q}
        initAlgAES()
        return
    }
    if GOARCH == "arm64" && cpu.ARM64.HasAES {
        initAlgAES()
        return
    }
    getRandomData((*[len(hashkey) * sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])
    hashkey[0] |= 1 // make sure these numbers are odd
    hashkey[1] |= 1
    hashkey[2] |= 1
    hashkey[3] |= 1
}

上文在建立map的時候,咱們能夠知道map的哈希種子是經過h.hash0 = fastrand()獲得的。它是在如下maptype中的hasher中被使用到,在下文內容中會看到hash值的生成。

type maptype struct {
    typ    _type
    key    *_type
    elem   *_type
    bucket *_type
  // hasher的第一個參數就是指向key的指針,h.hash0 = fastrand()獲得的hash0,就是hasher方法的第二個參數。
  // hasher方法返回的就是hash值。
    hasher     func(unsafe.Pointer, uintptr) uintptr
    keysize    uint8  // size of key slot
    elemsize   uint8  // size of elem slot
    bucketsize uint16 // size of bucket
    flags      uint32
}

map操做

假定key通過哈希計算後獲得64bit位的哈希值。若是B=5,buckets數組的長度,即桶的數量是32(2的5次方)。

例如,現要置一key於map中,該key通過哈希後,獲得的哈希值以下:

image

前面咱們知道,哈希值低位(low-order bits)用於選擇桶,哈希值高位(high-order bits)用於在一個獨立的桶中區別出鍵。當B等於5時,那麼咱們選擇的哈希值低位也是5位,即01010,它的十進制值爲10,表明10號桶。再用哈希值的高8位,找到此key在桶中的位置。最開始桶中尚未key,那麼新加入的key和value就會被放入第一個key空位和value空位。

注意:對於高低位的選擇,該操做的實質是取餘,可是取餘開銷很大,在實際代碼實現中採用的是位操做。如下是tophash的實現代碼。

func tophash(hash uintptr) uint8 {
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }
    return top
}

當兩個不一樣的key落在了同一個桶中,這時就發生了哈希衝突。go的解決方式是鏈地址法(這裏爲了讓讀者更好理解,只描述非擴容且該key是第一次添加的狀況):在桶中按照順序尋到第一個空位並記錄下來,後續在該桶和它的溢出桶中均未發現存在的該key,將key置於第一個空位;不然,去該桶的溢出桶中尋找空位,若是沒有溢出桶,則添加溢出桶,並將其置溢出桶的第一個空位(由於是第一次添加,因此不描述已存在該key的狀況)。

image

上圖中的B值爲5,因此桶的數量爲32。經過哈希函數計算出待插入key的哈希值,低5位哈希00110,對應於6號桶;高8位10010111,十進制爲151,因爲桶中前6個cell已經有正常哈希值填充了(遍歷),因此將151對應的高位哈希值放置於第7位cell,對應將key和value分別置於相應的第七個空位。

若是是查找key,那麼咱們會根據高位哈希值去桶中的每一個cell中找,若在桶中沒找到,而且overflow不爲nil,那麼繼續去溢出桶中尋找,直至找到,若是全部的cell都找過了,還未找到,則返回key類型的默認值(例如key是int類型,則返回0)。

查找key

對於map的元素查找,其源碼實現以下

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  // 若是開啓了競態檢測 -race
    if raceenabled && h != nil {
        callerpc := getcallerpc()
        pc := funcPC(mapaccess1)
        racereadpc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
  // 若是開啓了memory sanitizer -msan
    if msanenabled && h != nil {
        msanread(key, t.key.size)
    }
  // 若是map爲空或者元素個數爲0,返回零值
    if h == nil || h.count == 0 {
        if t.hashMightPanic() {
            t.hasher(key, 0) // see issue 23734
        }
        return unsafe.Pointer(&zeroVal[0])
    }
  // 注意,這裏是按位與操做
  // 當h.flags對應的值爲hashWriting(表明有其餘goroutine正在往map中寫key)時,那麼位計算的結果不爲0,所以拋出如下錯誤。
  // 這也代表,go的map是非併發安全的
    if h.flags&hashWriting != 0 {
        throw("concurrent map read and map write")
    }
  // 不一樣類型的key,會使用不一樣的hash算法,可詳見src/runtime/alg.go中typehash函數中的邏輯
    hash := t.hasher(key, uintptr(h.hash0))
    m := bucketMask(h.B)
  // 按位與操做,找到對應的bucket
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
  // 若是oldbuckets不爲空,那麼證實map發生了擴容
  // 若是有擴容發生,老的buckets中的數據可能還未搬遷至新的buckets裏
  // 因此須要先在老的buckets中找
    if c := h.oldbuckets; c != nil {
        if !h.sameSizeGrow() {
            m >>= 1
        }
        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
    // 若是在oldbuckets中tophash[0]的值,爲evacuatedX、evacuatedY,evacuatedEmpty其中之一
    // 則evacuated()返回爲true,表明搬遷完成。
    // 所以,只有當搬遷未完成時,纔會今後oldbucket中遍歷
        if !evacuated(oldb) {
            b = oldb
        }
    }
  // 取出當前key值的tophash值
    top := tophash(hash)
  // 如下是查找的核心邏輯
  // 雙重循環遍歷:外層循環是從桶到溢出桶遍歷;內層是桶中的cell遍歷
  // 跳出循環的條件有三種:第一種是已經找到key值;第二種是當前桶再無溢出桶;
  // 第三種是當前桶中有cell位的tophash值是emptyRest,這個值在前面解釋過,它表明此時的桶後面的cell還未利用,因此無需再繼續遍歷。
bucketloop:
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
      // 判斷tophash值是否相等
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
      }
      // 由於在bucket中key是用連續的存儲空間存儲的,所以能夠經過bucket地址+數據偏移量(bmap結構體的大小)+ keysize的大小,獲得k的地址
      // 同理,value的地址也是類似的計算方法,只是再要加上8個keysize的內存地址
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
      // 判斷key是否相等
            if t.key.equal(key, k) {
                e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                if t.indirectelem() {
                    e = *((*unsafe.Pointer)(e))
                }
                return e
            }
        }
    }
  // 全部的bucket都未找到,則返回零值
    return unsafe.Pointer(&zeroVal[0])
}

如下是mapaccess1的查找過程圖解

image

map的元素查找,對應go代碼有兩種形式

// 形式一
    v := m[k]
    // 形式二
    v, ok := m[k]

形式一的代碼實現,就是上述的mapaccess1方法。此外,在源碼中還有個mapaccess2方法,它的函數簽名以下。

func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {}

與mapaccess1相比,mapaccess2多了一個bool類型的返回值,它表明的是是否在map中找到了對應的key。由於和mapaccess1基本一致,因此詳細代碼就再也不貼出。

同時,源碼中還有mapaccessK方法,它的函數簽名以下。

func mapaccessK(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, unsafe.Pointer) {}

與mapaccess1相比,mapaccessK同時返回了key和value,其代碼邏輯也一致。

賦值key

對於寫入key的邏輯,其源碼實現以下

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  // 若是h是空指針,賦值會引發panic
  // 例如如下語句
  // var m map[string]int
    // m["k"] = 1
    if h == nil {
        panic(plainError("assignment to entry in nil map"))
    }
  // 若是開啓了競態檢測 -race
    if raceenabled {
        callerpc := getcallerpc()
        pc := funcPC(mapassign)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
  // 若是開啓了memory sanitizer -msan
    if msanenabled {
        msanread(key, t.key.size)
    }
  // 有其餘goroutine正在往map中寫key,會拋出如下錯誤
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }
  // 經過key和哈希種子,算出對應哈希值
    hash := t.hasher(key, uintptr(h.hash0))

  // 將flags的值與hashWriting作按位或運算
  // 由於在當前goroutine可能還未完成key的寫入,再次調用t.hasher會發生panic。
    h.flags ^= hashWriting

    if h.buckets == nil {
        h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}

again:
  // bucketMask返回值是2的B次方減1
  // 所以,經過hash值與bucketMask返回值作按位與操做,返回的在buckets數組中的第幾號桶
    bucket := hash & bucketMask(h.B)
  // 若是map正在搬遷(即h.oldbuckets != nil)中,則先進行搬遷工做。
    if h.growing() {
        growWork(t, h, bucket)
    }
  // 計算出上面求出的第幾號bucket的內存位置
  // post = start + bucketNumber * bucketsize
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    top := tophash(hash)

    var inserti *uint8
    var insertk unsafe.Pointer
    var elem unsafe.Pointer
bucketloop:
    for {
    // 遍歷桶中的8個cell
        for i := uintptr(0); i < bucketCnt; i++ {
      // 這裏分兩種狀況,第一種狀況是cell位的tophash值和當前tophash值不相等
      // 在 b.tophash[i] != top 的狀況下
      // 理論上有可能會是一個空槽位
      // 通常狀況下 map 的槽位分佈是這樣的,e 表示 empty:
      // [h0][h1][h2][h3][h4][e][e][e]
      // 但在執行過 delete 操做時,可能會變成這樣:
      // [h0][h1][e][e][h5][e][e][e]
      // 因此若是再插入的話,會盡可能往前面的位置插
      // [h0][h1][e][e][h5][e][e][e]
      //          ^
      //          ^
      //       這個位置
      // 因此在循環的時候還要順便把前面的空位置先記下來
      // 由於有可能在後面會找到相等的key,也可能找不到相等的key
            if b.tophash[i] != top {
        // 若是cell位爲空,那麼就能夠在對應位置進行插入
                if isEmpty(b.tophash[i]) && inserti == nil {
                    inserti = &b.tophash[i]
                    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
                    elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                }
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
      // 第二種狀況是cell位的tophash值和當前的tophash值相等
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey() {
                k = *((*unsafe.Pointer)(k))
            }
      // 注意,即便當前cell位的tophash值相等,不必定它對應的key也是相等的,因此還要作一個key值判斷
            if !t.key.equal(key, k) {
                continue
            }
            // 若是已經有該key了,就更新它
            if t.needkeyupdate() {
                typedmemmove(t.key, k, key)
            }
      // 這裏獲取到了要插入key對應的value的內存地址
      // pos = start + dataOffset + 8*keysize + i*elemsize
            elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
      // 若是順利到這,就直接跳到done的結束邏輯中去
            goto done
        }
    // 若是桶中的8個cell遍歷完,還未找到對應的空cell或覆蓋cell,那麼就進入它的溢出桶中去遍歷
        ovf := b.overflow(t)
    // 若是連溢出桶中都沒有找到合適的cell,跳出循環。
        if ovf == nil {
            break
        }
        b = ovf
    }

    // 在已有的桶和溢出桶中都未找到合適的cell供key寫入,那麼有可能會觸發如下兩種狀況
  // 狀況一:
  // 判斷當前map的裝載因子是否達到設定的6.5閾值,或者當前map的溢出桶數量是否過多。若是存在這兩種狀況之一,則進行擴容操做。
  // hashGrow()實際並未完成擴容,對哈希表數據的搬遷(複製)操做是經過growWork()來完成的。
  // 從新跳入again邏輯,在進行完growWork()操做後,再次遍歷新的桶。
    if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        hashGrow(t, h)
        goto again // Growing the table invalidates everything, so try again
    }

  // 狀況二:
// 在不知足狀況一的條件下,會爲當前桶再新建溢出桶,並將tophash,key插入到新建溢出桶的對應內存的0號位置
    if inserti == nil {
        // all current buckets are full, allocate a new one.
        newb := h.newoverflow(t, b)
        inserti = &newb.tophash[0]
        insertk = add(unsafe.Pointer(newb), dataOffset)
        elem = add(insertk, bucketCnt*uintptr(t.keysize))
    }

  // 在插入位置存入新的key和value
    if t.indirectkey() {
        kmem := newobject(t.key)
        *(*unsafe.Pointer)(insertk) = kmem
        insertk = kmem
    }
    if t.indirectelem() {
        vmem := newobject(t.elem)
        *(*unsafe.Pointer)(elem) = vmem
    }
    typedmemmove(t.key, insertk, key)
    *inserti = top
  // map中的key數量+1
    h.count++

done:
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
    if t.indirectelem() {
        elem = *((*unsafe.Pointer)(elem))
    }
    return elem
}

經過對mapassign的代碼分析以後,發現該函數並無將插入key對應的value寫入對應的內存,而是返回了value應該插入的內存地址。爲了弄清楚value寫入內存的操做是發生在何時,分析以下map.go代碼。

package main

func main() {
    m := make(map[int]int)
    for i := 0; i < 100; i++ {
        m[i] = 666
    }
}

m[i] = 666對應的彙編代碼

$ go tool compile -S map.go
...
        0x0098 00152 (map.go:6) LEAQ    type.map[int]int(SB), CX
        0x009f 00159 (map.go:6) MOVQ    CX, (SP)
        0x00a3 00163 (map.go:6) LEAQ    ""..autotmp_2+184(SP), DX
        0x00ab 00171 (map.go:6) MOVQ    DX, 8(SP)
        0x00b0 00176 (map.go:6) MOVQ    AX, 16(SP)
        0x00b5 00181 (map.go:6) CALL    runtime.mapassign_fast64(SB) // 調用函數runtime.mapassign_fast64,該函數實質就是mapassign(上文示例源代碼是該mapassign系列的通用邏輯)
        0x00ba 00186 (map.go:6) MOVQ    24(SP), AX 24(SP), AX // 返回值,即 value 應該存放的內存地址
        0x00bf 00191 (map.go:6) MOVQ    $666, (AX) // 把 666 放入該地址中
...

賦值的最後一步其實是編譯器額外生成的彙編指令來完成的,可見靠 runtime 有些工做是沒有作完的。因此,在go中,編譯器和 runtime 配合,才能完成一些複雜的工做。同時說明,在平時學習go的源代碼實現時,必要時還須要看一些彙編代碼。

刪除key

理解了賦值key的邏輯,刪除key的邏輯就比較簡單了。本文就再也不討論該部份內容了,讀者感興趣能夠自行查看src/runtime/map.go的mapdelete方法邏輯。

遍歷map

結論:迭代 map 的結果是無序的

m := make(map[int]int)
    for i := 0; i < 10; i++ {
        m[i] = i
    }
    for k, v := range m {
        fmt.Println(k, v)
    }

運行以上代碼,咱們會發現每次輸出順序都是不一樣的。

map遍歷的過程,是按序遍歷bucket,同時按需遍歷bucket中和其overflow bucket中的cell。可是map在擴容後,會發生key的搬遷,這形成原來落在一個bucket中的key,搬遷後,有可能會落到其餘bucket中了,從這個角度看,遍歷map的結果就不多是按照原來的順序了(詳見下文的map擴容內容)。

但其實,go爲了保證遍歷map的結果是無序的,作了如下事情:map在遍歷時,並非從固定的0號bucket開始遍歷的,每次遍歷,都會從一個隨機值序號的bucket,再從其中隨機的cell開始遍歷。而後再按照桶序遍歷下去,直到回到起始桶結束。

image

上圖的例子,是遍歷一個處於未擴容狀態的map。若是map正處於擴容狀態時,須要先判斷當前遍歷bucket是否已經完成搬遷,若是數據還在老的bucket,那麼就去老bucket中拿數據。

注意:在下文中會講解到增量擴容和等量擴容。當發生了增量擴容時,一個老的bucket數據可能會分裂到兩個不一樣的bucket中去,那麼此時,若是須要從老的bucket中遍歷數據,例如1號,則不能將老1號bucket中的數據所有取出,僅僅只能取出老 1 號 bucket 中那些在裂變以後,分配到新 1 號 bucket 中的那些 key(這個內容,請讀者看完下文map擴容的講解以後再回頭理解)。

鑑於篇幅緣由,本文再也不對map遍歷的詳細源碼進行註釋貼出。讀者可自行查看源碼src/runtime/map.go的mapiterinit()mapiternext()方法邏輯。

這裏註釋一下mapiterinit()中隨機保證的關鍵代碼

// 生成隨機數
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
   r += uintptr(fastrand()) << 31
}
// 決定了從哪一個隨機的bucket開始
it.startBucket = r & bucketMask(h.B)
// 決定了每一個bucket中隨機的cell的位置
it.offset = uint8(r >> h.B & (bucketCnt - 1))

map擴容

在文中講解裝載因子時,咱們提到裝載因子是決定哈希表是否進行擴容的關鍵指標。在go的map擴容中,除了裝載因子會決定是否須要擴容,溢出桶的數量也是擴容的另外一關鍵指標。

爲了保證訪問效率,當map將要添加、修改或刪除key時,都會檢查是否須要擴容,擴容其實是以空間換時間的手段。在以前源碼mapassign中,其實已經註釋map擴容條件,主要是兩點:

  1. 判斷已經達到裝載因子的臨界點,即元素個數 >= 桶(bucket)總數 * 6.5,這時候說明大部分的桶可能都快滿了(即平均每一個桶存儲的鍵值對達到6.5個),若是插入新元素,有大機率須要掛在溢出桶(overflow bucket)上。
func overLoadFactor(count int, B uint8) bool {
    return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
  1. 判斷溢出桶是否太多,當桶總數 < 2 ^ 15 時,若是溢出桶總數 >= 桶總數,則認爲溢出桶過多。當桶總數 >= 2 ^ 15 時,直接與 2 ^ 15 比較,當溢出桶總數 >= 2 ^ 15 時,即認爲溢出桶太多了。
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
    if B > 15 {
        B = 15
    }
    return noverflow >= uint16(1)<<(B&15)
}

對於第2點,其實算是對第 1 點的補充。由於在裝載因子比較小的狀況下,有可能 map 的查找和插入效率也很低,而第 1 點識別不出來這種狀況。表面現象就是計算裝載因子的分子比較小,即 map 裏元素總數少,可是桶數量多(真實分配的桶數量多,包括大量的溢出桶)。

在某些場景下,好比不斷的增刪,這樣會形成overflow的bucket數量增多,但負載因子又不高,未達不到第 1 點的臨界值,就不能觸發擴容來緩解這種狀況。這樣會形成桶的使用率不高,值存儲得比較稀疏,查找插入效率會變得很是低,所以有了第 2 點判斷指標。這就像是一座空城,房子不少,可是住戶不多,都分散了,找起人來很困難。

image

如上圖所示,因爲對map的不斷增刪,以0號bucket爲例,該桶鏈中就形成了大量的稀疏桶。

兩種狀況官方採用了不一樣的解決方案

  • 針對 1,將 B + 1,新建一個buckets數組,新的buckets大小是原來的2倍,而後舊buckets數據搬遷到新的buckets。該方法咱們稱之爲增量擴容
  • 針對 2,並不擴大容量,buckets數量維持不變,從新作一遍相似增量擴容的搬遷動做,把鬆散的鍵值對從新排列一次,以使bucket的使用率更高,進而保證更快的存取。該方法咱們稱之爲等量擴容

對於 2 的解決方案,其實存在一個極端的狀況:若是插入 map 的 key 哈希都同樣,那麼它們就會落到同一個 bucket 裏,超過 8 個就會產生 overflow bucket,結果也會形成 overflow bucket 數過多。移動元素其實解決不了問題,由於這時整個哈希表已經退化成了一個鏈表,操做效率變成了 O(n)。但 Go 的每個 map 都會在初始化階段的 makemap時定一個隨機的哈希種子,因此要構造這種衝突是沒那麼容易的。

在源碼中,和擴容相關的主要是hashGrow()函數與growWork()函數。hashGrow() 函數實際上並無真正地「搬遷」,它只是分配好了新的 buckets,並將老的 buckets 掛到了 oldbuckets 字段上。真正搬遷 buckets 的動做在 growWork() 函數中,而調用 growWork() 函數的動做是在 mapassign()mapdelete() 函數中。也就是插入(包括修改)、刪除 key 的時候,都會嘗試進行搬遷 buckets 的工做。它們會先檢查 oldbuckets 是否搬遷完畢(檢查 oldbuckets 是否爲 nil),再決定是否進行搬遷工做。

hashGrow()函數

func hashGrow(t *maptype, h *hmap) {
  // 若是達到條件 1,那麼將B值加1,至關因而原來的2倍
  // 不然對應條件 2,進行等量擴容,因此 B 不變
    bigger := uint8(1)
    if !overLoadFactor(h.count+1, h.B) {
        bigger = 0
        h.flags |= sameSizeGrow
    }
  // 記錄老的buckets
    oldbuckets := h.buckets
  // 申請新的buckets空間
    newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
  // 注意&^ 運算符,這塊代碼的邏輯是轉移標誌位
    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator != 0 {
        flags |= oldIterator
    }
    // 提交grow (atomic wrt gc)
    h.B += bigger
    h.flags = flags
    h.oldbuckets = oldbuckets
    h.buckets = newbuckets
  // 搬遷進度爲0
    h.nevacuate = 0
  // overflow buckets 數爲0
    h.noverflow = 0

  // 若是發現hmap是經過extra字段 來存儲 overflow buckets時
    if h.extra != nil && h.extra.overflow != nil {
        if h.extra.oldoverflow != nil {
            throw("oldoverflow is not nil")
        }
        h.extra.oldoverflow = h.extra.overflow
        h.extra.overflow = nil
    }
    if nextOverflow != nil {
        if h.extra == nil {
            h.extra = new(mapextra)
        }
        h.extra.nextOverflow = nextOverflow
    }
}

growWork()函數

func growWork(t *maptype, h *hmap, bucket uintptr) {
  // 爲了確認搬遷的 bucket 是咱們正在使用的 bucket
  // 即若是當前key映射到老的bucket1,那麼就搬遷該bucket1。
    evacuate(t, h, bucket&h.oldbucketmask())

    // 若是還未完成擴容工做,則再搬遷一個bucket。
    if h.growing() {
        evacuate(t, h, h.nevacuate)
    }
}

growWork()函數能夠知道,搬遷的核心邏輯是evacuate()函數。這裏讀者能夠思考一個問題:爲何每次至多搬遷2個bucket?這實際上是一種性能考量,若是map存儲了數以億計的key-value,一次性搬遷將會形成比較大的延時,所以才採用逐步搬遷策略。

在講解該邏輯以前,須要讀者先理解如下兩個知識點。

  • 知識點1:bucket序號的變化

前面講到,增量擴容(條件1)和等量擴容(條件2)都須要進行bucket的搬遷工做。對於等量擴容而言,因爲buckets的數量不變,所以能夠按照序號來搬遷。例如老的的0號bucket,仍然搬至新的0號bucket中。

image

可是,對於增量擴容而言,就會有所不一樣。例如原來的B=5,那麼增量擴容時,B就會變成6。那麼決定key值落入哪一個bucket的低位哈希值就會發生變化(從取5位變爲取6位),取新的低位hash值得過程稱爲rehash。

image

所以,在增量擴容中,某個 key 在搬遷先後 bucket 序號可能和原來相等,也多是相比原來加上 2^B(原來的 B 值),取決於低 hash 值第倒數第B+1位是 0 仍是 1。

image

如上圖所示,當原始的B = 3時,舊buckets數組長度爲8,在編號爲2的bucket中,其2號cell和5號cell,它們的低3位哈希值相同(不相同的話,也就不會落在同一個桶中了),可是它們的低4位分別是00十、1010。當發生了增量擴容,2號就會被搬遷到新buckets數組的2號bucket中去,5號被搬遷到新buckets數組的10號bucket中去,它們的桶號差距是2的3次方。

  • 知識點2:肯定搬遷區間

在源碼中,有bucket x 和bucket y的概念,其實就是增量擴容到原來的 2 倍,桶的數量是原來的 2 倍,前一半桶被稱爲bucket x,後一半桶被稱爲bucket y。一個 bucket 中的 key 可能會分裂到兩個桶中去,分別位於bucket x的桶,或bucket y中的桶。因此在搬遷一個 cell 以前,須要知道這個 cell 中的 key 是落到哪一個區間(而對於同一個桶而言,搬遷到bucket x和bucket y桶序號的差異是老的buckets大小,即2^old_B)。

這裏留一個問題:爲何肯定key落在哪一個區間很重要?

image
肯定了要搬遷到的目標 bucket 後,搬遷操做就比較好進行了。將源 key/value 值 copy 到目的地相應的位置。設置 key 在原始 buckets 的 tophash 爲 evacuatedX 或是 evacuatedY,表示已經搬遷到了新 map 的bucket x或是bucket y,新 map 的 tophash 則正常取 key 哈希值的高 8 位。

下面正式解讀搬遷核心代碼evacuate()函數。

evacuate()函數

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
  // 首先定位老的bucket的地址
    b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
  // newbit表明擴容以前老的bucket個數
    newbit := h.noldbuckets()
  // 判斷該bucket是否已經被搬遷
    if !evacuated(b) {
    // 官方TODO,後續版本也許會實現
        // TODO: reuse overflow buckets instead of using new ones, if there
        // is no iterator using the old buckets.  (If !oldIterator.)

    // xy 包含了高低區間的搬遷目的地內存信息
    // x.b 是對應的搬遷目的桶
    // x.k 是指向對應目的桶中存儲當前key的內存地址
    // x.e 是指向對應目的桶中存儲當前value的內存地址
        var xy [2]evacDst
        x := &xy[0]
        x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
        x.k = add(unsafe.Pointer(x.b), dataOffset)
        x.e = add(x.k, bucketCnt*uintptr(t.keysize))

    // 只有當增量擴容時才計算bucket y的相關信息(和後續計算useY相呼應)
        if !h.sameSizeGrow() {
            y := &xy[1]
            y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
            y.k = add(unsafe.Pointer(y.b), dataOffset)
            y.e = add(y.k, bucketCnt*uintptr(t.keysize))
        }

    // evacuate 函數每次只完成一個 bucket 的搬遷工做,所以要遍歷完此 bucket 的全部的 cell,將有值的 cell copy 到新的地方。
    // bucket 還會連接 overflow bucket,它們一樣須要搬遷。
    // 所以一樣會有 2 層循環,外層遍歷 bucket 和 overflow bucket,內層遍歷 bucket 的全部 cell。
    
    // 遍歷當前桶bucket和其以後的溢出桶overflow bucket
    // 注意:初始的b是待搬遷的老bucket
        for ; b != nil; b = b.overflow(t) {
            k := add(unsafe.Pointer(b), dataOffset)
            e := add(k, bucketCnt*uintptr(t.keysize))
      // 遍歷桶中的cell,i,k,e分別用於對應tophash,key和value
            for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
                top := b.tophash[i]
        // 若是當前cell的tophash值是emptyOne或者emptyRest,則表明此cell沒有key。並將其標記爲evacuatedEmpty,表示它「已經被搬遷」。
                if isEmpty(top) {
                    b.tophash[i] = evacuatedEmpty
                    continue
                }
        // 正常不會出現這種狀況
        // 未被搬遷的 cell 只多是emptyOne、emptyRest或是正常的 top hash(大於等於 minTopHash)
                if top < minTopHash {
                    throw("bad map state")
                }
                k2 := k
        // 若是 key 是指針,則解引用
                if t.indirectkey() {
                    k2 = *((*unsafe.Pointer)(k2))
                }
                var useY uint8
        // 若是是增量擴容
                if !h.sameSizeGrow() {
          // 計算哈希值,判斷當前key和vale是要被搬遷到bucket x仍是bucket y
                    hash := t.hasher(k2, uintptr(h.hash0))
                    if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
            // 有一個特殊狀況:有一種 key,每次對它計算 hash,獲得的結果都不同。
            // 這個 key 就是 math.NaN() 的結果,它的含義是 not a number,類型是 float64。
            // 當它做爲 map 的 key時,會遇到一個問題:再次計算它的哈希值和它當初插入 map 時的計算出來的哈希值不同!
            // 這個 key 是永遠不會被 Get 操做獲取的!當使用 m[math.NaN()] 語句的時候,是查不出來結果的。
            // 這個 key 只有在遍歷整個 map 的時候,才能被找到。
            // 而且,能夠向一個 map 插入多個數量的 math.NaN() 做爲 key,它們並不會被互相覆蓋。
            // 當搬遷碰到 math.NaN() 的 key 時,只經過 tophash 的最低位決定分配到 X part 仍是 Y part(若是擴容後是原來 buckets 數量的 2 倍)。若是 tophash 的最低位是 0 ,分配到 X part;若是是 1 ,則分配到 Y part。
                        useY = top & 1
                        top = tophash(hash)
          // 對於正常key,進入如下else邏輯  
                    } else {
                        if hash&newbit != 0 {
                            useY = 1
                        }
                    }
                }

                if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
                    throw("bad evacuatedN")
                }

        // evacuatedX + 1 == evacuatedY
                b.tophash[i] = evacuatedX + useY
        // useY要麼爲0,要麼爲1。這裏就是選取在bucket x的起始內存位置,或者選擇在bucket y的起始內存位置(只有增量同步纔會有這個選擇可能)。
                dst := &xy[useY]

        // 若是目的地的桶已經裝滿了(8個cell),那麼須要新建一個溢出桶,繼續搬遷到溢出桶上去。
                if dst.i == bucketCnt {
                    dst.b = h.newoverflow(t, dst.b)
                    dst.i = 0
                    dst.k = add(unsafe.Pointer(dst.b), dataOffset)
                    dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
                }
                dst.b.tophash[dst.i&(bucketCnt-1)] = top
        // 若是待搬遷的key是指針,則複製指針過去
                if t.indirectkey() {
                    *(*unsafe.Pointer)(dst.k) = k2 // copy pointer
        // 若是待搬遷的key是值,則複製值過去  
                } else {
                    typedmemmove(t.key, dst.k, k) // copy elem
                }
        // value和key同理
                if t.indirectelem() {
                    *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
                } else {
                    typedmemmove(t.elem, dst.e, e)
                }
        // 將當前搬遷目的桶的記錄key/value的索引值(也能夠理解爲cell的索引值)加一
                dst.i++
        // 因爲桶的內存佈局中在最後還有overflow的指針,多以這裏不用擔憂更新有可能會超出key和value數組的指針地址。
                dst.k = add(dst.k, uintptr(t.keysize))
                dst.e = add(dst.e, uintptr(t.elemsize))
            }
        }
    // 若是沒有協程在使用老的桶,就對老的桶進行清理,用於幫助gc
        if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
            b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
      // 只清除bucket 的 key,value 部分,保留 top hash 部分,指示搬遷狀態
            ptr := add(b, dataOffset)
            n := uintptr(t.bucketsize) - dataOffset
            memclrHasPointers(ptr, n)
        }
    }

  // 用於更新搬遷進度
    if oldbucket == h.nevacuate {
        advanceEvacuationMark(h, t, newbit)
    }
}

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
  // 搬遷桶的進度加一
    h.nevacuate++
  // 實驗代表,1024至少會比newbit高出一個數量級(newbit表明擴容以前老的bucket個數)。因此,用當前進度加上1024用於確保O(1)行爲。
    stop := h.nevacuate + 1024
    if stop > newbit {
        stop = newbit
    }
  // 計算已經搬遷完的桶數
    for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
        h.nevacuate++
    }
  // 若是h.nevacuate == newbit,則表明全部的桶都已經搬遷完畢
    if h.nevacuate == newbit {
    // 搬遷完畢,因此指向老的buckets的指針置爲nil
        h.oldbuckets = nil
    // 在講解hmap的結構中,有過說明。若是key和value均不包含指針,則均可以inline。
    // 那麼保存它們的buckets數組實際上是掛在hmap.extra中的。因此,這種狀況下,其實咱們是搬遷的extra的buckets數組。
    // 所以,在這種狀況下,須要在搬遷完畢後,將hmap.extra.oldoverflow指針置爲nil。
        if h.extra != nil {
            h.extra.oldoverflow = nil
        }
    // 最後,清除正在擴容的標誌位,擴容完畢。
        h.flags &^= sameSizeGrow
    }
}

代碼比較長,可是文中註釋已經比較清晰了,若是對map的擴容還不清楚,能夠參見如下圖解。

image

針對上圖的map,其B爲3,因此原始buckets數組爲8。當map元素數變多,加載因子超過6.5,因此引發了增量擴容。

以3號bucket爲例,能夠看到,因爲B值加1,因此在新選取桶時,須要取低4位哈希值,這樣就會形成cell會被搬遷到新buckets數組中不一樣的桶(3號或11號桶)中去。注意,在一個桶中,搬遷cell的工做是有序的:它們是依序填進對應新桶的cell中去的。

固然,實際狀況中3號桶極可能還有溢出桶,在這裏爲了簡化繪圖,假設3號桶沒有溢出桶,若是有溢出桶,則相應地添加到新的3號桶和11號桶中便可,若是對應的3號和11號桶均裝滿,則給新的桶添加溢出桶來裝載。

image

對於上圖的map,其B也爲3。假設整個map中的overflow過多,觸發了等量擴容。注意,等量擴容時,新的buckets數組大小和舊buckets數組是同樣的。

以6號桶爲例,它有一個bucket和3個overflow buckets,可是咱們可以發現桶裏的數據很是稀疏,等量擴容的目的就是爲了把鬆散的鍵值對從新排列一次,以使bucket的使用率更高,進而保證更快的存取。搬遷完畢後,新的6號桶中只有一個基礎bucket,暫時並不須要溢出桶。這樣,和原6號桶相比,數據變得緊密,使後續的數據存取變快。

最後回答一下上文中留下的問題:爲何肯定key落在哪一個區間很重要?由於對於增量擴容而言,本來一個bucket中的key會被分裂到兩個bucket中去,它們分別處於bucket x和bucket y中,可是它們之間存在關係 bucket x + 2^B = bucket y (其中,B是老bucket對應的B值)。假設key所在的老bucket序號爲n,那麼若是key落在新的bucket x,則它應該置入 bucket x起始位置 + n\bucket 的內存中去;若是key落在新的bucket y,則它應該置入 bucket y起始位置 + n\bucket的內存中去。所以,肯定key落在哪一個區間,這樣就很方便進行內存地址計算,快速找到key應該插入的內存地址。

總結

Go語言的map,底層是哈希表實現的,經過鏈地址法解決哈希衝突,它依賴的核心數據結構是數組加鏈表。

map中定義了2的B次方個桶,每一個桶中可以容納8個key。根據key的不一樣哈希值,將其散落到不一樣的桶中。哈希值的低位(哈希值的後B個bit位)決定桶序號,高位(哈希值的前8個bit位)標識同一個桶中的不一樣 key。

當向桶中添加了不少 key,形成元素過多,超過了裝載因子所設定的程度,或者屢次增刪操做,形成溢出桶過多,均會觸發擴容。

擴容分爲增量擴容和等量擴容。增量擴容,會增長桶的個數(增長一倍),把原來一個桶中的 keys 被從新分配到兩個桶中。等量擴容,不會更改桶的個數,只是會將桶中的數據變得緊湊。無論是增量擴容仍是等量擴容,都須要建立新的桶數組,並非原地操做的。

擴容過程是漸進性的,主要是防止一次擴容須要搬遷的 key 數量過多,引起性能問題。觸發擴容的時機是增長了新元素, 桶搬遷的時機則發生在賦值、刪除期間,每次最多搬遷兩個 桶。查找、賦值、刪除的一個很核心的內容是如何定位到 key 所在的位置,須要重點理解。一旦理解,關於 map 的源碼就能夠看懂了。

使用建議

從map設計能夠知道,它並非一個併發安全的數據結構。同時對map進行讀寫時,程序很容易出錯。所以,要想在併發狀況下使用map,請加上鎖(sync.Mutex或者sync.RwMutex)。其實,Go標準庫中已經爲咱們實現了併發安全的map——sync.Map,我以前寫過文章對它的實現進行講解,詳情能夠查看公號:Golang技術分享——《深刻理解sync.Map》一文。

遍歷map的結果是無序的,在使用中,應該注意到該點。

經過map的結構體能夠知道,它實際上是經過指針指向底層buckets數組。因此和slice同樣,儘管go函數都是值傳遞,可是,當map做爲參數被函數調用時,在函數內部對map的操做一樣會影響到外部的map。

另外,有個特殊的key值math.NaN,它每次生成的哈希值是不同的,這會形成m[math.NaN]是拿不到值的,並且屢次對它賦值,會讓map中存在多個math.NaN的key。不過這個基本用不到,知道有這個特殊狀況就能夠了。

參考連接

https://en.wikipedia.org/wiki...

https://blog.golang.org/maps

https://mp.weixin.qq.com/s/OH...

https://www.cse.cuhk.edu.hk/i...

https://github.com/cch123/gol...

https://zhuanlan.zhihu.com/p/...

https://draveness.me/golang/d...

https://github.com/talkgo/nig...

https://my.oschina.net/renhc/...

相關文章
相關標籤/搜索