如何設計並實現一個線程安全的 Map ?(上篇)

Map 是一種很常見的數據結構,用於存儲一些無序的鍵值對。在主流的編程語言中,默認就自帶它的實現。C、C++ 中的 STL 就實現了 Map,JavaScript 中也有 Map,Java 中有 HashMap,Swift 和 Python 中有 Dictionary,Go 中有 Map,Objective-C 中有 NSDictionary、NSMutableDictionary。html

上面這些 Map 都是線程安全的麼?答案是否認的,並不是全是線程安全的。那如何能實現一個線程安全的 Map 呢?想回答這個問題,須要先從如何實現一個 Map 提及。java

一. 選用什麼數據結構實現 Map ?

Map 是一個很是經常使用的數據結構,一個無序的 key/value 對的集合,其中 Map 全部的 key 都是不一樣的,而後經過給定的 key 能夠在常數時間 O(1) 複雜度內查找、更新或刪除對應的 value。mysql

要想實現常數級的查找,應該用什麼來實現呢?讀者應該很快會想到哈希表。確實,Map 底層通常都是使用數組來實現,會借用哈希算法輔助。對於給定的 key,通常先進行 hash 操做,而後相對哈希表的長度取模,將 key 映射到指定的地方。git

哈希算法有不少種,選哪種更加高效呢?es6

1. 哈希函數

MD5 和 SHA1 能夠說是目前應用最普遍的 Hash 算法,而它們都是以 MD4 爲基礎設計的。github

MD4(RFC 1320) 是 MIT 的Ronald L. Rivest 在 1990 年設計的,MD 是 Message Digest(消息摘要) 的縮寫。它適用在32位字長的處理器上用高速軟件實現——它是基於 32位操做數的位操做來實現的。
MD5(RFC 1321) 是 Rivest 於1991年對 MD4 的改進版本。它對輸入仍以512位分組,其輸出是4個32位字的級聯,與 MD4 相同。MD5 比 MD4 來得複雜,而且速度較之要慢一點,但更安全,在抗分析和抗差分方面表現更好。算法

SHA1 是由 NIST NSA 設計爲同 DSA 一塊兒使用的,它對長度小於264的輸入,產生長度爲160bit 的散列值,所以抗窮舉 (brute-force)
性更好。SHA-1 設計時基於和 MD4 相同原理,而且模仿了該算法。sql

經常使用的 hash 函數有 SHA-1,SHA-256,SHA-512,MD5 。這些都是經典的 hash 算法。在現代化生產中,還會用到現代的 hash 算法。下面列舉幾個,進行性能對比,最後再選其中一個源碼分析一下實現過程。數據庫

(1) Jenkins Hash 和 SpookyHash

1997年 Bob Jenkins 在《 Dr. Dobbs Journal》雜誌上發表了一片關於散列函數的文章《A hash function for hash Table lookup》。這篇文章中,Bob 普遍收錄了不少已有的散列函數,這其中也包括了他本身所謂的「lookup2」。隨後在2006年,Bob 發佈了 lookup3。lookup3 即爲 Jenkins Hash。更多有關 Bob’s 散列函數的內容請參閱維基百科:Jenkins hash function。memcached的 hash 算法,支持兩種算法:jenkins, murmur3,默認是 jenkins。apache

2011年 Bob Jenkins 發佈了他本身的一個新散列函數
SpookyHash(這樣命名是由於它是在萬聖節發佈的)。它們都擁有2倍於 MurmurHash 的速度,但他們都只使用了64位數學函數而沒有32位版本,SpookyHash 給出128位輸出。

(2) MurmurHash

MurmurHash 是一種非加密哈希函數,適用於通常的哈希檢索操做。
Austin Appleby 在2008年發佈了一個新的散列函數——MurmurHash。其最新版本大約是 lookup3 速度的2倍(大約爲1 byte/cycle),它有32位和64位兩個版本。32位版本只使用32位數學函數並給出一個32位的哈希值,而64位版本使用了64位的數學函數,並給出64位哈希值。根據Austin的分析,MurmurHash具備優異的性能,雖然 Bob Jenkins 在《Dr. Dobbs article》雜誌上聲稱「我預測 MurmurHash 比起lookup3要弱,可是我不知道具體值,由於我還沒測試過它」。MurmurHash可以迅速走紅得益於其出色的速度和統計特性。當前的版本是MurmurHash3,Redis、Memcached、Cassandra、HBase、Lucene都在使用它。

下面是用 C 實現 MurmurHash 的版本:

uint32_t murmur3_32(const char *key, uint32_t len, uint32_t seed) {
        static const uint32_t c1 = 0xcc9e2d51;
        static const uint32_t c2 = 0x1b873593;
        static const uint32_t r1 = 15;
        static const uint32_t r2 = 13;
        static const uint32_t m = 5;
        static const uint32_t n = 0xe6546b64;

        uint32_t hash = seed;

        const int nblocks = len / 4;
        const uint32_t *blocks = (const uint32_t *) key;
        int i;
        for (i = 0; i < nblocks; i++) {
                uint32_t k = blocks[i];
                k *= c1;
                k = (k << r1) | (k >> (32 - r1));
                k *= c2;

                hash ^= k;
                hash = ((hash << r2) | (hash >> (32 - r2))) * m + n;
        }

        const uint8_t *tail = (const uint8_t *) (key + nblocks * 4);
        uint32_t k1 = 0;

        switch (len & 3) {
        case 3:
                k1 ^= tail[2] << 16;
        case 2:
                k1 ^= tail[1] << 8;
        case 1:
                k1 ^= tail[0];

                k1 *= c1;
                k1 = (k1 << r1) | (k1 >> (32 - r1));
                k1 *= c2;
                hash ^= k1;
        }

        hash ^= len;
        hash ^= (hash >> 16);
        hash *= 0x85ebca6b;
        hash ^= (hash >> 13);
        hash *= 0xc2b2ae35;
        hash ^= (hash >> 16);

        return hash;
}複製代碼

(3) CityHash 和 FramHash

這兩種算法都是 Google 發佈的字符串算法。

CityHash 是2011年 Google 發佈的字符串散列算法,和 murmurhash 同樣,屬於非加密型 hash 算法。CityHash 算法的開發是受到了 MurmurHash 的啓發。其主要優勢是大部分步驟包含了至少兩步獨立的數學運算。現代 CPU 一般能從這種代碼得到最佳性能。CityHash 也有其缺點:代碼較同類流行算法複雜。Google 但願爲速度而不是爲了簡單而優化,所以沒有照顧較短輸入的特例。Google發佈的有兩種算法:cityhash64 與 cityhash128。它們分別根據字串計算 64 和 128 位的散列值。這些算法不適用於加密,但適合用在散列表等處。CityHash 的速度取決於CRC32 指令,目前爲SSE 4.2(Intel Nehalem及之後版本)。

相比 Murmurhash 支持3二、6四、128bit, Cityhash 支持6四、12八、256bit 。

2014年 Google 又發佈了 FarmHash,一個新的用於字符串的哈希函數系列。FarmHash 從 CityHash 繼承了許多技巧和技術,是它的後繼。FarmHash 有多個目標,聲稱從多個方面改進了 CityHash。與 CityHash 相比,FarmHash 的另外一項改進是在多個特定於平臺的實現之上提供了一個接口。這樣,當開發人員只是想要一個用於哈希表的、快速健壯的哈希函數,而不須要在每一個平臺上都同樣時,FarmHash 也能知足要求。目前,FarmHash 只包含在3二、64和128位平臺上用於字節數組的哈希函數。將來開發計劃包含了對整數、元組和其它數據的支持。

(4) xxHash

xxHash 是由 Yann Collet 建立的非加密哈希函數。它最初用於 LZ4 壓縮算法,做爲最終的錯誤檢查簽名的。該 hash 算法的速度接近於 RAM 的極限。並給出了32位和64位的兩個版本。如今它被普遍使用在PrestoDBRocksDBMySQLArangoDBPGroongaSpark 這些數據庫中,還用在了 Cocos2DDolphinCxbx-reloaded 這些遊戲框架中,

下面這有一個性能對比的實驗。測試環境是 Open-Source SMHasher program by Austin Appleby ,它是在 Windows 7 上經過 Visual C 編譯出來的,而且它只有惟一一個線程。CPU 內核是 Core 2 Duo @3.0GHz。

上表裏面的 hash 函數並非全部的 hash 函數,只列舉了一些常見的算法。第二欄是速度的對比,能夠看出來速度最快的是 xxHash 。第三欄是哈希的質量,哈希質量最高的有5個,全是5星,xxHash、MurmurHash 3a、CityHash6四、MD5-3二、SHA1-32 。從表裏的數據看,哈希質量最高,速度最快的仍是 xxHash。

(4) memhash

這個哈希算法筆者沒有在網上找到很明確的做者信息。只在 Google 的 Go 的文檔上有這麼幾行註釋,說明了它的靈感來源:

// Hashing algorithm inspired by
// xxhash: https://code.google.com/p/xxhash/
// cityhash: https://code.google.com/p/cityhash/複製代碼

它說 memhash 的靈感來源於 xxhash 和 cityhash。那麼接下來就來看看 memhash 是怎麼對字符串進行哈希的。

const (
    // Constants for multiplication: four random odd 32-bit numbers.
    m1 = 3168982561
    m2 = 3339683297
    m3 = 832293441
    m4 = 2336365089
)

func memhash(p unsafe.Pointer, seed, s uintptr) uintptr {
    if GOARCH == "386" && GOOS != "nacl" && useAeshash {
        return aeshash(p, seed, s)
    }
    h := uint32(seed + s*hashkey[0])
tail:
    switch {
    case s == 0:
    case s < 4:
        h ^= uint32(*(*byte)(p))
        h ^= uint32(*(*byte)(add(p, s>>1))) << 8
        h ^= uint32(*(*byte)(add(p, s-1))) << 16
        h = rotl_15(h*m1) * m2
    case s == 4:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
    case s <= 8:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2
    case s <= 16:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, 4))
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-8))
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2
    default:
        v1 := h
        v2 := uint32(seed * hashkey[1])
        v3 := uint32(seed * hashkey[2])
        v4 := uint32(seed * hashkey[3])
        for s >= 16 {
            v1 ^= readUnaligned32(p)
            v1 = rotl_15(v1*m1) * m2
            p = add(p, 4)
            v2 ^= readUnaligned32(p)
            v2 = rotl_15(v2*m2) * m3
            p = add(p, 4)
            v3 ^= readUnaligned32(p)
            v3 = rotl_15(v3*m3) * m4
            p = add(p, 4)
            v4 ^= readUnaligned32(p)
            v4 = rotl_15(v4*m4) * m1
            p = add(p, 4)
            s -= 16
        }
        h = v1 ^ v2 ^ v3 ^ v4
        goto tail
    }
    h ^= h >> 17
    h *= m3
    h ^= h >> 13
    h *= m4
    h ^= h >> 16
    return uintptr(h)
}

// Note: in order to get the compiler to issue rotl instructions, we
// need to constant fold the shift amount by hand.
// TODO: convince the compiler to issue rotl instructions after inlining.
func rotl_15(x uint32) uint32 {
    return (x << 15) | (x >> (32 - 15))
}複製代碼

m一、m二、m三、m4 是4個隨機選的奇數,做爲哈希的乘法因子。

// used in hash{32,64}.go to seed the hash function
var hashkey [4]uintptr

func alginit() {
    // Install aes hash algorithm if we have the instructions we need
    if (GOARCH == "386" || GOARCH == "amd64") &&
        GOOS != "nacl" &&
        cpuid_ecx&(1<<25) != 0 && // aes (aesenc)
        cpuid_ecx&(1<<9) != 0 && // sse3 (pshufb)
        cpuid_ecx&(1<<19) != 0 { // sse4.1 (pinsr{d,q})
        useAeshash = true
        algarray[alg_MEM32].hash = aeshash32
        algarray[alg_MEM64].hash = aeshash64
        algarray[alg_STRING].hash = aeshashstr
        // Initialize with random data so hash collisions will be hard to engineer.
        getRandomData(aeskeysched[:])
        return
    }
    getRandomData((*[len(hashkey) * sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])
    hashkey[0] |= 1 // make sure these numbers are odd
    hashkey[1] |= 1
    hashkey[2] |= 1
    hashkey[3] |= 1
}複製代碼

在這個初始化的函數中,初始化了2個數組,數組裏面裝的都是隨機的 hashkey。在 38六、 amd6四、非 nacl 的平臺上,會用 aeshash 。這裏會把隨機的 key 生成好,存入到 aeskeysched 數組中。同理,hashkey 數組裏面也會隨機好4個數字。最後都按位與了一個1,就是爲了保證生成出來的隨機數都是奇數。

接下來舉個例子,來看看 memhash 到底是如何計算哈希值的。

func main() {
    r := [8]byte{'h', 'a', 'l', 'f', 'r', 'o', 's', 't'}
    pp := memhashpp(unsafe.Pointer(&r), 3, 7)
    fmt.Println(pp)
}複製代碼

爲了簡單起見,這裏用筆者的名字爲例算出哈希值,種子簡單一點設置成3。

第一步計算 h 的值。

h := uint32(seed + s*hashkey[0])複製代碼

這裏假設 hashkey[0] = 1,那麼 h 的值爲 3 + 7 * 1 = 10 。因爲 s < 8,那麼就會進行如下的處理:

case s <= 8:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2複製代碼

readUnaligned32()函數會把傳入的 unsafe.Pointer 指針進行2次轉換,先轉成 *uint32 類型,而後再轉成 *(*uint32) 類型。

接着進行異或操做:

接着第二步 h m1 = 1718378850 3168982561 = 3185867170

因爲是32位的乘法,最終結果是64位的,高32位溢出,直接捨棄。

乘出來的結果當作 rotl_15() 入參。

func rotl_15(x uint32) uint32 {
    return (x << 15) | (x >> (32 - 15))
}複製代碼

這個函數裏面對入參進行了兩次位移操做。

最後將兩次位移的結果進行邏輯或運算:

接着再進行一次 readUnaligned32() 轉換:

轉換完再進行一次異或。此時 h = 2615762644。

而後還要再進行一次 rotl_15() 變換。這裏就不畫圖演示了。變換完成之後 h = 2932930721。

最後執行 hash 的最後一步:

h ^= h >> 17
    h *= m3
    h ^= h >> 13
    h *= m4
    h ^= h >> 16複製代碼

先右移17位,而後異或,再乘以m3,再右移13位,再異或,再乘以m4,再右移16位,最後再異或。

經過這樣一系列的操做,最後就能生成出 hash 值了。最後 h = 1870717864。感興趣的同窗能夠算一算。

(5)AES Hash

在上面分析 Go 的 hash 算法的時候,咱們能夠看到它對 CPU 是否支持 AES 指令集進行了判斷,當 CPU 支持 AES 指令集的時候,它會選用 AES Hash 算法,當 CPU 不支持 AES 指令集的時候,換成 memhash 算法。

AES 指令集全稱是高級加密標準指令集(或稱英特爾高級加密標準新指令,簡稱AES-NI)是一個 x86指令集架構 的擴展,用於 IntelAMD微處理器

利用 AES 實現 Hash 算法性能會很優秀,由於它能提供硬件加速。

具體代碼實現以下,彙編程序,註釋見下面程序中:


// aes hash 算法經過 AES 硬件指令集實現
TEXT runtime·aeshash(SB),NOSPLIT,$0-32
    MOVQ    p+0(FP), AX    // 把ptr移動到data數據段中
    MOVQ    s+16(FP), CX    // 長度
    LEAQ    ret+24(FP), DX
    JMP    runtime·aeshashbody(SB)

TEXT runtime·aeshashstr(SB),NOSPLIT,$0-24
    MOVQ    p+0(FP), AX    // 把ptr移動到字符串的結構體中
    MOVQ    8(AX), CX    // 字符串長度
    MOVQ    (AX), AX    // 字符串的數據
    LEAQ    ret+16(FP), DX
    JMP    runtime·aeshashbody(SB)複製代碼

最終的 hash 的實現都在 aeshashbody 中:


// AX: 數據
// CX: 長度
// DX: 返回的地址
TEXT runtime·aeshashbody(SB),NOSPLIT,$0-0
    // SSE 寄存器中裝填入咱們的隨機數種子
    MOVQ    h+8(FP), X0            // 每一個table中hash種子有64 位
    PINSRW    $4, CX, X0            // 長度佔16位
    PSHUFHW $0, X0, X0            // 壓縮高位字亂序,重複長度4次
    MOVO    X0, X1                // 保存加密前的種子
    PXOR    runtime·aeskeysched(SB), X0    // 對每個處理中的種子進行邏輯異或
    AESENC    X0, X0                // 加密種子

    CMPQ    CX, $16
    JB    aes0to15
    JE    aes16
    CMPQ    CX, $32
    JBE    aes17to32
    CMPQ    CX, $64
    JBE    aes33to64
    CMPQ    CX, $128
    JBE    aes65to128
    JMP    aes129plus

// aes 從 0 - 15
aes0to15:
    TESTQ    CX, CX
    JE    aes0

    ADDQ    $16, AX
    TESTW    $0xff0, AX
    JE    endofpage

    //當前加載的16位字節的地址不會越過一個頁面邊界,因此咱們能夠直接加載它。
    MOVOU    -16(AX), X1
    ADDQ    CX, CX
    MOVQ    $masks<>(SB), AX
    PAND    (AX)(CX*8), X1
final1:
    PXOR    X0, X1    // 異或數據和種子
    AESENC    X1, X1    // 連續加密3次
    AESENC    X1, X1
    AESENC    X1, X1
    MOVQ    X1, (DX)
    RET

endofpage:
    // 地址結尾是1111xxxx。 這樣就可能超過一個頁面邊界,因此在加載完最後一個字節後中止加載。而後使用pshufb將字節向下移動。
    MOVOU    -32(AX)(CX*1), X1
    ADDQ    CX, CX
    MOVQ    $shifts<>(SB), AX
    PSHUFB    (AX)(CX*8), X1
    JMP    final1

aes0:
    // 返回輸入的而且已經加密過的種子
    AESENC    X0, X0
    MOVQ    X0, (DX)
    RET

aes16:
    MOVOU    (AX), X1
    JMP    final1

aes17to32:
    // 開始處理第二個起始種子
    PXOR    runtime·aeskeysched+16(SB), X1
    AESENC    X1, X1

    // 加載要被哈希算法處理的數據
    MOVOU    (AX), X2
    MOVOU    -16(AX)(CX*1), X3

    // 異或種子
    PXOR    X0, X2
    PXOR    X1, X3

    // 連續加密3次
    AESENC    X2, X2
    AESENC    X3, X3
    AESENC    X2, X2
    AESENC    X3, X3
    AESENC    X2, X2
    AESENC    X3, X3

    // 拼接並生成結果
    PXOR    X3, X2
    MOVQ    X2, (DX)
    RET

aes33to64:
    // 處理第三個以上的起始種子
    MOVO    X1, X2
    MOVO    X1, X3
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    AESENC    X1, X1
    AESENC    X2, X2
    AESENC    X3, X3

    MOVOU    (AX), X4
    MOVOU    16(AX), X5
    MOVOU    -32(AX)(CX*1), X6
    MOVOU    -16(AX)(CX*1), X7

    PXOR    X0, X4
    PXOR    X1, X5
    PXOR    X2, X6
    PXOR    X3, X7

    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    PXOR    X6, X4
    PXOR    X7, X5
    PXOR    X5, X4
    MOVQ    X4, (DX)
    RET

aes65to128:
    // 處理第七個以上的起始種子
    MOVO    X1, X2
    MOVO    X1, X3
    MOVO    X1, X4
    MOVO    X1, X5
    MOVO    X1, X6
    MOVO    X1, X7
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    PXOR    runtime·aeskeysched+64(SB), X4
    PXOR    runtime·aeskeysched+80(SB), X5
    PXOR    runtime·aeskeysched+96(SB), X6
    PXOR    runtime·aeskeysched+112(SB), X7
    AESENC    X1, X1
    AESENC    X2, X2
    AESENC    X3, X3
    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    // 加載數據
    MOVOU    (AX), X8
    MOVOU    16(AX), X9
    MOVOU    32(AX), X10
    MOVOU    48(AX), X11
    MOVOU    -64(AX)(CX*1), X12
    MOVOU    -48(AX)(CX*1), X13
    MOVOU    -32(AX)(CX*1), X14
    MOVOU    -16(AX)(CX*1), X15

    // 異或種子
    PXOR    X0, X8
    PXOR    X1, X9
    PXOR    X2, X10
    PXOR    X3, X11
    PXOR    X4, X12
    PXOR    X5, X13
    PXOR    X6, X14
    PXOR    X7, X15

    // 連續加密3次
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    // 拼裝結果
    PXOR    X12, X8
    PXOR    X13, X9
    PXOR    X14, X10
    PXOR    X15, X11
    PXOR    X10, X8
    PXOR    X11, X9
    PXOR    X9, X8
    MOVQ    X8, (DX)
    RET

aes129plus:
    // 處理第七個以上的起始種子
    MOVO    X1, X2
    MOVO    X1, X3
    MOVO    X1, X4
    MOVO    X1, X5
    MOVO    X1, X6
    MOVO    X1, X7
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    PXOR    runtime·aeskeysched+64(SB), X4
    PXOR    runtime·aeskeysched+80(SB), X5
    PXOR    runtime·aeskeysched+96(SB), X6
    PXOR    runtime·aeskeysched+112(SB), X7
    AESENC    X1, X1
    AESENC    X2, X2
    AESENC    X3, X3
    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    // 逆序開始,從最後的block開始處理,由於可能會出現重疊的狀況
    MOVOU    -128(AX)(CX*1), X8
    MOVOU    -112(AX)(CX*1), X9
    MOVOU    -96(AX)(CX*1), X10
    MOVOU    -80(AX)(CX*1), X11
    MOVOU    -64(AX)(CX*1), X12
    MOVOU    -48(AX)(CX*1), X13
    MOVOU    -32(AX)(CX*1), X14
    MOVOU    -16(AX)(CX*1), X15

    // 異或種子
    PXOR    X0, X8
    PXOR    X1, X9
    PXOR    X2, X10
    PXOR    X3, X11
    PXOR    X4, X12
    PXOR    X5, X13
    PXOR    X6, X14
    PXOR    X7, X15

    // 計算剩餘128字節塊的數量
    DECQ    CX
    SHRQ    $7, CX

aesloop:
    // 加密狀態
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    // 在同一個block塊中加密狀態,進行異或運算
    MOVOU    (AX), X0
    MOVOU    16(AX), X1
    MOVOU    32(AX), X2
    MOVOU    48(AX), X3
    AESENC    X0, X8
    AESENC    X1, X9
    AESENC    X2, X10
    AESENC    X3, X11
    MOVOU    64(AX), X4
    MOVOU    80(AX), X5
    MOVOU    96(AX), X6
    MOVOU    112(AX), X7
    AESENC    X4, X12
    AESENC    X5, X13
    AESENC    X6, X14
    AESENC    X7, X15

    ADDQ    $128, AX
    DECQ    CX
    JNE    aesloop

    // 最後一步,進行3次以上的加密
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    PXOR    X12, X8
    PXOR    X13, X9
    PXOR    X14, X10
    PXOR    X15, X11
    PXOR    X10, X8
    PXOR    X11, X9
    PXOR    X9, X8
    MOVQ    X8, (DX)
    RET複製代碼

2. 哈希衝突處理

(1)鏈表數組法

鏈表數組法比較簡單,每一個鍵值對錶長取模,若是結果相同,用鏈表的方式依次日後插入。

假設待插入的鍵值集合是{ 2,3,5,7,11,13,19},表長 MOD 8。假設哈希函數在[0,9)上均勻分佈。如上圖。

接下來重點進行性能分析:

查找鍵值 k,假設鍵值 k 不在哈希表中,h(k) 在 [0,M) 中均勻分佈,即 P(h(k) = i) = 1/M 。令 Xi 爲哈希表 ht[ i ] 中包含鍵值的個數。若是 h(k) = i ,則不成功查找 k 的鍵值比較次數是 Xi,因而:

成功查找的分析稍微複雜一點。要考慮添加哈希表的次序,不考慮有相同鍵值的狀況,假設 K = {k1,k2,……kn},而且假設從空哈希表開始按照這個次序添加到哈希表中。引入隨機變量,若是 h(ki) = h(kj),那麼 Xij = 1;若是 h(ki) != h(kj),那麼 Xij = 0 。

因爲以前的假設哈希表是均勻分佈的,因此 P(Xij = i) = E(Xij) = 1/M ,這裏的 E(X) 表示隨機變量 X 的數學指望。再假設每次添加鍵值的時候都是把添加在鏈表末端。令 Ci 爲查找 Ki 所需的鍵值比較次數,因爲不能事先肯定查找 Ki 的機率,因此假定查找不一樣鍵值的機率都是相同的,都是 1/n ,則有:

由此咱們能夠看出,哈希表的性能和表中元素的多少關係不大,而和填充因子 α 有關。若是哈希表長和哈希表中元素個數成正比,則哈希表查找的複雜度爲 O(1) 。

綜上所述,鏈表數組的成功與不成功的平均鍵值比較次數以下:

(2)開放地址法 —— 線性探測

線性探測的規則是 hi = ( h(k) + i ) MOD M。舉個例子,i = 1,M = 9。

這種處理衝突的方法,一旦發生衝突,就把位置日後加1,直到找到一個空的位置。

舉例以下,假設待插入的鍵值集合是{2,3,5,7,11,13,19},線性探測的發生衝突之後添加的值爲1,那麼最終結果以下:

線性探測哈希表的性能分析比較複雜,這裏就僅給出結果。

(3)開放地址法 —— 平方探測

線性探測的規則是 h0 = h(k) ,hi = ( h0 + i * i ) MOD M。

舉例以下,假設待插入的鍵值集合是{2,3,5,7,11,13,20},平方探測的發生衝突之後添加的值爲查找次數的平方,那麼最終結果以下:

平方探測在線性探測的基礎上,加了一個二次曲線。當發生衝突之後,再也不是加一個線性的參數,而是加上探測次數的平方。

平方探測有一個須要注意的是,M的大小有講究。若是M不是奇素數,那麼就可能出現下面這樣的問題,即便哈希表裏面還有空的位置,可是卻有元素找不到要插入的位置。

舉例,假設 M = 10,待插入的鍵值集合是{0,1,4,5,6,9,10},當前面6個鍵值插入哈希表中之後,10就再也沒法插入了。

因此在平方探測中,存在下面這則規律:

若是 M 爲奇素數,則下面的 ⌈M / 2⌉ 位置 h0,h1,h2 …… h⌊M/2⌋ 互不相同。其中,hi = (h0 + i * i ) MOD M。

這面這則規律能夠用反證法證實。假設 hi = hj,i > j;0<=i,j<= ⌊M/2⌋,則 h0 + i i = ( h0 + j j ) MOD M,從而 M 能夠整除 ( i + j )( i - j )。因爲 M 爲素數,而且 0 < i + j,i - j < M,當且僅當 i = j 的時候才能知足。

上述規則也就說明了一點,只要 M 爲奇素數,平方探測至少能夠遍歷哈希表通常的位置。因此只要哈希表的填充因子 α <= 1 / 2 ,平方探測總能找到可插入的位置。

上述舉的例子,之因此鍵值10沒法插入,緣由也由於 α > 1 / 2了,因此不能保證有可插入的位置了。

(4)開放地址法 —— 雙哈希探測

雙哈希探測是爲了解決彙集的現象。不管是線性探測仍是平方探測,若是 h(k1) 和 h(k2) 相鄰,則它們的探測序列也都是相鄰的,這就是所謂的彙集現象。爲了不這種現象,因此引入了雙哈希函數 h2,使得兩次探測之間的距離爲 h2(k)。因此探測序列爲 h0 = h1(k),hi = ( h0 + i * h2(k) ) MOD M 。實驗代表,雙哈希探測的性能相似於隨機探測。

關於雙哈希探測和平方探測的平均查找長度比線性探測更加困難。因此引入隨機探測的概念來近似這兩種探測。隨機探測是指探測序列 { hi } 在區間 [0,M]中等機率獨立隨機選取,這樣 P(hi = j) = 1/M 。

假設探測序列爲 h0,h1,……,hi。在哈希表的 hi 位置爲空,在 h0,h1,……,hi-1 的位置上哈希表不是空,這次查找的鍵值比較次數爲 i。令隨機變量 X 爲一次不成功查找所需的鍵值比較次數。因爲哈希表的填充因子爲 α,因此在一個位置上哈希表爲空值的機率爲 1 - α ,爲非空值的機率爲 α,因此 P( X = i ) = α^i * ( 1 - α ) 。

在機率論中,上述的分佈叫幾何分佈。

假定哈希表元素的添加順序爲 {k1,k2,…… ,kn},令 Xi 爲當哈希表只包含 {k1,k2,…… ,ki} 時候一次不成功查找的鍵值比較次數,注意,這個時候哈希表的填充因子爲 i/M ,則查找 k(i+1) 的鍵值次數爲 Yi = 1 + Xi。假定查找任意一個鍵值的機率爲 1/n,則一次成功查找的平均鍵值比較次數爲:

綜上所述,平方探測和雙哈希探測的成功與不成功的平均鍵值比較次數以下:

總的來講,在數據量很是大的狀況下,簡單的 hash 函數不可避免不產生碰撞,即便採用了合適的處理碰撞的方法,依舊有必定時間複雜度。因此想盡量的避免碰撞,仍是要選擇高性能的 hash 函數,或者增長 hash 的位數,好比64位,128位,256位,這樣碰撞的概率會小不少。

3. 哈希表的擴容策略

隨着哈希表裝載因子的變大,發生碰撞的次數變得越來也多,哈希表的性能變得愈來愈差。對於單獨鏈表法實現的哈希表,尚能夠容忍,可是對於開放尋址法,這種性能的降低是不能接受的,所以對於開放尋址法須要尋找一種方法解決這個問題。

在實際應用中,解決這個問題的辦法是動態的增大哈希表的長度,當裝載因子超過某個閾值時增長哈希表的長度,自動擴容。每當哈希表的長度發生變化以後,全部 key 在哈希表中對應的下標索引須要所有從新計算,不能直接從原來的哈希表中拷貝到新的哈希表中。必須一個一個計算原來哈希表中的 key 的哈希值並插入到新的哈希表中。這種方式確定是達不到生產環境的要求的,由於時間複雜度過高了,O(n),數據量一旦大了,性能就會不好。Redis 想了一種方法,就算是觸發增加時也只須要常數時間 O(1) 便可完成插入操做。解決辦法是分屢次、漸進式地完成的舊哈希表到新哈希表的拷貝而不是一次拷貝完成。

接下來以 Redis 爲例,來談談它是哈希表是如何進行擴容而且不太影響性能的。

Redis 對字典的定義以下:

/* * 字典 * * 每一個字典使用兩個哈希表,用於實現漸進式 rehash */
typedef struct dict {
    // 特定於類型的處理函數
    dictType *type;
    // 類型處理函數的私有數據
    void *privdata;
    // 哈希表(2 個)
    dictht ht[2];
    // 記錄 rehash 進度的標誌,值爲 -1 表示 rehash 未進行
    int rehashidx;
    // 當前正在運做的安全迭代器數量
    int iterators;
} dict;複製代碼

從定義上咱們能夠看到,Redis 字典保存了2個哈希表,哈希表ht[1]就是用來 rehash 的。

在 Redis 中定義了以下的哈希表的數據結構:

/* * 哈希表 */
typedef struct dictht {
    // 哈希表節點指針數組(俗稱桶,bucket)
    dictEntry **table;
    // 指針數組的大小
    unsigned long size;
    // 指針數組的長度掩碼,用於計算索引值
    unsigned long sizemask;
    // 哈希表現有的節點數量
    unsigned long used;

} dictht;複製代碼

table 屬性是個數組, 數組的每一個元素都是個指向 dictEntry 結構的指針。

每一個 dictEntry 都保存着一個鍵值對, 以及一個指向另外一個 dictEntry 結構的指針:

/* * 哈希表節點 */
typedef struct dictEntry {
    // 鍵
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
    } v;
    // 鏈日後繼節點
    struct dictEntry *next;

} dictEntry;複製代碼

next 屬性指向另外一個 dictEntry 結構, 多個 dictEntry 能夠經過 next 指針串連成鏈表, 從這裏能夠看出, dictht 使用鏈地址法來處理鍵碰撞問題的。

dictAdd 在每次向字典添加新鍵值對以前, 都會對哈希表 ht[0] 進行檢查, 對於 ht[0] 的 size 和 used 屬性, 若是它們之間的比率 ratio = used / size 知足如下任何一個條件的話,rehash 過程就會被激活:

天然 rehash : ratio >= 1 ,且變量 dict_can_resize 爲真。
強制 rehash : ratio 大於變量 dict_force_resize_ratio (目前版本中, dict_force_resize_ratio 的值爲 5 )。

假設當前的字典須要擴容 rehash,那麼 Redis 會先設置字典的 rehashidx 爲 0 ,標識着 rehash 的開始;再爲 ht[1]->table 分配空間,大小至少爲 ht[0]->used 的兩倍。

如上圖, ht[1]->table 已經分配空間了8個空間了。

接着,開始 rehash 。將 ht[0]->table 內的鍵值移動到 ht[1]->table 中,鍵值的移動不是一次完成的,分屢次進行。

上圖能夠看出來, ht[0] 中的一部分鍵值已經遷移到 ht[1] 中了,而且此時還有新的鍵值插入進來,是直接插入到 ht[1] 中的,不會再插入到 ht[0] 中了。保證了 ht[0] 只減不增。

在 rehash 進行的過程當中,不斷的有新的鍵值插入進來,也不斷的把 ht[0] 中的鍵值都遷移過來,直到 ht[0] 中的鍵值都遷移過來爲止。注意 Redis 用的是頭插法,新值永遠都插在鏈表的第一個位置,這樣也不用遍歷到鏈表的最後,省去了 O(n) 的時間複雜度。進行到上圖這種狀況,全部的節點也就遷移完畢了。

rehash 在結束以前會進行清理工做,釋放 ht[0] 的空間;用 ht[1] 來代替 ht[0] ,使原來的 ht[1] 成爲新的 ht[0] ;建立一個新的空哈希表,並將它設置爲 ht[1] ;將字典的 rehashidx 屬性設置爲 -1 ,標識 rehash 已中止;

最終 rehash 結束之後狀況如上圖。若是還下次還須要 rehash ,重複上述過程便可。這種分屢次,漸進式 rehash 的方式也成就了 Redis 的高性能。

值得一提的是,Redis 是支持字典的 reshrink 操做的。操做步驟就是
rehash 的逆過程。

二. 紅黑樹優化

讀到這裏,讀者應該已經明白了到底用什麼方式來控制 map 使得
Hash 碰撞的機率又小,哈希桶數組佔用空間又少了吧,答案就是選擇好的 Hash 算法和增長擴容機制。

Java 在 JDK1.8 對 HashMap 底層的實現再次進行了優化。

上圖是來自美團博客總結的。從這裏咱們能夠發現:

Java 底層初始桶的個數是16個,負載因子默認是0.75,也就是說當鍵值第一次達到12個的時候就會進行擴容 resize。擴容的臨界值在64,當超過了64之後,而且衝突節點爲8或者大於8,這個時候就會觸發紅黑樹轉換。爲了防止底層鏈表過長,鏈表就轉換爲紅黑樹。

換句話說,當桶的總個數沒有到64個的時候,即便鏈表長爲8,也不會進行紅黑樹轉換。

若是節點小於6個,紅黑樹又會從新退化成鏈表。

固然這裏之因此選擇用紅黑樹來進行優化,保證最壞狀況不會退化成
O(n),紅黑樹能保證最壞時間複雜度也爲 O(log n)。

在美團博客中也提到了,Java 在 JDK1.8 中還有一個值得學習的優化。Java 在 rehash 的鍵值節點遷移過程當中,不須要再次計算一次 hash 計算!

因爲使用了2次冪的擴展(指長度擴爲原來2倍),因此,元素的位置要麼是在原位置,要麼是在原位置再移動2次冪的位置。看下圖能夠明白這句話的意思,n 爲 table 的長度,圖(a)表示擴容前的 key1 和
key2 兩種 key 肯定索引位置的示例,圖(b)表示擴容後 key1 和
key2 兩種 key 肯定索引位置的示例,其中 hash1 是 key1 對應的哈希與高位運算結果。

元素在從新計算 hash 以後,由於 n 變爲2倍,那麼 n-1 的 mask 範圍在高位多1bit(紅色),所以新的 index 就會發生這樣的變化:

因此在擴容之後,就只須要看擴容容量之後那個位上的值爲0,仍是爲1,若是是0,表明索引不變,若是是1,表明的是新的索引值等於原來的索引值加上 oldCap 便可,這樣就不須要再次計算一次 hash 了。

上圖是把16擴容到32的狀況。

三. Go 中 Map 的具體實現舉例

讀到這裏,讀者對如何設計一個 Map 應該有一些本身的想法了。選擇一個優秀的哈希算法,用鏈表 + 數組 做爲底層數據結構,如何擴容和優化,這些應該都有了解了。讀到這裏也許讀者認爲本篇文章內容已通過半了,不過前面這些都是偏理論,接下來也許纔到了本文的重點部分 —— 從零開始分析一下完整的 Map 實現。

接下來筆者對 Go 中的 Map 的底層實現進行分析,也算是對一個 Map 的具體實現和重要的幾個操做,添加鍵值,刪除鍵值,擴容策略進行舉例。

Go 的 map 實如今 /src/runtime/hashmap.go 這個文件中。

map 底層實質仍是一個 hash table。

先來看看 Go 定義了一些常量。

const (
    // 一個桶裏面最多能夠裝的鍵值對的個數,8對。
    bucketCntBits = 3
    bucketCnt     = 1 << bucketCntBits

    // 觸發擴容操做的最大裝載因子的臨界值
    loadFactor = 6.5

    // 爲了保持內聯,鍵 和 值 的最大長度都是128字節,若是超過了128個字節,就存儲它的指針
    maxKeySize   = 128
    maxValueSize = 128

    // 數據偏移應該是 bmap 的整數倍,可是須要正確的對齊。
    dataOffset = unsafe.Offsetof(struct {
        b bmap
        v int64
    }{}.v)

    // tophash 的一些值
    empty          = 0 // 沒有鍵值對
    evacuatedEmpty = 1 // 沒有鍵值對,而且桶內的鍵值被遷移走了。
    evacuatedX     = 2 // 鍵值對有效,而且已經遷移了一個表的前半段
    evacuatedY     = 3 // 鍵值對有效,而且已經遷移了一個表的後半段
    minTopHash     = 4 // 最小的 tophash

    // 標記
    iterator     = 1 // 當前桶的迭代子
    oldIterator  = 2 // 舊桶的迭代子
    hashWriting  = 4 // 一個goroutine正在寫入map
    sameSizeGrow = 8 // 當前字典增加到新字典而且保持相同的大小

    // 迭代子檢查桶ID的哨兵
    noCheck = 1<<(8*sys.PtrSize) - 1
)複製代碼

這裏值得說明的一點是觸發擴容操做的臨界值6.5是怎麼得來的。這個值太大會致使overflow buckets過多,查找效率下降,太小會浪費存儲空間。

據 Google 開發人員稱,這個值是一個測試的程序,測量出來選擇的一個經驗值。

%overflow :
溢出率,平均一個 bucket 有多少個 鍵值kv 的時候會溢出。

bytes/entry :
平均存一個 鍵值kv 須要額外存儲多少字節的數據。

hitprobe :
查找一個存在的 key 平均查找次數。

missprobe :
查找一個不存在的 key 平均查找次數。

通過這幾組測試數據,最終選定 6.5 做爲臨界的裝載因子。

接着看看 Go 中 map header 的定義:

type hmap struct {
    count     int // map 的長度
    flags     uint8
    B         uint8  // log以2爲底,桶個數的對數 (總共能存 6.5 * 2^B 個元素)
    noverflow uint16 // 近似溢出桶的個數
    hash0     uint32 // 哈希種子

    buckets    unsafe.Pointer // 有 2^B 個桶的數組. count==0 的時候,這個數組爲 nil.
    oldbuckets unsafe.Pointer // 舊的桶數組一半的元素
    nevacuate  uintptr        // 擴容增加過程當中的計數器

    extra *mapextra // 可選字段
}複製代碼

在 Go 的 map header 結構中,也包含了2個指向桶數組的指針,buckets 指向新的桶數組,oldbuckets 指向舊的桶數組。這點和 Redis 字典中也有兩個 dictht 數組相似。

hmap 的最後一個字段是一個指向 mapextra 結構的指針,它的定義以下:

type mapextra struct {
    overflow [2]*[]*bmap
    nextOverflow *bmap
}複製代碼

若是一個鍵值對沒有找到對應的指針,那麼就會把它們先存到溢出桶
overflow 裏面。在 mapextra 中還有一個指向下一個可用的溢出桶的指針。

溢出桶 overflow 是一個數組,裏面存了2個指向 *bmap 數組的指針。overflow[0] 裏面裝的是 hmap.buckets 。overflow[1] 裏面裝的是 hmap.oldbuckets。

再看看桶的數據結構的定義,bmap 就是 Go 中 map 裏面桶對應的結構體類型。

type bmap struct {
    tophash [bucketCnt]uint8
}複製代碼

桶的定義比較簡單,裏面就只是包含了一個 uint8 類型的數組,裏面包含8個元素。這8個元素存儲的是 hash 值的高8位。

在 tophash 以後的內存佈局裏還有2塊內容。緊接着 tophash 以後的是8對 鍵值 key- value 對。而且排列方式是 8個 key 和 8個 value 放在一塊兒。

8對 鍵值 key- value 對結束之後緊接着一個 overflow 指針,指向下一個 bmap。今後也能夠看出 Go 中 map是用鏈表的方式處理 hash 衝突的。

爲什麼 Go 存儲鍵值對的方式不是普通的 key/value、key/value、key/value……這樣存儲的呢?它是鍵 key 都存儲在一塊兒,而後緊接着是 值value 都存儲在一塊兒,爲何會這樣呢?

在 Redis 中,當使用 REDIS_ENCODING_ZIPLIST 編碼哈希表時, 程序經過將鍵和值一同推入壓縮列表, 從而造成保存哈希表所需的鍵-值對結構,如上圖。新添加的 key-value 對會被添加到壓縮列表的表尾。

這種結構有一個弊端,若是存儲的鍵和值的類型不一樣,在內存中佈局中所佔字節不一樣的話,就須要對齊。好比說存儲一個 map[int64]int8 類型的字典。

Go 爲了節約內存對齊的內存消耗,因而把它設計成上圖所示那樣。

若是 map 裏面存儲了上萬億的大數據,這裏節約出來的內存空間仍是比較可觀的。

1. 新建 Map

makemap 新建了一個 Map,若是入參 h 不爲空,那麼 map 的 hmap 就是入參的這個 hmap,若是入參 bucket 不爲空,那麼這個 bucket 桶就做爲第一個桶。

func makemap(t *maptype, hint int64, h *hmap, bucket unsafe.Pointer) *hmap {
    // hmap 的 size 大小的值非法
    if sz := unsafe.Sizeof(hmap{}); sz > 48 || sz != t.hmap.size {
        println("runtime: sizeof(hmap) =", sz, ", t.hmap.size =", t.hmap.size)
        throw("bad hmap size")
    }

    // 超過範圍的 hint 值都爲0
    if hint < 0 || hint > int64(maxSliceCap(t.bucket.size)) {
        hint = 0
    }

    // key 值的類型不是 Go 所支持的
    if !ismapkey(t.key) {
        throw("runtime.makemap: unsupported map key type")
    }

    // 經過編譯器和反射檢車 key 值的 size 是否合法
    if t.key.size > maxKeySize && (!t.indirectkey || t.keysize != uint8(sys.PtrSize)) ||
        t.key.size <= maxKeySize && (t.indirectkey || t.keysize != uint8(t.key.size)) {
        throw("key size wrong")
    }
    // 經過編譯器和反射檢車 value 值的 size 是否合法
    if t.elem.size > maxValueSize && (!t.indirectvalue || t.valuesize != uint8(sys.PtrSize)) ||
        t.elem.size <= maxValueSize && (t.indirectvalue || t.valuesize != uint8(t.elem.size)) {
        throw("value size wrong")
    }

    // 雖然如下的變量咱們不依賴,並且能夠在編譯階段檢查下面這些值的合法性,
    // 可是咱們仍是在這裏檢測。

    // key 值對齊超過桶的個數
    if t.key.align > bucketCnt {
        throw("key align too big")
    }
    // value 值對齊超過桶的個數
    if t.elem.align > bucketCnt {
        throw("value align too big")
    }
    // key 值的 size 不是 key 值對齊的倍數
    if t.key.size%uintptr(t.key.align) != 0 {
        throw("key size not a multiple of key align")
    }
    // value 值的 size 不是 value 值對齊的倍數
    if t.elem.size%uintptr(t.elem.align) != 0 {
        throw("value size not a multiple of value align")
    }
    // 桶個數過小,沒法正確對齊
    if bucketCnt < 8 {
        throw("bucketsize too small for proper alignment")
    }
    // 數據偏移量不是 key 值對齊的整數倍,說明須要在桶中填充 key
    if dataOffset%uintptr(t.key.align) != 0 {
        throw("need padding in bucket (key)")
    }
    // 數據偏移量不是 value 值對齊的整數倍,說明須要在桶中填充 value
    if dataOffset%uintptr(t.elem.align) != 0 {
        throw("need padding in bucket (value)")
    }

    B := uint8(0)
    for ; overLoadFactor(hint, B); B++ {
    }

    // 分配內存並初始化哈希表
    // 若是此時 B = 0,那麼 hmap 中的 buckets 字段稍後分配
    // 若是 hint 值很大,初始化這塊內存須要一段時間。
    buckets := bucket
    var extra *mapextra
    if B != 0 {
        var nextOverflow *bmap
        // 初始化 bucket 和 nextOverflow 
        buckets, nextOverflow = makeBucketArray(t, B)
        if nextOverflow != nil {
            extra = new(mapextra)
            extra.nextOverflow = nextOverflow
        }
    }

    // 初始化 hmap
    if h == nil {
        h = (*hmap)(newobject(t.hmap))
    }
    h.count = 0
    h.B = B
    h.extra = extra
    h.flags = 0
    h.hash0 = fastrand()
    h.buckets = buckets
    h.oldbuckets = nil
    h.nevacuate = 0
    h.noverflow = 0

    return h
}複製代碼

新建一個 map 最重要的就是分配內存並初始化哈希表,在 B 不爲0的狀況下,還會初始化 mapextra 而且會 buckets 會被從新生成。

func makeBucketArray(t *maptype, b uint8) (buckets unsafe.Pointer, nextOverflow *bmap) {
    base := uintptr(1 << b)
    nbuckets := base
    if b >= 4 {
        nbuckets += 1 << (b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        // 若是申請 sz 大小的桶,系統只能返回 up 大小的內存空間,那麼桶的個數爲 up / t.bucket.size
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }
    buckets = newarray(t.bucket, int(nbuckets))
    // 當 b > 4 而且計算出來桶的個數與 1 << b 個數不等的時候,
    if base != nbuckets {
        // 此時 nbuckets 比 base 大,那麼會預先分配 nbuckets - base 個 nextOverflow 桶
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}複製代碼

這裏的 newarray 就已是 mallocgc 了。

從上述代碼裏面能夠看出,只有當 B >=4 的時候,makeBucketArray 纔會生成 nextOverflow 指針指向 bmap,從而在 Map 生成 hmap 的時候纔會生成 mapextra 。

當 B = 3 ( B < 4 ) 的時候,初始化 hmap 只會生成8個桶。

當 B = 4 ( B >= 4 ) 的時候,初始化 hmap 的時候還會額外生成 mapextra ,並初始化 nextOverflow。mapextra 的 nextOverflow 指針會指向第16個桶結束,第17個桶的首地址。第17個桶(從0開始,也就是下標爲16的桶)的 bucketsize - sys.PtrSize 地址開始存一個指針,這個指針指向當前整個桶的首地址。這個指針就是 bmap 的 overflow 指針。

當 B = 5 ( B >= 4 ) 的時候,初始化 hmap 的時候還會額外生成 mapextra ,並初始化 nextOverflow。這個時候就會生成總共34個桶了。同理,最後一個桶大小減去一個指針的大小的地址開始存儲一個 overflow 指針。

2. 查找 Key

在 Go 中,若是字典裏面查找一個不存在的 key ,查找不到並不會返回一個 nil ,而是返回當前類型的零值。好比,字符串就返回空字符串,int 類型就返回 0 。

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    if raceenabled && h != nil {
        // 獲取 caller 的 程序計數器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 獲取 mapaccess1 的程序計數器 program counter
        pc := funcPC(mapaccess1)
        racereadpc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled && h != nil {
        msanread(key, t.key.size) 
    }
    if h == nil || h.count == 0 {
        return unsafe.Pointer(&zeroVal[0])
    }
    // 若是多線程讀寫,直接拋出異常
    // 併發檢查 go hashmap 不支持併發訪問
    if h.flags&hashWriting != 0 {
        throw("concurrent map read and map write")
    }
    alg := t.key.alg
    // 計算 key 的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))
    m := uintptr(1)<<h.B - 1
    // hash % (1<<B - 1) 求出 key 在哪一個桶
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
    // 若是當前還存在 oldbuckets 桶
    if c := h.oldbuckets; c != nil {
        // 當前擴容不是等量擴容
        if !h.sameSizeGrow() {
            // 若是 oldbuckets 未遷移完成 則找找 oldbuckets 中對應的 bucket(低 B-1 位)
            // 不然爲 buckets 中的 bucket(低 B 位)
            // 把 mask 縮小 1 倍
            m >>= 1
        }

        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        if !evacuated(oldb) {
            // 若是 oldbuckets 桶存在,而且尚未擴容遷移,就在老的桶裏面查找 key 
            b = oldb
        }
    }
    // 取出 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    // 若是 top 小於 minTopHash,就讓它加上 minTopHash 的偏移。
    // 由於 0 - minTopHash 這區間的數都已經用來做爲標記位了
    if top < minTopHash {
        top += minTopHash
    }
    for {
        for i := uintptr(0); i < bucketCnt; i++ {
            // 若是 hash 的高8位和當前 key 記錄的不同,就找下一個
            // 這樣比較很高效,由於只用比較高8位,不用比較全部的 hash 值
            // 若是高8位都不相同,hash 值確定不一樣,可是高8位若是相同,那麼就要比較整個 hash 值了
            if b.tophash[i] != top {
                continue
            }
            // 取出 key 值的方式是用偏移量,bmap 首地址 + i 個 key 值大小的偏移量
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey {
                k = *((*unsafe.Pointer)(k))
            }
            // 比較 key 值是否相等
            if alg.equal(key, k) {
                // 若是找到了 key,那麼取出 value 值
                // 取出 value 值的方式是用偏移量,bmap 首地址 + 8 個 key 值大小的偏移量 + i 個 value 值大小的偏移量
                v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
                if t.indirectvalue {
                    v = *((*unsafe.Pointer)(v))
                }
                return v
            }
        }
        // 若是當前桶裏面沒有找到相應的 key ,那麼就去下一個桶去找
        b = b.overflow(t)
        // 若是 b == nil,說明桶已經都找完了,返回對應type的零值
        if b == nil {
            return unsafe.Pointer(&zeroVal[0])
        }
    }
}複製代碼

具體實現代碼如上,詳細解釋見代碼。

如上圖,這是一個查找 key 的全過程。

首先計算出 key 對應的 hash 值,hash 值對 B 取餘。

這裏有一個優化點。m % n 這步計算,若是 n 是2的倍數,那麼能夠省去這一步取餘操做。

m % n = m & ( n - 1 )複製代碼

這樣優化就能夠省去耗時的取餘操做了。這裏例子中計算完取出來是 0010 ,也就是2,因而對應的是桶數組裏面的第3個桶。爲何是第3個桶呢?首地址指向第0個桶,往下偏移2個桶的大小,因而偏移到了第3個桶的首地址了,具體實現能夠看上述代碼。

hash 的低 B 位決定了桶數組裏面的第幾個桶,hash 值的高8位決定了這個桶數組 bmap 裏面 key 存在 tophash 數組的第幾位了。如上圖,hash 的高8位用來和 tophash 數組裏面的每一個值進行對比,若是高8位和 tophash[i] 不等,就直接比下一個。若是相等,則取出 bmap 裏面對應完整的 key,再比較一次,看是否徹底一致。

整個查找過程優先在 oldbucket 裏面找(若是存在 lodbucket 的話),找完再去新 bmap 裏面找。

有人可能會有疑問,爲什麼這裏要加入 tophash 多一次比較呢?

tophash 的引入是爲了加速查找的。因爲它只存了 hash 值的高8位,比查找完整的64位要快不少。經過比較高8位,迅速找到高8位一致hash 值的索引,接下來再進行一次完整的比較,若是還一致,那麼就斷定找到該 key 了。

若是找到了 key 就返回對應的 value。若是沒有找到,還會繼續去 overflow 桶繼續尋找,直到找到最後一個桶,若是尚未找到就返回對應類型的零值。

3. 插入 Key

插入 key 的過程和查找 key 的過程大致一致。

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    if h == nil {
        panic(plainError("assignment to entry in nil map"))
    }
    if raceenabled {
        // 獲取 caller 的 程序計數器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 獲取 mapassign 的程序計數器 program counter
        pc := funcPC(mapassign)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled {
        msanread(key, t.key.size)
    }
    // 若是多線程讀寫,直接拋出異常
    // 併發檢查 go hashmap 不支持併發訪問
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }
    alg := t.key.alg
    // 計算 key 值的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))

    // 在計算完 hash 值之後當即設置 hashWriting 變量的值,若是在計算 hash 值的過程當中沒有徹底寫完,可能會致使 panic
    h.flags |= hashWriting

    // 若是 hmap 的桶的個數爲0,那麼就新建一個桶
    if h.buckets == nil {
        h.buckets = newarray(t.bucket, 1)
    }

again:
    // hash 值對 B 取餘,求得所在哪一個桶
    bucket := hash & (uintptr(1)<<h.B - 1)
    // 若是還在擴容中,繼續擴容
    if h.growing() {
        growWork(t, h, bucket)
    }
    // 根據 hash 值的低 B 位找到位於哪一個桶
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    // 計算 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }

    var inserti *uint8
    var insertk unsafe.Pointer
    var val unsafe.Pointer
    for {
        // 遍歷當前桶全部鍵值,查找 key 對應的 value
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                if b.tophash[i] == empty && inserti == nil {
                    // 若是日後找都沒有找到,這裏先記錄一個標記,方便找不到之後插入到這裏
                    inserti = &b.tophash[i]
                    // 計算出偏移 i 個 key 值的位置
                    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
                    // 計算出 val 所在的位置,當前桶的首地址 + 8個 key 值所佔的大小 + i 個 value 值所佔的大小
                    val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
                }
                continue
            }
            // 依次取出 key 值
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // 若是 key 值是一個指針,那麼就取出改指針對應的 key 值
            if t.indirectkey {
                k = *((*unsafe.Pointer)(k))
            }
            // 比較 key 值是否相等
            if !alg.equal(key, k) {
                continue
            }
            // 若是須要更新,那麼就把 t.key 拷貝從 k 拷貝到 key
            if t.needkeyupdate {
                typedmemmove(t.key, k, key)
            }
            // 計算出 val 所在的位置,當前桶的首地址 + 8個 key 值所佔的大小 + i 個 value 值所佔的大小
            val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
            goto done
        }
        ovf := b.overflow(t)
        if ovf == nil {
            break
        }
        b = ovf
    }

    // 沒有找到當前的 key 值,而且檢查最大負載因子,若是達到了最大負載因子,或者存在不少溢出的桶
    if !h.growing() && (overLoadFactor(int64(h.count), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        // 開始擴容
        hashGrow(t, h)
        goto again // Growing the table invalidates everything, so try again
    }
    // 若是找不到一個空的位置能夠插入 key 值
    if inserti == nil {
        // all current buckets are full, allocate a new one.
        // 意味着當前桶已經所有滿了,那麼就生成一個新的
        newb := h.newoverflow(t, b)
        inserti = &newb.tophash[0]
        insertk = add(unsafe.Pointer(newb), dataOffset)
        val = add(insertk, bucketCnt*uintptr(t.keysize))
    }

    // store new key/value at insert position
    if t.indirectkey {
        // 若是是存儲 key 值的指針,這裏就用 insertk 存儲 key 值的地址
        kmem := newobject(t.key)
        *(*unsafe.Pointer)(insertk) = kmem
        insertk = kmem
    }
    if t.indirectvalue {
        // 若是是存儲 value 值的指針,這裏就用 val 存儲 key 值的地址
        vmem := newobject(t.elem)
        *(*unsafe.Pointer)(val) = vmem
    }
    // 將 t.key 從 insertk 拷貝到 key 的位置
    typedmemmove(t.key, insertk, key)
    *inserti = top
    // hmap 中保存的總 key 值的數量 + 1
    h.count++

done:
    // 禁止併發寫
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
    if t.indirectvalue {
        // 若是 value 裏面存儲的是指針,那麼取值該指針指向的 value 值
        val = *((*unsafe.Pointer)(val))
    }
    return val
}複製代碼

插入 key 的過程當中和查找 key 有幾點不一樣,須要注意:

    1. 若是找到要插入的 key ,只須要直接更新對應的 value 值就行了。
    1. 若是沒有在 bmap 中沒有找到待插入的 key ,這麼這時分幾種狀況。
      狀況一: bmap 中還有空位,在遍歷 bmap 的時候預先標記空位,一旦查找結束也沒有找到 key,就把 key 放到預先遍歷時候標記的空位上。
      狀況二:bmap中已經沒有空位了。這個時候 bmap 裝的很滿了。此時須要檢查一次最大負載因子是否已經達到了。若是達到了,當即進行擴容操做。擴容之後在新桶裏面插入 key,流程和上述的一致。若是沒有達到最大負載因子,那麼就在新生成一個 bmap,並把前一個 bmap 的 overflow 指針指向新的 bmap。
    1. 在擴容過程當中,oldbucke t是被凍結的,查找 key 時會在
      oldbucket 中查找,但不會在 oldbucket 中插入數據。若是在
      oldbucket 是找到了相應的key,作法是將它遷移到新 bmap 後加入
      evalucated 標記。

其餘流程和查找 key 基本一致,這裏就再也不贅述了。

3. 刪除 Key

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
    if raceenabled && h != nil {
        // 獲取 caller 的 程序計數器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 獲取 mapdelete 的程序計數器 program counter
        pc := funcPC(mapdelete)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled && h != nil {
        msanread(key, t.key.size)
    }
    if h == nil || h.count == 0 {
        return
    }
    // 若是多線程讀寫,直接拋出異常
    // 併發檢查 go hashmap 不支持併發訪問
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }

    alg := t.key.alg
    // 計算 key 值的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))

    // 在計算完 hash 值之後當即設置 hashWriting 變量的值,若是在計算 hash 值的過程當中沒有徹底寫完,可能會致使 panic
    h.flags |= hashWriting

    bucket := hash & (uintptr(1)<<h.B - 1)
    // 若是還在擴容中,繼續擴容
    if h.growing() {
        growWork(t, h, bucket)
    }
    // 根據 hash 值的低 B 位找到位於哪一個桶
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    // 計算 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }
    for {
        // 遍歷當前桶全部鍵值,查找 key 對應的 value
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                continue
            }
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // 若是 k 是指向 key 的指針,那麼這裏須要取出 key 的值
            k2 := k
            if t.indirectkey {
                k2 = *((*unsafe.Pointer)(k2))
            }
            if !alg.equal(key, k2) {
                continue
            }
            // key 的指針置空
            if t.indirectkey {
                *(*unsafe.Pointer)(k) = nil
            } else {
            // 清除 key 的內存
                typedmemclr(t.key, k)
            }
            v := unsafe.Pointer(uintptr(unsafe.Pointer(b)) + dataOffset + bucketCnt*uintptr(t.keysize) + i*uintptr(t.valuesize))
            // value 的指針置空
            if t.indirectvalue {
                *(*unsafe.Pointer)(v) = nil
            } else {
            // 清除 value 的內存
                typedmemclr(t.elem, v)
            }
            // 清空 tophash 裏面的值
            b.tophash[i] = empty
            // map 裏面 key 的總個數減1
            h.count--
            goto done
        }
        // 若是沒有找到,那麼就繼續查找 overflow 桶,一直遍歷到最後一個
        b = b.overflow(t)
        if b == nil {
            goto done
        }
    }

done:
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
}複製代碼

刪除操做主要流程和查找 key 流程也差很少,找到對應的 key 之後,若是是指針指向原來的 key,就把指針置爲 nil。若是是值就清空它所在的內存。還要清理 tophash 裏面的值最後把 map 的 key 總個數計數器減1 。

若是在擴容過程當中,刪除操做會在擴容之後在新的 bmap 裏面刪除。

查找的過程依舊會一直遍歷到鏈表的最後一個 bmap 桶。

4. 增量翻倍擴容

這部分算是整個 Map 實現比較核心的部分了。咱們都知道 Map 在不斷的裝載 Key 值的時候,查找效率會變的愈來愈低,若是此時不進行擴容操做的話,哈希衝突使得鏈表變得愈來愈長,性能也就愈來愈差。擴容勢在必行。

可是擴容過程當中若是阻斷了 Key 值的寫入,在處理大數據的時候會致使有一段不響應的時間,若是用在高實時的系統中,那麼每次擴容都會卡幾秒,這段時間都不能相應任何請求。這種性能明顯是不能接受的。因此要既不影響寫入,也同時要進行擴容。這個時候就應該增量擴容了。

這裏增量擴容其實用途已經很普遍了,以前舉例的 Redis 就採用的增量擴容策略。

接下來看看 Go 是怎麼進行增量擴容的。

在 Go 的 mapassign 插入 Key 值、mapdelete 刪除 key 值的時候都會檢查當前是否在擴容中。

func growWork(t *maptype, h *hmap, bucket uintptr) {
    // 確保咱們遷移了全部 oldbucket
    evacuate(t, h, bucket&h.oldbucketmask())

    // 再遷移一個標記過的桶
    if h.growing() {
        evacuate(t, h, h.nevacuate)
    }
}複製代碼

從這裏咱們能夠看到,每次執行一次 growWork 會遷移2個桶。一個是當前的桶,這算是局部遷移,另一個是 hmap 裏面指向的 nevacuate 的桶,這算是增量遷移。

在插入 Key 值的時候,若是當前在擴容過程當中,oldbucket 是被凍結的,查找時會先在 oldbucket 中查找,但不會在oldbucket中插入數據。只有在 oldbucket 找到了相應的 key,那麼將它遷移到新 bucket 後加入 evalucated 標記。

在刪除 Key 值的時候,若是當前在擴容過程當中,優先查找 bucket,即新桶,找到一個之後把它對應的 Key、Value 都置空。若是 bucket 裏面找不到,纔會去 oldbucket 中去查找。

每次插入 Key 值的時候,都會判斷一下當前裝載因子是否超過了 6.5,若是達到了這個極限,就當即執行擴容操做 hashGrow。這是擴容以前的準備工做。

func hashGrow(t *maptype, h *hmap) {
    // 若是達到了最大裝載因子,就須要擴容。
    // 否則的話,一個桶後面鏈表跟着一大堆的 overflow 桶
    bigger := uint8(1)
    if !overLoadFactor(int64(h.count), h.B) {
        bigger = 0
        h.flags |= sameSizeGrow
    }
    // 把 hmap 的舊桶的指針指向當前桶
    oldbuckets := h.buckets
    // 生成新的擴容之後的桶,hmap 的 buckets 指針指向擴容之後的桶。
    newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger)

    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator != 0 {
        flags |= oldIterator
    }
    // B 加上新的值
    h.B += bigger
    h.flags = flags
    // 舊桶指針指向當前桶
    h.oldbuckets = oldbuckets
    // 新桶指針指向擴容之後的桶
    h.buckets = newbuckets
    h.nevacuate = 0
    h.noverflow = 0

    if h.extra != nil && h.extra.overflow[0] != nil {
        if h.extra.overflow[1] != nil {
            throw("overflow is not nil")
        }
        // 交換 overflow[0] 和 overflow[1] 的指向
        h.extra.overflow[1] = h.extra.overflow[0]
        h.extra.overflow[0] = nil
    }
    if nextOverflow != nil {
        if h.extra == nil {
            // 生成 mapextra
            h.extra = new(mapextra)
        }
        h.extra.nextOverflow = nextOverflow
    }

    // 實際拷貝鍵值對的過程在 evacuate() 中
}複製代碼

用圖表示出它的流程:

hashGrow 操做算是擴容以前的準備工做,實際拷貝的過程在 evacuate 中。

hashGrow 操做會先生成擴容之後的新的桶數組。新的桶數組的大小是以前的2倍。而後 hmap 的 buckets 會指向這個新的擴容之後的桶,而 oldbuckets 會指向當前的桶數組。

處理完 hmap 之後,再處理 mapextra,nextOverflow 的指向原來的 overflow 指針,overflow 指針置爲 null。

到此就作好擴容以前的準備工做了。

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
    b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
    // 在準備擴容以前桶的個數
    newbit := h.noldbuckets()
    alg := t.key.alg
    if !evacuated(b) {
        // TODO: reuse overflow buckets instead of using new ones, if there
        // is no iterator using the old buckets. (If !oldIterator.)

        var (
            x, y   *bmap          // 在新桶裏面 低位桶和高位桶
            xi, yi int            // key 和 value 值的索引值分別爲 xi , yi 
            xk, yk unsafe.Pointer // 指向 x 和 y 的 key 值的指針 
            xv, yv unsafe.Pointer // 指向 x 和 y 的 value 值的指針 
        )
        // 新桶中低位的一些桶
        x = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
        xi = 0
        // 擴容之後的新桶中低位的第一個 key 值
        xk = add(unsafe.Pointer(x), dataOffset)
        // 擴容之後的新桶中低位的第一個 key 值對應的 value 值
        xv = add(xk, bucketCnt*uintptr(t.keysize))
        // 若是不是等量擴容
        if !h.sameSizeGrow() {
            y = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
            yi = 0
            yk = add(unsafe.Pointer(y), dataOffset)
            yv = add(yk, bucketCnt*uintptr(t.keysize))
        }
        // 依次遍歷溢出桶
        for ; b != nil; b = b.overflow(t) {
            k := add(unsafe.Pointer(b), dataOffset)
            v := add(k, bucketCnt*uintptr(t.keysize))
            // 遍歷 key - value 鍵值對
            for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
                top := b.tophash[i]
                if top == empty {
                    b.tophash[i] = evacuatedEmpty
                    continue
                }
                if top < minTopHash {
                    throw("bad map state")
                }
                k2 := k
                // key 值若是是指針,則取出指針裏面的值
                if t.indirectkey {
                    k2 = *((*unsafe.Pointer)(k2))
                }
                useX := true
                if !h.sameSizeGrow() {
                    // 若是不是等量擴容,則須要從新計算 hash 值,無論是高位桶 x 中,仍是低位桶 y 中
                    hash := alg.hash(k2, uintptr(h.hash0))
                    if h.flags&iterator != 0 {
                        if !t.reflexivekey && !alg.equal(k2, k2) {
                            // 若是兩個 key 不相等,那麼他們倆極大可能舊的 hash 值也不相等。
                            // tophash 對要遷移的 key 值也是沒有多大意義的,因此咱們用低位的 tophash 輔助擴容,標記一些狀態。
                            // 爲下一個級 level 從新計算一些新的隨機的 hash 值。以致於這些 key 值在屢次擴容之後依舊能夠均勻分佈在全部桶中
                            // 判斷 top 的最低位是否爲1
                            if top&1 != 0 {
                                hash |= newbit
                            } else {
                                hash &^= newbit
                            }
                            top = uint8(hash >> (sys.PtrSize*8 - 8))
                            if top < minTopHash {
                                top += minTopHash
                            }
                        }
                    }
                    useX = hash&newbit == 0
                }
                if useX {
                    // 標記低位桶存在 tophash 中
                    b.tophash[i] = evacuatedX
                    // 若是 key 的索引值到了桶最後一個,就新建一個 overflow
                    if xi == bucketCnt {
                        newx := h.newoverflow(t, x)
                        x = newx
                        xi = 0
                        xk = add(unsafe.Pointer(x), dataOffset)
                        xv = add(xk, bucketCnt*uintptr(t.keysize))
                    }
                    // 把 hash 的高8位再次存在 tophash 中
                    x.tophash[xi] = top
                    if t.indirectkey {
                        // 若是是指針指向 key ,那麼拷貝指針指向
                        *(*unsafe.Pointer)(xk) = k2 // copy pointer
                    } else {
                        // 若是是指針指向 key ,那麼進行值拷貝
                        typedmemmove(t.key, xk, k) // copy value
                    }
                    // 同理拷貝 value
                    if t.indirectvalue {
                        *(*unsafe.Pointer)(xv) = *(*unsafe.Pointer)(v)
                    } else {
                        typedmemmove(t.elem, xv, v)
                    }
                    // 繼續遷移下一個
                    xi++
                    xk = add(xk, uintptr(t.keysize))
                    xv = add(xv, uintptr(t.valuesize))
                } else {
                    // 這裏是高位桶 y,遷移過程和上述低位桶 x 一致,下面就再也不贅述了
                    b.tophash[i] = evacuatedY
                    if yi == bucketCnt {
                        newy := h.newoverflow(t, y)
                        y = newy
                        yi = 0
                        yk = add(unsafe.Pointer(y), dataOffset)
                        yv = add(yk, bucketCnt*uintptr(t.keysize))
                    }
                    y.tophash[yi] = top
                    if t.indirectkey {
                        *(*unsafe.Pointer)(yk) = k2
                    } else {
                        typedmemmove(t.key, yk, k)
                    }
                    if t.indirectvalue {
                        *(*unsafe.Pointer)(yv) = *(*unsafe.Pointer)(v)
                    } else {
                        typedmemmove(t.elem, yv, v)
                    }
                    yi++
                    yk = add(yk, uintptr(t.keysize))
                    yv = add(yv, uintptr(t.valuesize))
                }
            }
        }
        // Unlink the overflow buckets & clear key/value to help GC.
        if h.flags&oldIterator == 0 {
            b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
            // Preserve b.tophash because the evacuation
            // state is maintained there.
            if t.bucket.kind&kindNoPointers == 0 {
                memclrHasPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
            } else {
                memclrNoHeapPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
            }
        }
    }

    // Advance evacuation mark
    if oldbucket == h.nevacuate {
        h.nevacuate = oldbucket + 1
        // Experiments suggest that 1024 is overkill by at least an order of magnitude.
        // Put it in there as a safeguard anyway, to ensure O(1) behavior.
        stop := h.nevacuate + 1024
        if stop > newbit {
            stop = newbit
        }
        for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
            h.nevacuate++
        }
        if h.nevacuate == newbit { // newbit == # of oldbuckets
            // Growing is all done. Free old main bucket array.
            h.oldbuckets = nil
            // Can discard old overflow buckets as well.
            // If they are still referenced by an iterator,
            // then the iterator holds a pointers to the slice.
            if h.extra != nil {
                h.extra.overflow[1] = nil
            }
            h.flags &^= sameSizeGrow
        }
    }
}複製代碼

上述函數就是遷移過程最核心的拷貝工做了。

整個遷移過程並不難。這裏須要說明的是 x ,y 表明的意義。因爲擴容之後,新的桶數組是原來桶數組的2倍。用 x 表明新的桶數組裏面低位的那一半,用 y 表明高位的那一半。其餘的變量就是一些標記了,遊標和標記 key - value 原來所在的位置。詳細的見代碼註釋。

上圖中表示了遷移開始以後的過程。能夠看到舊的桶數組裏面的桶在遷移到新的桶中,而且新的桶也在不斷的寫入新的 key 值。

一直拷貝鍵值對,直到舊桶中全部的鍵值都拷貝到了新的桶中。

最後一步就是釋放舊桶,oldbuckets 的指針置爲 null。到此,一次遷移過程就徹底結束了。

5. 等量擴容

嚴格意義上這種方式並不能算是擴容。可是函數名是 Grow,姑且暫時就這麼叫吧。

在 go1.8 的版本開始,添加了 sameSizeGrow,當 overflow buckets
的數量超過必定數量 (2^B) 但裝載因子又未達到 6.5 的時候,此時可能存在部分空的bucket,即 bucket 的使用率低,這時會觸發sameSizeGrow,即 B 不變,但走數據遷移流程,將 oldbuckets 的數據從新緊湊排列提升 bucket 的利用率。固然在 sameSizeGrow 過程當中,不會觸發 loadFactorGrow。

四. Map 實現中的一些優化

讀到這裏,相信讀者內心應該很清楚如何設計並實現一個 Map 了吧。包括 Map 中的各類操做的實現。在探究如何實現一個線程安全的 Map 以前,先把以前說到個一些亮點優化點,小結一下。

在 Redis 中,採用增量式擴容的方式處理哈希衝突。當平均查找長度超過 5 的時候就會觸發增量擴容操做,保證 hash 表的高性能。

同時 Redis 採用頭插法,保證插入 key 值時候的性能。

在 Java 中,當桶的個數超過了64個之後,而且衝突節點爲8或者大於8,這個時候就會觸發紅黑樹轉換。這樣能保證鏈表在很長的狀況下,查找長度依舊不會太長,而且紅黑樹保證最差狀況下也支持 O(log n) 的時間複雜度。

Java 在遷移以後有一個很是好的設計,只須要比較遷移以後桶個數的最高位是否爲0,若是是0,key 在新桶內的相對位置不變,若是是1,則加上桶的舊的桶的個數 oldCap 就能夠獲得新的位置。

在 Go 中優化的點比較多:

  1. 哈希算法選用高效的 memhash 算法 和 CPU AES指令集。AES 指令集充分利用 CPU 硬件特性,計算哈希值的效率超高。
  2. key - value 的排列設計成 key 放在一塊兒,value 放在一塊兒,而不是key,value成對排列。這樣方便內存對齊,數據量大了之後節約內存對齊形成的一些浪費。
  3. key,value 的內存大小超過128字節之後自動轉成存儲一個指針。
  4. tophash 數組的設計加速了 key 的查找過程。tophash 也被複用,用來標記擴容操做時候的狀態。
  5. 用位運算轉換求餘操做,m % n ,當 n = 1 << B 的時候,能夠轉換成 m & (1<<B - 1) 。
  6. 增量式擴容。
  7. 等量擴容,緊湊操做。
  8. Go 1.9 版本之後,Map 原生就已經支持線程安全。(在下一章中重點討論這個問題)

固然 Go 中還有一些須要再優化的地方:

  1. 在遷移的過程當中,當前版本不會重用 overflow 桶,而是直接從新申請一個新的桶。這裏能夠優化成優先重用沒有指針指向的 overflow 桶,當沒有可用的了,再去申請一個新的。這一點做者已經寫在了 TODO 裏面了。
  2. 動態合併多個 empty 的桶。
  3. 當前版本中沒有 shrink 操做,Map 只能增加而不能收縮。這塊 Redis 有相關的實現。

(鑑於單篇文章的長度,線程安所有分所有放到下篇去講,稍後更新下篇)


Reference:
《算法與數據結構》
《Redis 設計與實現》
xxHash
字符串hash函數
General Purpose Hash Function Algorithms
Java 8系列之從新認識HashMap

GitHub Repo:Halfrost-Field

Follow: halfrost · GitHub

Source: halfrost.com/go_map/

相關文章
相關標籤/搜索