深刻剖析 Java 7 中的 HashMap 和 ConcurrentHashMap

本文將深刻剖析 Java7 中的 HashMap 和 ConcurrentHashMap 的源碼,解析 HashMap 線程不安全的原理以及解決方案,最後以測試用例加以驗證。java

1 Java7 HashMap

HashMap 的數據結構:node

從上圖中能夠看出,HashMap 底層就是一個數組結構,數組中的每一項又是一個鏈表。linux

經過查看 JDK 中的 HashMap 源碼,能夠看到其構造函數有一行代碼:數組

public HashMap(int initialCapacity, float loadFactor) {
    ...
    table = new Entry[capacity];
    ...
}

即建立了一個大小爲 capacity 的 Entry 數組,而 Entry 的結構以下:安全

static class Entry<K,V> implements Map.Entry<K,V> {
    final K key;
    V value;
    Entry<K,V> next;
    final int hash;
    ……
}

能夠看到,Entry 是一個 static class,其中包含了 key 和 value ,也就是鍵值對,另外還包含了一個 next 的 Entry 指針。數據結構

  • capacity:當前數組容量,始終保持 2^n,能夠擴容,擴容後數組大小爲當前的 2 倍。默認初始容量爲 16。
  • loadFactor:負載因子,默認爲 0.75。
  • threshold:擴容的閾值,等於 capacity * loadFactor

1.1 put過程分析

public V put(K key, V value) {
    // 當插入第一個元素的時候,須要先初始化數組大小
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    // 若是 key 爲 null,則這個 entry 放到 table[0] 中
    if (key == null)
        return putForNullKey(value);
    // key 的 hash 值
    int hash = hash(key);
    // 找到對應的數組下標
    int i = indexFor(hash, table.length);
    // 遍歷一下對應下標處的鏈表,看是否有重複的 key 已經存在,
    // 若是有,直接覆蓋,put 方法返回舊值就結束了
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
 
    modCount++;
    // 不存在重複的 key,將此 entry 添加到鏈表中
    addEntry(hash, key, value, i);
    return null;
}

這裏對一些方法作深刻解析。多線程

  • 數組初始化
private void inflateTable(int toSize) {
    // 保證數組大小必定是 2^n
    int capacity = roundUpToPowerOf2(toSize);
    // 計算擴容閾值
    threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
    // 初始化數組
    table = new Entry[capacity];
    initHashSeedAsNeeded(capacity);
}
  • 找到對應的數組下標
static int indexFor(int hash, int length) {
    // 做用等價於取模運算,但這種方式效率更高
    return hash & (length-1);
}

由於HashMap的底層數組長度老是 2^n,當 length 爲 2 的 n 次方時,hash & (length-1) 就至關於對length取模,並且速度比直接取模要快的多。併發

  • 添加節點到鏈表中
void addEntry(int hash, K key, V value, int bucketIndex) {
    // 若是當前 HashMap 大小已經達到了閾值,而且新值要插入的數組位置已經有元素了,那麼要擴容
    if ((size >= threshold) && (null != table[bucketIndex])) {
        // 擴容
        resize(2 * table.length);
        // 從新計算 hash 值
        hash = (null != key) ? hash(key) : 0;
        // 計算擴容後的新的下標
        bucketIndex = indexFor(hash, table.length);
    }
    createEntry(hash, key, value, bucketIndex);
}
// 永遠都是在鏈表的表頭添加新元素
void createEntry(int hash, K key, V value, int bucketIndex) {
    // 獲取指定 bucketIndex 索引處的 Entry
    Entry<K,V> e = table[bucketIndex];
    // 將新建立的 Entry 放入 bucketIndex 索引處,並讓新的 Entry 指向原來的 Entry
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

當系統決定存儲 HashMap 中的 key-value 對時,徹底沒有考慮 Entry 中的 value,僅僅只是 根據 key 來計算並決定每一個 Entry 的存儲位置 。咱們徹底能夠把 Map 集合中的 value 當成 key 的附屬,當系統決定了 key 的存儲位置以後,value 隨之保存在那裏便可。app

  • 數組擴容

隨着 HashMap 中元素的數量愈來愈多,發生碰撞的機率將愈來愈大,所產生的子鏈長度就會愈來愈長,這樣勢必會影響 HashMap 的存取速度。爲了保證 HashMap 的效率,系統必需要在某個臨界點進行擴容處理,該臨界點 threshold。而在 HashMap 數組擴容以後,最消耗性能的點就出現了:原數組中的數據必須從新計算其在新數組中的位置,並放進去,這就是 resize。ssh

void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 若 oldCapacity 已達到最大值,直接將 threshold 設爲 Integer.MAX_VALUE
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return; // 直接返回
    }
    // 不然,建立一個更大的數組
    Entry[] newTable = new Entry[newCapacity];
    //將每條Entry從新哈希到新的數組中
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    table = newTable;
    // 從新設定 threshold
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

1.2 get過程分析

public V get(Object key) {
    // key 爲 null 的話,會被放到 table[0],因此只要遍歷下 table[0] 處的鏈表就能夠了
    if (key == null)
        return getForNullKey();
    // key 非 null 的狀況,詳見下文
    Entry<K,V> entry = getEntry(key);
 
    return null == entry ? null : entry.getValue();
}
final Entry<K,V> getEntry(Object key) {
    // The number of key-value mappings contained in this map.
    if (size == 0) {
        return null;
    }
 
    // 根據該 key 的 hashCode 值計算它的 hash 碼 
    int hash = (key == null) ? 0 : hash(key);
    // 肯定數組下標,而後從頭開始遍歷鏈表,直到找到爲止
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        //若搜索的key與查找的key相同,則返回相對應的value
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}

2 Java7 ConcurrentHashMap

ConcurrentHashMap 的成員變量中,包含了一個 Segment 數組 final Segment<K,V>[] segments;,而 Segment 是ConcurrentHashMap 的內部類。

而後在 Segment 這個類中,包含了一個 HashEntry 的數組transient volatile HashEntry<K,V>[] table,而 HashEntry 也是 ConcurrentHashMap 的內部類。

HashEntry 中,包含了 key 和 value 以及 next 指針(相似於 HashMap 中的 Entry),因此 HashEntry 能夠構成一個鏈表。

2.1 成員變量及構造函數

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable { 
    ...
    //初始的容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //初始的加載因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //初始的併發等級,表示當前更新線程的估計數
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //最小的segment數量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //最大的segment數量
    static final int MAX_SEGMENTS = 1 << 16; 
    //
    static final int RETRIES_BEFORE_LOCK = 2;
    // segments 的掩碼值, key 的散列碼的高位用來選擇具體的 segment
    final int segmentMask; 
    // 偏移量
    final int segmentShift; 
    final Segment<K,V>[] segments; 
    ...
    // 建立一個帶有指定初始容量、加載因子和併發級別的新的空映射
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // 尋找最佳匹配參數(不小於給定參數的最接近的 2^n)
        int sshift = 0; // 用來記錄向左按位移動的次數
        int ssize = 1; // 用來記錄Segment數組的大小
        // 計算並行級別 ssize,由於要保持並行級別是 2^n
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 若爲默認值,concurrencyLevel 爲 16,sshift 爲 4
        // 那麼計算出 segmentShift 爲 28,segmentMask 爲 15
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 記錄每一個 Segment 上要放置多少個元素
        int c = initialCapacity / ssize;
        // 假若有餘數,則Segment數量加1
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

當用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:

  • Segment 數組長度爲 16,不能夠擴容
  • Segment[i] 的默認大小爲 2,負載因子是 0.75,得出初始閾值爲 1.5,也就是之後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  • 這裏初始化了 segment[0],其餘位置仍是 null,至於爲何要初始化 segment[0],後面的代碼會介紹
  • 當前 segmentShift 的值爲 32 – 4 = 28,segmentMask 爲 16 – 1 = 15,姑且把它們簡單翻譯爲移位數和掩碼,這兩個值立刻就會用到

2.2 put過程分析

根據 hash 值很快就能找到相應的 Segment,以後就是 Segment 內部的 put 操做。

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // 根據 hash 值找到 Segment 數組中的位置 j
    // hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位,
    // 而後和 segmentMask(15) 作一次與操做,也就是說 j 是 hash 值的最後 4 位,也就是槽的數組下標
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說了,初始化的時候初始化了 segment[0],可是其餘位置仍是 null,
    // ensureSegment(j) 對 segment[j] 進行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

Segment 內部是由 數組+鏈表 組成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先獲取該 segment 的獨佔鎖
    // 每個Segment進行put時,都會加鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 內部的數組
        HashEntry<K,V>[] tab = table;
        // 利用 hash 值,求應該放置的數組下標
        int index = (tab.length - 1) & hash;
        // 數組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            // 若是鏈頭不爲 null
            if (e != null) {
                K k;
                //若是在該鏈中找到相同的key,則用新值替換舊值,並退出循環
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                //若是沒有和key相同的,一直遍歷到鏈尾,鏈尾的next爲null,進入到else
                e = e.next;
            }
            else {
                // node 究竟是不是 null,這個要看獲取鎖的過程,不過和這裏都沒有關係。
                // 若是不爲 null,那就直接將它設置爲鏈表表頭;若是是null,初始化並設置爲鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 若是超過了該 segment 的閾值,這個 segment 須要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // 沒有達到閾值,將 node 放到數組 tab 的 index 位置,
                    // 其實就是將新的節點設置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}

2.3 初始化Segment

ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其餘槽來講,在插入第一個值的時候進行初始化。

這裏須要考慮併發,由於極可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就能夠。

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 這裏看到爲何以前要初始化 segment[0] 了,
        // 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k]
        // 爲何要用「當前」,由於 segment[0] 可能早就擴容過了
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        // 初始化 segment[k] 內部的數組
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck Segment[k] 是否被其它線程初始化了
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 循環,內部用 CAS,當前線程成功設值或其餘線程成功設值後,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

2.4 get過程分析

比較簡單,先找到 Segment 數組的位置,而後找到 HashEntry 數組的位置,最後順着鏈表查找便可。

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

3 線程不安全

3.1 哈希碰撞

多個線程同時使用 put() 方法添加元素,若存在兩個或多個 put() 的 key 發生了碰撞,那麼有可能其中一個線程的數據被覆蓋。

3.2 擴容

當數據要插入 HashMap 時,都會檢查容量有沒有超過設定的 thredhold,若是超過,則須要擴容。而多線程會致使擴容後的鏈表造成環形數據結構,一旦造成環形數據結構,Entry 的 next 的節點永遠不爲 null,就會在獲取 Entry 時產生死循環。

例子可見文章《HashMap多線程死循環問題》。

不過要注意,其使用的 Java 版本既不是 7,也不是 8。在 Java7 中方法 addEntry() 添加節點到鏈表中是先擴容後再添加,而例子中的源碼是:

void addEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    // 先添加節點
    table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
    
    // 而後擴容
    if (size++ >= threshold)
        resize(2 * table.length);
}

因此在 Java7 中此例子無效。而在 Java8 中,經過確保建新鏈與舊鏈的順序是相同的,便可避免產生死循環。

4 HashMap遍歷方式

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class HashMapTest {
    private final static Map<Integer, Object> map = new HashMap<Integer, Object>(10000);
    private static final Object PRESENT = new Object();
    public static void main(String args[]) {
        long startTime;
        long endTime;
        long totalTime;
        for (int i = 0; i < 7500; i++) {
            map.put(i, PRESENT);
        }
        // 方法一
        startTime = System.nanoTime();
        Iterator iter1 = map.entrySet().iterator();
        while (iter1.hasNext()) {
            Map.Entry<Integer, Object> entry = (Map.Entry) iter1.next();
            Integer key = entry.getKey();
            Object val = entry.getValue();
        }
        endTime = System.nanoTime();
        totalTime = endTime - startTime;
        System.out.println("methor1 pays " + totalTime + " ms");
        
        // 方法二
        startTime = System.nanoTime();
        Iterator iter2 = map.keySet().iterator();
        while (iter2.hasNext()) {
            Object key = iter2.next();
            Object val = map.get(key);
        }
        endTime = System.nanoTime();
        totalTime = endTime - startTime;
        System.out.println("methor2 pays " + totalTime + " ms");
    }
}

運行結果:

5 性能對比

線程安全的使用 HashMap 有三種方式,分別爲 Hashtable、SynchronizedMap()、ConcurrentHashMap。

Hashtable

使用 synchronized 來保證線程安全,幾乎全部的 public 的方法都是 synchronized 的,而有些方法也是在內部經過 synchronized 代碼塊來實現。

synchronizedMap()

經過建立一個線程安全的 Map 對象,並把它做爲一個封裝的對象來返回。

ConcurrentHashMap

支持多線程對 Map 作讀操做,而且不須要任何的 blocking 。這得益於 CHM 將 Map 分割成了不一樣的部分,在執行更新操做時只鎖住一部分。根據默認的併發級別, Map 被分割成 16 個部分,而且由不一樣的鎖控制。這意味着,同時最多能夠有 16個 寫線程操做 Map 。試想一下,由只能一個線程進入變成同時可由 16 個寫線程同時進入(讀線程幾乎不受限制),性能的提高是顯而易見的。但因爲一些更新操做,如 put(), remove(), putAll(), clear()只鎖住操做的部分,因此在檢索操做不能保證返回的是最新的結果。

在迭代遍歷 CHM 時, keySet 返回的 iterator 是弱一致和 fail-safe 的,可能不會返回某些最近的改變,而且在遍歷過程當中,若是已經遍歷的數組上的內容變化了,不會拋出 ConcurrentModificationExceptoin 的異常。

何時使用 ConcurrentHashMap ?

CHM 適用於讀者數量超過寫者時,當寫者數量大於等於讀者時,CHM 的性能是低於 Hashtable 和 synchronizedMap 的。這是由於當鎖住了整個 Map 時,讀操做要等待對同一部分執行寫操做的線程結束。

CHM 適用於作 cache ,在程序啓動時初始化,以後能夠被多個請求線程訪問。

CHM 是Hashtable一個很好的替代,但要記住, CHM 的比 Hashrable 的同步性稍弱。

6 拓展:Java8 HashMap & ConcurrentHashMap

Java8 對 HashMap 和 ConcurrentHashMap 作了一些修改:

  • 兩者均利用了紅黑樹,因此其數據結構由 數組 + 鏈表 + 紅黑樹 組成。咱們知道,鏈表上的數據須要一個一個比較下去才能找到咱們須要的,時間複雜度取決於鏈表的長度,爲 O(n)。爲了下降這一部分的開銷,在 Java8 中,當鏈表中的元素超過了 8 個之後,會將鏈表轉換爲紅黑樹,這個時候時間複雜度就降爲了 O(logN)
  • Java8 中 ConcurrentHashMap 摒棄 Java7 中的 Segment 的概念,使用了另外一種方式實現保證線程安全。

Linux公社的RSS地址: https://www.linuxidc.com/rssFeed.aspx

本文永久更新連接地址: https://www.linuxidc.com/Linux/2018-09/154133.htm

相關文章
相關標籤/搜索