前文講了HashMap的源碼分析,從中能夠看到下面的問題:html
基於上述問題,均可以使用ConcurrentHashMap進行解決,ConcurrentHashMap使用分段鎖技術解決了併發訪問效率,在遍歷迭代獲取時進行修改操做也不會發生併發修改異常等等問題。node
//最大容量大小 private static final int MAXIMUM_CAPACITY = 1 << 30; //默認容量大小 private static final int DEFAULT_CAPACITY = 16; /** *控制標識符,用來控制table的初始化和擴容的操做,不一樣的值有不一樣的含義 * 多線程之間,以volatile方式讀取sizeCtl屬性,來判斷ConcurrentHashMap當前所處的狀態。 * 經過cas設置sizeCtl屬性,告知其餘線程ConcurrentHashMap的狀態變動 *未初始化: * sizeCtl=0:表示沒有指定初始容量。 * sizeCtl>0:表示初始容量。 *初始化中: * sizeCtl=-1,標記做用,告知其餘線程,正在初始化 *正常狀態: * sizeCtl=0.75n ,擴容閾值 *擴容中: * sizeCtl < 0 : 表示有其餘線程正在執行擴容 * sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此時只有一個線程在執行擴容 */ private transient volatile int sizeCtl; //併發級別 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //建立一個新的空map,默認大小是16 public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); //調整table的大小,tableSizeFor的實現查看前文HashMap源碼分析的構造方法模塊 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } /** * concurrencyLevel:併發度,預估同時操做數據的線程數量 * 表示可以同時更新ConccurentHashMap且不產生鎖競爭的最大線程數。 * 默認值爲16,(即容許16個線程併發可能不會產生競爭)。 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //至少使用一樣多的桶容納一樣多的更新線程來操做元素 if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
public V put(K key, V value) { return putVal(key, value, false); } static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash普通節點哈希的可用位 //把位數控制在int最大整數以內,h ^ (h >>> 16)的含義查看前文的put源碼解析 static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } final V putVal(K key, V value, boolean onlyIfAbsent) { //key和value爲空拋出異常 if (key == null || value == null) throw new NullPointerException(); //獲得hash值 int hash = spread(key.hashCode()); int binCount = 0; //自旋對table進行遍歷 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //若是hash計算出的槽位元素爲null,CAS將元素填充進當前槽位並結束遍歷 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //hash爲-1,說明正在擴容,那麼就幫助其擴容。以加快速度 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) {// 同步 f 節點,防止增長鏈表的時候致使鏈表成環 if (tabAt(tab, i) == f) {// 若是對應的下標位置的節點沒有改變 if (fh >= 0) {//f節點的hash值大於0 binCount = 1;//鏈表初始長度 // 死循環,直到將值添加到鏈表尾部,並計算鏈表的長度 for (Node<K,V> e = f;; ++binCount) { K ek; //hash和key相同,值進行覆蓋 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //hash和key不一樣,添加到鏈表後面 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //是樹節點,添加到樹中 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //若是節點添加到鏈表和樹中 if (binCount != 0) { //鏈表長度大於等於8時,將鏈表轉換成紅黑樹樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 判斷是否須要擴容 addCount(1L, binCount); return null; }
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //若是一個線程發現sizeCtl<0,意味着另外的線程執行CAS操做成功,當前線程只須要讓出cpu時間片,即保證只有一個線程初始化 //因爲sizeCtl是volatile的,保證了順序性和可見性 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//cas操做判斷並置爲-1 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//若沒有參數則默認容量爲16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//建立數組 table = tab = nt;//數組賦值給當前ConcurrentHashMap //計算下一次元素到達擴容的閥值,若是n爲16的話,那麼這裏 sc = 12,其實就是 0.75 * n sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { /**getObjectVolatile:獲取obj對象中offset偏移地址對應的object型field的值,支持volatile load語義。 * 數組的尋址計算方式:a[i]_address = base_address + i * data_type_size * base_address:起始地址;i:索引;data_type_size:數據類型長度大小 */ return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }
private static int RESIZE_STAMP_BITS = 16; /** * numberOfLeadingZeros()的具體算法邏輯請參考:https://www.jianshu.com/p/2c1be41f6e59 * numberOfLeadingZeros(n)返回的是n的二進制標識的從高位開始到第一個非0的數字的之間0的個數,好比numberOfLeadingZeros(8)返回的就是28 ,由於0000 0000 0000 0000 0000 0000 0000 1000在1前面有28個0 * RESIZE_STAMP_BITS 的值是16,1 << (RESIZE_STAMP_BITS - 1)就是將1左移位15位,0000 0000 0000 0000 1000 0000 0000 0000 * 而後將兩個數字再按位或,將至關於 將移位後的 兩個數相加。 * 好比: * 8的二進制表示是: 0000 0000 0000 0000 0000 0000 0000 1000 = 8 * 7的二進制表示是: 0000 0000 0000 0000 0000 0000 0000 0111 = 7 * 按位或的結果是: 0000 0000 0000 0000 0000 0000 0000 1111 = 15 * 至關於 8 + 7 =15 * 爲何會出現這種效果呢?由於8是2的整數次冪,也就是說8的二進制表示只會在某個高位上是1,其他地位都是0,因此在按位或的時候,低位表示的全是7的位值,因此出現了這種效果。 */ static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); } final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //若是table不是空,且node節點是轉移類型,且node節點的nextTable(新 table)不是空,嘗試幫助擴容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //根據length獲得一個標識符號 int rs = resizeStamp(tab.length); //若是nextTab沒有被併發修改,且tab也沒有被併發修改,且sizeCtl<0(說明還在擴容) while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { /** * 若是 sizeCtl 無符號右移16不等於rs( sc前16位若是不等於標識符,則標識符變化了) * 或者 sizeCtl == rs + 1(擴容結束了,再也不有線程進行擴容)(默認第一個線程設置 sc ==rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減1。這個時候,sc 就等於 rs + 1) * 或者 sizeCtl == rs + 65535 (若是達到最大幫助線程的數量,即 65535) * 或者轉移下標正在調整 (擴容結束) * 結束循環,返回 table * 【即若是還在擴容,判斷標識符是否變化,判斷擴容是否結束,判斷是否達到最大線程數,判斷擴容轉移下標是否在調整(擴容結束),若是知足任意條件,結束循環。】 */ if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 若是以上都不是, 將 sizeCtl + 1, (表示增長了一個線程幫助其擴容) if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab);//進行擴容和數據遷移 break; } } return nextTab;//返回擴容後的數組 } return table;//沒有擴容,返回原數組 }
//擴容索引,表示已經分配給擴容線程的table數組索引位置。主要用來協調多個線程,併發安全地獲取遷移任務(hash桶)。 private transient volatile int transferIndex;
在擴容以前,transferIndex 在數組的最右邊 。此時有一個線程發現已經到達擴容閾值,準備開始擴容。算法
發現transferIndex=0,即全部node均已分配數組
發現擴容線程已經達到最大擴容線程數緩存
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //先判斷CPU核數,若是是多核,將數組長度/8,再/核數,獲得stride,不然stride=數組長度,若是stride<16,則stride=16 //這裏的目的是讓每一個CPU處理的桶同樣多,避免出現轉移任務不均勻的現象,若是桶較少的話,默認一個CPU(一個線程)處理16個桶,即確保每次至少獲取16個桶(遷移任務) if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //未初始化進行初始化 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//擴容2倍 nextTab = nt;//更新 } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE;//擴容失敗,sizeCtl使用int最大值。 return; } nextTable = nextTab;//更新成員變量 //transferIndex默認=table.length transferIndex = n; } int nextn = nextTab.length;//新tab的長度 //建立一個fwd節點,用於佔位。當別的線程發現這個槽位中是fwd類型的節點,表示其餘線程正在擴容,而且此節點已經擴容完畢,跳過這個節點。關聯了nextTab,能夠經過ForwardingNode.find()訪問已經遷移到nextTab的數據。 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //首次推動爲 true,若是等於true,說明須要再次推動一個下標(i--),反之,若是是false,那麼就不能推動下標,須要將當前的下標處理完畢才能繼續推動 boolean advance = true; //完成狀態,若是是true,就結束此方法。 boolean finishing = false; // to ensure sweep before committing nextTab //自旋,i表示當前線程能夠處理的當前桶區間最大下標,bound表示當前線程能夠處理的當前桶區間最小下標 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //while:若是當前線程能夠向後推動;這個循環就是控制i遞減。同時,每一個線程都會進入這裏取得本身須要轉移的桶的區間 //分析場景:table.length=32,此時執行到這個地方nextTab.length=64 A,B線程同時進行擴容。 //A,B線程同時執行到while循環中cas這段代碼 //A線程獲第一時間搶到資源,設置bound=nextBound=16,i = nextIndex - 1=31 A線程搬運table[31]~table[16]中間16個元素 //B線程再次回到while起點,而後在次獲取到 bound = nextBound-0,i=nextIndex - 1=15,B線程搬運table[15]~table[0]中間16個元素 //當transferIndex=0的時候,說明table裏面全部搬運任務都已經完成,沒法在分配任務。 while (advance) { int nextIndex, nextBound; // 對i減1,判斷是否大於等於bound(正常狀況下,若是大於bound不成立,說明該線程上次領取的任務已經完成了。那麼,須要在下面繼續領取任務) // 若是對i減1大於等於 bound,或者完成了,修改推動狀態爲 false,不能推動了。任務成功後修改推動狀態爲 true。 // 一般,第一次進入循環,i-- 這個判斷會沒法經過,從而走下面的nextIndex = transferIndex(獲取最新的轉移下標)。其他狀況都是:若是能夠推動,將i減1,而後修改爲不可推動。若是i對應的桶處理成功了,改爲能夠推動。 if (--i >= bound || finishing) advance = false;//這裏設置false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動 // 這裏的目的是:1. 當一個線程進入時,會選取最新的轉移下標。 // 2. 當一個線程處理完本身的區間時,若是還有剩餘區間的沒有別的線程處理,再次CAS獲取區間。 else if ((nextIndex = transferIndex) <= 0) { // 若是小於等於0,說明沒有區間能夠獲取了,i改爲-1,推動狀態變成false,再也不推動 // 這個-1會在下面的if塊裏判斷,從而進入完成狀態判斷 i = -1; advance = false;//這裏設置false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動 } // CAS修改transferIndex,即 length - 區間值,留下剩餘的區間值供後面的線程使用 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound;//這個值就是當前線程能夠處理的最小當前區間最小下標 i = nextIndex - 1;//初次對i賦值,這個就是當前線程能夠處理的當前區間的最大下標 advance = false;// 這裏設置false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動,這樣致使漏掉某個桶。下面的 if(tabAt(tab, i) == f) 判斷會出現這樣的狀況。 } } //i<0(不在 tab 下標內,按照上面的判斷,領取最後一段區間的線程結束) if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) {// 若是完成了擴容和數據遷移 nextTable = null;//刪除成員遍歷 table = nextTab;//更新table sizeCtl = (n << 1) - (n >>> 1);//更新閥值 return;//結束transfer } //若是沒完成,嘗試將sc -1. 表示這個線程結束幫助擴容了,將 sc 的低 16 位減一。 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //若是 sc - 2 不等於標識符左移 16 位。若是他們相等了,說明沒有線程在幫助他們擴容了。也就是說,擴容結束了。 /** *第一個擴容的線程,執行transfer方法以前(helpTransfer方法中),會設置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) *後續幫其擴容的線程,執行transfer方法以前,會設置 sizeCtl = sizeCtl+1 *每個退出transfer的方法的線程,退出以前,會設置 sizeCtl = sizeCtl-1 *那麼最後一個線程退出時: *必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT */ if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return;// 不相等,說明不到最後一個線程,直接退出transfer方法 finishing = advance = true;// 若是相等,擴容結束了,更新 finising 變量 i = n; // recheck before commit,最後退出的線程要從新check下是否所有遷移完畢 } } else if ((f = tabAt(tab, i)) == null) // 獲取老tab的i下標位置的變量,若是是 null,就使用 fwd 佔位。 advance = casTabAt(tab, i, null, fwd);// 若是成功寫入 fwd 佔位,再次推動一個下標 else if ((fh = f.hash) == MOVED)// 若是不是 null 且 hash 值是 MOVED。 advance = true; // already processed,說明別的線程已經處理過了,再次推動一個下標 else {// 到這裏,說明這個位置有實際值了,且不是佔位符。對這個節點上鎖。爲何上鎖,防止 putVal 的時候向鏈表插入數據 synchronized (f) { // 判斷 i 下標處的桶節點是否和 f 相同 if (tabAt(tab, i) == f) { Node<K,V> ln, hn;// low, height 高位桶,低位桶 // 若是 f 的 hash 值大於 0 。TreeBin 的 hash 是 -2 if (fh >= 0) { // 對老長度進行與運算(第一個操做數的的第n位於第二個操做數的第n位若是都是1,那麼結果的第n爲也爲1,不然爲0) // 因爲 Map 的長度都是 2 的次方(000001000 這類的數字),那麼取於 length 只有 2 種結果,一種是 0,一種是1 // 若是是結果是0 ,Doug Lea 將其放在低位,反之放在高位,目的是將鏈表從新 hash,放到對應的位置上,讓新的取於算法可以擊中他。 int runBit = fh & n; Node<K,V> lastRun = f; // 尾節點,且和頭節點的 hash 值取於不相等 // 遍歷這個桶 for (Node<K,V> p = f.next; p != null; p = p.next) { // 取於桶中每一個節點的 hash 值 int b = p.hash & n; // 若是節點的 hash 值和首節點的 hash 值取於結果不一樣 if (b != runBit) { runBit = b; // 更新 runBit,用於下面判斷 lastRun 該賦值給 ln 仍是 hn。 lastRun = p; // 這個 lastRun 保證後面的節點與本身的取於值相同,避免後面沒有必要的循環 } } if (runBit == 0) {// 若是最後更新的 runBit 是 0 ,設置低位節點 ln = lastRun; hn = null; } else { hn = lastRun; // 若是最後更新的 runBit 是 1, 設置高位節點 ln = null; }// 再次循環,生成兩個鏈表,lastRun 做爲中止條件,這樣就是避免無謂的循環(lastRun 後面都是相同的取於結果) for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 若是與運算結果是 0,那麼就還在低位 if ((ph & n) == 0) // 若是是0 ,那麼建立低位節點 ln = new Node<K,V>(ph, pk, pv, ln); else // 1 則建立高位 hn = new Node<K,V>(ph, pk, pv, hn); } // 其實這裏相似 hashMap // 設置低位鏈表放在新數組的 i setTabAt(nextTab, i, ln); // 設置高位鏈表,在原有長度上加 n setTabAt(nextTab, i + n, hn); // 將舊的鏈表設置成佔位符,表示處理過了 setTabAt(tab, i, fwd); // 繼續向後推動 advance = true; }// 若是是紅黑樹 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍歷 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); // 和鏈表相同的判斷,與運算 == 0 的放在低位 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } // 不是 0 的放在高位 else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 若是樹的節點數小於等於 6,那麼轉成鏈表,反之,建立一個新的樹 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位樹 setTabAt(nextTab, i, ln); // 高位數 setTabAt(nextTab, i + n, hn); // 舊的設置成佔位符 setTabAt(tab, i, fwd); // 繼續向後推動 advance = true; } } } } } }
// 從 putVal 傳入的參數是x=1,check=binCount默認是0,只有hash衝突了纔會大於1,且他的大小是鏈表的長度(若是不是紅黑樹結構的話,紅黑樹=2)。 private final void addCount(long x, int check) { CounterCell[] as; long b, s; //若是計數盒子不是空或者修改 baseCount 失敗 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 若是計數盒子是空(還沒有出現併發) // 若是隨機取餘一個數組位置爲空 或者 // 修改這個槽位的變量失敗(出現併發了) // 執行 fullAddCount 方法,在fullAddCount自旋直到CAS操做成功才結束退出 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } // 檢查是否須要擴容,在 putVal 方法調用時,默認就是要檢查的(check默認是0,鏈表是鏈表長度,紅黑樹是2),若是是值覆蓋了,就忽略 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 若是map.size() 大於 sizeCtl(達到擴容閾值須要擴容) 且 // table 不是空;且 table 的長度小於 1 << 30。(能夠擴容) while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 根據 length 獲得一個標識 int rs = resizeStamp(n); if (sc < 0) {//代表此時有別的線程正在進行擴容 // 若是 sc 的低 16 位不等於 標識符(校驗異常 sizeCtl 變化了) // 若是 sc == 標識符 + 1 (擴容結束了,再也不有線程進行擴容)(默認第一個線程設置 sc ==rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等於 rs + 1) // 若是 sc == 標識符 + 65535(幫助線程數已經達到最大) // 若是 nextTable == null(結束擴容了) // 若是 transferIndex <= 0 (轉移狀態變化了) // 結束循環 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 不知足前面5個條件時,嘗試參與這次擴容,把正在執行transfer任務的線程數加1,+2表明有1個,+1表明有0個,表示多了一個線程在幫助擴容,執行transfer if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //若是不在擴容,將 sc 更新:標識符左移 16 位 而後 + 2. 也就是變成一個負數。高 16 位是標識符,低 16 位初始是 2. //試着讓本身成爲第一個執行transfer任務的線程 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount();// 從新計數,判斷是否須要開啓下一輪擴容 } } }
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //獲得hash int h = spread(key.hashCode()); //table有值,且查找到的槽位有值(tabAt方法經過valatile讀) if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //hash、key、value都相同返回當前查找到節點的值 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //遍歷特殊節點:紅黑樹、已經遷移的節點(ForwardingNode)等 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //遍歷node鏈表(e.next也是valitle變量) while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; }
public V remove(Object key) { return replaceNode(key, null, null); } //經過volatile設置第i個節點的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); } final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); //自旋 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //數組或查找的槽位爲空,結束自旋返回null if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //正在擴容,幫助擴容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null;//返回的舊值 boolean validated = false;//是否進行刪除鏈表或紅黑樹節點 synchronized (f) {//槽位加鎖 //getObjectVolatile獲取tab[i],若是此時tab[i]!=f,說明其餘線程修改了tab[i]。回到for循環開始處,從新執行 if (tabAt(tab, i) == f) {//槽位節點沒有變化 if (fh >= 0) {//槽位節點是鏈表 validated = true; //遍歷鏈表 for (Node<K,V> e = f, pred = null;;) { K ek; //hash、key、value相同 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val;//臨時節點緩存當前節點值 //值相同 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev;//給舊值賦值 if (value != null)//值覆蓋,replace()調用 e.val = value; else if (pred != null)//有前節點,表示當前節點不是頭節點 pred.next = e.next;//刪除當前節點 else setTabAt(tab, i, e.next);//刪除頭節點,即更新當前槽位(數組槽位)節點爲頭節點的下一節點 } break; } //當前節點不是目標節點,繼續遍歷下一個節點 pred = e; //到達鏈表尾部,依舊沒有找到,跳出循環 if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) {//紅黑樹 validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; //樹有節點且查找的節點不爲null if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; //值相同 if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv;//給舊值賦值 if (value != null)//值覆蓋,replace()調用 p.val = value; else if (t.removeTreeNode(p))//刪除節點成功 setTabAt(tab, i, untreeify(t.first));//更新當前槽位(數組槽位)節點爲樹的第一個節點 } } } } } if (validated) { //若是刪除了節點,更新size if (oldVal != null) { if (value == null) addCount(-1L, -1);//數量-1 return oldVal; } break; } } } return null; }