系列傳送門:java
注:本篇基於JDK1.8。node
在思考這個問題以前,咱們能夠思考:若是不用ConcurrentHashMap的話,有哪些其餘的容器供咱們選擇呢?而且它們的缺陷是什麼?算法
哈希表利用哈希算法可以花費O(1)的時間複雜度高效地根據key找到value值,可以知足這個需求的容器還有HashTable和HashMap。數組
HashTable安全
HashTable使用synchronized關鍵字保證了多線程環境下的安全性,但加鎖的實現方式是獨佔式的,全部訪問HashTable的線程都必須競爭同一把鎖,性能較爲低下。多線程
public synchronized V put(K key, V value) { // ... }
HashMap併發
JDK1.8版本的HashMap在讀取hash槽的時候讀取的是工做內存中引用指向的對象,在多線程環境下,其餘線程修改的值不能被及時讀到。app
這就引起出可能存在的一些問題:ide
好比在插入操做的時候,第一次將會根據key的hash值判斷當前的槽內是否被佔用,若是沒有的話就會插入value。在併發環境下,若是A線程判斷槽未被佔用,在執行寫入操做的時候正巧時間片耗盡,此時B線程正巧也執行了一樣的操做,率先插入了B的value,此時A正巧被CPU從新調度繼續執行寫入操做,進而將線程B的value覆蓋。工具
還有一種狀況是在同一個hash槽內,HashMap老是保持key惟一,在插入的時候,若是存在key,就會進行value覆蓋。併發狀況下,若是A線程判斷最後一個節點仍未發現重複的key,那麼將會執行插入操做,若是B線程在A判斷和插入之間執行了一樣的操做,也會發生數據的覆蓋,也就是數據的丟失。
固然,像這樣的併發問題其實還有一些,這裏就不細說了,剛興趣的小夥伴能夠查閱下資料。
在Java8以前,底層採用Segment+HashEntry的方式實現。
採用分段鎖的概念,底層使用Segment數組,Segment經過繼承ReentrantLock來進行加鎖,每次須要加鎖的操做會鎖住一個segment,分段保證每一個段是線程安全的。
JDK1.8以後採用CAS + Synchronized
的方式來保證併發安全。
採用【Node數組】加【鏈表】加【紅黑樹】的結構,與HashMap相似。
過一遍便可,不用過於糾結,有些字段也許是爲了兼容Java8以前的版本。
/* ---------------- Constants -------------- */ // 容許的最大容量 private static final int MAXIMUM_CAPACITY = 1 << 30; // 默認初始值16,必須是2的冪 private static final int DEFAULT_CAPACITY = 16; // toArray相關方法可能須要的量 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //爲了和Java8以前的分段相關內容兼容,並未使用 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 負載因子 private static final float LOAD_FACTOR = 0.75f; // 鏈表轉紅黑樹閥值> 8 鏈表轉換爲紅黑樹 static final int TREEIFY_THRESHOLD = 8; // 樹轉鏈表閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo)) static final int UNTREEIFY_THRESHOLD = 6; // 鏈表樹化的最小容量 treeifyBin的時候,容量若是不足64,會優先選擇擴容到64 static final int MIN_TREEIFY_CAPACITY = 64; // 每一步最小重綁定數量 private static final int MIN_TRANSFER_STRIDE = 16; // sizeCtl中用於生成標記的位數 private static int RESIZE_STAMP_BITS = 16; // 2^15-1,help resize的最大線程數 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 32-16=16,sizeCtl中記錄size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // forwarding nodes的hash值 static final int MOVED = -1; // 樹根節點的hash值 static final int TREEBIN = -2; // ReservationNode的hash值 static final int RESERVED = -3; // 提供給普通node節點hash用 static final int HASH_BITS = 0x7fffffff; // 可用處理器數量 static final int NCPU = Runtime.getRuntime().availableProcessors();
/* ---------------- Fields -------------- */ // 就是咱們說的底層的Node數組,懶初始化的,在第一次插入的時候才初始化,大小須要是2的冪 transient volatile Node<K,V>[] table; /** * 擴容resize的時候用的table */ private transient volatile Node<K,V>[] nextTable; /** * 基礎計數器,是經過CAS來更新的 */ private transient volatile long baseCount; /** * Table initialization and resizing control. * 若是爲負數:表示正在初始化或者擴容,具體以下、 * -1表示初始化, * -N,N-1表示正在進行擴容的線程數 * 默認爲0,初始化以後,保存下一次擴容的大小 */ private transient volatile int sizeCtl; /** * 擴容時分割table的索引 */ private transient volatile int transferIndex; /** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */ private transient volatile int cellsBusy; /** * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells; // 視圖 private transient KeySetView<K,V> keySet; private transient ValuesView<K,V> values; private transient EntrySetView<K,V> entrySet;
和HashMap同樣,table數組的初始化是在第一次插入的時候才進行的。
/** * 建立一個新的,空的map,默認大小爲16 */ public ConcurrentHashMap() { } /** * 指定初始容量 */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : // hashmap中講過哦,用來返回的是大於等於傳入值的最小2的冪次方 // https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/104095675#tableSizeFor_105 tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // sizeCtl 初始化爲容量 this.sizeCtl = cap; } /** * 接收一個map對象 */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } /** * 指定初始容量和負載因子 * @since 1.6 */ public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } /** * 最全的:容量、負載因子、併發級別 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
這裏的tableSizeFor方法在HashMap中有解析過:https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/104095675#tableSizeFor_105
咱們經過註解能夠知道,這個方法返回的是大於等於傳入值的最小2的冪次方(傳入1時,爲1)。它究竟是怎麼實現的呢,咱們來看看具體的源碼:
static final int tableSizeFor(int cap) { int n = cap - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } 123456789
說實話,我再看到這個方法具體實現以後,感嘆了一句,數學好牛!我經過代入具體數字,翻閱了許多關於這部分的文章與視頻,經過簡單的例子,來作一下總結。
咱們先試想一下,咱們想獲得比n大的最小2次冪只須要在最高位的前一位置1,後面全置0就ok了吧。如0101表明的是5,1000就符合咱們的需求爲8。
咱們再傳入更大的數,爲了寫着方便,這裏就以8位爲例:
第一步int n = cap -1
這一步實際上是爲了防止cap自己爲2的冪次的狀況,若是沒有這一步的話,在一頓操做以後,會出現翻倍的狀況。好比傳入爲8,算出來會是16,因此事先減去1,保證結果。
最後n<0的狀況的斷定,排除了傳入容量爲0的狀況。
n>=MAXIMUM_CAPACITY的狀況的斷定,排除了移位和或運算以後所有爲1的狀況。
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 對key進行散列計算 : (h ^ (h >>> 16)) & HASH_BITS; int hash = spread(key.hashCode()); // 記錄鏈表長度 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //第一次put,就是這裏進行初始化的 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 找該 hash 值對應的數組下標,獲得第一個節點 f, // 這裏判斷f是否爲空,就是這個位置上有沒有節點佔着 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 若是沒有,用CAS嘗試將值放入,插入成功,則退出for循環 // 若是CAS失敗,則表示存在併發競爭,再次進入循環 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // hash是Node節點f的一個屬性,等於MOVED的狀況表示該節點處於遷移狀態 else if ((fh = f.hash) == MOVED) // 幫助遷移【內部其實調用了transfer,後面分析】 tab = helpTransfer(tab, f); else { // 進入這個分支表示:根據key計算出的hash,獲得的位置上是存在Node的,接着我將遍歷鏈表了 V oldVal = null; // 鎖住Node節點 synchronized (f) { // 加鎖後的二次校驗,針對tab可能被其餘線程修改的狀況 if (tabAt(tab, i) == f) { if (fh >= 0) { // 頭節點的hash屬性 >= 0 binCount = 1; // 記錄鏈表長度 // 鏈表的遍歷操做,你懂的 for (Node<K,V> e = f;; ++binCount) { K ek; // 找到同樣的key了 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // onlyIfAbsent爲false的話,這裏就要覆蓋了,默認是覆蓋的 if (!onlyIfAbsent) e.val = value; // 接着也就結束遍歷了 break; } Node<K,V> pred = e; // 遍歷到最後了,把Node插入尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 紅黑樹 Node<K,V> p; binCount = 2; // 紅黑樹putTreeVal if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 判斷是否須要將鏈表轉換爲紅黑樹 節點數 >= 8的時候 if (binCount >= TREEIFY_THRESHOLD) //樹化 後面單獨分析 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
其實你會發現,若是你看過HashMap的源碼,理解ConcurrentHashMap的操做其實仍是比較清晰的,相信看下來你已經基本瞭解了。接下來將會具體分析一下幾個關鍵的方法
採用延遲初始化,第一次put的時候,調用initTable()初始化Node數組。
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 若是小於0,表示已經有其餘線程搶着初始化了 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // 這裏就試着cas搶一下,搶到就將sc設置爲-1,聲明主權,搶不到就再次進入循環 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { // 設置初始容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 建立容量爲n的數組 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 賦值給volatile變量table底層數組 table = tab = nt; // 這裏其實就是 sc = n - n/4 = 0.75 * n sc = n - (n >>> 2); } } finally { // 就當12吧 sizeCtl = sc; } break; } } return tab; }
初始化的併發問題如何解決呢?
經過volatile的sizeCtl變量進行標識,在第一次初始化的時候,若是有多個線程同時進行初始化操做,他們將會判斷sizeCtl是否小於0,小於0表示已經有其餘線程在進行初始化了。
由於獲取到初始化權的線程已經經過cas操做將sizeCtl的值改成-1了,且volatile的特性保證了變量在各個線程之間的可見性。
接着,將會建立合適容量的數組,並將sizeCtl的值設置爲cap*loadFactor
。
這部分包含鏈表轉紅黑樹的邏輯,固然,須要知足一些前提條件:
>=TREEIFY_THRESHOLD
的時候啦,默認是8。if (binCount != 0) { // 判斷是否須要將鏈表轉換爲紅黑樹 節點數 >= 8的時候 if (binCount >= TREEIFY_THRESHOLD) // 樹化 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
< MIN_TREEIFY_CAPACITY
的時候,會優先調用tryPresize
進行數組擴容。private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { // 若是數組長度小於MIN_TREEIFY_CAPACITY 會優先擴容tryPresize if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 鎖住頭節點 synchronized (b) { if (tabAt(tab, index) == b) { // 遍歷鏈表, 建立紅黑樹 TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } // 將紅黑樹設置對應的位置上 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
數組擴容操做通常都是核心,仔細看看。
private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);// tableSizeFor(1.5*size) int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 這裏就是以前說的initTable部分的代碼 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { //1 0000 | 0000 0000 0000 0000 1000 0000 0000 0000 int rs = resizeStamp(n); // sc小於0表示已經有線程正在進行擴容操做 if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // cas將sizeCtl加1, 若是成功, 則執行transfer操做 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 沒有線程在擴容,將sizeCtl的值改成(rs << RESIZE_STAMP_SHIFT) + 2) else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } } static final int resizeStamp(int n) { // Integer.numberOfLeadingZeros(n) 實際上是返回n的前導零個數, 每次擴容翻倍,個數會少1 // 若是n = 16 , 返回27 // 1 << (RESIZE_STAMP_BITS - 1) 1 左移15位標識 第16位爲1,低15位全爲0 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
resizeStamp(int n)
方法能夠參照http://www.javashuo.com/article/p-fsgojpbx-b.html的解析。
這個方法涉及到數據遷移的操做,支持併發執行,第一個發起數據遷移的線程,nextTab參數傳null,以後再調用此方法時,nextTab不會爲null。
併發執行實現:使用stride將一次遷移任務拆分紅一個個的小任務,第一個發起數據遷移的線程將會將transferIndex指向原數組最後的位置,而後從後向前的stride分任務屬於第一個線程,再將transferIndex指向新的位置,再往前的stride個任務屬於第二個線程,依次類推。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 多核狀況下, stride爲 (n >>> 3) / NCPU , 單核狀況下,就是數組的容量 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 最小是16 stride = MIN_TRANSFER_STRIDE; // subdivide range // 第一個發起數據遷移的線程,nextTab參數傳null if (nextTab == null) { // initiating try { // n<<1 表示容量翻倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 爲nextTable 和transferIndex賦值,transferIndex從最後一個開始 nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; // 構造一個hash == MOVED 的節點,標記已經遷移完畢的位置 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 這裏的advance表示已經作完一個位置的遷移,能夠準備下一個位置了 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // advance爲true表示能夠準備下一個位置了 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; // nextIndex會等於transferIndex // transferIndex 一旦小於等於 0,說明原數組的全部位置都有相應的線程去處理了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // bound 指向了 transferIndex-stride bound = nextBound; // i指向 transferIndex - 1 // 從後向前執行遷移任務 i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; // 全部的遷移操做都完成了 if (finishing) { nextTable = null; table = nextTab; // 將nextTab賦值給table sizeCtl = (n << 1) - (n >>> 1); // 從新計算sizeCtl return; } // sizeCtl在前設置爲 (rs << RESIZE_STAMP_SHIFT) + 2 // 以後每有一個線程參與遷移就會 將sizeCtl加1 // 這裏能夠當作逆操做, 每次-1,表明完成了本身的任務 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 走到這裏表示(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT // 全部的任務完成, 下一循環就進入上面的finish分支了 finishing = advance = true; i = n; // recheck before commit } } // 下面是具體的遷移操做 // 若是i位置是null,那就將剛剛初始化hash=MOVED的節點cas放入 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 若是已是hash==MOVED ,表明這個位置已經遷移過了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 對這個位置單獨加鎖,處理該位置的遷移工做 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // 鏈表節點 if (fh >= 0) { // 將鏈表一分爲二 int runBit = fh & n; Node<K,V> lastRun = f; // 下面幾步都在尋找lastRun的位置,表示lastRun以後的節點須要放到一塊兒 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 其中一個鏈表放在新數組的i位置上 setTabAt(nextTab, i, ln); // 另外一個鏈表放在新數組的i + n 位置上 setTabAt(nextTab, i + n, hn); // 將原數組的i位置上設置爲fwd表示已經處理完畢 // 這裏的fwd是咱們以前建立的ForwardingNode, // 下一進行判斷的時候,就會將advance設置爲true了 setTabAt(tab, i, fwd); // 聲明該位置已經遷移完畢了 advance = true; } // 下面是紅黑樹的遷移 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 節點數小於6 將紅黑樹轉化爲鏈表untreeify ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
get方法相對來講就簡單不少了,根據key計算出的hash值,找到對應的位置,判斷頭節點是否是要的值,不是的話就從紅黑樹或者鏈表裏找。
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 計算hash值 int h = spread(key.hashCode()); // 找到對應的hash桶的位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 正好在頭節點上 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 樹形結構上 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 在鏈表上 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
能夠看到get方法是無鎖的,經過volatile修飾的next來每次都獲取最新的值。
再提一下吧:
volatile可以保證變量在線程之間的可見性,可以被多線程同時讀且保證不會讀到過時的值,由於爲根據Java內存模型的happen before原則,對volatile字段的寫入操做先於讀操做,即便兩個線程同時修改和獲取volatile變量,get操做也能拿到最新的值。
但只能被單線程寫,【有一種狀況能夠被多線程寫,就是寫入的值不依賴於原值】,讀操做是不須要加鎖的。
Segment+HashEntry
的方式和分段鎖
的概念實現。Node + CAS + Synchronized
的方式實現併發,使用數組+鏈表+紅黑樹
的結構。f.hash == MOVED
,則先幫助節點完成遷移操做。synchronized
鎖住首節點,接下來判斷是鏈表節點或是紅黑樹節點,找到合適的位置,插入或覆蓋值。