集合(Collection)是編程中經常使用的數據結構,而併發也是服務器端編程經常使用的技術之一,併發老是離不開集合這類高級數據結構的支持。好比兩個線程須要同時訪問一箇中間臨界區(Queue),好比常會用緩存做爲外部文件的副本(HashMap)。而Map這種以鍵值對爲元素的數據結構也是集合中最經常使用到的。Map家族中的三大類:HashMap、HashTable、ConcurrentHashMap。前者非線程安全的,後二者是線程安全的,而HashTable的實現原理與HashMap很類似,只是在公開的方法上(例如get)加入了synchronized關鍵字,保證同步。實現簡單方便的劣勢就是帶來了性能的降低,所以目前HashTable已被淘汰,被同時知足線程安全並且性能更高(讀寫更快)的ConcurrentHashMap代替。html
Java7和Java8的HashMap差異仍是比較大的,源碼角度來看也愈來愈複雜,本文將依次從Java7的HashMap以及Java8的HashMap,Java7的ConcurrentHashMap和Java8的ConcurrentHashMap的順序來逐步分析源碼,深刻理解他們的原理。java
Hashmap的key和value容許存放null值node
而Hashtable和ConcurrentHashmap由於併發的緣由,key和value不容許爲null值。編程
由於HashMap不支持併發操做,因此它的代碼也比較簡單。先從下面這張圖來了解HashMap的結構,由數組+鏈表而組成。數組
大方向上,HashMap 裏面是一個數組,而後數組中每一個元素是一個單向鏈表。緩存
上圖中,每一個綠色的實體是嵌套類 Entry 的實例,Entry 包含四個屬性:key, value, hash 值和用於單向鏈表的 next。安全
size的大小始終爲2的n次方,服務器
loadFactor:負載因子,默認爲 0.75。數據結構
threshold:擴容的閾值,等於 capacity * loadFactor多線程
咱們知道在Java中最經常使用的兩種結構是數組和模擬指針(引用),幾乎全部的數據結構均可以利用這兩種來組合實現。數組的存儲方式在內存的地址是連續的,大小固定,一旦分配不能被其餘引用佔用。它的特色是查詢快,時間複雜度是O(1),插入和刪除的操做比較慢,時間複雜度是O(n),鏈表的存儲方式是非連續的,大小不固定,特色與數組相反,插入和刪除快,查詢速度慢。HashMap能夠說是一種折中的方案吧:外層是數組,每一個數組下面連着鏈表。
put的過程比較簡單,跟着代碼看一遍就能看懂
public V put(K key, V value) { // 當插入第一個元素的時候,須要先初始化數組大小 if (table == EMPTY_TABLE) { inflateTable(threshold); } // 若是 key 爲 null,感興趣的能夠往裏看,最終會將這個 entry 放到 table[0] 中 if (key == null) return putForNullKey(value); // 1. 求 key 的 hash 值 int hash = hash(key); // 2. 找到對應的數組下標 int i = indexFor(hash, table.length); // 3. 遍歷一下對應下標處的鏈表,看是否有重複的 key 已經存在, // 若是有,直接覆蓋,put 方法返回舊值就結束了 for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; // 4. 不存在重複的 key,將此 entry 添加到鏈表中,細節後面說 addEntry(hash, key, value, i); return null; }
在第一個元素插入 HashMap 的時候作一次數組的初始化,就是先肯定初始的數組大小,並計算數組擴容的閾值。
private void inflateTable(int toSize) { // 保證數組大小必定是 2 的 n 次方。 // 好比這樣初始化:new HashMap(20),那麼處理成初始數組大小是 32 int capacity = roundUpToPowerOf2(toSize); // 計算擴容閾值:capacity * loadFactor threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1); // 算是初始化數組吧 table = new Entry[capacity]; initHashSeedAsNeeded(capacity); //ignore }
這裏有一個將數組大小保持爲 2 的 n 次方的作法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相應的要求,只不過實現的代碼稍微有些不一樣,後面再看到的時候就知道了。
這個比較簡單,使用 key 的 hash 值對數組長度進行取模就能夠了。
static int indexFor(int hash, int length) { // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2"; return hash & (length-1); }
這個方法很簡單,簡單說就是取 hash 值的低 n 位。如在數組長度爲 32 的時候,其實取的就是 key 的 hash 值的低 5 位,做爲它在數組中的下標位置。
找到數組下標後,會先進行 key 判重,若是沒有重複,說明發生了哈希碰撞就準備將新值放入到鏈表的表頭(頭插法)。
void addEntry(int hash, K key, V value, int bucketIndex) { // 若是當前 HashMap 大小已經達到了閾值,而且新值要插入的數組位置已經有元素了,那麼要擴容 if ((size >= threshold) && (null != table[bucketIndex])) { // 擴容,後面會介紹一下 resize(2 * table.length); // 擴容之後,從新計算 hash 值 hash = (null != key) ? hash(key) : 0; // 從新計算擴容後的新的下標 bucketIndex = indexFor(hash, table.length); } // 往下看 createEntry(hash, key, value, bucketIndex); } // 這個很簡單,其實就是將新值放到鏈表的表頭,而後 size++ void createEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<>(hash, key, value, e); size++; }
這個方法的主要邏輯就是先判斷是否須要擴容,須要的話先擴容,而後再將這個新的數據插入到擴容後的數組的相應位置處的鏈表的表頭。
前面咱們看到,在插入新值的時候,若是當前的 size 已經達到了閾值,而且要插入的數組位置上已經有元素,那麼就會觸發擴容,擴容後,數組大小爲原來的 2 倍。
void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; } // 新的數組 Entry[] newTable = new Entry[newCapacity]; // 將原來數組中的值遷移到新的更大的數組中 transfer(newTable, initHashSeedAsNeeded(newCapacity)); table = newTable; threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
咱們接下來看一下遷移數組用到的transfer函數,這也是致使HashMap線程不安全的主要緣由,再接下來會詳細講到
void transfer(Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K,V> e : table) { while(null != e) { Entry<K,V> next = e.next; if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } } }
大概作了一下幾個事情:
相對於 put 過程,get 過程是很是簡單的。
public V get(Object key) { // 以前說過,key 爲 null 的話,會被放到 table[0],因此只要遍歷下 table[0] 處的鏈表就能夠了 if (key == null) return getForNullKey(); // Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue();
getEntry(key):
final Entry<K,V> getEntry(Object key) { if (size == 0) { return null; } int hash = (key == null) ? 0 : hash(key); // 肯定數組下標,而後從頭開始遍歷鏈表,直到找到爲止 for (Entry<K,V> e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } return null; }
Java8 對 HashMap 進行了一些修改,最大的不一樣就是利用了紅黑樹,因此其由 數組+鏈表+紅黑樹 組成。
根據 Java7 HashMap 的介紹,咱們知道,查找的時候,根據 hash 值咱們可以快速定位到數組的具體下標,可是以後的話,須要順着鏈表一個個比較下去才能找到咱們須要的,時間複雜度取決於鏈表的長度,爲 O(n)。
爲了下降這部分的開銷,在 Java8 中,當鏈表中的元素超過了 8 個之後,會將鏈表轉換爲紅黑樹,在這些位置進行查找的時候能夠下降時間複雜度爲 O(logN)。
Java7 中使用 Entry 來表明每一個 HashMap 中的數據節點,Java8 中使用 Node,基本沒有區別,都是 key,value,hash 和 next 這四個屬性,不過,Node 只能用於鏈表的狀況,紅黑樹的狀況須要使用 TreeNode。
咱們根據數組元素中,第一個節點數據類型是 Node 仍是 TreeNode 來判斷該位置下是鏈表仍是紅黑樹的。
public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } // 第三個參數 onlyIfAbsent 若是是 true,那麼只有在不存在該 key 時纔會進行 put 操做 // 第四個參數 evict 咱們這裏不關心 final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K,V>[] tab; Node<K,V> p; int n, i; // 第一次 put 值的時候,會觸發下面的 resize(),相似 java7 的第一次 put 也要初始化數組長度 // 第一次 resize 和後續的擴容有些不同,由於此次是數組從 null 初始化到默認的 16 或自定義的初始容量 if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; // 找到具體的數組下標,若是此位置沒有值,那麼直接初始化一下 Node 並放置在這個位置就能夠了 if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null); else {// 數組該位置有數據 Node<K,V> e; K k; // 首先,判斷該位置的第一個數據和咱們要插入的數據,key 是否是"相等",若是是,取出這個節點 if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; // 若是該節點是表明紅黑樹的節點,調用紅黑樹的插值方法,本文不展開說紅黑樹 else if (p instanceof TreeNode) e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); else { // 到這裏,說明數組該位置上是一個鏈表 for (int binCount = 0; ; ++binCount) { // 插入到鏈表的最後面(Java7 是插入到鏈表的最前面) if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); // TREEIFY_THRESHOLD 爲 8,因此,若是新插入的值是鏈表中的第 9 個 // 會觸發下面的 treeifyBin,也就是將鏈表轉換爲紅黑樹 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st treeifyBin(tab, hash); break; } // 若是在該鏈表中找到了"相等"的 key(== 或 equals) if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) // 此時 break,那麼 e 爲鏈表中[與要插入的新值的 key "相等"]的 node break; p = e; } } // e!=null 說明存在舊值的key與要插入的key"相等" // 對於咱們分析的put操做,下面這個 if 其實就是進行 "值覆蓋",而後返回舊值 if (e != null) { V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } ++modCount; // 若是 HashMap 因爲新插入這個值致使 size 已經超過了閾值,須要進行擴容 if (++size > threshold) resize(); afterNodeInsertion(evict); return null; }
和 Java7 稍微有點不同的地方就是,Java7 是先擴容後插入新值的,Java8 先插值再擴容,不過這個不重要。
resize() 方法用於初始化數組或數組擴容,每次擴容後,容量爲原來的 2 倍,並進行數據遷移。
1 final Node<K,V>[] resize() { 2 Node<K,V>[] oldTab = table; 3 int oldCap = (oldTab == null) ? 0 : oldTab.length; 4 int oldThr = threshold; 5 int newCap, newThr = 0; 6 if (oldCap > 0) { // 對應數組擴容 7 if (oldCap >= MAXIMUM_CAPACITY) { 8 threshold = Integer.MAX_VALUE; 9 return oldTab; 10 } 11 // 將數組大小擴大一倍 12 else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && 13 oldCap >= DEFAULT_INITIAL_CAPACITY) 14 // 將閾值擴大一倍 15 newThr = oldThr << 1; // double threshold 16 } 17 else if (oldThr > 0) // 對應使用 new HashMap(int initialCapacity) 初始化後,第一次 put 的時候 18 newCap = oldThr; 19 else {// 對應使用 new HashMap() 初始化後,第一次 put 的時候 20 newCap = DEFAULT_INITIAL_CAPACITY; 21 newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); 22 } 23 24 if (newThr == 0) { 25 float ft = (float)newCap * loadFactor; 26 newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? 27 (int)ft : Integer.MAX_VALUE); 28 } 29 threshold = newThr; 30 31 // 用新的數組大小初始化新的數組 32 Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; 33 table = newTab; // 若是是初始化數組,到這裏就結束了,返回 newTab 便可 34 35 if (oldTab != null) { 36 // 開始遍歷原數組,進行數據遷移。 37 for (int j = 0; j < oldCap; ++j) { 38 Node<K,V> e; 39 if ((e = oldTab[j]) != null) { 40 oldTab[j] = null; 41 // 若是該數組位置上只有單個元素,那就簡單了,簡單遷移這個元素就能夠了 42 if (e.next == null) 43 newTab[e.hash & (newCap - 1)] = e; 44 // 若是是紅黑樹,具體咱們就不展開了 45 else if (e instanceof TreeNode) 46 ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); 47 else { 48 // 這塊是處理鏈表的狀況, 49 // 須要將此鏈表拆成兩個鏈表,放到新的數組中,而且保留原來的前後順序 50 // loHead、loTail 對應一條鏈表,hiHead、hiTail 對應另外一條鏈表,代碼仍是比較簡單的 51 Node<K,V> loHead = null, loTail = null; 52 Node<K,V> hiHead = null, hiTail = null; 53 Node<K,V> next; 54 do { 55 next = e.next; 56 if ((e.hash & oldCap) == 0) { 57 if (loTail == null) 58 loHead = e; 59 else 60 loTail.next = e; 61 loTail = e; 62 } 63 else { 64 if (hiTail == null) 65 hiHead = e; 66 else 67 hiTail.next = e; 68 hiTail = e; 69 } 70 } while ((e = next) != null); 71 if (loTail != null) { 72 loTail.next = null; 73 // 第一條鏈表 74 newTab[j] = loHead; 75 } 76 if (hiTail != null) { 77 hiTail.next = null; 78 // 第二條鏈表的新的位置是 j + oldCap,這個很好理解 79 newTab[j + oldCap] = hiHead; 80 } 81 } 82 } 83 } 84 } 85 return newTab; 86 }
相對於 put 來講,get 比較簡單了。
public V get(Object key) { Node<K,V> e; return (e = getNode(hash(key), key)) == null ? null : e.value; } final Node<K,V> getNode(int hash, Object key) { Node<K,V>[] tab; Node<K,V> first, e; int n; K k; if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { // 判斷第一個節點是否是就是須要的 if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { // 判斷是不是紅黑樹 if (first instanceof TreeNode) return ((TreeNode<K,V>)first).getTreeNode(hash, key); // 鏈表遍歷 do { if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return null; }
HashMap衆所周知是線程不安全的,全部方法沒采用同步處理,沒法作到線程同步(線程安全)也是能夠想象獲得的。HashMap的線程不安全性主要體如今如下三個方面:
在多線程訪問的狀況下:
1.在hashmap作put操做的時候會調用下面方法:
// 新增Entry。將「key-value」插入指定位置,bucketIndex是位置索引。 void addEntry(int hash, K key, V value, int bucketIndex) { // 保存「bucketIndex」位置的值到「e」中 Entry<K,V> e = table[bucketIndex]; // 設置「bucketIndex」位置的元素爲「新Entry」, // 設置「e」爲「新Entry的下一個節點」 table[bucketIndex] = new Entry<K,V>(hash, key, value, e); // 若HashMap的實際大小 不小於 「閾值」,則調整HashMap的大小 if (size++ >= threshold) resize(2 * table.length); }
在hashmap作put操做的時候會調用到以上的方法。如今假如A線程和B線程同時對同一個數組位置調用addEntry,兩個線程會同時獲得如今的頭結點,而後A寫入新的頭結點以後,B也寫入新的頭結點,那B的寫入操做就會覆蓋A的寫入操做形成A的寫入操做丟失
2.刪除鍵值對的代碼
final Entry<K,V> removeEntryForKey(Object key) { // 獲取哈希值。若key爲null,則哈希值爲0;不然調用hash()進行計算 int hash = (key == null) ? 0 : hash(key.hashCode()); int i = indexFor(hash, table.length); Entry<K,V> prev = table[i]; Entry<K,V> e = prev; // 刪除鏈表中「鍵爲key」的元素 // 本質是「刪除單向鏈表中的節點」 while (e != null) { Entry<K,V> next = e.next; Object k; if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) { modCount++; size--; if (prev == e) table[i] = next; else prev.next = next; e.recordRemoval(this); return e; } prev = e; e = next; }
當多個線程同時操做同一個數組位置的時候,也都會先取得如今狀態下該位置存儲的頭結點,而後各自去進行計算操做,以後再把結果寫會到該數組位置去,其實寫回的時候可能其餘的線程已經就把這個位置給修改過了,就會覆蓋其餘線程的修改。
3.addEntry中當加入新的鍵值對後鍵值對總數量超過門限值的時候會調用一個resize操做,代碼以下:
這個操做會新生成一個新的容量的數組,而後對原數組的全部鍵值對從新進行計算和寫入新的數組,以後指向新生成的數組。
當多個線程同時檢測到總數量超過門限值的時候就會同時調用resize操做,各自生成新的數組並rehash後賦給該map底層的數組table,結果最終只有最後一個線程生成的新數組被賦給table變量,其餘線程的均會丟失。並且某些線程已經完成賦值而其餘線程剛開始的時候,就會用已經被賦值的table做爲原始數組,這樣也會有問題,多個線程同時進行resize操做也有可能使鏈表產生環,致使死鎖,詳細圖文詳解見http://www.importnew.com/22011.html
ConcurrentHashMap 和 HashMap 思路是差很少的,可是由於它支持併發操做,因此要複雜一些。
整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 表明」部分「或」一段「的意思,因此不少地方都會將其描述爲分段鎖。注意,行文中,我不少地方用了「槽」來表明一個 segment。
簡單理解就是,ConcurrentHashMap 是一個 Segment 數組,Segment 經過繼承 ReentrantLock 來進行加鎖,因此每次須要加鎖的操做鎖住的是一個 segment,這樣只要保證每一個 Segment 是線程安全的,也就實現了全局的線程安全。
concurrencyLevel:並行級別、併發數、Segment 數,怎麼翻譯不重要,理解它。默認是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,因此理論上,這個時候,最多能夠同時支持 16 個線程併發寫,只要它們的操做分別分佈在不一樣的 Segment 上。這個值能夠在初始化的時候設置爲其餘值,可是一旦初始化之後,它是不能夠擴容的。
再具體到每一個 Segment 內部,其實每一個 Segment 很像以前介紹的 HashMap,不過它要保證線程安全,因此處理起來要麻煩些。
initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操做的時候須要平均分給每一個 Segment。
loadFactor:負載因子,以前咱們說了,Segment 數組不能夠擴容,因此這個負載因子是給每一個 Segment 內部使用的。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; // 計算並行級別 ssize,由於要保持並行級別是 2 的 n 次方 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 咱們這裏先不要那麼燒腦,用默認值,concurrencyLevel 爲 16,sshift 爲 4 // 那麼計算出 segmentShift 爲 28,segmentMask 爲 15,後面會用到這兩個值 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // initialCapacity 是設置整個 map 初始的大小, // 這裏根據 initialCapacity 計算 Segment 數組中每一個位置能夠分到的大小 // 如 initialCapacity 爲 64,那麼每一個 Segment 或稱之爲"槽"能夠分到 4 個 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 默認 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,由於這樣的話,對於具體的槽上, // 插入一個元素不至於擴容,插入第二個的時候纔會擴容 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 建立 Segment 數組, // 並建立數組的第一個元素 segment[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 往數組寫入 segment[0] UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
初始化完成,咱們獲得了一個 Segment 數組。
咱們就當是用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:
咱們先看 put 的主流程,對於其中的一些關鍵細節操做,後面會進行詳細介紹。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 1. 計算 key 的 hash 值 int hash = hash(key); // 2. 根據 hash 值找到 Segment 數組中的位置 j // hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位, // 而後和 segmentMask(15) 作一次與操做,也就是說 j 是 hash 值的最後 4 位,也就是槽的數組下標 int j = (hash >>> segmentShift) & segmentMask; // 剛剛說了,初始化的時候初始化了 segment[0],可是其餘位置仍是 null, // ensureSegment(j) 對 segment[j] 進行初始化 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); // 3. 插入新值到 槽 s 中 return s.put(key, hash, value, false); }
第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,以後就是 Segment 內部的 put 操做了。
Segment 內部是由 數組+鏈表 組成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 在往該 segment 寫入前,須要先獲取該 segment 的獨佔鎖 // 先看主流程,後面還會具體介紹這部份內容 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // 這個是 segment 內部的數組 HashEntry<K,V>[] tab = table; // 再利用 hash 值,求應該放置的數組下標 int index = (tab.length - 1) & hash; // first 是數組該位置處的鏈表的表頭 HashEntry<K,V> first = entryAt(tab, index); // 下面這串 for 循環雖然很長,不過也很好理解,想一想該位置沒有任何元素和已經存在一個鏈表這兩種狀況 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { // 覆蓋舊值 e.value = value; ++modCount; } break; } // 繼續順着鏈表走 e = e.next; } else { // node 究竟是不是 null,這個要看獲取鎖的過程,不過和這裏都沒有關係。 // 若是不爲 null,那就直接將它設置爲鏈表表頭;若是是null,初始化並設置爲鏈表表頭。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 若是超過了該 segment 的閾值,這個 segment 須要擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); // 擴容後面也會具體分析 else // 沒有達到閾值,將 node 放到數組 tab 的 index 位置, // 其實就是將新的節點設置成原鏈表的表頭 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解鎖 unlock(); } return oldValue; }
總體流程仍是比較簡單的,因爲有獨佔鎖的保護,因此 segment 內部的操做並不複雜。至於這裏面的併發問題,咱們稍後再進行介紹。
到這裏 put 操做就結束了,接下來,咱們說一說其中幾步關鍵的操做。
前面咱們看到,在往某個 segment 中 put 的時候,首先會調用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨佔鎖,若是失敗,那麼進入到 scanAndLockForPut 這個方法來獲取鎖。
下面咱們來具體分析這個方法中是怎麼控制加鎖的。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 循環獲取鎖 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node // 進到這裏說明數組該位置的鏈表是空的,沒有任何元素 // 固然,進到這裏的另外一個緣由是 tryLock() 失敗,因此該槽存在併發,不必定是該位置 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else // 順着鏈表往下走 e = e.next; } // 重試次數若是超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞隊列等待鎖 // lock() 是阻塞方法,直到獲取鎖後返回 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && // 這個時候是有大問題了,那就是有新的元素進到了鏈表,成爲了新的表頭 // 因此這邊的策略是,至關於從新走一遍這個 scanAndLockForPut 方法 (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另外一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨佔鎖。
這個方法就是看似複雜,可是其實就是作了一件事,那就是獲取該 segment 的獨佔鎖,若是須要的話順便實例化了一下 node。
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其餘槽來講,在插入第一個值的時候進行初始化。
這裏須要考慮併發,由於極可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就能夠。
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 這裏看到爲何以前要初始化 segment[0] 了, // 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k] // 爲何要用「當前」,由於 segment[0] 可能早就擴容過了 Segment<K,V> proto = ss[0]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 內部的數組 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次檢查一遍該槽是否被其餘線程初始化了。 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 循環,內部用 CAS,當前線程成功設值或其餘線程成功設值後,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
重複一下,segment 數組不能擴容,擴容是 segment 數組某個位置內部的數組 HashEntry\[] 進行擴容,擴容後,容量爲原來的 2 倍。
首先,咱們要回顧一下觸發擴容的地方,put 的時候,若是判斷該值的插入會致使該 segment 的元素個數超過閾值,那麼先進行擴容,再插值,讀者這個時候能夠回去 put 方法看一眼。
該方法不須要考慮併發,由於到這裏的時候,是持有該 segment 的獨佔鎖的。
// 方法參數上的 node 是此次擴容後,須要添加到新的數組中的數據。 private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // 2 倍 int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); // 建立新數組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 爲 31,對應二進制 ‘000...00011111’ int sizeMask = newCapacity - 1; // 遍歷原數組,老套路,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置 for (int i = 0; i < oldCapacity ; i++) { // e 是鏈表的第一個元素 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 計算應該放置在新數組中的位置, // 假設原數組長度爲 16,e 在 oldTable[3] 處,那麼 idx 只多是 3 或者是 3 + 16 = 19 int idx = e.hash & sizeMask; if (next == null) // 該位置處只有一個元素,那比較好辦 newTable[idx] = e; else { // Reuse consecutive sequence at same slot // e 是鏈表表頭 HashEntry<K,V> lastRun = e; // idx 是當前鏈表的頭結點 e 的新位置 int lastIdx = idx; // 下面這個 for 循環會找到一個 lastRun 節點,這個節點以後的全部元素是將要放到一塊兒的 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // 將 lastRun 及其以後的全部節點組成的這個鏈表放到 lastIdx 這個位置 newTable[lastIdx] = lastRun; // 下面的操做是處理 lastRun 以前的節點, // 這些節點可能分配在另外一個鏈表中,也可能分配到上面的那個鏈表中 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
這裏的擴容比以前的 HashMap 要複雜一些,代碼難懂一點。上面有兩個挨着的 for 循環,第一個 for 有什麼用呢?
仔細一看發現,若是沒有第一個 for 循環,也是能夠工做的,可是,這個 for 循環下來,若是 lastRun 的後面還有比較多的節點,那麼此次就是值得的。由於咱們只須要克隆 lastRun 前面的節點,後面的一串節點跟着 lastRun 走就是了,不須要作任何操做。
我以爲 Doug Lea 的這個想法也是挺有意思的,不過比較壞的狀況就是每次 lastRun 都是鏈表的最後一個元素或者很靠後的元素,那麼此次遍歷就有點浪費了。不過 Doug Lea 也說了,根據統計,若是使用默認的閾值,大約只有 1/6 的節點須要克隆。
相對於 put 來講,get因爲不會受到併發的影響,因此不涉及到同步操做,簡單了不少。
public V get(Object key) { Node<K,V> e; return (e = getNode(hash(key), key)) == null ? null : e.value; } final Node<K,V> getNode(int hash, Object key) { Node<K,V>[] tab; Node<K,V> first, e; int n; K k; if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { // 判斷第一個節點是否是就是須要的 if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { // 判斷是不是紅黑樹 if (first instanceof TreeNode) return ((TreeNode<K,V>)first).getTreeNode(hash, key); // 鏈表遍歷 do { if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return null; }
在Java6和Java7中ConcurrentHashMap使用鎖分段技術提升併發訪問效率。首先將數據分紅一段一段地存儲,而後給每一段數據配一個鎖,當一個線程佔用鎖訪問其中一段數據時,其餘段的數據也能被其餘線程訪問。然而在Java8中的實現已經拋棄了Segment分段鎖機制,利用CAS+Synchronized來保證併發更新的安全,底層依然採用數組+鏈表+紅黑樹的存儲結構。
Java 8 CouncurrentHashMap的結構和Java 8 HashMap很像,不過要保證線程安全,源碼上要複雜不少。
Java 8的ConcurrentHashMap取消segments字段,直接採用transient volatile HashEntry<K,V> table
保存數據,採用table數組元素做爲鎖,從而實現了對每一行數據進行加鎖,進一步減小併發衝突的機率。
Node節點的value和next都用volatile修飾,保證併發性。
// 這構造函數裏,什麼都不幹 public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
這個初始化方法有點意思,經過提供初始容量,計算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),而後向上取最近的 2 的 n 次方】。如 initialCapacity 爲 10,那麼獲得 sizeCtl 爲 16,若是 initialCapacity 爲 11,獲得 sizeCtl 爲 32。
主要由樂觀鎖CAS和悲觀鎖synchronized來代替segment保證併發性。
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 獲得 hash 值 int hash = spread(key.hashCode()); // 用於記錄相應鏈表的長度 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 若是數組"空",進行數組初始化 if (tab == null || (n = tab.length) == 0) // 初始化數組,後面會詳細介紹 tab = initTable(); // 找該 hash 值對應的數組下標,獲得第一個節點 f else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 若是數組該位置爲空, // 用一次 CAS 操做將這個新值放入其中便可,這個 put 操做差很少就結束了,能夠拉到最後面了 // 若是 CAS 失敗,那就是有併發操做,進到下一個循環就行了 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // hash 竟然能夠等於 MOVED,這個須要到後面才能看明白,不過從名字上也能猜到,確定是由於在擴容 else if ((fh = f.hash) == MOVED) // 幫助數據遷移,這個等到看完數據遷移部分的介紹後,再理解這個就很簡單了 tab = helpTransfer(tab, f); else { // 到這裏就是說,f 是該位置的頭結點,並且不爲空 V oldVal = null; // 獲取數組該位置的頭結點的監視器鎖 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { // 頭結點的 hash 值大於 0,說明是鏈表 // 用於累加,記錄鏈表的長度 binCount = 1; // 遍歷鏈表 for (Node<K,V> e = f;; ++binCount) { K ek; // 若是發現了"相等"的 key,判斷是否要進行值覆蓋,而後也就能夠 break 了 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 到了鏈表的最末端,將這個新值放到鏈表的最後面 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 紅黑樹 Node<K,V> p; binCount = 2; // 調用紅黑樹的插值方法插入新節點 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // binCount != 0 說明上面在作鏈表操做 if (binCount != 0) { // 判斷是否要將鏈表轉換爲紅黑樹,臨界值和 HashMap 同樣,也是 8 if (binCount >= TREEIFY_THRESHOLD) // 這個方法和 HashMap 中稍微有一點點不一樣,那就是它不是必定會進行紅黑樹轉換, // 若是當前數組的長度小於 64,那麼會選擇進行數組擴容,而不是轉換爲紅黑樹 // 具體源碼咱們就不看了,擴容部分後面說 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // addCount(1L, binCount); return null; }
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 判斷頭結點是否就是咱們須要的節點 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 若是頭結點的 hash 小於 0,說明 正在擴容,或者該位置是紅黑樹 else if (eh < 0) // 參考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k) return (p = e.find(h, key)) != null ? p.val : null; // 遍歷鏈表 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
http://www.importnew.com/28263.html