做者:郭雨田 原文地址:https://mp.weixin.qq.com/s/c_...程序員
golang中map是一個kv對集合。底層使用hash table,用鏈表來解決衝突 ,出現衝突時,不是每個key都申請一個結構經過鏈表串起來,而是以bmap爲最小粒度掛載,一個bmap能夠放8個kv。在哈希函數的選擇上,會在程序啓動時,檢測 cpu 是否支持 aes,若是支持,則使用 aes hash,不然使用 memhash。每一個map的底層結構是hmap,是有若干個結構爲bmap的bucket組成的數組。每一個bucket底層都採用鏈表結構。接下來,咱們來詳細看下map的結構:**golang
// A header for a Go map. type hmap struct { count int // 元素個數 flags uint8 B uint8 // 擴容常量相關字段B是buckets數組的長度的對數 2^B noverflow uint16 // 溢出的bucket個數 hash0 uint32 // hash seed buckets unsafe.Pointer // buckets 數組指針 oldbuckets unsafe.Pointer // 結構擴容的時候用於賦值的buckets數組 nevacuate uintptr // 搬遷進度 extra *mapextra // 用於擴容的指針 } type mapextra struct { overflow *[]*bmap oldoverflow *[]*bmap nextOverflow *bmap } // A bucket for a Go map. type bmap struct { tophash [bucketCnt]uint8 // len爲8的數組 } //底層定義的常量 const ( // Maximum number of key/value pairs a bucket can hold. bucketCntBits = 3 bucketCnt = 1 << bucketCntBits )
但這只是表面(src/runtime/hashmap.go)的結構,編譯期間會給它加料,動態地建立一個新的結構:算法
type bmap struct { topbits [8]uint8 keys [8]keytype values [8]valuetype pad uintptr overflow uintptr }
hmap和bmap的結構是這樣的 :api
bmap
就是咱們常說的「桶」,桶裏面會最多裝 8 個 key,這些 key 之因此會落入同一個桶,是由於它們通過哈希計算後,哈希結果是「一類」的,關於key的定位咱們在map的查詢和賦值中詳細說明。在桶內,又會根據 key 計算出來的 hash 值的高 8 位來決定 key 到底落入桶內的哪一個位置(一個桶內最多有8個位置)。數組
當 map 的 key 和 value 都不是指針,而且 size 都小於 128 字節的狀況下,會把 bmap 標記爲不含指針,這樣能夠避免 gc 時掃描整個 hmap。可是,咱們看 bmap 其實有一個 overflow 的字段,是指針類型的,破壞了 bmap 不含指針的設想,這時會把 overflow 移動到 hmap的extra 字段來。這部分咱們在分析擴容操做的時候再詳細說明。下面咱們看下bmap的內部組成圖:安全
HOBHash
指的就是 top hash,每一個bucket中topHash惟一。key 和 value 是各自放在一塊兒的,並非 key/value/...
這樣的形式。能夠省略掉 padding 字段,節省內存空間。數據結構
例如,有這樣一個類型的 map:map[int64]int8
,若是按照 key/value...
這樣的模式存儲,那在每個 key/value 對以後都要額外 padding 7 個字節;而將全部的 key,value 分別綁定到一塊兒,這種形式 key/key/.../value/value/...
,則只須要在最後添加 padding,每一個 bucket 設計成最多隻能放 8 個 key-value 對,若是有第 9 個 key-value 落入當前的 bucket,那就須要再構建一個 bucket ,經過 overflow
指針鏈接起來。併發
一、map初始化:app
方法1: var m map[string]string // 聲明變量 --nil map 支持查詢 返回類型默認值 賦值、delete操做會panic m = make(map[string]string, 10) // 初始化 --empty map 能夠進行賦值操做了 方法2: m := make(map[string]string,10) // 容量參數可省略 方法3: m := map[string]string{ // 經過直接賦值進行初始化 "test": "test", "name": "lili", "age": "one", }
第一步:入參校驗,判斷key的類型是否合法,必須爲可比較類型。ide
第二步:底層調用makemap函數,計算獲得合適的B,map容量最多可容納6.5*2^B個元素,6.5爲裝載因子閾值常量。裝載因子的計算公式是:裝載因子=填入表中的元素個數/散列表的長度,裝載因子越大,說明空閒位置越少,衝突越多,散列表的性能會降低。
func makemap(t *maptype, hint int, h *hmap) *hmap { //邊界校驗 if hint < 0 || hint > int(maxSliceCap(t.bucket.size)) { hint = 0 } // initialize Hmap if h == nil { h = new(hmap) } //生成hash種子 h.hash0 = fastrand() // find size parameter which will hold the requested # of elements B := uint8(0) //計算獲得合適的B for overLoadFactor(hint, B) { B++ } h.B = B // allocate initial hash table // if B == 0, the buckets field is allocated lazily later (in mapassign) // If hint is large zeroing this memory could take a while. //申請桶空間 if h.B != 0 { var nextOverflow *bmap h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) if nextOverflow != nil { h.extra = new(mapextra) h.extra.nextOverflow = nextOverflow } } return h } //常量loadFactorNum=13 ,loadFactorDen=2 func overLoadFactor(count int, B uint8) bool { return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
makemap函數會經過 fastrand
建立一個隨機的哈希種子,而後根據傳入的 hint
計算出須要的最小須要的桶的數量,最後再使用 makeBucketArray
建立用於保存桶的數組,這個方法其實就是根據傳入的 B
計算出的須要建立的桶數量在內存中分配一片連續的空間用於存儲數據,在建立桶的過程當中還會額外建立一些用於保存溢出數據的桶,數量是 2^(B-4)
個。初始化完成返回hmap指針。
二、查找操做
Go 語言中讀取 map 有兩種語法:帶 comma 和 不帶 comma。當要查詢的 key 不在 map 裏,帶 comma 的用法會返回一個 bool 型變量提示 key 是否在 map 中;而不帶 comma 的語句則會返回一個 value 類型的零值。若是 value 是 int 型就會返回 0,若是 value 是 string 類型,就會返回空字符串。
value := m["name"] fmt.Printf("value:%s", value) value, ok := m["name"] if ok { fmt.Printf("value:%s", value) }
兩種語法對應到底層兩個不一樣的函數,那麼在底層是如何定位到key的呢?稍後咱們對函數進行源碼分析。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool)
key的定位:
key 通過哈希計算後獲得哈希值,共 64 個 bit 位(64位機,32位機就不討論了,如今主流都是64位機),計算它到底要落在哪一個桶時,只會用到最後 B 個 bit 位。還記得前面提到過的 B 嗎?若是 B = 5,那麼桶的數量,也就是 buckets 數組的長度是 2^5 = 32。例如,如今有一個 key 通過哈希函數計算後,獲得的哈希結果是:
用最後的 5 個 bit 位,也就是 01010
,值爲 10,也就是 10 號桶。這個操做實際上就是取餘操做,可是取餘開銷太大,因此代碼實現上用的位操做代替。
再用哈希值的高 8 位,找到此 key 在 bucket 中的位置,這是在尋找已有的 key。最開始桶內尚未 key,新加入的 key 會找到第一個空位放入。
buckets 編號就是桶編號,當兩個不一樣的 key 落在同一個桶中,也就是發生了哈希衝突。衝突的解決手段是用鏈表法:在 bucket 中,從前日後找到第一個空位。這樣,在查找某個 key 時,先找到對應的桶,再去遍歷 bucket 中的 key。
上圖中,假定 B = 5,因此 bucket 總數就是 2^5 = 32。首先計算出待查找 key 的哈希,使用低 5 位 00110
,找到對應的 6 號 bucket,使用高 8 位 10010111
,對應十進制 151,在 6 號 bucket 中尋找 tophash 值(HOB hash)爲 151 的 key,找到了 2 號槽位,這樣整個查找過程就結束了。
若是在 bucket 中沒找到,而且 overflow 不爲空,還要繼續去 overflow bucket 中尋找,直到找到或是全部的 key 槽位都找遍了,包括全部的 overflow bucket。
接下來咱們看下底層函數源碼:
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { //... // 若是 h 什麼都沒有,返回零值 if h == nil || h.count == 0 { return unsafe.Pointer(&zeroVal[0]) } // 寫和讀衝突 if h.flags&hashWriting != 0 { throw("concurrent map read and map write") } // 不一樣類型 key 使用的 hash 算法在編譯期肯定 alg := t.key.alg // 計算哈希值,而且加入 hash0 引入隨機性 hash := alg.hash(key, uintptr(h.hash0)) // 好比 B=5,那 m 就是31,二進制是全 1 // 求 bucket num 時,將 hash 與 m 相與, // 達到 bucket num 由 hash 的低 8 位決定的效果 m := bucketMask(h.B) // b 就是 bucket 的地址 b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) // oldbuckets 不爲 nil,說明發生了擴容 if c := h.oldbuckets; c != nil { // 若是不是同 size 擴容(看後面擴容的內容) // 對應條件 1 的解決方案 if !h.sameSizeGrow() { // 新 bucket 數量是老的 2 倍 m >>= 1 } // 求出 key 在老的 map 中的 bucket 位置 oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) // 若是 oldb 沒有搬遷到新的 bucket // 那就在老的 bucket 中尋找 if !evacuated(oldb) { b = oldb } } // 計算出高 8 位的 hash // 至關於右移 56 位,只取高8位 top := tophash(hash) //開始尋找key for ; b != nil; b = b.overflow(t) { // 遍歷 8 個 bucket for i := uintptr(0); i < bucketCnt; i++ { // tophash 不匹配,繼續 if b.tophash[i] != top { continue } // tophash 匹配,定位到 key 的位置 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) // key 是指針 if t.indirectkey { // 解引用 k = *((*unsafe.Pointer)(k)) } // 若是 key 相等 if alg.equal(key, k) { // 定位到 value 的位置 v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize)) // value 解引用 if t.indirectvalue { v = *((*unsafe.Pointer)(v)) } return v } } } return unsafe.Pointer(&zeroVal[0]) }
這裏咱們再詳細分析下key/value值是如何獲取的:
// key 定位公式 k :=add(unsafe.Pointer(b),dataOffset+i*uintptr(t.keysize)) // value 定位公式 v:= add(unsafe.Pointer(b),dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize)) //對於 bmap 起始地址的偏移: dataOffset = unsafe.Offsetof(struct{ b bmap v int64 }{}.v)
bucket 裏 key 的起始地址就是 unsafe.Pointer(b)+dataOffset。第 i 個 key 的地址就要在此基礎上跨過 i 個 key 的大小;而咱們又知道,value 的地址是在全部 key 以後,所以第 i 個 value 的地址還須要加上全部 key 的偏移。
三、賦值操做
m := make(map[int32]int32) m[0] = 6666666
接下來咱們將分紅幾個部分去看看底層在賦值的時候,進行了什麼操做:
第一階段:校驗和初始化
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { //判斷 hmap 是否已經初始化(是否爲 nil) if h == nil { panic(plainError("assignment to entry in nil map")) } //... //判斷是否併發讀寫 map,如果則拋出異常 if h.flags&hashWriting != 0 { throw("concurrent map writes") } //根據 key 的不一樣類型調用不一樣的 hash 方法計算得出 hash 值 alg := t.key.alg hash := alg.hash(key, uintptr(h.hash0)) //設置 flags 標誌位,表示有一個 goroutine 正在寫入數據。由於 alg.hash 有可能出現 panic 致使異常 h.flags |= hashWriting //判斷 buckets 是否爲 nil,如果則調用 newobject 根據當前 bucket 大小進行分配 //初始化時沒有初始 buckets,那麼它在第一次賦值時就會對 buckets 分配 if h.buckets == nil { h.buckets = newobject(t.bucket) // newarray(t.bucket, 1) } }
第二階段:尋找可插入位和更新既有值
//根據低八位計算獲得 bucket 的內存地址 bucket := hash & bucketMask(h.B) //判斷是否正在擴容,若正在擴容中則先遷移再接着處理 if h.growing() { growWork(t, h, bucket) } //計算並獲得 bucket 的 bmap 指針地址 b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize))) //計算 key hash 高八位用於查找 Key top := tophash(hash) var inserti *uint8 var insertk unsafe.Pointer var val unsafe.Pointer for { //迭代 buckets 中的每個 bucket(共 8 個) for i := uintptr(0); i < bucketCnt; i++ { //對比 bucket.tophash 與 top(高八位)是否一致 if b.tophash[i] != top { //若不一致,判斷是否爲空槽 if b.tophash[i] == empty && inserti == nil { //有兩種狀況,第一種是沒有插入過。第二種是插入後被刪除 inserti = &b.tophash[i] insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) //把該位置標識爲可插入 tophash 位置,這裏就是第一個能夠插入數據的地方 val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize)) } continue } //如果匹配(也就是本來已經存在),則進行更新。最後跳出並返回 value 的內存地址 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey { k = *((*unsafe.Pointer)(k)) } if !alg.equal(key, k) { continue } // already have a mapping for key. Update it. if t.needkeyupdate { typedmemmove(t.key, k, key) } val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize)) goto done } //判斷是否迭代完畢,如果則結束迭代 buckets 並更新當前桶位置 ovf := b.overflow(t) if ovf == nil { break } b = ovf } //若知足三個條件:觸發最大 LoadFactor 、存在過多溢出桶 overflow buckets、沒有正在進行擴容。就會進行擴容動做(以確保後續的動做) if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) goto again // Growing the table invalidates everything, so try again }
第三階段:申請新的插入位和插入新值
//通過前面迭代尋找動做,若沒有找到可插入的位置,意味着當前的全部桶都滿了,將從新分配一個新溢出桶用於插入動做。最後再在上一步申請的新插入位置,存儲鍵值對,返回該值的內存地址 if inserti == nil { // all current buckets are full, allocate a new one. newb := h.newoverflow(t, b) inserti = &newb.tophash[0] insertk = add(unsafe.Pointer(newb), dataOffset) val = add(insertk, bucketCnt*uintptr(t.keysize)) } // store new key/value at insert position if t.indirectkey { kmem := newobject(t.key) *(*unsafe.Pointer)(insertk) = kmem insertk = kmem } if t.indirectvalue { vmem := newobject(t.elem) *(*unsafe.Pointer)(val) = vmem } typedmemmove(t.key, insertk, key) *inserti = top h.count++ done ... return val
第四階段:寫入
最後返回的是內存地址。是怎麼進行寫入的呢?這是由於隱藏的最後一步寫入動做(將值拷貝到指定內存區域)是經過底層彙編配合來完成的,在 runtime 中只完成了絕大部分的動做。 mapassign
函數和拿到值存放的內存地址,再將 6666666 這個值存放進該內存地址中。另外咱們看到 PCDATA
指令,主要是包含一些垃圾回收的信息,由編譯器產生。
... 0x0099 00153 (test.go:6) CALL runtime.mapassign_fast32(SB) 0x009e 00158 (test.go:6) PCDATA $2, $2 0x009e 00158 (test.go:6) MOVQ 24(SP), AX 0x00a3 00163 (test.go:6) PCDATA $2, $0 0x00a3 00163 (test.go:6) MOVL $6666666, (AX)
擴容:
關於上文中一直提到的擴容是怎麼回事呢,如今咱們來具體分析下:
還記得bucket中的topHash字段嗎?如今咱們來補充知識點minTopHash:當一個 cell 的 tophash 值小於 minTopHash 時,標誌這個 cell 的遷移狀態。由於這個狀態值是放在 tophash 數組裏,爲了和正常的哈希值區分開,會給 key 計算出來的哈希值一個增量:minTopHash。這樣就能區分正常的 top hash 值和表示狀態的哈希值。
下面的這幾種狀態就表徵了 bucket 的狀況:
// 空的 cell,也是初始時 bucket 的狀態 empty = 0 // 空的 cell,表示 cell 已經被遷移到新的 bucket evacuatedEmpty = 1 // key,value 已經搬遷完畢,可是 key 都在新 bucket 前半部分, evacuatedX = 2 // 同上,key 在後半部分 evacuatedY = 3 // tophash 的最小正常值 minTopHash = 4
爲了不計算出的topHash與minTopHash 衝突,底層作了相關操做:
func tophash(hash uintptr) uint8 { top := uint8(hash >> (sys.PtrSize*8 - 8)) if top < minTopHash { top += minTopHash } return top }
隨着向 map 中添加的 key 愈來愈多,key 發生碰撞的機率也愈來愈大。bucket 中的 8 個 cell 會被逐漸塞滿,查找、插入、刪除 key 的效率也會愈來愈低。最理想的狀況是一個 bucket 只裝一個 key,這樣,就能達到 O(1)
的效率,但這樣空間消耗太大,用空間換時間的代價過高。Go 語言採用一個 bucket 裏裝載 8 個 key,定位到某個 bucket 後,還須要再定位到具體的 key,這實際上又用了時間換空間。固然,這樣作,要有一個度,否則全部的 key 都落在了同一個 bucket 裏,直接退化成了鏈表,各類操做的效率直接降爲 O(n),是不行的。所以,須要有一個指標來衡量前面描述的狀況,這就是 裝載因子
。
Go 源碼裏這樣定義: loadFactor := count/(2^B)
count 就是 map 的元素個數,2^B 表示 bucket 數量。
再來講觸發 map 擴容的時機:在向 map 插入新 key 的時候,會進行條件檢測,符合下面這 2 個條件,就會觸發擴容:
一、裝載因子超過閾值,源碼裏定義的閾值是 6.5
二、overflow 的 bucket 數量過多
經過彙編語言能夠找到賦值操做對應源碼中的函數是 mapassign
,對應擴容條件的源碼以下:
//觸發擴容的時機 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) goto again // Growing the table invalidates everything, so try again } // 裝載因子超過 6.5 func overLoadFactor(count int, B uint8) bool { return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen) } // overflow buckets 太多 func tooManyOverflowBuckets(noverflow uint16, B uint8) bool { if B > 15 { B = 15 } return noverflow >= uint16(1)<<(B&15) }
第 1 點:咱們知道,每一個 bucket 有 8 個空位,在沒有溢出,且全部的桶都裝滿了的狀況下,裝載因子算出來的結果是 8。所以當裝載因子超過 6.5 時,代表不少 bucket 都快要裝滿了,查找效率和插入效率都變低了。在這個時候進行擴容是有必要的。
第 2 點:是對第 1 點的補充。就是說在裝載因子比較小的狀況下,這時候 map 的查找和插入效率也很低,而第 1 點識別不出來這種狀況。表面現象就是計算裝載因子的分子比較小,即 map 裏元素總數少,可是 bucket 數量多(真實分配的 bucket 數量多,包括大量的 overflow bucket)。
不難想像形成這種狀況的緣由:不停地插入、刪除元素。先插入不少元素,致使建立了不少 bucket,可是裝載因子達不到第 1 點的臨界值,未觸發擴容來緩解這種狀況。以後,刪除元素下降元素總數量,再插入不少元素,致使建立不少的 overflow bucket,但就是不會觸犯第 1 點的規定,你能拿我怎麼辦?overflow bucket 數量太多,致使 key 會很分散,查找插入效率低得嚇人,所以出臺第 2 點規定。這就像是一座空城,房子不少,可是住戶不多,都分散了,找起人來很困難。
對於命中條件 1,2 的限制,都會發生擴容。可是擴容的策略並不相同,畢竟兩種條件應對的場景不一樣。
對於條件 1,元素太多,而 bucket 數量太少,很簡單:將 B 加 1,bucket 最大數量(2^B
)直接變成原來 bucket 數量的 2 倍。因而,就有新老 bucket 了。注意,這時候元素都在老 bucket 裏,還沒遷移到新的 bucket 來。新 bucket 只是最大數量變爲原來最大數量的 2 倍(2^B*2
) 。
對於條件 2,其實元素沒那麼多,可是 overflow bucket 數特別多,說明不少 bucket 都沒裝滿。解決辦法就是開闢一個新 bucket 空間,將老 bucket 中的元素移動到新 bucket,使得同一個 bucket 中的 key 排列地更緊密。這樣,原來,在 overflow bucket 中的 key 能夠移動到 bucket 中來。結果是節省空間,提升 bucket 利用率,map 的查找和插入效率天然就會提高。
因爲 map 擴容須要將原有的 key/value 從新搬遷到新的內存地址,若是有大量的 key/value 須要搬遷,會很是影響性能。所以 Go map 的擴容採起了一種稱爲「漸進式」的方式,原有的 key 並不會一次性搬遷完畢,每次最多隻會搬遷 2 個 bucket。
上面說的 hashGrow()
函數實際上並無真正地「搬遷」,它只是分配好了新的 buckets,並將老的 buckets 掛到了 oldbuckets 字段上。真正搬遷 buckets 的動做在 growWork()
函數中,而調用 growWork()
函數的動做是在 mapassign 和 mapdelete 函數中。也就是插入或修改、刪除 key 的時候,都會嘗試進行搬遷 buckets 的工做。先檢查 oldbuckets 是否搬遷完畢,具體來講就是檢查 oldbuckets 是否爲 nil。
func hashGrow(t *maptype, h *hmap) { // B+1 至關因而原來 2 倍的空間 bigger := uint8(1) // 對應條件 2 if !overLoadFactor(h.count+1, h.B) { // 進行等量的內存擴容,因此 B 不變 bigger = 0 h.flags |= sameSizeGrow } // 將老 buckets 掛到 buckets 上 oldbuckets := h.buckets // 申請新的 buckets 空間 newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) //先把 h.flags 中 iterator 和 oldIterator 對應位清 0 //若是 iterator 位爲 1,把它轉接到 oldIterator 位,使得 oldIterator 標誌位變成1 //能夠理解爲buckets 如今掛到了 oldBuckets 名下了,將對應的標誌位也轉接過去 flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } // commit the grow (atomic wrt gc) h.B += bigger h.flags = flags h.oldbuckets = oldbuckets h.buckets = newbuckets // 搬遷進度爲 0 h.nevacuate = 0 // overflow buckets 數爲 0 h.noverflow = 0 }
幾個標誌位以下:
// 可能有迭代器使用 buckets iterator = 1 // 可能有迭代器使用 oldbuckets oldIterator = 2 // 有協程正在向 map 中寫入 key hashWriting = 4 // 等量擴容(對應條件 2) sameSizeGrow = 8
再來看看真正執行搬遷工做的 growWork() 函數
func growWork(t *maptype, h *hmap, bucket uintptr) { // 搬遷正在使用的舊 bucket evacuate(t, h, bucket&h.oldbucketmask()) // 再搬遷一個 bucket,以加快搬遷進程 if h.growing() { evacuate(t, h, h.nevacuate) } } func (h *hmap) growing() bool { return h.oldbuckets != nil }
搬遷過程evacuate源碼:
type evacDst struct { b *bmap // 表示bucket 移動的目標地址 i int // 指向 x,y 中 key/val 的 index k unsafe.Pointer // 指向 x,y 中的 key v unsafe.Pointer // 指向 x,y 中的 value } func evacuate(t *maptype, h *hmap, oldbucket uintptr) { // 定位老的 bucket 地址 b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) // 計算容量 結果是 2^B,如 B = 5,結果爲32 newbit := h.noldbuckets() // 若是 b 沒有被搬遷過 if !evacuated(b) { // 默認是等 size 擴容,先後 bucket 序號不變 var xy [2]evacDst // 使用 x 來進行搬遷 x := &xy[0] x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize))) x.k = add(unsafe.Pointer(x.b), dataOffset) x.v = add(x.k, bucketCnt*uintptr(t.keysize)) // 若是不是等 size 擴容,先後 bucket 序號有變 if !h.sameSizeGrow() { // 使用 y 來進行搬遷 y := &xy[1] // y 表明的 bucket 序號增長了 2^B y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize))) y.k = add(unsafe.Pointer(y.b), dataOffset) y.v = add(y.k, bucketCnt*uintptr(t.keysize)) } // 遍歷全部的 bucket,包括 overflow buckets b 是老的 bucket 地址 for ; b != nil; b = b.overflow(t) { k := add(unsafe.Pointer(b), dataOffset) v := add(k, bucketCnt*uintptr(t.keysize)) // 遍歷 bucket 中的全部 cell for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) { // 當前 cell 的 top hash 值 top := b.tophash[i] // 若是 cell 爲空,即沒有 key if top == empty { // 那就標誌它被"搬遷"過 b.tophash[i] = evacuatedEmpty continue } // 正常不會出現這種狀況 // 未被搬遷的 cell 只多是 empty 或是 // 正常的 top hash(大於 minTopHash) if top < minTopHash { throw("bad map state") } // 若是 key 是指針,則解引用 k2 := k if t.indirectkey { k2 = *((*unsafe.Pointer)(k2)) } var useY uint8 // 若是不是等量擴容 if !h.sameSizeGrow() { // 計算 hash 值,和 key 第一次寫入時同樣 hash := t.key.alg.hash(k2, uintptr(h.hash0)) // 若是有協程正在遍歷 map 若是出現 相同的 key 值,算出來的 hash 值不一樣 if h.flags&iterator != 0 && !t.reflexivekey && !t.key.alg.equal(k2, k2) { // useY =1 使用位置Y useY = top & 1 top = tophash(hash) } else { // 第 B 位置 不是 0 if hash&newbit != 0 { //使用位置Y useY = 1 } } } if evacuatedX+1 != evacuatedY { throw("bad evacuatedN") } //決定key是裂變到 X 仍是 Y b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY dst := &xy[useY] // evacuation destination // 若是 xi 等於 8,說明要溢出了 if dst.i == bucketCnt { // 新建一個 bucket dst.b = h.newoverflow(t, dst.b) // xi 從 0 開始計數 dst.i = 0 //key移動的位置 dst.k = add(unsafe.Pointer(dst.b), dataOffset) //value 移動的位置 dst.v = add(dst.k, bucketCnt*uintptr(t.keysize)) } // 設置 top hash 值 dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check // key 是指針 if t.indirectkey { // 將原 key(是指針)複製到新位置 *(*unsafe.Pointer)(dst.k) = k2 // copy pointer } else { // 將原 key(是值)複製到新位置 typedmemmove(t.key, dst.k, k) // copy value } //value同上 if t.indirectvalue { *(*unsafe.Pointer)(dst.v) = *(*unsafe.Pointer)(v) } else { typedmemmove(t.elem, dst.v, v) } // 定位到下一個 cell dst.i++ dst.k = add(dst.k, uintptr(t.keysize)) dst.v = add(dst.v, uintptr(t.valuesize)) } } // Unlink the overflow buckets & clear key/value to help GC. // bucket搬遷完畢 若是沒有協程在使用老的 buckets,就把老 buckets 清除掉,幫助gc if h.flags&oldIterator == 0 && t.bucket.kind&kindNoPointers == 0 { b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)) ptr := add(b, dataOffset) n := uintptr(t.bucketsize) - dataOffset memclrHasPointers(ptr, n) } } // 更新搬遷進度 if oldbucket == h.nevacuate { advanceEvacuationMark(h, t, newbit) } }
擴容後,B 增長了 1,意味着 buckets 總數是原來的 2 倍,原來 1 號的桶「裂變」到兩個桶,某個 key 在搬遷先後 bucket 序號可能和原來相等,也多是相比原來加上 2^B(原來的 B 值),取決於 hash 值 第 6 bit 位是 0 仍是 1。原理看下圖:
四、遍歷操做:
1.只獲取key for key := range m { fmt.Println(key) } 2.只獲取value for _, value := range m { fmt.Println(value) } 3.有序遍歷map,獲取kv keys := []string{} for k, _ := range m { keys = append(keys, k) } // 排序 sort.Strings(keys) // 有序遍歷 for _, k := range keys { fmt.Println(k, m[k]) }
理解了上面 bucket 序號的變化,咱們就能夠回答另外一個問題了:爲何遍歷 map 是無序的?
遍歷的過程,就是按順序遍歷 bucket,同時按順序遍歷 bucket 中的 key。搬遷後,key 的位置發生了重大的變化,有些 key 飛上高枝,有些 key 則原地不動。這樣,遍歷 map 的結果就不可能按原來的順序了。固然,若是我就一個 hard code 的 map,我也不會向 map 進行插入刪除的操做,按理說每次遍歷這樣的 map 都會返回一個固定順序的 key/value 序列吧。的確是這樣,可是 Go 杜絕了這種作法,由於這樣會給新手程序員帶來誤解,覺得這是必定會發生的事情,在某些狀況下,可能會釀成大錯。
固然,Go 作得更絕,當咱們在遍歷 map 時,並非固定地從 0 號 bucket 開始遍歷,每次都是從一個隨機值序號的 bucket 開始遍歷,而且是從這個 bucket 的一個隨機序號的 cell 開始遍歷。這樣,即便你是一個寫死的 map,僅僅只是遍歷它,也不太可能會返回一個固定序列的 key/value 對了。
//runtime.mapiterinit 遍歷時選用初始桶的函數 func mapiterinit(t *maptype, h *hmap, it *hiter) { ... it.t = t it.h = h it.B = h.B it.buckets = h.buckets if t.bucket.kind&kindNoPointers != 0 { h.createOverflow() it.overflow = h.extra.overflow it.oldoverflow = h.extra.oldoverflow } r := uintptr(fastrand()) if h.B > 31-bucketCntBits { r += uintptr(fastrand()) << 31 } it.startBucket = r & bucketMask(h.B) it.offset = uint8(r >> h.B & (bucketCnt - 1)) it.bucket = it.startBucket ... mapiternext(it) }
重點是fastrand
的部分,是一個生成隨機數的方法:它生成了隨機數。用於決定從哪裏開始循環迭代。更具體的話就是根據隨機數,選擇一個桶位置做爲起始點進行遍歷迭代所以每次從新 for range map
,你見到的結果都是不同的。那是由於它的起始位置根本就不固定!
... // decide where to start r := uintptr(fastrand()) if h.B > 31-bucketCntBits { r += uintptr(fastrand()) << 31 } it.startBucket = r & bucketMask(h.B) it.offset = uint8(r >> h.B & (bucketCnt - 1)) // iterator state it.bucket = it.startBucket
五、更新操做:
底層操做原理參考上文
m["age"] = "two" m["name"] = "lily"
六、刪除操做:
delete(m, "name")
寫操做底層的執行函數是 mapdelete
:
*func mapdelete(t *maptype, h _hmap, key unsafe.Pointer)_
它首先會檢查 h.flags 標誌,若是發現寫標位是 1,直接 panic,由於這代表有其餘協程同時在進行寫操做。計算 key 的哈希,找到落入的 bucket。檢查此 map 若是正在擴容的過程當中,直接觸發一次搬遷操做。刪除操做一樣是兩層循環,核心仍是找到 key 的具體位置。尋找過程都是相似的,在 bucket 中挨個 cell 尋找。找到對應位置後,對 key 或者 value 進行「清零」操做,將 count 值減 1,將對應位置的 tophash 值置成 Empty
。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) { if raceenabled && h != nil { callerpc := getcallerpc() pc := funcPC(mapdelete) racewritepc(unsafe.Pointer(h), callerpc, pc) raceReadObjectPC(t.key, key, callerpc, pc) } if msanenabled && h != nil { msanread(key, t.key.size) } if h == nil || h.count == 0 { return } if h.flags&hashWriting != 0 { throw("concurrent map writes") } alg := t.key.alg hash := alg.hash(key, uintptr(h.hash0)) // Set hashWriting after calling alg.hash, since alg.hash may panic, // in which case we have not actually done a write (delete). h.flags |= hashWriting bucket := hash & bucketMask(h.B) if h.growing() { growWork(t, h, bucket) } b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) top := tophash(hash) search: for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) k2 := k if t.indirectkey { k2 = *((*unsafe.Pointer)(k2)) } if !alg.equal(key, k2) { continue } // Only clear key if there are pointers in it. // 對key清零 if t.indirectkey { *(*unsafe.Pointer)(k) = nil } else if t.key.kind&kindNoPointers == 0 { memclrHasPointers(k, t.key.size) } v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize)) // 對value清零 if t.indirectvalue { *(*unsafe.Pointer)(v) = nil } else if t.elem.kind&kindNoPointers == 0 { memclrHasPointers(v, t.elem.size) } else { memclrNoHeapPointers(v, t.elem.size) } // 高位hash清零 b.tophash[i] = empty // 個數減一 h.count-- break search } } if h.flags&hashWriting == 0 { throw("concurrent map writes") } h.flags &^= hashWriting }
七、併發操做
map 並非一個線程安全的數據結構。同時讀寫一個 map 是不安全的,若是被檢測到,會直接 panic。
解決方法1:讀寫鎖 sync.RWMutex。
type TestMap struct { M map[int]string Lock sync.RWMutex } func main() { testMap := TestMap{} testMap.M = map[int]string{1: "lili"} go func() { i := 0 for i < 10000 { testMap.Lock.RLock() fmt.Println(i, testMap.M[1]) testMap.Lock.RUnlock() i++ } }() go func() { i := 0 for i < 10000 { testMap.Lock.Lock() testMap.M[1] = "lily" testMap.Lock.Unlock() i++ } }() for { runtime.GC() } }
解決方法2:使用golang提供的 sync.Map
func main() { m := sync.Map{} m.Store(1, 1) i := 0 go func() { for i < 1000 { m.Store(1, 1) i++ } }() go func() { for i < 1000 { m.Store(2, 2) i++ } }() go func() { for i < 1000 { fmt.Println(m.Load(1)) i++ } }() for { runtime.GC() } }
參考文獻:
【2】《解剖Go語言map底層實現》