ConcurrentHashMap 和 HashMap 思路是差很少的,可是由於它支持併發操做,因此要複雜一些。java
整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 表明」部分「或」一段「的意思,因此不少地方都會將其描述爲分段鎖。注意,行文中,我不少地方用了**「槽」**來表明一個 segment。node
簡單理解就是,ConcurrentHashMap 是一個 Segment 數組,Segment 經過繼承 ReentrantLock 來進行加鎖,因此每次須要加鎖的操做鎖住的是一個 segment,這樣只要保證每一個 Segment 是線程安全的,也就實現了全局的線程安全。數組
concurrencyLevel:並行級別、併發數、Segment 數,怎麼翻譯不重要,理解它。默認是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,因此理論上,這個時候,最多能夠同時支持 16 個線程併發寫,只要它們的操做分別分佈在不一樣的 Segment 上。這個值能夠在初始化的時候設置爲其餘值,可是一旦初始化之後,它是不能夠擴容的。安全
再具體到每一個 Segment 內部,其實每一個 Segment 很像以前介紹的 HashMap,不過它要保證線程安全,因此處理起來要麻煩些。多線程
initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操做的時候須要平均分給每一個 Segment。併發
loadFactor:負載因子,以前咱們說了,Segment 數組不能夠擴容,因此這個負載因子是給每一個 Segment 內部使用的。ssh
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; // 計算並行級別 ssize,由於要保持並行級別是 2 的 n 次方 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 咱們這裏先不要那麼燒腦,用默認值,concurrencyLevel 爲 16,sshift 爲 4 // 那麼計算出 segmentShift 爲 28,segmentMask 爲 15,後面會用到這兩個值 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // initialCapacity 是設置整個 map 初始的大小, // 這裏根據 initialCapacity 計算 Segment 數組中每一個位置能夠分到的大小 // 如 initialCapacity 爲 64,那麼每一個 Segment 或稱之爲"槽"能夠分到 4 個 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 默認 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,由於這樣的話,對於具體的槽上, // 插入一個元素不至於擴容,插入第二個的時候纔會擴容 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 建立 Segment 數組, // 並建立數組的第一個元素 segment[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 往數組寫入 segment[0] UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
初始化完成,咱們獲得了一個 Segment 數組。ide
咱們就當是用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:函數
##put 過程分析oop
咱們先看 put 的主流程,對於其中的一些關鍵細節操做,後面會進行詳細介紹。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 1. 計算 key 的 hash 值 int hash = hash(key); // 2. 根據 hash 值找到 Segment 數組中的位置 j // hash 是 32 位,無符號右移 segmentShift(28) 位,剩下高 4 位, // 而後和 segmentMask(15) 作一次與操做,也就是說 j 是 hash 值的高 4 位,也就是槽的數組下標 int j = (hash >>> segmentShift) & segmentMask; // 剛剛說了,初始化的時候初始化了 segment[0],可是其餘位置仍是 null, // ensureSegment(j) 對 segment[j] 進行初始化 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); // 3. 插入新值到 槽 s 中 return s.put(key, hash, value, false); }
第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,以後就是 Segment 內部的 put 操做了。
Segment 內部是由 數組+鏈表
組成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 在往該 segment 寫入前,須要先獲取該 segment 的獨佔鎖 // 先看主流程,後面還會具體介紹這部份內容 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // 這個是 segment 內部的數組 HashEntry<K,V>[] tab = table; // 再利用 hash 值,求應該放置的數組下標 int index = (tab.length - 1) & hash; // first 是數組該位置處的鏈表的表頭 HashEntry<K,V> first = entryAt(tab, index); // 下面這串 for 循環雖然很長,不過也很好理解,想一想該位置沒有任何元素和已經存在一個鏈表這兩種狀況 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { // 覆蓋舊值 e.value = value; ++modCount; } break; } // 繼續順着鏈表走 e = e.next; } else { // node 究竟是不是 null,這個要看獲取鎖的過程,不過和這裏都沒有關係。 // 若是不爲 null,那就直接將它設置爲鏈表表頭;若是是null,初始化並設置爲鏈表表頭。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 若是超過了該 segment 的閾值,這個 segment 須要擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); // 擴容後面也會具體分析 else // 沒有達到閾值,將 node 放到數組 tab 的 index 位置, // 其實就是將新的節點設置成原鏈表的表頭 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解鎖 unlock(); } return oldValue; }
總體流程仍是比較簡單的,因爲有獨佔鎖的保護,因此 segment 內部的操做並不複雜。至於這裏面的併發問題,咱們稍後再進行介紹。
到這裏 put 操做就結束了,接下來,咱們說一說其中幾步關鍵的操做。
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其餘槽來講,在插入第一個值的時候進行初始化。
這裏須要考慮併發,由於極可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就能夠。
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 這裏看到爲何以前要初始化 segment[0] 了, // 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k] // 爲何要用「當前」,由於 segment[0] 可能早就擴容過了 Segment<K,V> proto = ss[0]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 內部的數組 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次檢查一遍該槽是否被其餘線程初始化了。 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 循環,內部用 CAS,當前線程成功設值或其餘線程成功設值後,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
總的來講,ensureSegment(int k) 比較簡單,對於併發操做使用 CAS 進行控制。
我沒搞懂這裏爲何要搞一個 while 循環,CAS 失敗不就表明有其餘線程成功了嗎,爲何要再進行判斷? 感謝評論區的李子木,若是當前線程 CAS 失敗,這裏的 while 循環是爲了將 seg 賦值返回。
前面咱們看到,在往某個 segment 中 put 的時候,首先會調用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨佔鎖,若是失敗,那麼進入到 scanAndLockForPut 這個方法來獲取鎖。
下面咱們來具體分析這個方法中是怎麼控制加鎖的。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 循環獲取鎖 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node // 進到這裏說明數組該位置的鏈表是空的,沒有任何元素 // 固然,進到這裏的另外一個緣由是 tryLock() 失敗,因此該槽存在併發,不必定是該位置 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else // 順着鏈表往下走 e = e.next; } // 重試次數若是超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞隊列等待鎖 // lock() 是阻塞方法,直到獲取鎖後返回 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && // 這個時候是有大問題了,那就是有新的元素進到了鏈表,成爲了新的表頭 // 因此這邊的策略是,至關於從新走一遍這個 scanAndLockForPut 方法 (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另外一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨佔鎖。
這個方法就是看似複雜,可是其實就是作了一件事,那就是獲取該 segment 的獨佔鎖,若是須要的話順便實例化了一下 node。
重複一下,segment 數組不能擴容,擴容是 segment 數組某個位置內部的數組 HashEntry[] 進行擴容,擴容後,容量爲原來的 2 倍。
首先,咱們要回顧一下觸發擴容的地方,put 的時候,若是判斷該值的插入會致使該 segment 的元素個數超過閾值,那麼先進行擴容,再插值,讀者這個時候能夠回去 put 方法看一眼。
該方法不須要考慮併發,由於到這裏的時候,是持有該 segment 的獨佔鎖的。
// 方法參數上的 node 是此次擴容後,須要添加到新的數組中的數據。 private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // 2 倍 int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); // 建立新數組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 爲 31,對應二進制 ‘000...00011111’ int sizeMask = newCapacity - 1; // 遍歷原數組,老套路,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置 for (int i = 0; i < oldCapacity ; i++) { // e 是鏈表的第一個元素 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 計算應該放置在新數組中的位置, // 假設原數組長度爲 16,e 在 oldTable[3] 處,那麼 idx 只多是 3 或者是 3 + 16 = 19 int idx = e.hash & sizeMask; if (next == null) // 該位置處只有一個元素,那比較好辦 newTable[idx] = e; else { // Reuse consecutive sequence at same slot // e 是鏈表表頭 HashEntry<K,V> lastRun = e; // idx 是當前鏈表的頭結點 e 的新位置 int lastIdx = idx; // 下面這個 for 循環會找到一個 lastRun 節點,這個節點以後的全部元素是將要放到一塊兒的 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // 將 lastRun 及其以後的全部節點組成的這個鏈表放到 lastIdx 這個位置 newTable[lastIdx] = lastRun; // 下面的操做是處理 lastRun 以前的節點, // 這些節點可能分配在另外一個鏈表中,也可能分配到上面的那個鏈表中 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
這裏的擴容比以前的 HashMap 要複雜一些,代碼難懂一點。上面有兩個挨着的 for 循環,第一個 for 有什麼用呢?
仔細一看發現,若是沒有第一個 for 循環,也是能夠工做的,可是,這個 for 循環下來,若是 lastRun 的後面還有比較多的節點,那麼此次就是值得的。由於咱們只須要克隆 lastRun 前面的節點,後面的一串節點跟着 lastRun 走就是了,不須要作任何操做。
我以爲 Doug Lea 的這個想法也是挺有意思的,不過比較壞的狀況就是每次 lastRun 都是鏈表的最後一個元素或者很靠後的元素,那麼此次遍歷就有點浪費了。不過 Doug Lea 也說了,根據統計,若是使用默認的閾值,大約只有 1/6 的節點須要克隆。
相對於 put 來講,get 真的不要太簡單。
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; // 1. hash 值 int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 2. 根據 hash 找到對應的 segment if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // 3. 找到segment 內部數組相應位置的鏈表,遍歷 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
如今咱們已經說完了 put 過程和 get 過程,咱們能夠看到 get 過程當中是沒有加鎖的,那天然咱們就須要去考慮併發問題。
添加節點的操做 put 和刪除節點的操做 remove 都是要加 segment 上的獨佔鎖的,因此它們之間天然不會有問題,咱們須要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操做。
remove 操做咱們沒有分析源碼,因此這裏說的讀者感興趣的話仍是須要到源碼中去求實一下的。
get 操做須要遍歷鏈表,可是 remove 操做會"破壞"鏈表。
若是 remove 破壞的節點 get 操做已通過去了,那麼這裏不存在任何問題。
若是 remove 先破壞了一個節點,分兩種狀況考慮。 一、若是此節點是頭結點,那麼須要將頭結點的 next 設置爲數組該位置的元素,table 雖然使用了 volatile 修飾,可是 volatile 並不能提供數組內部操做的可見性保證,因此源碼中使用了 UNSAFE 來操做數組,請看方法 setEntryAt。二、若是要刪除的節點不是頭結點,它會將要刪除節點的後繼節點接到前驅節點中,這裏的併發保證就是 next 屬性是 volatile 的。
Java7 中實現的 ConcurrentHashMap 說實話仍是比較複雜的,Java8 對 ConcurrentHashMap 進行了比較大的改動。建議讀者能夠參考 Java8 中 HashMap 相對於 Java7 HashMap 的改動,對於 ConcurrentHashMap,Java8 也引入了紅黑樹。
說實話,Java8 ConcurrentHashMap 源碼真心不簡單,最難的在於擴容,數據遷移操做不容易看懂。
咱們先用一個示意圖來描述下其結構:
結構上和 Java8 的 HashMap 基本上同樣,不過它要保證線程安全性,因此在源碼上確實要複雜一些。
// 這構造函數裏,什麼都不幹 public ConcurrentHashMap() { }
public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
這個初始化方法有點意思,經過提供初始容量,計算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),而後向上取最近的 2 的 n 次方】。如 initialCapacity 爲 10,那麼獲得 sizeCtl 爲 16,若是 initialCapacity 爲 11,獲得 sizeCtl 爲 32。
sizeCtl 這個屬性使用的場景不少,不過只要跟着文章的思路來,就不會被它搞暈了。
若是你愛折騰,也能夠看下另外一個有三個參數的構造方法,這裏我就不說了,大部分時候,咱們會使用無參構造函數進行實例化,咱們也按照這個思路來進行源碼分析吧。
仔細地一行一行代碼看下去:
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 獲得 hash 值 int hash = spread(key.hashCode()); // 用於記錄相應鏈表的長度 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 若是數組"空",進行數組初始化 if (tab == null || (n = tab.length) == 0) // 初始化數組,後面會詳細介紹 tab = initTable(); // 找該 hash 值對應的數組下標,獲得第一個節點 f else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 若是數組該位置爲空, // 用一次 CAS 操做將這個新值放入其中便可,這個 put 操做差很少就結束了,能夠拉到最後面了 // 若是 CAS 失敗,那就是有併發操做,進到下一個循環就行了 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // hash 竟然能夠等於 MOVED,這個須要到後面才能看明白,不過從名字上也能猜到,確定是由於在擴容 else if ((fh = f.hash) == MOVED) // 幫助數據遷移,這個等到看完數據遷移部分的介紹後,再理解這個就很簡單了 tab = helpTransfer(tab, f); else { // 到這裏就是說,f 是該位置的頭結點,並且不爲空 V oldVal = null; // 獲取數組該位置的頭結點的監視器鎖 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { // 頭結點的 hash 值大於 0,說明是鏈表 // 用於累加,記錄鏈表的長度 binCount = 1; // 遍歷鏈表 for (Node<K,V> e = f;; ++binCount) { K ek; // 若是發現了"相等"的 key,判斷是否要進行值覆蓋,而後也就能夠 break 了 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 到了鏈表的最末端,將這個新值放到鏈表的最後面 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 紅黑樹 Node<K,V> p; binCount = 2; // 調用紅黑樹的插值方法插入新節點 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 判斷是否要將鏈表轉換爲紅黑樹,臨界值和 HashMap 同樣,也是 8 if (binCount >= TREEIFY_THRESHOLD) // 這個方法和 HashMap 中稍微有一點點不一樣,那就是它不是必定會進行紅黑樹轉換, // 若是當前數組的長度小於 64,那麼會選擇進行數組擴容,而不是轉換爲紅黑樹 // 具體源碼咱們就不看了,擴容部分後面說 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // addCount(1L, binCount); return null; }
put 的主流程看完了,可是至少留下了幾個問題,第一個是初始化,第二個是擴容,第三個是幫助數據遷移,這些咱們都會在後面進行一一介紹。
這個比較簡單,主要就是初始化一個合適大小的數組,而後會設置 sizeCtl。
初始化方法中的併發問題是經過對 sizeCtl 進行一個 CAS 操做來控制的。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 初始化的"功勞"被其餘線程"搶去"了 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // CAS 一下,將 sizeCtl 設置爲 -1,表明搶到了鎖 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { // DEFAULT_CAPACITY 默認初始容量是 16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化數組,長度爲 16 或初始化時提供的長度 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 將這個數組賦值給 table,table 是 volatile 的 table = tab = nt; // 若是 n 爲 16 的話,那麼這裏 sc = 12 // 其實就是 0.75 * n sc = n - (n >>> 2); } } finally { // 設置 sizeCtl 爲 sc,咱們就當是 12 吧 sizeCtl = sc; } break; } } return tab; }
前面咱們在 put 源碼分析也說過,treeifyBin 不必定就會進行紅黑樹轉換,也多是僅僅作數組擴容。咱們仍是進行源碼分析吧。
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { // MIN_TREEIFY_CAPACITY 爲 64 // 因此,若是數組長度小於 64 的時候,其實也就是 32 或者 16 或者更小的時候,會進行數組擴容 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 後面咱們再詳細分析這個方法 tryPresize(n << 1); // b 是頭結點 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 加鎖 synchronized (b) { if (tabAt(tab, index) == b) { // 下面就是遍歷鏈表,創建一顆紅黑樹 TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } // 將紅黑樹設置到數組相應位置中 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
若是說 Java8 ConcurrentHashMap 的源碼不簡單,那麼說的就是擴容操做和遷移操做。
這個方法要完徹底全看懂還須要看以後的 transfer 方法,讀者應該提早知道這點。
這裏的擴容也是作翻倍擴容的,擴容後數組容量爲原來的 2 倍。
// 首先要說明的是,方法參數 size 傳進來的時候就已經翻了倍了 private final void tryPresize(int size) { // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 這個 if 分支和以前說的初始化數組的代碼基本上是同樣的,在這裏,咱們能夠不用管這塊代碼 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); // 0.75 * n } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { // 我沒看懂 rs 的真正含義是什麼,不過也關係不大 int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 2. 用 CAS 將 sizeCtl 加 1,而後執行 transfer 方法 // 此時 nextTab 不爲 null if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 1. 將 sizeCtl 設置爲 (rs << RESIZE_STAMP_SHIFT) + 2) // 我是沒看懂這個值真正的意義是什麼?不過能夠計算出來的是,結果是一個比較大的負數 // 調用 transfer 方法,此時 nextTab 參數爲 null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
這個方法的核心在於 sizeCtl 值的操做,首先將其設置爲一個負數,而後執行 transfer(tab, null),再下一個循環將 sizeCtl 加 1,並執行 transfer(tab, nt),以後多是繼續 sizeCtl 加 1,並執行 transfer(tab, nt)。
因此,可能的操做就是執行 1 次 transfer(tab, null) + 屢次 transfer(tab, nt),這裏怎麼結束循環的須要看完 transfer 源碼才清楚。
下面這個方法很點長,將原來的 tab 數組的元素遷移到新的 nextTab 數組中。
雖然咱們以前說的 tryPresize 方法中屢次調用 transfer 不涉及多線程,可是這個 transfer 方法能夠在其餘地方被調用,典型地,咱們以前在說 put 方法的時候就說過了,請往上看 put 方法,是否是有個地方調用了 helpTransfer 方法,helpTransfer 方法會調用 transfer 方法的。
此方法支持多線程執行,外圍調用此方法的時候,會保證第一個發起數據遷移的線程,nextTab 參數爲 null,以後再調用此方法的時候,nextTab 不會爲 null。
閱讀源碼以前,先要理解併發操做的機制。原數組長度爲 n,因此咱們有 n 個遷移任務,讓每一個線程每次負責一個小任務是最簡單的,每作完一個任務再檢測是否有其餘沒作完的任務,幫助遷移就能夠了,而 Doug Lea 使用了一個 stride,簡單理解就是步長,每一個線程每次負責遷移其中的一部分,如每次遷移 16 個小任務。因此,咱們就須要一個全局的調度者來安排哪一個線程執行哪幾個任務,這個就是屬性 transferIndex 的做用。
第一個發起數據遷移的線程會將 transferIndex 指向原數組最後的位置,而後從後往前的 stride 個任務屬於第一個線程,而後將 transferIndex 指向新的位置,再往前的 stride 個任務屬於第二個線程,依此類推。固然,這裏說的第二個線程不是真的必定指代了第二個線程,也能夠是同一個線程,這個讀者應該能理解吧。其實就是將一個大的遷移任務分爲了一個個任務包。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // stride 在單核下直接等於 n,多核模式下爲 (n>>>3)/NCPU,最小值是 16 // stride 能夠理解爲」步長「,有 n 個位置是須要進行遷移的, // 將這 n 個任務分爲多個任務包,每一個任務包有 stride 個任務 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 若是 nextTab 爲 null,先進行一次初始化 // 前面咱們說了,外圍會保證第一個發起遷移的線程調用此方法時,參數 nextTab 爲 null // 以後參與遷移的線程調用此方法時,nextTab 不會爲 null if (nextTab == null) { try { // 容量翻倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // nextTable 是 ConcurrentHashMap 中的屬性 nextTable = nextTab; // transferIndex 也是 ConcurrentHashMap 的屬性,用於控制遷移的位置 transferIndex = n; } int nextn = nextTab.length; // ForwardingNode 翻譯過來就是正在被遷移的 Node // 這個構造方法會生成一個Node,key、value 和 next 都爲 null,關鍵是 hash 爲 MOVED // 後面咱們會看到,原數組中位置 i 處的節點完成遷移工做後, // 就會將位置 i 處設置爲這個 ForwardingNode,用來告訴其餘線程該位置已經處理過了 // 因此它其實至關因而一個標誌。 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // advance 指的是作完了一個位置的遷移工做,能夠準備作下一個位置的了 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab /* * 下面這個 for 循環,最難理解的在前面,而要看懂它們,應該先看懂後面的,而後再倒回來看 * */ // i 是位置索引,bound 是邊界,注意是從後往前 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 下面這個 while 真的是很差理解 // advance 爲 true 表示能夠進行下一個位置的遷移了 // 簡單理解結局:i 指向了 transferIndex,bound 指向了 transferIndex-stride while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; // 將 transferIndex 值賦給 nextIndex // 這裏 transferIndex 一旦小於等於 0,說明原數組的全部位置都有相應的線程去處理了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // 看括號中的代碼,nextBound 是此次遷移任務的邊界,注意,是從後往前 bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 全部的遷移操做已經完成 nextTable = null; // 將新的 nextTab 賦值給 table 屬性,完成遷移 table = nextTab; // 從新計算 sizeCtl:n 是原數組長度,因此 sizeCtl 得出的值將是新數組長度的 0.75 倍 sizeCtl = (n << 1) - (n >>> 1); return; } // 以前咱們說過,sizeCtl 在遷移前會設置爲 (rs << RESIZE_STAMP_SHIFT) + 2 // 而後,每有一個線程參與遷移就會將 sizeCtl 加 1, // 這裏使用 CAS 操做對 sizeCtl 進行減 1,表明作完了屬於本身的任務 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 任務結束,方法退出 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 到這裏,說明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT, // 也就是說,全部的遷移任務都作完了,也就會進入到上面的 if(finishing){} 分支了 finishing = advance = true; i = n; // recheck before commit } } // 若是位置 i 處是空的,沒有任何節點,那麼放入剛剛初始化的 ForwardingNode 」空節點「 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 該位置處是一個 ForwardingNode,表明該位置已經遷移過了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 對數組該位置處的結點加鎖,開始處理數組該位置處的遷移工做 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // 頭結點的 hash 大於 0,說明是鏈表的 Node 節點 if (fh >= 0) { // 下面這一塊和 Java7 中的 ConcurrentHashMap 遷移是差很少的, // 須要將鏈表一分爲二, // 找到原鏈表中的 lastRun,而後 lastRun 及其以後的節點是一塊兒進行遷移的 // lastRun 以前的節點須要進行克隆,而後分到兩個鏈表中 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 其中的一個鏈表放在新數組的位置 i setTabAt(nextTab, i, ln); // 另外一個鏈表放在新數組的位置 i+n setTabAt(nextTab, i + n, hn); // 將原數組該位置處設置爲 fwd,表明該位置已經處理完畢, // 其餘線程一旦看到該位置的 hash 值爲 MOVED,就不會進行遷移了 setTabAt(tab, i, fwd); // advance 設置爲 true,表明該位置已經遷移完畢 advance = true; } else if (f instanceof TreeBin) { // 紅黑樹的遷移 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 若是一分爲二後,節點數少於 8,那麼將紅黑樹轉換回鏈表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 將 ln 放置在新數組的位置 i setTabAt(nextTab, i, ln); // 將 hn 放置在新數組的位置 i+n setTabAt(nextTab, i + n, hn); // 將原數組該位置處設置爲 fwd,表明該位置已經處理完畢, // 其餘線程一旦看到該位置的 hash 值爲 MOVED,就不會進行遷移了 setTabAt(tab, i, fwd); // advance 設置爲 true,表明該位置已經遷移完畢 advance = true; } } } } } }
說到底,transfer 這個方法並無實現全部的遷移任務,每次調用這個方法只實現了 transferIndex 往前 stride 個位置的遷移工做,其餘的須要由外圍來控制。
這個時候,再回去仔細看 tryPresize 方法可能就會更加清晰一些了。
get 方法歷來都是最簡單的,這裏也不例外:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 判斷頭結點是否就是咱們須要的節點 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 若是頭結點的 hash 小於 0,說明 正在擴容,或者該位置是紅黑樹 else if (eh < 0) // 參考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k) return (p = e.find(h, key)) != null ? p.val : null; // 遍歷鏈表 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
簡單說一句,此方法的大部份內容都很簡單,只有正好碰到擴容的狀況,ForwardingNode.find(int h, Object k) 稍微複雜一些,不過在瞭解了數據遷移的過程後,這個也就不難了,因此限於篇幅這裏也不展開說了。
來源: javadoop