Java8較java7的改進:java
改進一:取消segments字段,直接採用transient volatile HashEntry<K,V> table保存數據,採用table數組元素做爲鎖,從而實現了對每一行數據進行加鎖,進一步減小併發衝突的機率。node
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two(2的冪). Accessed directly by iterators. */ transient volatile Node<K,V>[] table;
改進二:將原先table數組+單向鏈表的數據結構,變動爲table數組+單向鏈表+紅黑樹的結構。對於hash表來講,最核心的能力在於將key hash以後能均勻的分佈在數組中。若是hash以後散列的很均勻,那麼table數組中的每一個隊列長度主要爲0或者1。但實際狀況並不是老是如此理想,雖然ConcurrentHashMap類默認的加載因子爲0.75,可是在數據量過大或者運氣不佳的狀況下,仍是會存在一些隊列長度過長的狀況,若是仍是採用單向列表方式,那麼查詢某個節點的時間複雜度爲O(n);所以,對於個數超過8(默認值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度能夠下降到O(logN),能夠改進性能。算法
重要屬性:數組
/** * 這個sizeCtl是volatile的,那麼他是線程可見的,一個思考:它是全部修改都在CAS中進行,可是sizeCtl爲何不設計成LongAdder(jdk8出現的)類型呢? * 或者設計成AtomicLong(在高併發的狀況下比LongAdder低效),這樣就能減小本身操做CAS了。 * * 默認爲0,用來控制table的初始化和擴容操做,具體應用在後續會體現出來。 * -1 表明table正在初始化 * -N 表示有N-1個線程正在進行擴容操做 * 其他狀況: *一、若是table未初始化,表示table須要初始化的大小。 *二、若是table初始化完成,表示table的容量,默認是table大小的0.75 倍,竟然用這個公式算0.75(n - (n >>> 2))。 **/ private static final long SIZECTL; private static final long TRANSFERINDEX; /** * races. Updated via CAS. * 記錄容器的容量大小,經過CAS更新 */ private static final long BASECOUNT; /** * 自旋鎖 (鎖定經過 CAS) 在調整大小和/或建立 CounterCells 時使用。 在CounterCell類更新value中會使用,功能相似顯示鎖和內置鎖,性能更好 * 在Striped64類也有應用 */ private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; /** * Node:保存key,value及key的hash值的數據結構。其中value和next都用volatile修飾,保證併發的可見性。 * @param <K> * @param <V> */ static class Node<K,V> implements Entry<K,V> { final int hash; final K key; volatile V val; .. volatile Node<K,V> next; } /** * ForwardingNode:一個特殊的Node節點,hash值爲-1,其中存儲nextTable的引用。 * @param <K> * @param <V> */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; … }
構造函數:數據結構
/** *initialCapacity 初始化容量 **/ public ConcurrentHashMap(int initialCapacity) /** * *建立與給定map具備相同映射的新map **/ public ConcurrentHashMap(Map<? extends K, ? extends V> m) /** *initialCapacity 初始容量 *loadFactor 負載因子,當容量達到initialCapacity*loadFactor時,執行擴容 **/ public ConcurrentHashMap(int initialCapacity, float loadFactor) /** *initialCapacity 初始容量 *loadFactor 負載因子 *concurrencyLevel 預估的併發更新線程數 **/ public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
方法:多線程
put:併發
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode());//對hashCode進行再散列,算法爲(h ^ (h >>> 16)) & HASH_BITS int binCount = 0; //這邊加了一個循環,就是不斷的嘗試,由於在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject //由於若是其餘線程正在修改tab,那麼嘗試就會失敗,因此這邊要加一個for循環,不斷的嘗試 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 若是table爲空,初始化;不然,根據hash值計算獲得數組索引i,若是tab[i]爲空,直接新建節點Node便可。注:tab[i]實質爲鏈表或者紅黑樹的首節點。 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 若是tab[i]不爲空而且hash值爲MOVED(-1),說明該鏈表正在進行transfer操做,返回擴容完成後的table else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 針對首個節點進行加鎖操做,而不是segment,進一步減小線程衝突 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 若是在鏈表中找到值爲key的節點e,直接設置e.val = value便可。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 若是沒有找到值爲key的節點,直接新建Node並加入鏈表便可。 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 若是首節點爲TreeBin類型,說明爲紅黑樹結構,執行putTreeVal操做。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 若是節點數>=8,那麼轉換鏈表結構爲紅黑樹結構。 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 計數增長1,有可能觸發transfer操做(擴容)。 addCount(1L, binCount); return null; }
helpTransfer:app
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //下面幾種狀況和addCount的方法同樣,請參考addCount的備註 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
tabAt:dom
@SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } /* *可是這邊爲何i要等於((long)i << ASHIFT) + ABASE呢,計算偏移量 *ASHIFT是指tab[i]中第i個元素在相對於數組第一個元素的偏移量,而ABASE就算第一數組的內存素的偏移地址 *因此呢,((long)i << ASHIFT) + ABASE就算i最後的地址 * 那麼compareAndSwapObject的做用就算tab[i]和c比較,若是相等就tab[i]=v不然tab[i]=c; */ static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
addCount:ide
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次進來都baseCount都加1由於x=1 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//1 CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //多線程CAS發生失敗的時候執行 fullAddCount(x, uncontended);//2 return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //當條件知足開始擴容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) {//若是小於0說明已經有線程在進行擴容操做了 //一下的狀況說明已經有在擴容或者多線程進行了擴容,其餘線程直接break不要進入擴容操做 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//若是相等說明擴容已經完成,能夠繼續擴容 transfer(tab, nt); } //這個時候sizeCtl已經等於(rs << RESIZE_STAMP_SHIFT) + 2等於一個大的負數, // 這邊加上2很巧妙,由於transfer後面對sizeCtl--操做的時候,最多隻能減兩次就結束 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
上面註釋1,每次都會對baseCount 加1,若是併發競爭太大,那麼可能致使U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失敗,那麼爲了提升高併發的時候baseCount可見性失敗的問題,又避免一直重試,這樣性能會有很大的影響,那麼在jdk8的時候是有引入一個類Striped64,其中LongAdder和DoubleAdder就是對這個類的實現。這兩個方法都是爲解決高併發場景而生的,是AtomicLong的增強版,AtomicLong在高併發場景性能會比LongAdder差。可是LongAdder的空間複雜度會高點。
咱們每次進來都對baseCount進行加1當達到必定的容量時,就須要對table進行擴容。擴容方法就是transfer
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; //構建一個連節點的指針,用於標識位 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab //循環的關鍵變量,判斷是否已經擴容完成,完成就return,退出循環 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //循環的關鍵i,i--操做保證了倒序遍歷數組 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) {//nextIndex=transferIndex=n=tab.length(默認16) i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //i<0說明已經遍歷完舊的數組tab;i>=n何時有可能呢?在下面看到i=n,因此目前i最大應該是n吧。 //i+n>=nextn,nextn=nextTab.length,因此若是知足i+n>=nextn說明已經擴容完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) {// a nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //利用CAS方法更新這個擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操做,參考sizeCtl的註釋 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //若是有多個線程進行擴容,那麼這個值在第二個線程之後就不會相等,由於sizeCtl已經被減1了, // 因此後面的線程就只能直接返回,始終保證只有一個線程執行了 a(上面註釋a) if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true;//finishing和advance保證線程已經擴容完成了能夠退出循環 i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null)//若是tab[i]爲null,那麼就把fwd插入到tab[i],代表這個節點已經處理過了 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED)//那麼若是f.hash=-1的話說明該節點爲ForwardingNode,說明該節點已經處理過了 advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; //這邊還對鏈表進行遍歷,這邊的的算法和hashmap的算法又不同了,這班是有點對半拆分的感受 //把鏈表分表拆分爲,hash&n等於0和不等於0的,而後分別放在新表的i和i+n位置 //次方法同hashmap的resize for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //把已經替換的節點的舊tab的i的位置用fwd替換,fwd包含nextTab setTabAt(tab, i, fwd); advance = true; }//下面紅黑樹基本和鏈表差很少 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //判斷擴容後是否還須要紅黑樹結構 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
注意:若是鏈表結構中元素超過TREEIFY_THRESHOLD閾值,默認爲8個,則把鏈表轉化爲紅黑樹,提升遍歷查詢效率.接下來咱們看看如何構造樹結構,代碼以下:
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
能夠看出,生成樹節點的代碼塊是同步的,進入同步代碼塊以後,再次驗證table中index位置元素是否被修改過。
一、根據table中index位置Node鏈表,從新生成一個hd爲頭結點的TreeNode鏈表。
二、根據hd頭結點,生成TreeBin樹結構,並把樹結構的root節點寫到table的index位置的內存中,具體實現以下:
/** * Creates bin with initial set of nodes headed by b. */ TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); }
Get:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0)//若是eh=-1就說明e節點爲ForWordingNode,這說明什麼,說明這個節點已經不存在了,被另外一個線程正則擴容 //因此要查找key對應的值的話,直接到新newtable找 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
這個get請求,咱們須要cas來保證變量的原子性。若是tab[i]正被鎖住,那麼CAS就會失敗,失敗以後就會不斷的重試。這也保證了get在高併發狀況下不會出錯。
咱們來分析下到底有多少種狀況會致使get在併發的狀況下可能取不到值。一、一個線程在get的時候,另外一個線程在對同一個key的node進行remove操做;二、一個線程在get的時候,另外一個線程正則重排table。可能致使舊table取不到值。
那麼本質是,我在get的時候,有其餘線程在對同一桶的鏈表或樹進行修改。那麼get是怎麼保證同步性的呢?咱們看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt究竟是幹嗎的:
@SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }
它是對tab[i]進行原子性的讀取,由於咱們知道putVal等對table的桶操做是有加鎖的,那麼通常狀況下咱們對桶的讀也是要加鎖的,可是咱們這邊爲何不須要加鎖呢?由於咱們用了Unsafe的getObjectVolatile,由於table是volatile類型,因此對tab[i]的原子請求也是可見的。由於若是同步正確的狀況下,根據happens-before原則,對volatile域的寫入操做happens-before於每個後續對同一域的讀操做。因此無論其餘線程對table鏈表或樹的修改,都對get讀取可見。
sun.misc.Unsafe類:
Unsafe類是什麼呢?java不能直接訪問操做系統底層,而是經過本地方法來訪問。Unsafe類提供了硬件級別的原子操做。Unsafe類在jdk 源碼的多個類中用到,這個類的提供了一些繞開JVM的更底層功能,基於它的實現能夠提升效率。可是,它是一把雙刃劍:正如它的名字所預示的那樣,它是Unsafe的,它所分配的內存須要手動free(不被GC回收)。Unsafe類,提供了JNI某些功能的簡單替代:確保高效性的同時,使事情變得更簡單。
//在o的offset偏移地址處,獲取volatile類型的對象 public native java.lang.Object getObjectVolatile(java.lang.Object o, long l); //原子性的更新java變量 public final native boolean compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2); /** * Stores a reference value into a given Java variable, with volatile store * semantics. Otherwise identical to * {@link #putObject(Object, long, Object)} */ public native void putObjectVolatile(java.lang.Object o, long l, java.lang.Object o1); /** * Atomically update Java variable to <tt>x</tt> if it is currently holding * <tt>expected</tt>. * * @return <tt>true</tt> if successful */ public final native boolean compareAndSwapLong(java.lang.Object o, long l, long l1, long l2);