ConcurrentHashMap 是併發中的重中之重,也是最經常使用的數據結構,以前的文章中,咱們介紹了 putVal 方法。併發編程之 ConcurrentHashMap(JDK 1.8) putVal 源碼分析。其中分析了 initTable 方法和 putVal 方法,但也留下了一句話:java
這篇文章僅僅是 ConcurrentHashMap 的開頭,關於 ConcurrentHashMap 裏面的精華太多,值得咱們好好學習。node
說道精華,他的擴容方法絕對是精華,要知道,ConcurrentHashMap 擴容是高度併發的。算法
今天來逐行分析源碼。編程
首先說結論。源碼加註釋我會放在後面。該方法的執行邏輯以下:數組
經過計算 CPU 核心數和 Map 數組的長度獲得每一個線程(CPU)要幫助處理多少個桶,而且這裏每一個線程處理都是平均的。默認每一個線程處理 16 個桶。所以,若是長度是 16 的時候,擴容的時候只會有一個線程擴容。緩存
初始化臨時變量 nextTable。將其在原有基礎上擴容兩倍。安全
死循環開始轉移。多線程併發轉移就是在這個死循環中,根據一個 finishing 變量來判斷,該變量爲 true 表示擴容結束,不然繼續擴容。數據結構
3.1 進入一個 while 循環,分配數組中一個桶的區間給線程,默認是 16. 從大到小進行分配。當拿到分配值後,進行 i-- 遞減。這個 i 就是數組下標。(其中有一個 bound 參數,這個參數指的是該線程這次能夠處理的區間的最小下標,超過這個下標,就須要從新領取區間或者結束擴容,還有一個 advance 參數,該參數指的是是否繼續遞減轉移下一個桶,若是爲 true,表示能夠繼續向後推動,反之,說明尚未處理好當前桶,不能推動
)
3.2 出 while 循環,進 if 判斷,判斷擴容是否結束,若是擴容結束,清空臨時變量,更新 table 變量,更新庫容閾值。若是沒完成,但已經沒法領取區間(沒了),該線程退出該方法,並將 sizeCtl 減一,表示擴容的線程少一個了。若是減完這個數之後,sizeCtl 迴歸了初始狀態,表示沒有線程再擴容了,該方法全部的線程擴容結束了。(這裏主要是判斷擴容任務是否結束,若是結束了就讓線程退出該方法,並更新相關變量
)。而後檢查全部的桶,防止遺漏。
3.3 若是沒有完成任務,且 i 對應的槽位是空,嘗試 CAS 插入佔位符,讓 putVal 方法的線程感知。
3.4 若是 i 對應的槽位不是空,且有了佔位符,那麼該線程跳過這個槽位,處理下一個槽位。
3.5 若是以上都是否是,說明這個槽位有一個實際的值。開始同步處理這個桶。
3.6 到這裏,都尚未對桶內數據進行轉移,只是計算了下標和處理區間,而後一些完成狀態判斷。同時,若是對應下標內沒有數據或已經被佔位了,就跳過了。多線程
處理每一個桶的行爲都是同步的。防止 putVal 的時候向鏈表插入數據。
4.1 若是這個桶是鏈表,那麼就將這個鏈表根據 length 取於拆成兩份,取於結果是 0 的放在新表的低位,取於結果是 1 放在新表的高位。
4.2 若是這個桶是紅黑數,那麼也拆成 2 份,方式和鏈表的方式同樣,而後,判斷拆分過的樹的節點數量,若是數量小於等於 6,改形成鏈表。反之,繼續使用紅黑樹結構。
4.3 到這裏,就完成了一個桶從舊錶轉移到新表的過程。併發
好,以上,就是 transfer 方法的整體邏輯。仍是挺複雜的。再進行精簡,分紅 3 步驟:
大致就是上面的 3 個步驟。
再來看看源碼和註釋。
源碼加註釋:
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. * * transferIndex 表示轉移時的下標,初始爲擴容前的 length。 * * 咱們假設長度是 32 */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 將 length / 8 而後除以 CPU核心數。若是獲得的結果小於 16,那麼就使用 16。 // 這裏的目的是讓每一個 CPU 處理的桶同樣多,避免出現轉移任務不均勻的現象,若是桶較少的話,默認一個 CPU(一個線程)處理 16 個桶 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range 細分範圍 stridea:TODO // 新的 table 還沒有初始化 if (nextTab == null) { // initiating try { // 擴容 2 倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 更新 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME // 擴容失敗, sizeCtl 使用 int 最大值。 sizeCtl = Integer.MAX_VALUE; return;// 結束 } // 更新成員變量 nextTable = nextTab; // 更新轉移下標,就是 老的 tab 的 length transferIndex = n; } // 新 tab 的 length int nextn = nextTab.length; // 建立一個 fwd 節點,用於佔位。當別的線程發現這個槽位中是 fwd 類型的節點,則跳過這個節點。 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 首次推動爲 true,若是等於 true,說明須要再次推動一個下標(i--),反之,若是是 false,那麼就不能推動下標,須要將當前的下標處理完畢才能繼續推動 boolean advance = true; // 完成狀態,若是是 true,就結束此方法。 boolean finishing = false; // to ensure sweep before committing nextTab // 死循環,i 表示下標,bound 表示當前線程能夠處理的當前桶區間最小下標 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 若是當前線程能夠向後推動;這個循環就是控制 i 遞減。同時,每一個線程都會進入這裏取得本身須要轉移的桶的區間 while (advance) { int nextIndex, nextBound; // 對 i 減一,判斷是否大於等於 bound (正常狀況下,若是大於 bound 不成立,說明該線程上次領取的任務已經完成了。那麼,須要在下面繼續領取任務) // 若是對 i 減一大於等於 bound(還須要繼續作任務),或者完成了,修改推動狀態爲 false,不能推動了。任務成功後修改推動狀態爲 true。 // 一般,第一次進入循環,i-- 這個判斷會沒法經過,從而走下面的 nextIndex 賦值操做(獲取最新的轉移下標)。其他狀況都是:若是能夠推動,將 i 減一,而後修改爲不可推動。若是 i 對應的桶處理成功了,改爲能夠推動。 if (--i >= bound || finishing) advance = false;// 這裏設置 false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動 // 這裏的目的是:1. 當一個線程進入時,會選取最新的轉移下標。2. 當一個線程處理完本身的區間時,若是還有剩餘區間的沒有別的線程處理。再次獲取區間。 else if ((nextIndex = transferIndex) <= 0) { // 若是小於等於0,說明沒有區間了 ,i 改爲 -1,推動狀態變成 false,再也不推動,表示,擴容結束了,當前線程能夠退出了 // 這個 -1 會在下面的 if 塊裏判斷,從而進入完成狀態判斷 i = -1; advance = false;// 這裏設置 false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動 }// CAS 修改 transferIndex,即 length - 區間值,留下剩餘的區間值供後面的線程使用 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound;// 這個值就是當前線程能夠處理的最小當前區間最小下標 i = nextIndex - 1; // 初次對i 賦值,這個就是當前線程能夠處理的當前區間的最大下標 advance = false; // 這裏設置 false,是爲了防止在沒有成功處理一個桶的狀況下卻進行了推動,這樣對致使漏掉某個桶。下面的 if (tabAt(tab, i) == f) 判斷會出現這樣的狀況。 } }// 若是 i 小於0 (不在 tab 下標內,按照上面的判斷,領取最後一段區間的線程擴容結束) // 若是 i >= tab.length(不知道爲何這麼判斷) // 若是 i + tab.length >= nextTable.length (不知道爲何這麼判斷) if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 若是完成了擴容 nextTable = null;// 刪除成員變量 table = nextTab;// 更新 table sizeCtl = (n << 1) - (n >>> 1); // 更新閾值 return;// 結束方法。 }// 若是沒完成 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 嘗試將 sc -1. 表示這個線程結束幫助擴容了,將 sc 的低 16 位減一。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 若是 sc - 2 不等於標識符左移 16 位。若是他們相等了,說明沒有線程在幫助他們擴容了。也就是說,擴容結束了。 return;// 不相等,說明沒結束,當前線程結束方法。 finishing = advance = true;// 若是相等,擴容結束了,更新 finising 變量 i = n; // 再次循環檢查一下整張表 } } else if ((f = tabAt(tab, i)) == null) // 獲取老 tab i 下標位置的變量,若是是 null,就使用 fwd 佔位。 advance = casTabAt(tab, i, null, fwd);// 若是成功寫入 fwd 佔位,再次推動一個下標 else if ((fh = f.hash) == MOVED)// 若是不是 null 且 hash 值是 MOVED。 advance = true; // already processed // 說明別的線程已經處理過了,再次推動一個下標 else {// 到這裏,說明這個位置有實際值了,且不是佔位符。對這個節點上鎖。爲何上鎖,防止 putVal 的時候向鏈表插入數據 synchronized (f) { //擴容時,只在這個環節加鎖 // 判斷 i 下標處的桶節點是否和 f 相同 if (tabAt(tab, i) == f) { Node<K,V> ln, hn;// low, height 高位桶,低位桶 // 若是 f 的 hash 值大於 0 。TreeBin 的 hash 是 -2 if (fh >= 0) { // 對老長度進行與運算(第一個操做數的的第n位於第二個操做數的第n位若是都是1,那麼結果的第n爲也爲1,不然爲0) // 因爲 Map 的長度都是 2 的次方(000001000 這類的數字),那麼取於 length 只有 2 種結果,一種是 0,一種是1 // 若是是結果是0 ,Doug Lea 將其放在低位,反之放在高位,目的是將鏈表從新 hash,放到對應的位置上,讓新的取於算法可以擊中他。 int runBit = fh & n; Node<K,V> lastRun = f; // 尾節點,且和頭節點的 hash 值取於不相等 // 遍歷這個桶 for (Node<K,V> p = f.next; p != null; p = p.next) { // 取於桶中每一個節點的 hash 值 int b = p.hash & n; // 若是節點的 hash 值和首節點的 hash 值取於結果不一樣 if (b != runBit) { runBit = b; // 更新 runBit,用於下面判斷 lastRun 該賦值給 ln 仍是 hn。 lastRun = p; // 這個 lastRun 保證後面的節點與本身的取於值相同,避免後面沒有必要的循環 } } if (runBit == 0) {// 若是最後更新的 runBit 是 0 ,設置低位節點 ln = lastRun; hn = null; } else { hn = lastRun; // 若是最後更新的 runBit 是 1, 設置高位節點 ln = null; }// 再次循環,生成兩個鏈表,lastRun 做爲中止條件,這樣就是避免無謂的循環(lastRun 後面都是相同的取於結果) for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 若是與運算結果是 0,那麼就還在低位 if ((ph & n) == 0) // 若是是0 ,那麼建立低位節點 ln = new Node<K,V>(ph, pk, pv, ln); else // 1 則建立高位 hn = new Node<K,V>(ph, pk, pv, hn); } // 其實這裏相似 hashMap // 設置低位鏈表放在新鏈表的 i setTabAt(nextTab, i, ln); // 設置高位鏈表,在原有長度上加 n setTabAt(nextTab, i + n, hn); // 將舊的鏈表設置成佔位符 setTabAt(tab, i, fwd); // 繼續向後推動 advance = true; }// 若是是紅黑樹 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍歷 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); // 和鏈表相同的判斷,與運算 == 0 的放在低位 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } // 不是 0 的放在高位 else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 若是樹的節點數小於等於 6,那麼轉成鏈表,反之,建立一個新的樹 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位樹 setTabAt(nextTab, i, ln); // 高位數 setTabAt(nextTab, i + n, hn); // 舊的設置成佔位符 setTabAt(tab, i, fwd); // 繼續向後推動 advance = true; } } } } } }
代碼加註釋比較長,有興趣能夠逐行對照,有 2 個判斷樓主看不懂爲何這麼判斷,知道的同窗能夠提醒一下。
而後,說說精華的部分。
假設總長度是 64 ,每一個線程能夠分到 16 個桶,各自處理,不會互相影響。
擴容前的狀態。
當對 4 號桶或者 10 號桶進行轉移的時候,會將鏈表拆成兩份,規則是根據節點的 hash 值取於 length,若是結果是 0,放在低位,不然放在高位。
所以,10 號桶的數據,黑色節點會放在新表的 10 號位置,白色節點會放在新桶的 26 號位置。
下圖是循環處理桶中數據的邏輯:
處理完以後,新桶的數據是這樣的:
transfer 方法能夠說很牛逼,很精華,內部多線程擴容性能很高,
經過給每一個線程分配桶區間,避免線程間的爭用,經過爲每一個桶節點加鎖,避免 putVal 方法致使數據不一致。同時,在擴容的時候,也會將鏈表拆成兩份,這點和 HashMap 的 resize 方法相似。
而若是有新的線程想 put 數據時,也會幫助其擴容。鬼斧神工,使人讚歎。
https://mp.weixin.qq.com/s?__biz=MzAxMzE4MDI0NQ==&mid=2650336167&idx=1&sn=56f2583778afe80ce7a3476ad311e550&chksm=83aac79db4dd4e8b96aa46df3387157d1b8f7d279dfdb770a45a3fe9e17359a4e6e6ae499bb3&token=1182516540&lang=zh_CN#rd
在以前的文章提到ConcurrentHashMap是一個線程安全的,那麼我麼看一下ConcurrentHashMap如何進行操做的。
ConcurrentHashMap與HashTable區別?
HashTable
put()源代碼
從代碼能夠看出來在全部put 的操做的時候 都須要用 synchronized 關鍵字進行同步。而且key 不能爲空。
這樣至關於每次進行put 的時候都會進行同步 當10個線程同步進行操做的時候,就會發現當第一個線程進去 其餘線程必須等待第一個線程執行完成,才能夠進行下去。性能特別差。
CurrentHashMap
分段鎖技術:ConcurrentHashMap相比 HashTable而言解決的問題就是 的 它不是鎖所有數據,而是鎖一部分數據,這樣多個線程訪問的時候就不會出現競爭關係。不須要排隊等待了。
從圖中能夠看出來ConcurrentHashMap的主幹是個Segment數組。
這就是爲何ConcurrentHashMap支持容許多個修改同時併發進行,緣由就是採用的Segment分段鎖功能,每個Segment 都想的於小的hash table而且都有本身鎖,只要修改再也不同一個段上就不會引發併發問題。
final Segment<K,V>[] segments;
使用ConConcurrentHashMap時候 有時候會遇到跨段的問題,跨段的時候【size()、 containsValue()】,可能須要鎖定部分段或者全段,當操做結束以後,又回按照 順序 進行 釋放 每一段的鎖。注意是按照順序解鎖的。,每一個Segment又包含了多個HashEntry.
transient volatile HashEntry<K,V>[] table; static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; //其餘省略 }
須要注意的是 Segment 是一種可重入鎖(繼承ReentrantLock)
那麼我簡單說一下ReentrantLock 與synchronized有什麼區別?
synchronized 是一個同步鎖 synchronized (this)
同步鎖 當一個線程A 訪問 【資源】的代碼同步塊的時候,A線程就會持續持有當前鎖的狀態,若是其餘線程B-E 也要訪問【資源】的代碼同步塊的時候將會收到阻塞,所以須要排隊等待A線程釋放鎖的狀態。(如圖狀況1)可是注意的是,當一個線程B-E 只是不能方法 A線程 【資源】的代碼同步塊,仍然能夠訪問其餘的非資源同步塊。
ReentrantLock 可重入鎖 一般兩類:公平性、非公平性
公平性:根據線程請求鎖的順序依次獲取鎖,當一個線程A 訪問 【資源】的期間,線程A 獲取鎖資源,此時內部存在一個計數器num+1,在訪問期間,線程B、C請求 資源時,發現A 線程在持有當前資源,所以在後面生成節點排隊(B 處於待喚醒狀態),假如此時a線程再次請求資源時,不須要再次排隊,能夠直接再次獲取當前資源 (內部計數器+1 num=2) ,當A線程釋放全部鎖的時候(num=0),此時會喚醒B線程進行獲取鎖的操做,其餘C-E線程就同理。(狀況2)
非公平性:當A線程已經釋放所以後,準備喚醒線程B獲取資源的時候,此時線程M 獲取請求,此時會出現競爭,線程B 沒有競爭過M線程,測試M獲取的線程所以,M會有限得到資源,B繼續睡眠。(狀況2)
synchronized 是一個非公平性鎖。 非公平性 會比公平性鎖的效率要搞不少緣由,不須要通知等待。
ReentrantLock 提供了 new Condition能夠得到多個Condition對象,能夠簡單的實現比較複雜的線程同步的功能.經過await(),signal()以實現。
ReentrantLock 提供能夠中斷鎖的一個方法lock.lockInterruptibly()方法。
Jdk 1.8 synchronized和 ReentrantLock 比較的話,官方比較建議用synchronized。
在瞭解Segment 機制以後咱們繼續看一下ConcurrentHashMap核心構造方法代碼。
// 跟HashMap結構有點相似 Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf;//負載因子 this.threshold = threshold;//閾值 this.table = tab;//主幹數組即HashEntry數組 }
構造方法
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //MAX_SEGMENTS 爲1<<16=65536,也就是最大併發數爲65536 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0; //ssize 爲segments數組長度,concurrentLevel計算得出 int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //segmentShift和segmentMask這兩個變量在定位segment時會用到 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //計算cap的大小,即Segment中HashEntry的數組長度,cap也必定爲2的n次方. int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; //建立segments數組並初始化第一個Segment,其他的Segment延遲初始化 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); this.segments = ss; }
從以上代碼能夠看出ConcurrentHashMap有比較重要的三個參數:
loadFactor 負載因子 0.75
threshold 初始 容量 16
concurrencyLevel 其實是Segment的實際數量。
ConcurrentHashMap如何發生ReHash?
ConcurrentLevel 一旦設定的話,就不會改變。ConcurrentHashMap當元素個數大於臨界值的時候,就會發生擴容。可是ConcurrentHashMap與其餘的HashMap不一樣的是,它不會對Segmengt 數量增大,只會增長Segmengt 後面的鏈表容量的大小。即對每一個Segmengt 的元素進行的ReHash操做。
咱們再看一下核心的ConcurrentHashMapput ()方法:
public V put(K key, V value) { Segment<K,V> s; //concurrentHashMap不容許key/value爲空 if (value == null) throw new NullPointerException(); //hash函數對key的hashCode從新散列,避免差勁的不合理的hashcode,保證散列均勻 int hash = hash(key); //返回的hash值無符號右移segmentShift位與段掩碼進行位運算,定位segment int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
主要注意的是 當前put 方法 當前key 爲空的時候 ,代碼報錯。
這個代碼主要是把Key 經過Hash函數計算出hash值 現計算出當前key屬於那個Segment 調用Segment.put 分段方法Segment.put()
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
//tryLock()是ReentrantLock獲取鎖一個方法。若是當前線程獲取鎖成功 返回true,若是別線程獲取了鎖返回false不成功時會遍歷定位到的HashEnry位置的鏈表(遍歷主要是爲了使CPU緩存鏈表),若找不到,則建立HashEntry。tryLock必定次數後(MAX_SCAN_RETRIES變量決定),則lock。若遍歷過程當中,因爲其餘線程的操做致使鏈表頭結點變化,則須要從新遍歷。
V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash;//定位HashEntry,能夠看到,這個hash值在定位Segment時和在Segment中定位HashEntry都會用到,只不過定位Segment時只用到高几位。 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //若c超出閾值threshold,須要擴容並rehash。擴容後的容量是當前容量的2倍。這樣能夠最大程度避免以前散列好的entry從新散列。擴容並rehash的這個過程是比較消耗資源的。 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
Put 時候 ,經過Hash函數將即將要put 的元素均勻的放到所須要的Segment 段中,調用Segment的put 方法進行數據。
Segment的put 是加鎖中完成的。若是當前元素數大於最大臨界值的的話將會產生rehash. 先經過 getFirst 找到鏈表的表頭部分,而後遍歷鏈表,調用equals 比配是否存在相同的key ,若是找到的話,則將最新的Key 對應value值。若是沒有找到,新增一個HashEntry 它加到整個Segment的頭部。
咱們先看一下Get 方法的源碼:
//計算Segment中元素的數量
transient volatile int count; *********************************************************** public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); } *********************************************************** final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; } ******************************************************** V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
1.讀取的時候 傳遞Key值,經過Hash函數計算出 對應Segment 的位置。
2.調用segmentFor(int hash) 方法,用於肯定操做應該在哪個segment中進行 ,經過 右無符號位運算 右移segmentShift位在與運算 segmentMask【偏移碼】 得到須要操做的Segment
肯定了須要操做的Segment 再調用 get 方法獲取對應的值。經過count 值先判斷當前值是否爲空。在調用getFirst()獲取頭節點,而後遍歷列表經過equals對比的方式進行比對返回值。
ConcurrentHashMap爲何讀的時候不加鎖?
ConcurrentHashMap是分段併發分段進行讀取數據的。
Segment 裏面有一個Count 字段,用來表示當前Segment中元素的個數 它的類型是volatile變量。全部的操做到最後都會 在最後一部更新count 這個變量,因爲volatile變量 happer-before的特性。致使get 方法可以幾乎準確的獲取最新的結構更新。
再看一下ConcurrentHashMapRemove()方法:
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
調用Segment 的remove 方法,先定位當前要刪除的元素C,此時須要把A、B元素所有複製一遍,一個一個接入到D上。
remove 也是在加鎖的狀況下進行的。
volatile 變量
咱們發現 對於CurrentHashMap而言的話,源碼裏面又不少地方都用到了這個變量。好比HashEntry 、value 、Segment元素個數Count。
volatile 屬於JMM 模型中的一個詞語。首先先簡單說一下 Java內存模型中的 幾個概念:
原子性:保證 Java內存模型中原子變量內存操做的。一般有 read、write、load、use、assign、store、lock、unlock等這些。
可見性:就是當一個線程對一個變量進行了修改,其餘線程便可當即獲得這個變量最新的修改數據。
有序性:若是在本線程內觀察,全部操做都是有序的;若是在一個線程中觀察另外一個線程,全部操做都是無序的。
先行發生:happen-before 先行發生原則是指Java內存模型中定義的兩項操做之間的依序關係,若是說操做A先行發生於操做B,其實就是說發生操做B以前.
傳遞性
volatile 變量 與普通變量的不一樣之處?
volatile 是有可見性,必定程度的有序性。
volatile 賦值的時候新值可以當即刷新到主內存中去,每次使用的時候可以馬上從內存中刷新。
作一個簡單例子看一下 這個功能
public class VolatileTest{ int a=1; int b=2; //賦值操做 public void change(){ a=3; b=a; } //打印操做 public void print(){ System.out.println("b:"+b+",a:"+a); } @Test public void testNorMal(){ VolatileTest vt=new VolatileTest(); for (int i = 0; i < 100000; i++) { new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } vt.change(); } }).start(); new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } vt.print(); } }).start(); } } }