本文將深刻剖析 Java7 中的 HashMap 和 ConcurrentHashMap 的源碼,解析 HashMap 線程不安全的原理以及解決方案,最後以測試用例加以驗證。java
HashMap 的數據結構:node
從上圖中能夠看出,HashMap 底層就是一個數組結構,數組中的每一項又是一個鏈表。linux
經過查看 JDK 中的 HashMap 源碼,能夠看到其構造函數有一行代碼:數組
public HashMap(int initialCapacity, float loadFactor) { ... table = new Entry[capacity]; ... }
即建立了一個大小爲 capacity 的 Entry 數組,而 Entry 的結構以下:安全
static class Entry<K,V> implements Map.Entry<K,V> { final K key; V value; Entry<K,V> next; final int hash; …… }
能夠看到,Entry 是一個 static class,其中包含了 key 和 value ,也就是鍵值對,另外還包含了一個 next 的 Entry 指針。數據結構
public V put(K key, V value) { // 當插入第一個元素的時候,須要先初始化數組大小 if (table == EMPTY_TABLE) { inflateTable(threshold); } // 若是 key 爲 null,則這個 entry 放到 table[0] 中 if (key == null) return putForNullKey(value); // key 的 hash 值 int hash = hash(key); // 找到對應的數組下標 int i = indexFor(hash, table.length); // 遍歷一下對應下標處的鏈表,看是否有重複的 key 已經存在, // 若是有,直接覆蓋,put 方法返回舊值就結束了 for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; // 不存在重複的 key,將此 entry 添加到鏈表中 addEntry(hash, key, value, i); return null; }
這裏對一些方法作深刻解析。多線程
private void inflateTable(int toSize) { // 保證數組大小必定是 2^n int capacity = roundUpToPowerOf2(toSize); // 計算擴容閾值 threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1); // 初始化數組 table = new Entry[capacity]; initHashSeedAsNeeded(capacity); }
static int indexFor(int hash, int length) { // 做用等價於取模運算,但這種方式效率更高 return hash & (length-1); }
由於HashMap的底層數組長度老是 2^n,當 length 爲 2 的 n 次方時,hash & (length-1) 就至關於對length取模,並且速度比直接取模要快的多。併發
void addEntry(int hash, K key, V value, int bucketIndex) { // 若是當前 HashMap 大小已經達到了閾值,而且新值要插入的數組位置已經有元素了,那麼要擴容 if ((size >= threshold) && (null != table[bucketIndex])) { // 擴容 resize(2 * table.length); // 從新計算 hash 值 hash = (null != key) ? hash(key) : 0; // 計算擴容後的新的下標 bucketIndex = indexFor(hash, table.length); } createEntry(hash, key, value, bucketIndex); } // 永遠都是在鏈表的表頭添加新元素 void createEntry(int hash, K key, V value, int bucketIndex) { // 獲取指定 bucketIndex 索引處的 Entry Entry<K,V> e = table[bucketIndex]; // 將新建立的 Entry 放入 bucketIndex 索引處,並讓新的 Entry 指向原來的 Entry table[bucketIndex] = new Entry<>(hash, key, value, e); size++; }
當系統決定存儲 HashMap 中的 key-value 對時,徹底沒有考慮 Entry 中的 value,僅僅只是 根據 key 來計算並決定每一個 Entry 的存儲位置 。咱們徹底能夠把 Map 集合中的 value 當成 key 的附屬,當系統決定了 key 的存儲位置以後,value 隨之保存在那裏便可。app
隨着 HashMap 中元素的數量愈來愈多,發生碰撞的機率將愈來愈大,所產生的子鏈長度就會愈來愈長,這樣勢必會影響 HashMap 的存取速度。爲了保證 HashMap 的效率,系統必需要在某個臨界點進行擴容處理,該臨界點 threshold。而在 HashMap 數組擴容以後,最消耗性能的點就出現了:原數組中的數據必須從新計算其在新數組中的位置,並放進去,這就是 resize。ssh
void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; // 若 oldCapacity 已達到最大值,直接將 threshold 設爲 Integer.MAX_VALUE if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; // 直接返回 } // 不然,建立一個更大的數組 Entry[] newTable = new Entry[newCapacity]; //將每條Entry從新哈希到新的數組中 transfer(newTable, initHashSeedAsNeeded(newCapacity)); table = newTable; // 從新設定 threshold threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1); }
public V get(Object key) { // key 爲 null 的話,會被放到 table[0],因此只要遍歷下 table[0] 處的鏈表就能夠了 if (key == null) return getForNullKey(); // key 非 null 的狀況,詳見下文 Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue(); } final Entry<K,V> getEntry(Object key) { // The number of key-value mappings contained in this map. if (size == 0) { return null; } // 根據該 key 的 hashCode 值計算它的 hash 碼 int hash = (key == null) ? 0 : hash(key); // 肯定數組下標,而後從頭開始遍歷鏈表,直到找到爲止 for (Entry<K,V> e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; //若搜索的key與查找的key相同,則返回相對應的value if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } return null; }
ConcurrentHashMap 的成員變量中,包含了一個 Segment 數組 final Segment<K,V>[] segments;,而 Segment 是ConcurrentHashMap 的內部類。
而後在 Segment 這個類中,包含了一個 HashEntry 的數組transient volatile HashEntry<K,V>[] table,而 HashEntry 也是 ConcurrentHashMap 的內部類。
HashEntry 中,包含了 key 和 value 以及 next 指針(相似於 HashMap 中的 Entry),因此 HashEntry 能夠構成一個鏈表。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { ... //初始的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //初始的加載因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //初始的併發等級,表示當前更新線程的估計數 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //最小的segment數量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大的segment數量 static final int MAX_SEGMENTS = 1 << 16; // static final int RETRIES_BEFORE_LOCK = 2; // segments 的掩碼值, key 的散列碼的高位用來選擇具體的 segment final int segmentMask; // 偏移量 final int segmentShift; final Segment<K,V>[] segments; ... // 建立一個帶有指定初始容量、加載因子和併發級別的新的空映射 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 尋找最佳匹配參數(不小於給定參數的最接近的 2^n) int sshift = 0; // 用來記錄向左按位移動的次數 int ssize = 1; // 用來記錄Segment數組的大小 // 計算並行級別 ssize,由於要保持並行級別是 2^n while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 若爲默認值,concurrencyLevel 爲 16,sshift 爲 4 // 那麼計算出 segmentShift 爲 28,segmentMask 爲 15 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 記錄每一個 Segment 上要放置多少個元素 int c = initialCapacity / ssize; // 假若有餘數,則Segment數量加1 if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
當用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:
根據 hash 值很快就能找到相應的 Segment,以後就是 Segment 內部的 put 操做。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); // 根據 hash 值找到 Segment 數組中的位置 j // hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位, // 而後和 segmentMask(15) 作一次與操做,也就是說 j 是 hash 值的最後 4 位,也就是槽的數組下標 int j = (hash >>> segmentShift) & segmentMask; // 剛剛說了,初始化的時候初始化了 segment[0],可是其餘位置仍是 null, // ensureSegment(j) 對 segment[j] 進行初始化 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); // 插入新值到 槽 s 中 return s.put(key, hash, value, false); }
Segment 內部是由 數組+鏈表 組成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 先獲取該 segment 的獨佔鎖 // 每個Segment進行put時,都會加鎖 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // segment 內部的數組 HashEntry<K,V>[] tab = table; // 利用 hash 值,求應該放置的數組下標 int index = (tab.length - 1) & hash; // 數組該位置處的鏈表的表頭 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { // 若是鏈頭不爲 null if (e != null) { K k; //若是在該鏈中找到相同的key,則用新值替換舊值,並退出循環 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } //若是沒有和key相同的,一直遍歷到鏈尾,鏈尾的next爲null,進入到else e = e.next; } else { // node 究竟是不是 null,這個要看獲取鎖的過程,不過和這裏都沒有關係。 // 若是不爲 null,那就直接將它設置爲鏈表表頭;若是是null,初始化並設置爲鏈表表頭。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 若是超過了該 segment 的閾值,這個 segment 須要擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else // 沒有達到閾值,將 node 放到數組 tab 的 index 位置, // 其實就是將新的節點設置成原鏈表的表頭 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解鎖 unlock(); } return oldValue; }
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其餘槽來講,在插入第一個值的時候進行初始化。
這裏須要考慮併發,由於極可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就能夠。
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 這裏看到爲何以前要初始化 segment[0] 了, // 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k] // 爲何要用「當前」,由於 segment[0] 可能早就擴容過了 Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 內部的數組 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment[k] 是否被其它線程初始化了 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 循環,內部用 CAS,當前線程成功設值或其餘線程成功設值後,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
比較簡單,先找到 Segment 數組的位置,而後找到 HashEntry 數組的位置,最後順着鏈表查找便可。
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
多個線程同時使用 put() 方法添加元素,若存在兩個或多個 put() 的 key 發生了碰撞,那麼有可能其中一個線程的數據被覆蓋。
當數據要插入 HashMap 時,都會檢查容量有沒有超過設定的 thredhold,若是超過,則須要擴容。而多線程會致使擴容後的鏈表造成環形數據結構,一旦造成環形數據結構,Entry 的 next 的節點永遠不爲 null,就會在獲取 Entry 時產生死循環。
例子可見文章《HashMap多線程死循環問題》。
不過要注意,其使用的 Java 版本既不是 7,也不是 8。在 Java7 中方法 addEntry() 添加節點到鏈表中是先擴容後再添加,而例子中的源碼是:
void addEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; // 先添加節點 table[bucketIndex] = new Entry<K,V>(hash, key, value, e); // 而後擴容 if (size++ >= threshold) resize(2 * table.length); }
因此在 Java7 中此例子無效。而在 Java8 中,經過確保建新鏈與舊鏈的順序是相同的,便可避免產生死循環。
import java.util.HashMap; import java.util.Iterator; import java.util.Map; public class HashMapTest { private final static Map<Integer, Object> map = new HashMap<Integer, Object>(10000); private static final Object PRESENT = new Object(); public static void main(String args[]) { long startTime; long endTime; long totalTime; for (int i = 0; i < 7500; i++) { map.put(i, PRESENT); } // 方法一 startTime = System.nanoTime(); Iterator iter1 = map.entrySet().iterator(); while (iter1.hasNext()) { Map.Entry<Integer, Object> entry = (Map.Entry) iter1.next(); Integer key = entry.getKey(); Object val = entry.getValue(); } endTime = System.nanoTime(); totalTime = endTime - startTime; System.out.println("methor1 pays " + totalTime + " ms"); // 方法二 startTime = System.nanoTime(); Iterator iter2 = map.keySet().iterator(); while (iter2.hasNext()) { Object key = iter2.next(); Object val = map.get(key); } endTime = System.nanoTime(); totalTime = endTime - startTime; System.out.println("methor2 pays " + totalTime + " ms"); } }
運行結果:
線程安全的使用 HashMap 有三種方式,分別爲 Hashtable、SynchronizedMap()、ConcurrentHashMap。
使用 synchronized 來保證線程安全,幾乎全部的 public 的方法都是 synchronized 的,而有些方法也是在內部經過 synchronized 代碼塊來實現。
經過建立一個線程安全的 Map 對象,並把它做爲一個封裝的對象來返回。
支持多線程對 Map 作讀操做,而且不須要任何的 blocking 。這得益於 CHM 將 Map 分割成了不一樣的部分,在執行更新操做時只鎖住一部分。根據默認的併發級別, Map 被分割成 16 個部分,而且由不一樣的鎖控制。這意味着,同時最多能夠有 16個 寫線程操做 Map 。試想一下,由只能一個線程進入變成同時可由 16 個寫線程同時進入(讀線程幾乎不受限制),性能的提高是顯而易見的。但因爲一些更新操做,如 put(), remove(), putAll(), clear()只鎖住操做的部分,因此在檢索操做不能保證返回的是最新的結果。
在迭代遍歷 CHM 時, keySet 返回的 iterator 是弱一致和 fail-safe 的,可能不會返回某些最近的改變,而且在遍歷過程當中,若是已經遍歷的數組上的內容變化了,不會拋出 ConcurrentModificationExceptoin 的異常。
CHM 適用於讀者數量超過寫者時,當寫者數量大於等於讀者時,CHM 的性能是低於 Hashtable 和 synchronizedMap 的。這是由於當鎖住了整個 Map 時,讀操做要等待對同一部分執行寫操做的線程結束。
CHM 適用於作 cache ,在程序啓動時初始化,以後能夠被多個請求線程訪問。
CHM 是Hashtable一個很好的替代,但要記住, CHM 的比 Hashrable 的同步性稍弱。
Java8 對 HashMap 和 ConcurrentHashMap 作了一些修改:
Linux公社的RSS地址: https://www.linuxidc.com/rssFeed.aspx
本文永久更新連接地址: https://www.linuxidc.com/Linux/2018-09/154133.htm