上篇文章介紹了 HashMap 源碼後,在博客平臺廣受好評,讓原本己經不打算更新這個系列的我,彷彿被打了一頓雞血。真的,被讀者承認的感受,就是這麼奇妙。html
而後,有讀者但願我能出一版 ConcurrentHashMap 的解析。因此,今天的這篇文章,我準備講述一下 ConcurrentHashMap 分別在JDK1.7和 JDK1.8 的源碼。文章較長,建議小夥伴們能夠先收藏再看哦~java
說一下爲何我要把源碼解析寫的這麼詳細吧。一方面,能夠記錄下當時本身的思考過程,也方便後續本身複習翻閱;另外一方面,記錄下來還可以幫助看到文章的小夥伴加深對源碼的理解,簡直是一箭雙鵰的事情。node
另外,上一篇文章,有個錯誤點,卻沒有讀者給我指正出來。o(╥﹏╥)o 。所以,我只能本身在此更正一下。見下面截圖,算法
put 方法,在新值替換舊值那裏,應該是隻有一種狀況的,e 不包括新值。圖中的方框也標註出來了。由於,判斷 e=p.next==null , 而後新的節點是賦值給 p.next 了,並無賦值給 e,此時 e 依舊是空的。因此 e!=null,表明當前的 e 是已經存在的舊值。數組
文章編寫過程,不免出現做者考慮不周的地方,若是有朋友發現有錯誤的地方,還請不吝賜教,指正出來。知錯能改,善莫大焉,對於技術,咱們應該懷有一顆嚴謹的心態~安全
這篇文章,我打算從如下幾個方面來說。多線程
1)多線程下的 HashMap 有什麼問題?併發
2)怎樣保證線程安全,爲何選用 ConcurrentHashMap?less
3)ConcurrentHashMap 1.7 源碼解析dom
4)ConcurrentHashMap 1.8 源碼解析
在上一篇文章中,已經講解了 HashMap 1.7 死循環的成因,也正由於如此,咱們才說 HashMap 在多線程下是不安全的。可是,在JDK1.8 的 HashMap 改成採用尾插法,已經不存在死循環的問題了,爲何也會線程不安全呢?
咱們以 put 方法爲例(1.8),
假如如今有兩個線程都執行到了上圖中的劃線處。當線程一判斷爲空以後,CPU 時間片到了,被掛起。線程二也執行到此處判斷爲空,繼續執行下一句,建立了一個新節點,插入到此下標位置。而後,線程一解掛,一樣認爲此下標的元素爲空,所以也建立了一個新節點放在此下標處,所以形成了元素的覆蓋。
因此,能夠看到無論是 JDK1.7 仍是 1.8 的 HashMap 都存在線程安全的問題。那麼,在多線程環境下,應該怎樣去保證線程安全呢?
首先,你可能想到,在多線程環境下用 Hashtable 來解決線程安全的問題。這樣確實是能夠的,可是一樣的它也有缺點,咱們看下最經常使用的 put 方法和 get 方法。
能夠看到,無論是往 map 裏邊添加元素仍是獲取元素,都會用 synchronized 關鍵字加鎖。當有多個元素以前存在資源競爭時,只能有一個線程能夠獲取到鎖,操做資源。更不能忍的是,一個簡單的讀取操做,互相之間又不影響,爲何也不能同時進行呢?
因此,hashtable 的缺點顯而易見,它無論是 get 仍是 put 操做,都是鎖住了整個 table,效率低下,所以 並不適合高併發場景。
也許,你還會想起來一個集合工具類 Collections,生成一個SynchronizedMap。其實,它和 Hashtable 差很少,一樣的緣由,鎖住整張表,效率低下。
因此,思考一下,既然鎖住整張表的話,併發效率低下,那我把整張表分紅 N 個部分,並使元素儘可能均勻的分佈到每一個部分中,分別給他們加鎖,互相之間並不影響,這種方式豈不是更好 。這就是在 JDK1.7 中 ConcurrentHashMap 採用的方案,被叫作鎖分段技術,每一個部分就是一個 Segment(段)。
可是,在JDK1.8中,徹底重構了,採用的是 Synchronized + CAS ,把鎖的粒度進一步下降,而放棄了 Segment 分段。(此時的 Synchronized 已經升級了,效率獲得了很大提高,鎖升級能夠了解一下)
咱們看下在 JDK1.7中 ConcurrentHashMap 是怎麼實現的。牆裂建議,在本文以前瞭解一下多線程的基本知識,如JMM內存模型,volatile關鍵字做用,CAS和自旋,ReentranLock重入鎖。
在 JDK1.7中,本質上仍是採用鏈表+數組的形式存儲鍵值對的。可是,爲了提升併發,把原來的整個 table 劃分爲 n 個 Segment 。因此,從總體來看,它是一個由 Segment 組成的數組。而後,每一個 Segment 裏邊是由 HashEntry 組成的數組,每一個 HashEntry之間又能夠造成鏈表。咱們能夠把每一個 Segment 當作是一個小的 HashMap,其內部結構和 HashMap 是如出一轍的。
當對某個 Segment 加鎖時,如圖中 Segment2,並不會影響到其餘 Segment 的讀寫。每一個 Segment 內部本身操做本身的數據。這樣一來,咱們要作的就是儘量的讓元素均勻的分佈在不一樣的 Segment中。最理想的狀態是,全部執行的線程操做的元素都是不一樣的 Segment,這樣就能夠下降鎖的競爭。
廢話了這麼多,仍是來看底層源碼吧,由於全部的思想都在代碼裏體現。借用 Linus的一句話,「No BB . Show me the code 」 (改編版,哈哈)
先看下 1.7 中經常使用的變量和內部類都有哪些,這有助於咱們瞭解 ConcurrentHashMap 的總體結構。
//默認初始化容量,這個和 HashMap中的容量是一個概念,表示的是整個 Map的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //默認加載因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默認的併發級別,這個參數決定了 Segment 數組的長度 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大的容量 static final int MAXIMUM_CAPACITY = 1 << 30; //每一個Segment中table數組的最小長度爲2,且必須是2的n次冪。 //因爲每一個Segment是懶加載的,用的時候纔會初始化,所以爲了不使用時當即調整大小,設定了最小容量2 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //用於限制Segment數量的最大值,必須是2的n次冪 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative //在size方法和containsValue方法,會優先採用樂觀的方式不加鎖,直到重試次數達到2,纔會對全部Segment加鎖 //這個值的設定,是爲了不無限次的重試。後邊size方法會詳講怎麼實現樂觀機制的。 static final int RETRIES_BEFORE_LOCK = 2; //segment掩碼值,用於根據元素的hash值定位所在的 Segment 下標。後邊會細講 final int segmentMask; //和 segmentMask 配合使用來定位 Segment 的數組下標,後邊講。 final int segmentShift; // Segment 組成的數組,每個 Segment 均可以看作是一個特殊的 HashMap final Segment<K,V>[] segments; //Segment 對象,繼承自 ReentrantLock 可重入鎖。 //其內部的屬性和方法和 HashMap 神似,只是多了一些拓展功能。 static final class Segment<K,V> extends ReentrantLock implements Serializable { //這是在 scanAndLockForPut 方法中用到的一個參數,用於計算最大重試次數 //獲取當前可用的處理器的數量,若大於1,則返回64,不然返回1。 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; //用於表示每一個Segment中的 table,是一個用HashEntry組成的數組。 transient volatile HashEntry<K,V>[] table; //Segment中的元素個數,每一個Segment單獨計數(下邊的幾個參數一樣的都是單獨計數) transient int count; //每次 table 結構修改時,如put,remove等,此變量都會自增 transient int modCount; //當前Segment擴容的閾值,同HashMap計算方法同樣也是容量乘以加載因子 //須要知道的是,每一個Segment都是單獨處理擴容的,互相之間不會產生影響 transient int threshold; //加載因子 final float loadFactor; //Segment構造函數 Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } ... // put(),remove(),rehash() 方法都在此類定義 } // HashEntry,存在於每一個Segment中,它就相似於HashMap中的Node,用於存儲鍵值對的具體數據和維護單向鏈表的關係 static final class HashEntry<K,V> { //每一個key經過哈希運算後的結果,用的是 Wang/Jenkins hash 的變種算法,此處不細講,感興趣的可自行查閱相關資料 final int hash; final K key; //value和next都用 volatile 修飾,用於保證內存可見性和禁止指令重排序 volatile V value; //指向下一個節點 volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } }
ConcurrentHashMap 有五種構造函數,可是最終都會調用同一個構造函數,因此只須要搞明白這一個核心的構造函數就能夠了。
PS: 文章註釋中 (1)(2)(3) 等序號都是用來方便作標記,不是計算值
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //檢驗參數是否合法。值得說的是,併發級別必定要大於0,不然就沒辦法實現分段鎖了。 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //併發級別不能超過最大值 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments //偏移量,是爲了對hash值作位移操做,計算元素所在的Segment下標,put方法詳講 int sshift = 0; //用於設定最終Segment數組的長度,必須是2的n次冪 int ssize = 1; //這裏就是計算 sshift 和 ssize 值的過程 (1) while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; //Segment的掩碼 this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //c用於輔助計算cap的值 (2) int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // cap 用於肯定某個Segment的容量,即Segment中HashEntry數組的長度 int cap = MIN_SEGMENT_TABLE_CAPACITY; //(3) while (cap < c) cap <<= 1; // create segments and segments[0] //這裏用 loadFactor作爲加載因子,cap乘以加載因子做爲擴容閾值,建立長度爲cap的HashEntry數組, //三個參數,建立一個Segment對象,保存到S0對象中。後邊在 ensureSegment 方法會用到S0做爲原型對象去建立對應的Segment。 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); //建立出長度爲 ssize 的一個 Segment數組 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //把S0存到Segment數組中去。在這裏,咱們就能夠發現,此時只是建立了一個Segment數組, //可是並無把數組中的每一個Segment對象建立出來,僅僅建立了一個Segment用來做爲原型對象。 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
上邊的註釋中留了 (1)(2)(3) 三個地方尚未細說。咱們如今假設一組數據,把涉及到的幾個變量計算出來,就能明白這些參數的含義了。
//假設調用了默認構造,都用的是默認參數,即 initialCapacity 和 concurrencyLevel 都是16 //(1) sshift 和 ssize 值的計算過程爲,每次循環,都會把 sshift 自增1,而且 ssize 左移一位,即乘以2, //直到 ssize 的值大於等於 concurrencyLevel 的值 16。 sshfit=0,1,2,3,4 ssize=1,2,4,8,16 //能夠看到,初始他們的值分別是0和1,最終結果是4和16 //sshfit是爲了輔助計算segmentShift值,ssize是爲了肯定Segment數組長度。 //(2) 此時,計算c的值, c = 16/16 = 1; //判斷 c * 16 < 16 是否爲真,真的話 c 自增1,此處爲false,所以 c的值爲1不變。 //(3) 此時,因爲c爲1, cap爲2 ,所以判斷 cap < c 爲false,最終cap爲2。 //總結一下,以上三個步驟,最終都是爲了肯定如下幾個關鍵參數的值, //肯定 segmentShift ,這個用於後邊計算hash值的偏移量,此處即爲 32-4=28, //肯定 ssize,必須是一個大於等於 concurrencyLevel 的一個2的n次冪值 //肯定 cap,必須是一個大於等於2的一個2的n次冪值 //感興趣的小夥伴,還能夠用另外幾組參數來計算上邊的參數值,能夠加深理解參數的含義。 //例如initialCapacity和concurrencyLevel分別傳入10和5,或者傳入33和16
put 方法的整體流程是,
//這是Map的put方法 public V put(K key, V value) { Segment<K,V> s; //不支持value爲空 if (value == null) throw new NullPointerException(); //經過 Wang/Jenkins 算法的一個變種算法,計算出當前key對應的hash值 int hash = hash(key); //上邊咱們計算出的 segmentShift爲28,所以hash值右移28位,說明此時用的是hash的高4位, //而後把它和掩碼15進行與運算,獲得的值必定是一個 0000 ~ 1111 範圍內的值,即 0~15 。 int j = (hash >>> segmentShift) & segmentMask; //這裏是用Unsafe類的原子操做找到Segment數組中j下標的 Segment 對象 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //初始化j下標的Segment s = ensureSegment(j); //在此Segment中添加元素 return s.put(key, hash, value, false); }
上邊有一個這樣的方法, UNSAFE.getObject (segments, (j << SSHIFT) + SBASE。它是爲了經過Unsafe這個類,找到 j 最新的實際值。這個計算 (j << SSHIFT) + SBASE ,在後邊很是常見,咱們只須要知道它表明的是 j 的一個偏移量,經過偏移量,就能夠獲得 j 的實際值。能夠類比,AQS 中的 CAS 操做。 Unsafe中的操做,都須要一個偏移量,看下圖,
(j << SSHIFT) + SBASE 就至關於圖中的 stateOffset偏移量。只不過圖中是 CAS 設置新值,而咱們這裏是取 j 的最新值。 後邊不少這樣的計算方式,就不贅述了。接着看 s.put 方法,這纔是最終肯定元素位置的方法。
//Segment中的 put 方法 final V put(K key, int hash, V value, boolean onlyIfAbsent) { //這裏經過tryLock嘗試加鎖,若是加鎖成功,返回null,不然執行 scanAndLockForPut方法 //這裏說明一下,tryLock 和 lock 是 ReentrantLock 中的方法, //區別是 tryLock 不會阻塞,搶鎖成功就返回true,失敗就立馬返回false, //而 lock 方法是,搶鎖成功則返回,失敗則會進入同步隊列,阻塞等待獲取鎖。 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { //當前Segment的table數組 HashEntry<K,V>[] tab = table; //這裏就是經過hash值,與tab數組長度取模,找到其所在HashEntry數組的下標 int index = (tab.length - 1) & hash; //當前下標位置的第一個HashEntry節點 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { //若是第一個節點不爲空 if (e != null) { K k; //而且第一個節點,就是要插入的節點,則替換value值,不然繼續向後查找 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { //替換舊值 oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } //說明當前index位置不存在任何節點,此時first爲null, //或者當前index存在一條鏈表,而且已經遍歷完了還沒找到相等的key,此時first就是鏈表第一個元素 else { //若是node不爲空,則直接頭插 if (node != null) node.setNext(first); //不然,建立一個新的node,並頭插 else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //若是當前Segment中的元素大於閾值,而且tab長度沒有超過容量最大值,則擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); //不然,就把當前node設置爲index下標位置新的頭結點 else setEntryAt(tab, index, node); ++modCount; //更新count值 count = c; //這種狀況說明舊值確定爲空 oldValue = null; break; } } } finally { //須要注意ReentrantLock必須手動解鎖 unlock(); } //返回舊值 return oldValue; }
這裏說明一下計算 Segment 數組下標和計算 HashEntry 數組下標的不一樣點:
//下邊的hash值是經過哈希運算後的hash值,不是hashCode //計算 Segment 下標 (hash >>> segmentShift) & segmentMask //計算 HashEntry 數組下標 (tab.length - 1) & hash
思考一下,爲何它們的算法不同呢? 計算 Segment 數組下標是用的 hash值高几位(這裏以高 4 位爲例)和掩碼作與運算,而計算 HashEntry 數組下標是直接用的 hash 值和數組長度減1作與運算。
個人理解是,這是爲了儘可能避免當前 hash 值計算出來的 Segment 數組下標和計算出來的 HashEntry 數組下標趨於相同。簡單說,就是爲了不分配到同一個 Segment 中的元素扎堆現象,即避免它們都被分配到同一條鏈表上,致使鏈表過長。同時,也是爲了減小併發。下面作一個運算,幫助理解一下(假設不用高 4 位運算,而是正常狀況都用低位作運算)。
//咱們以併發級別16,HashEntry數組容量 4 爲例,則它們參與運算的掩碼分別爲 15 和 3 //hash值 0110 1101 0110 1111 0110 1110 0010 0010 //segmentMask = 15 ,標記爲 (1) 0000 0000 0000 0000 0000 0000 0000 1111 //tab.length - 1 = 3 ,標記爲 (2) 0000 0000 0000 0000 0000 0000 0000 0011 //用 hash 分別和 15 ,3 作與運算,會發現獲得的結果是同樣,都是十進制 2. //這代表,當前 hash值被分配到下標爲 2 的 Segment 中,同時,被分配到下標爲 2 的 HashEntry 數組中 //如今如有另一個 hash 值 h2,和第一個hash值,高位不一樣,可是低4位相同, 1010 1101 0110 1111 0110 1110 0010 0010 //咱們會發現,最後它也會被分配到下標爲 2 的 Segment 和 HashEntry 數組,就會和第一個元素造成鏈表。 //因此,爲了不這種扎堆現象,讓元素儘可能均勻分配,就讓 hash 的高 4 位和 (1)處作與 運算,而用低位和 (2)處作與運算 //這樣計算後,它們所在的Segment下標分別爲 6(0110), 10(1010),即便它們在HashEntry數組中的下標都爲 2(0010),也無所謂 //由於它們並不在一個 Segment 中,也就不會在同一個 HashEntry 數組中,更不會造成鏈表。 //更重要的是,它們不會有併發,由於在各自不一樣的 Segment 本身操做本身的加鎖解鎖,互不影響
可能有的小夥伴就會打岔了,那若是兩個 hash 值,低位和高位都相同,怎麼辦呢。若是是這樣,我只能說,這個 hash 算法也太爛了吧。(這裏的 hash 算法也會盡可能避免這種狀況,固然只是減小概率,並不能杜絕)
我有個大膽的想法,這裏的高低位不一樣的計算方式,是否是後邊 1.8 HashMap 讓 hash 高低位作異或運算的引子呢?不得而知。。
put 方法比較簡單,只要能看懂 HashMap 中的 put 方法,這裏也沒問題。主要是它調用的子方法比較複雜,下邊一個一個講解。
回到 Map的 put 方法,判斷 j 下標的 Segment爲空後,則須要調用此方法,初始化一個 Segment 對象,以確保拿到的對象必定是不爲空的,不然沒法執行s.put了。
//k爲 (hash >>> segmentShift) & segmentMask 算法計算出來的值 private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; //u表明 k 的偏移量,用於經過 UNSAFE 獲取主內存最新的實際 K 值 long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //從內存中取到最新的下標位置的 Segment 對象,判斷是否爲空,(1) if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //以前構造函數說了,s0是做爲一個原型對象,用於建立新的 Segment 對象 Segment<K,V> proto = ss[0]; // use segment 0 as prototype //容量 int cap = proto.table.length; //加載因子 float lf = proto.loadFactor; //擴容閾值 int threshold = (int)(cap * lf); //把 Segment 對應的 HashEntry 數組先建立出來 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //再次檢查 K 下標位置的 Segment 是否爲空, (2) if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck //此處把 Segment 對象建立出來,並賦值給 s, Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); //循環檢查 K 下標位置的 Segment 是否爲空, (3) //若不爲空,則說明有其它線程搶先建立成功,而且已經成功同步到主內存中了, //則把它取出來,並返回 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //CAS,若當前下標的Segment對象爲空,就把它替換爲最新建立出來的 s 對象。 //若成功,就跳出循環,不然,就一直自旋直到成功,或者 seg 不爲空(其餘線程成功緻使)。 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
能夠發現,我標註了上邊 (1)(2)(3) 個地方,每次都判斷最新的Segment是否爲空。可能有的小夥伴就會迷惑,爲何作這麼屢次判斷,我直接去自旋不就行了,反正最後都要自旋的。
個人理解是,在多線程環境下,由於不肯定是何時會有其它線程 CAS 成功,有可能發生在以上的任意時刻。因此,只要發現一旦內存中的對象已經存在了,則說明已經有其它線程把Segment對象建立好,並CAS成功同步到主內存了。此時,就能夠直接返回,而不須要往下執行了。這樣作,是爲了代碼執行效率考慮。
put 方法第一步搶鎖失敗以後,就會執行此方法,
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //根據hash值定位到它對應的HashEntry數組的下標位置,並找到鏈表的第一個節點 //注意,這個操做會從主內存中獲取到最新的狀態,以確保獲取到的first是最新值 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; //重試次數,初始化爲 -1 int retries = -1; // negative while locating node //若搶鎖失敗,就一直循環,直到成功獲取到鎖。有三種狀況 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below //1.若 retries 小於0, if (retries < 0) { if (e == null) { //若 e 節點和 node 都爲空,則建立一個 node 節點。這裏只是預測性的建立一個node節點 if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } //如當前遍歷到的 e 節點不爲空,則判斷它的key是否等於傳進來的key,如果則把 retries 設爲0 else if (key.equals(e.key)) retries = 0; //不然,繼續向後遍歷節點 else e = e.next; } //2.如果重試次數超過了最大嘗試次數,則調用lock方法加鎖。代表再也不重試,我下定決心了必定要獲取到鎖。 //要麼當前線程能夠獲取到鎖,要麼獲取不到就去排隊等待獲取鎖。獲取成功後,再 break。 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } //3.若 retries 的值爲偶數,而且從內存中再次獲取到最新的頭節點,判斷若不等於first //則說明有其餘線程修改了當前下標位置的頭結點,因而須要更新頭結點信息。 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { //更新頭結點信息,並把重試次數重置爲 -1,繼續下一次循環,從最新的頭結點遍歷當前鏈表。 e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
這個方法邏輯比較複雜,會一直循環嘗試獲取鎖,若獲取成功,則返回。不然的話,每次循環時,都會同時遍歷當前鏈表。若遍歷完了一次,還沒找到和key相等的節點,就會預先建立一個節點。注意,這裏只是預測性的建立一個新節點,也有可能在這以前,就已經獲取鎖成功了。
同時,當重試次每偶數次時,就會檢查一次當前最新的頭結點是否被改變。由於如有變化的話,還須要從最新的頭結點開始遍歷鏈表。
還有一種狀況,就是循環次數達到了最大限制,則中止循環,用阻塞的方式去獲取鎖。這時,也就中止了遍歷鏈表的動做,當前線程也不會再作其餘預熱(warm up)的事情。
關於爲何預測性的建立新節點,源碼中原話是這樣的:
Since traversal speed doesn't matter, we might as well help warm up the associated code and accesses as well.
解釋一下就是,由於遍歷速度無所謂,因此,咱們能夠預先(warm up)作一些相關聯代碼的準備工做。這裏相關聯代碼,指的就是循環中,在獲取鎖成功或者調用 lock 方法以前作的這些事情,固然也包括建立新節點。
在put 方法中能夠看到,有一句是判斷 node 是否爲空,若建立了,就直接頭插。不然的話,它也會本身建立這個新節點。
scanAndLockForPut 這個方法能夠確保返回時,當前線程必定是獲取到鎖的狀態。
當 put 方法時,發現元素個數超過了閾值,則會擴容。須要注意的是,每一個Segment只管它本身的擴容,互相之間並不影響。換句話說,能夠出現這個 Segment的長度爲2,另外一個Segment的長度爲4的狀況(只要是2的n次冪)。
//node爲建立的新節點 private void rehash(HashEntry<K,V> node) { //當前Segment中的舊錶 HashEntry<K,V>[] oldTable = table; //舊的容量 int oldCapacity = oldTable.length; //新容量爲舊容量的2倍 int newCapacity = oldCapacity << 1; //更新新的閾值 threshold = (int)(newCapacity * loadFactor); //用新的容量建立一個新的 HashEntry 數組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; //當前的掩碼,用於計算節點在新數組中的下標 int sizeMask = newCapacity - 1; //遍歷舊錶 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; //若是e不爲空,說明當前鏈表不爲空 if (e != null) { HashEntry<K,V> next = e.next; //計算hash值再新數組中的下標位置 int idx = e.hash & sizeMask; //若是e不爲空,且它的下一個節點爲空,則說明這條鏈表只有一個節點, //直接把這個節點放到新數組的對應下標位置便可 if (next == null) // Single node on list newTable[idx] = e; //不然,處理當前鏈表的節點遷移操做 else { // Reuse consecutive sequence at same slot //記錄上一次遍歷到的節點 HashEntry<K,V> lastRun = e; //對應上一次遍歷到的節點在新數組中的新下標 int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { //計算當前遍歷到的節點的新下標 int k = last.hash & sizeMask; //若 k 不等於 lastIdx,則說明這次遍歷到的節點和上次遍歷到的節點不在同一個下標位置 //須要把 lastRun 和 lastIdx 更新爲當前遍歷到的節點和下標值。 //若相同,則不處理,繼續下一次 for 循環。 if (k != lastIdx) { lastIdx = k; lastRun = last; } } //把和 lastRun 節點的下標位置相同的鏈表最末尾的幾個連續的節點放到新數組的對應下標位置 newTable[lastIdx] = lastRun; //再把剩餘的節點,複製到新數組 //從舊數組的頭結點開始遍歷,直到 lastRun 節點,由於 lastRun節點後邊的節點都已經遷移完成了。 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; //用的是複製節點信息的方式,並非把原來的節點直接遷移,區別於lastRun處理方式 newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } //全部節點都遷移完成以後,再處理傳進來的新的node節點,把它頭插到對應的下標位置 int nodeIndex = node.hash & sizeMask; // add the new node //頭插node節點 node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; //更新當前Segment的table信息 table = newTable; }
上邊的遷移過程和 lastRun 和 lastIdx 變量可能不太好理解,我畫個圖就明白了。以其中一條鏈表處理方式爲例。
從頭結點開始向後遍歷,找到當前鏈表的最後幾個下標相同的連續的節點。如上圖,雖然開頭出現了有兩個節點的下標都是 k2, 可是中間出現一個不一樣的下標 k1,打斷了下標連續相同,所以從下一個k2,又從新開始算。好在後邊三個連續的節點下標都是相同的,所以倒數第三個節點被標記爲 lastRun,且變量無變化。
從lastRun節點到尾結點的這部分就能夠總體遷移到新數組的對應下標位置了,由於它們的下標都是相同的,能夠這樣統一處理。
另外從頭結點到 lastRun 以前的節點,沒法統一處理,只能一個一個去複製了。且注意,這裏不是直接遷移,而是複製節點到新的數組,舊的節點會在不久的未來,由於沒有引用指向,被 JVM 垃圾回收處理掉。
(不知道爲啥這個方法名起爲 rehash,其實擴容時 hash 值並無從新計算,變化的只是它們所在的下標而已。我猜想,多是,借用了 1.7 HashMap 中的說法吧。。。)
put 方法搞明白了以後,其實 get 方法就很好理解了。也是先定位到 Segment,而後再定位到 HashEntry 。
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; //計算hash值 int h = hash(key); //一樣的先定位到 key 所在的Segment ,而後從主內存中取出最新的節點 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { //若Segment不爲空,且鏈表也不爲空,則遍歷查找節點 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; //找到則返回它的 value 值,不然返回 null if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
remove 方法和 put 方法相似,也不用作過多特殊的介紹,
public V remove(Object key) { int hash = hash(key); //定位到Segment Segment<K,V> s = segmentForHash(hash); //若 s爲空,則返回 null,不然執行 remove return s == null ? null : s.remove(key, hash, null); } public boolean remove(Object key, Object value) { int hash = hash(key); Segment<K,V> s; return value != null && (s = segmentForHash(hash)) != null && s.remove(key, hash, value) != null; } final V remove(Object key, int hash, Object value) { //嘗試加鎖,若失敗,則執行 scanAndLock ,此方法和 scanAndLockForPut 方法相似 if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; //從主內存中獲取對應 table 的最新的頭結點 HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; //匹配到 key if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; // value 爲空,或者 value 也匹配成功 if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
size 方法須要重點說明一下。愛思考的小夥伴可能就會想到,併發狀況下,有可能在統計期間,數組元素個數不停的變化,並且,整個表還被分紅了 N個 Segment,怎樣統計才能保證結果的準確性呢? 咱們一塊兒來看下吧。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. //segment數組 final Segment<K,V>[] segments = this.segments; //統計全部Segment中元素的總個數 int size; //若是size大小超過32位,則標記爲溢出爲true boolean overflow; //統計每一個Segment中的 modcount 之和 long sum; //上次記錄的 sum 值 long last = 0L; //重試次數,初始化爲 -1 int retries = -1; try { for (;;) { //若是超太重試次數,則再也不重試,而是把全部Segment都加鎖,再統計 size if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) //強制加鎖 ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; //遍歷全部Segment for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); //若當前遍歷到的Segment不爲空,則統計它的 modCount 和 count 元素個數 if (seg != null) { //累加當前Segment的結構修改次數,如put,remove等操做都會影響modCount sum += seg.modCount; int c = seg.count; //若當前Segment的元素個數 c 小於0 或者 size 加上 c 的結果小於0,則認爲溢出 //由於若超過了 int 最大值,就會返回負數 if (c < 0 || (size += c) < 0) overflow = true; } } //當這次嘗試,統計的 sum 值和上次統計的值相同,則說明這段時間內, //並無任何一個 Segment 的結構發生改變,就能夠返回最後的統計結果 if (sum == last) break; //不相等,則說明有 Segment 結構發生了改變,則記錄最新的結構變化次數之和 sum, //並賦值給 last,用於下次重試的比較。 last = sum; } } finally { //若是超過了指定重試次數,則說明表中的全部Segment都被加鎖了,所以須要把它們都解鎖 if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } //若結果溢出,則返回 int 最大值,不然正常返回 size 值 return overflow ? Integer.MAX_VALUE : size; }
其實源碼中前兩行的註釋也說的很是清楚了。咱們先採用樂觀的方式,認爲在統計 size 的過程當中,並無發生 put, remove 等會改變 Segment 結構的操做。 可是,若是發生了,就須要重試。若是重試2次都不成功(執行三次,第一次不能叫作重試),就只能強制把全部 Segment 都加鎖以後,再統計了,以此來獲得準確的結果。
須要說明的是,JDK 1.8 的 CHM(ConcurrentHashMap) 實現,徹底重構了 1.7 。再也不有 Segment 的概念,只是爲了兼容 1.7 才申明瞭一下,並無用到。所以,再也不使用分段鎖,而是給數組中的每個頭節點(爲了方便,之後都叫桶)都加鎖,鎖的粒度下降了。而且,用的是 Synchronized 鎖。
可能有的小夥伴就有疑惑了,不是都說同步鎖是重量級鎖嗎,這樣不是會影響併發效率嗎?
確實以前同步鎖是一個重量級鎖,可是在 JDK1.6 以後進行了各類優化以後,它已經再也不那麼重了。引入了偏向鎖,輕量級鎖,以及鎖升級的概念,並且,聽說在更細粒度的代碼層面上,同步鎖已經能夠媲美 Lock 鎖,甚至是趕超了。 除此以外,它還有不少優勢,這裏再也不展開了。感興趣的能夠自行查閱同步鎖的鎖升級過程,以及它和 Lock 鎖的區別。
在 1.8 CHM 中,底層存儲結構和 1.8 的 HashMap 是同樣的,都是數組+鏈表+紅黑樹。不一樣的就是,多了一些併發的處理。
文章開頭咱們提到了,在 1.8 HashMap 中的線程安全問題,就是由於在多個線程同時操做同一個桶的頭結點時,會發生值的覆蓋狀況。那麼,順着這個思路,咱們看一下在 CHM 中它是怎麼避免這種狀況發生的吧。
PS: 因爲1.8的 CHM 和 HashMap 結構和基本屬性變量,還有初始化邏輯都差很少,只是多了一些併發狀況須要用到的參數和內部類,所以,再也不單獨拎出來介紹。在方法中用到的時候,再詳細解釋。
所以,從 put 方法開始,咱們看下,它在插入新元素的時候,是怎麼保證線程安全的吧。
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //能夠看到,在併發狀況下,key 和 value 都是不支持爲空的。 if (key == null || value == null) throw new NullPointerException(); //這裏和1.8 HashMap 的hash 方法大同小異,只是多了一個操做,以下 //( h ^ (h >>> 16)) & HASH_BITS; HASH_BITS = 0x7fffffff; // 0x7fffffff ,二進制爲 0111 1111 1111 1111 1111 1111 1111 1111 。 //因此,hash值除了作了高低位異或運算,還多了一步,保證最高位的 1 個 bit 位老是0。 //這裏,我並無明白它的意圖,僅僅是保證計算出來的hash值不超過 Integer 最大值,且不爲負數嗎。 //同 HashMap 的hash 方法對比一下,會發現連源碼註釋都是相同的,並無多說明其它的。 //我我的認爲意義不大,由於最後 hash 是爲了和 capacity -1 作與運算,而 capacity 最大值爲 1<<30, //即 0100 0000 0000 0000 0000 0000 0000 0000 ,減1爲 0011 1111 1111 1111 1111 1111 1111 1111。 //即便 hash 最高位爲 1(無所謂0),也不影響最後的結果,最高位也總會是0. int hash = spread(key.hashCode()); //用來計算當前鏈表上的元素個數 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //若是表爲空,則說明還未初始化。 if (tab == null || (n = tab.length) == 0) //初始化表,只有一個線程能夠初始化成功。 tab = initTable(); //若表已經初始化,則找到當前 key 所在的桶,而且判斷是否爲空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //若當前桶爲空,則經過 CAS 原子操做,把新節點插入到此位置, //這保證了只有一個線程能夠 CAS 成功,其它線程都會失敗。 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //若所在桶不爲空,則判斷節點的 hash 值是否爲 MOVED(值是-1) else if ((fh = f.hash) == MOVED) //若爲-1,說明當前數組正在進行擴容,則須要當前線程幫忙遷移數據 tab = helpTransfer(tab, f); else { V oldVal = null; //這裏用加同步鎖的方式,來保證線程安全,給桶中第一個節點對象加鎖 synchronized (f) { //recheck 一下,保證當前桶的第一個節點無變化,後邊不少這樣相似的操做,再也不贅述 if (tabAt(tab, i) == f) { //若是hash值大於等於0,說明是正常的鏈表結構 if (fh >= 0) { binCount = 1; //從頭結點開始遍歷,每遍歷一次,binCount計數加1 for (Node<K,V> e = f;; ++binCount) { K ek; //若是找到了和當前 key 相同的節點,則用新值替換舊值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //若遍歷到了尾結點,則把新節點尾插進去 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //不然判斷是不是樹節點。這裏提一下,TreeBin只是頭結點對TreeNode的再封裝 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //注意下,這個判斷是在同步鎖外部,由於 treeifyBin內部也有同步鎖,並不影響 if (binCount != 0) { //若是節點個數大於等於 8,則轉化爲紅黑樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); //把舊節點值返回 if (oldVal != null) return oldVal; break; } } } //給元素個數加 1,並有可能會觸發擴容,比較複雜,稍後細講 addCount(1L, binCount); return null; }
先看下當數組爲空時,是怎麼初始化表的。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //循環判斷表是否爲空,直到初始化成功爲止。 while ((tab = table) == null || tab.length == 0) { //sizeCtl 這個值有不少狀況,默認值爲0, //當爲 -1 時,說明有其它線程正在對錶進行初始化操做 //當表初始化成功後,又會把它設置爲擴容閾值 //當爲一個小於 -1 的負數,用來表示當前有幾個線程正在幫助擴容(後邊細講) if ((sc = sizeCtl) < 0) //若 sc 小於0,其實在這裏就是-1,由於此時表是空的,不會發生擴容,sc只能爲正數或者-1 //所以,當前線程放棄 CPU 時間片,只是自旋。 Thread.yield(); // lost initialization race; just spin //經過 CAS 把 sc 的值設置爲-1,代表當前線程正在進行表的初始化,其它失敗的線程就會自旋 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //從新檢查一下表是否爲空 if ((tab = table) == null || tab.length == 0) { //若是sc大於0,則爲sc,不然返回默認容量 16。 //當調用有參構造建立 Map 時,sc的值是大於0的。 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //建立數組 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //n減去 1/4 n ,即爲 0.75n ,表示擴容閾值 sc = n - (n >>> 2); } } finally { //更新 sizeCtl 爲擴容閾值 sizeCtl = sc; } //若當前線程初始化表成功,則跳出循環。其它自旋的線程由於判斷數組不爲空,也會中止自旋 break; } } return tab; }
若 put 方法元素插入成功以後,則會調用此方法,傳入參數爲 addCount(1L, binCount)。這個方法的目的很簡單,就是把整個 table 的元素個數加 1 。可是,實現比較難。
咱們先思考一下,若是讓咱們本身去實現這樣的統計元素個數,怎麼實現?
類比 1.8 的 HashMap ,咱們能夠搞一個 size 變量來存儲個數統計。可是,這是在多線程環境下,須要考慮併發的問題。所以,能夠把 size 設置爲 volatile 的,保證可見性,而後經過 CAS 樂觀鎖來自增 1。
這樣雖然也能夠實現。可是,設想一下如今有很是多的線程,都在同一時間操做這個 size 變量,將會形成特別嚴重的競爭。因此,基於此,這裏作了更好的優化。讓這些競爭的線程,分散到不一樣的對象裏邊,單獨操做它本身的數據(計數變量),用這樣的方式儘可能下降競爭。到最後須要統計 size 的時候,再把全部對象裏邊的計數相加就能夠了。
上邊提到的 size ,在此用 baseCount 表示。分散到的對象用 CounterCell 表示,對象裏邊的計數變量用 value 表示。注意這裏的變量都是 volatile 修飾的。
當須要修改元素數量時,線程會先去 CAS 修改 baseCount 加1,若成功即返回。若失敗,則線程被分配到某個 CounterCell ,而後操做 value 加1。若成功,則返回。不然,給當前線程從新分配一個 CounterCell,再嘗試給 value 加1。(這裏簡略的說,實際更復雜)
CounterCell 會組成一個數組,也會涉及到擴容問題。因此,先畫一個示意圖幫助理解一下。
//線程被分配到的格子 @sun.misc.Contended static final class CounterCell { //此格子內記錄的 value 值 volatile long value; CounterCell(long x) { value = x; } } //用來存儲線程和線程生成的隨機數的對應關係 static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } // x爲1,check表明鏈表上的元素個數 private final void addCount(long x, int check) { CounterCell[] as; long b, s; //此處要進入if有兩種狀況 //1.數組不爲空,說明數組已經被建立好了。 //2.若數組爲空,說明數組還未建立,頗有可能競爭的線程很是少,所以就直接 CAS 操做 baseCount //若 CAS 成功,則方法跳轉到 (2)處,若失敗,則須要考慮給當前線程分配一個格子(指CounterCell對象) if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; //字面意思,是無競爭,這裏先標記爲 true,表示尚未產生線程競爭 boolean uncontended = true; //這裏有三種狀況,會進入 fullAddCount 方法 //1.若數組爲空,進方法 (1) //2.ThreadLocalRandom.getProbe() 方法會給當前線程生成一個隨機數(能夠簡單的認爲也是一個hash值) //而後用隨機數與數組長度取模,計算它所在的格子。若當前線程所分配到的格子爲空,進方法 (1)。 //3.若數組不爲空,且線程所在格子不爲空,則嘗試 CAS 修改此格子對應的 value 值加1。 //若修改爲功,則跳轉到 (3),若失敗,則把 uncontended 值設爲 fasle,說明產生了競爭,而後進方法 (1) if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //方法(1), 這個方法的目的是讓當前線程必定把 1 加成功。狀況更多,更復雜,稍後講。 fullAddCount(x, uncontended); return; } //(3)能走到這,說明數組不爲空,且修改 baseCount失敗, //且線程被分配到的格子不爲空,且修改 value 成功。 //可是這裏沒明白爲何小於等於1,就直接返回了,這裏我懷疑以前的方法漏掉了binCount=0的狀況。 //並且此處若返回了,後邊怎麼判斷擴容?(存疑) if (check <= 1) return; //計算總共的元素個數 s = sumCount(); } //(2)這裏用於檢查是否須要擴容(下邊這部分不少邏輯不懂的話,等後邊講完擴容,再回來看就理解了) if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //若元素個數達到擴容閾值,且tab不爲空,且tab數組長度小於最大容量 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //這裏假設數組長度n就爲16,這個方法返回的是一個固定值,用於當作一個擴容的校驗標識 //能夠跳轉到最後,看詳細計算過程,0000 0000 0000 0000 1000 0000 0001 1011 int rs = resizeStamp(n); //若sc小於0,說明正在擴容 if (sc < 0) { //sc的結構相似這樣,1000 0000 0001 1011 0000 0000 0000 0001 //sc的高16位是數據校驗標識,低16位表明當前有幾個線程正在幫助擴容,RESIZE_STAMP_SHIFT=16 //所以判斷校驗標識是否相等,不相等則退出循環 //sc == rs + 1,sc == rs + MAX_RESIZERS 這兩個應該是用來判斷擴容是否已經完成,可是計算方法存疑 //感興趣的能夠看這個地址,應該是一個 bug , // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427 //nextTable=null 說明須要擴容的新數組還未建立完成 //transferIndex這個參數小於等於0,說明已經不須要其它線程幫助擴容了, //可是並不說明已經擴容完成,由於有可能還有線程正在遷移元素。稍後擴容細講就明白了。 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //到這裏說明當前線程能夠幫助擴容,所以sc值加一,表明擴容的線程數加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //當sc大於0,說明sc表明擴容閾值,所以第一次擴容以前確定走這個分支,用於初始化新表 nextTable //rs<<16 //1000 0000 0001 1011 0000 0000 0000 0000 //+2 //1000 0000 0001 1011 0000 0000 0000 0010 //這個值,轉爲十進制就是 -2145714174,用於標識,這是擴容時,初始化新表的狀態, //擴容時,須要用到這個參數校驗是否全部線程都所有幫助擴容完成。 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //擴容,第二個參數表明新表,傳入null,則說明是第一次初始化新表(nextTable) transfer(tab, null); s = sumCount(); } } } //計算表中的元素總個數 final long sumCount() { CounterCell[] as = counterCells; CounterCell a; //baseCount,以這個值做爲累加基準 long sum = baseCount; if (as != null) { //遍歷 counterCells 數組,獲得每一個對象中的value值 for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) //累加 value 值 sum += a.value; } } //此時獲得的就是元素總個數 return sum; } //擴容時的校驗標識 static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); } //Integer.numberOfLeadingZeros方法的做用是返回 n 的最高位爲1的前面的0的個數 //n=16, 0000 0000 0000 0000 0000 0000 0001 0000 //前面有27個0,即27 0000 0000 0000 0000 0000 0000 0001 1011 //RESIZE_STAMP_BITS爲16,而後 1<<(16-1),即 1<<15 0000 0000 0000 0000 1000 0000 0000 0000 //它們作或運算,獲得 rs 的值 0000 0000 0000 0000 1000 0000 0001 1011
上邊的 addCount 方法還沒完,別忘了有可能元素個數加 1 的操做還未成功,就走到 fullAddCount 這個方法了。看方法名,就知道了,全力增長計數值,必定要成功(奧利給)。 這個方法和擴容遷移方法是最難的,保持耐心~
//傳過來的參數分別爲 1 , false private final void fullAddCount(long x, boolean wasUncontended) { int h; //若是當前線程的隨機數爲0,則強制初始化一個值 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); //此時把 wasUncontended 設爲true,認爲無競爭 wasUncontended = true; } //用來表示比 contend(競爭)更嚴重的碰撞,若爲true,表示可能須要擴容,以減小碰撞衝突 boolean collide = false; // True if last slot nonempty //循環內,外層if判斷分三種狀況,內層判斷又分爲六種狀況 for (;;) { CounterCell[] as; CounterCell a; int n; long v; //1. 若counterCells數組不爲空。 建議先看下邊的2和3兩種狀況,再回頭看這個。 if ((as = counterCells) != null && (n = as.length) > 0) { // (1) 若當前線程所在的格子(CounterCell對象)爲空 if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { //若無鎖,則樂觀的建立一個 CounterCell 對象。 CounterCell r = new CounterCell(x); //嘗試加鎖 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; //加鎖成功後,再 recheck 一下數組是否不爲空,且當前格子爲空 try { CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //把新建立的對象賦值給當前格子 rs[j] = r; created = true; } } finally { //手動釋放鎖 cellsBusy = 0; } //若當前格子建立成功,且上邊的賦值成功,則說明加1成功,退出循環 if (created) break; //不然,繼續下次循環 continue; // Slot is now non-empty } } //若cellsBusy=1,說明有其它線程搶鎖成功。或者若搶鎖的 CAS 操做失敗,都會走到這裏, //則當前線程需跳轉到(9)從新生成隨機數,進行下次循環判斷。 collide = false; } /** *後邊這幾種狀況,都是數組和當前隨機到的格子都不爲空的狀況。 *且注意每種狀況,若執行成功,且不break,continue,則都會執行(9),從新生成隨機數,進入下次循環判斷 */ // (2) 到這,說明當前方法在被調用以前已經 CAS 失敗過一次,若不明白可回頭看下 addCount 方法, //爲了減小競爭,則跳轉到⑨處從新生成隨機數,並把 wasUncontended 設置爲true ,認爲下一次不會產生競爭 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // (3) 若 wasUncontended 爲 true 無競爭,則嘗試一次 CAS。若成功,則結束循環,若失敗則判斷後邊的 (4)(5)(6)。 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; // (4) 結合 (6) 一塊兒看,(4)(5)(6)都是 wasUncontended=true,且CAS修改value失敗的狀況。 //若數組有變化,或者數組長度大於等於當前CPU的核心數,則把 collide 改成 false //由於數組如有變化,說明是由擴容引發的;長度超限,則說明已經沒法擴容,只能認爲無碰撞。 //這裏頗有意思,認真思考一下,當擴容超限後,則會達到一個平衡,即 (4)(5) 反覆執行,直到 (3) 中CAS成功,跳出循環。 else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale // (5) 若數組無變化,且數組長度小於CPU核心數時,且 collide 爲 false,就把它改成 true,說明下次循環可能須要擴容 else if (!collide) collide = true; // (6) 若數組無變化,且數組長度小於CPU核心數時,且 collide 爲 true,說明衝突比較嚴重,須要擴容了。 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { //recheck if (counterCells == as) {// Expand table unless stale //建立一個容量爲原來兩倍的數組 CounterCell[] rs = new CounterCell[n << 1]; //轉移舊數組的值 for (int i = 0; i < n; ++i) rs[i] = as[i]; //更新數組 counterCells = rs; } } finally { cellsBusy = 0; } //認爲擴容後,下次不會產生衝突了,和(4)處邏輯照應 collide = false; //當次擴容後,就不須要從新生成隨機數了 continue; // Retry with expanded table } // (9),從新生成一個隨機數,進行下一次循環判斷 h = ThreadLocalRandom.advanceProbe(h); } //2.這裏的 cellsBusy 參數很是有意思,是一個volatile的 int值,用來表示自旋鎖的標誌, //能夠類比 AQS 中的 state 參數,用來控制鎖之間的競爭,而且是獨佔模式。簡化版的AQS。 //cellsBusy 若爲0,說明無鎖,線程均可以搶鎖,若爲1,表示已經有線程拿到了鎖,則其它線程不能搶鎖。 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { //這裏再從新檢測下 counterCells 數組引用是否有變化 if (counterCells == as) { //初始化一個長度爲 2 的數組 CounterCell[] rs = new CounterCell[2]; //根據當前線程的隨機數值,計算下標,只有兩個結果 0 或 1,並初始化對象 rs[h & 1] = new CounterCell(x); //更新數組引用 counterCells = rs; //初始化成功的標誌 init = true; } } finally { //別忘了,須要手動解鎖。 cellsBusy = 0; } //若初始化成功,則說明當前加1的操做也已經完成了,則退出整個循環。 if (init) break; } //3.到這,說明數組爲空,且 2 搶鎖失敗,則嘗試直接去修改 baseCount 的值, //若成功,也說明加1操做成功,則退出循環。 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
不得不佩服 Doug Lea 大神,思惟這麼縝密,若是是個人話,直接一個 CAS 完事。(手動攤手~)
須要說明的一點是,雖然咱們一直在說幫助擴容,其實更準確的說應該是幫助遷移元素。由於擴容的第一次初始化新表(擴容後的新表)這個動做,只能由一個線程完成。其餘線程都是在幫助遷移元素到新數組。
這裏仍是先看下遷移的示意圖,幫助理解。
爲了方便,上邊以原數組長度 8 爲例。在元素遷移的時候,全部線程都遵循從後向前推動的規則,即如圖A線程是第一個進來的線程,會從下標爲7的位置,開始遷移數據。
並且當前線程遷移時會肯定一個範圍,限定它這次遷移的數據範圍,如圖 A 線程只能遷移 bound=6到 i=7 這兩個數據。
此時,其它線程就不能遷移這部分數據了,只能繼續向前推動,尋找其它能夠遷移的數據範圍,且每次推動的步長爲固定值 stride(此處假設爲2)。如圖中 B線程發現 A 線程正在遷移6,7的數據,所以只能向前尋找,而後遷移 bound=4 到 i=5 的這兩個數據。
當每一個線程遷移完成它的範圍內數據時,都會繼續向前推動。那何時是個頭呢?
這就須要維護一個全局的變量 transferIndex,來表示全部線程總共推動到的元素下標位置。如圖,線程 A 第一次遷移成功後又向前推動,而後遷移2,3 的數據。此時,若沒有其餘線程在幫助遷移,則 transferIndex 即爲2。
剩餘部分等待下一個線程來遷移,或者有任何的 A 和B線程已經遷移完成,也能夠推動到這裏幫助遷移。直到 transferIndex=0 。(會作一些其餘校驗來判斷是否遷移所有完成,看代碼)。
//這個類是一個標誌,用來表明當前桶(數組中的某個下標位置)的元素已經所有遷移完成 static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { //把當前桶的頭結點的 hash 值設置爲 -1,代表已經遷移完成, //這個節點中並不存儲有效的數據 super(MOVED, null, null, null); this.nextTable = tab; } } //遷移數據 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //根據當前CPU核心數,肯定每次推動的步長,最小值爲16.(爲了方便咱們以2爲例) if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //從 addCount 方法,只會有一個線程跳轉到這裏,初始化新數組 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") //新數組長度爲原數組的兩倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } //用 nextTable 指代新數組 nextTable = nextTab; //這裏就把推動的下標值初始化爲原數組長度(以16爲例) transferIndex = n; } //新數組長度 int nextn = nextTab.length; //建立一個標誌類 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //是否向前推動的標誌 boolean advance = true; //是否全部線程都所有遷移完成的標誌 boolean finishing = false; // to ensure sweep before committing nextTab //i 表明當前線程正在遷移的桶的下標,bound表明它本次能夠遷移的範圍下限 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //須要向前推動 while (advance) { int nextIndex, nextBound; // (1) 先看 (3) 。i每次自減 1,直到 bound。若超過bound範圍,或者finishing標誌爲true,則不用向前推動。 //若未所有完成遷移,且 i 並未走到 bound,則跳轉到 (7),處理當前桶的元素遷移。 if (--i >= bound || finishing) advance = false; // (2) 每次執行,都會把 transferIndex 最新的值同步給 nextIndex //若 transferIndex小於等於0,則說明原數組中的每一個桶位置,都有線程在處理遷移了, //因而,須要跳出while循環,並把 i設爲 -1,以跳轉到④判斷在處理的線程是否已經所有完成。 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // (3) 第一個線程會先走到這裏,肯定它的數據遷移範圍。(2)處會更新 nextIndex爲 transferIndex 的最新值 //所以第一次 nextIndex=n=16,nextBound表明當次遷移的數據範圍下限,減去步長便可, //因此,第一次時,nextIndex=16,nextBound=16-2=14。後續,每次都會間隔一個步長。 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //bound表明當次數據遷移下限 bound = nextBound; //第一次的i爲15,由於長度16的數組,最後一個元素的下標爲15 i = nextIndex - 1; //代表不須要向前推動,只有當把當前範圍內的數據所有遷移完成後,才能夠向前推動 advance = false; } } // (4) if (i < 0 || i >= n || i + n >= nextn) { int sc; //若所有線程遷移完成 if (finishing) { nextTable = null; //更新table爲新表 table = nextTab; //擴容閾值改成原來數組長度的 3/2 ,即新長度的 3/4,也就是新數組長度的0.75倍 sizeCtl = (n << 1) - (n >>> 1); return; } //到這,說明當前線程已經完成了本身的全部遷移(不管參與了幾回遷移), //則把 sc 減1,代表參與擴容的線程數減小 1。 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //在 addCount 方法最後,咱們強調,遷移開始時,會設置 sc=(rs << RESIZE_STAMP_SHIFT) + 2 //每當有一個線程參與遷移,sc 就會加 1,每當有一個線程完成遷移,sc 就會減 1。 //所以,這裏就是去校驗當前 sc 是否和初始值是否相等。相等,則說明所有線程遷移完成。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; //只有此處,纔會把finishing 設置爲true。 finishing = advance = true; //這裏很是有意思,會把 i 從 -1 修改成16, //目的就是,讓 i 再從後向前掃描一遍數組,檢查是否全部的桶都已被遷移完成,參看 (6) i = n; // recheck before commit } } // (5) 若i的位置元素爲空,則說明當前桶的元素已經被遷移完成,就把頭結點設置爲fwd標誌。 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // (6) 若當前桶的頭結點是 ForwardingNode ,說明遷移完成,則向前推動 else if ((fh = f.hash) == MOVED) advance = true; // already processed //(7) 處理當前桶的數據遷移。 else { synchronized (f) { //給頭結點加鎖 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //若hash值大於等於0,則說明是普通鏈表節點 if (fh >= 0) { int runBit = fh & n; //這裏是 1.7 的 CHM 的 rehash 方法和 1.8 HashMap的 resize 方法的結合體。 //會分紅兩條鏈表,一條鏈表和原來的下標相同,另外一條鏈表是原來的下標加數組長度的位置 //而後找到 lastRun 節點,從它到尾結點總體遷移。 //lastRun前邊的節點則單個遷移,可是須要注意的是,這裏是頭插法。 //另外還有一點和1.7不一樣,1.7 lastRun前邊的節點是複製過去的,而這裏是直接遷移的,沒有複製操做。 //因此,最後會有兩條鏈表,一條鏈表從 lastRun到尾結點是正序的,而lastRun以前的元素是倒序的, //另一條鏈表,從頭結點開始就是倒敘的。看下圖。 Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } //樹節點 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
遷移後的新數組鏈表方向示意圖,以 runBit =0 爲例。
最後再看 put 方法中的這個方法,就比較簡單了。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //頭結點爲 ForwardingNode ,而且新數組已經初始化 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //若校驗標識失敗,或者已經擴容完成,或推動下標到頭,則退出 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //當前線程須要幫助遷移,sc值加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
JDK1.8 的 CHM 最主要的邏輯基本上都講完了,其它方法原理類同。1.8 的 ConcurrentHashMap 實現原理仍是比較簡單的,可是代碼實現比較複雜。相對於 1.7 來講,鎖的粒度下降了,效率也提升了。
看到這裏的小夥伴,我想說你是最棒的!
求點贊:
上篇文章收藏比點贊還要多,我要哭了 o(╥﹏╥)o。
原創不易,小夥伴們若是以爲文章還不錯的話,動動手指,關注我,點個贊吧~