文章簡介java
想必你們對HashMap數據結構並不陌生,JDK1.7採用的是數組+鏈表的方式,JDK1.8採用的是數組+鏈表+紅黑樹的方式。雖然JDK1.8對於HashMap有了很大的改進,提升了存取效率,可是線程安全的問題不可忽視,因此就有了線程安全的解決方案,好比在方法上加synchronized同步鎖的HashTable,或者併發包中的ConcurrentHashMap線程安全類,本文就來和你們一塊兒探討一下關於ConcurrentHashMap的源碼,版本是JDK1.8,下面讓咱們正式開始吧。數組
備註:你們須要對HashMap1.8源碼有一些瞭解,在原來HashMap1.8源碼中比較常見的知識點本文不會具體展開。安全
不妨先以一段你們熟悉的代碼開始本文的旅程數據結構
ConcurrentHashMap<Integer,String> map=new ConcurrentHashMap<Integer, String>(); map.put(1,"Zhang");
當咱們在put元素時,點開put方法的源碼會發現,這裏調用了一個putVal()的方法,同時將key和value做爲參數傳入併發
public V put(K key, V value) { return putVal(key, value, false); }
繼續點擊putVal()方法,而後咱們看看這裏到底實現了什麼ide
//key或者value都不能爲空 if (key == null || value == null) throw new NullPointerException(); //計算hash值,實際上就是獲得一個int類型的數,只是須要對這個數進行處理,目的是爲了肯定key,value組成的Node節點在數組下標中的位置 int hash = spread(key.hashCode());
不妨先看下spread(key.hashCode())的實現this
key.hashCode()實際上調用的是native的方法,目的是獲得一個整形數,爲了使得這個整形數儘量不同,因此要對高16位和低16位進行異或運算,儘量利用好每一位的值 static final int spread(int h) { //對key.hashCode的結果進行高16位和低16位的運算 return (h ^ (h >>> 16)) & HASH_BITS; }
接下來就是要初始化這個數組的大小,由於數組不初始化,表明key,value的每一個Node類也不能放到對應的位置線程
if (tab == null || (n = tab.length) == 0) //初始化數組的大小 tab = initTable();
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //只有當數組爲空或者大小爲0的時候纔對數組進行初始化 while ((tab = table) == null || tab.length == 0) { //這裏其實就是用一個sizeCtl記錄是否已經有線程在進行初始化操做,若是有,則讓出CPU的資源,也就是保證只有一個線程對數組進行初始化操做,從而保證線程安全。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //使用CAS樂觀鎖機制比較SIZECTL和sc是否相等,只有當前值和內存中最新值相等的時候,纔會將當前值賦值爲-1,一旦被賦值爲-1,上面有其餘線程進來,就直接執行了Thread.yeild()方法了 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //三元運算符獲得數組默認大小,點擊DEFAULT_CAPACITY發現是16,這點和HashMap是同樣的 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //建立Node類型的數組,真正初始化的地方 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //計算擴容的標準,採用的是位移運算,由於效率更高,sc最終結果爲12 sc = n - (n >>> 2); } } finally { //無論不管最終將sc賦值爲sizeCtl,這時候sizeCtl結果爲12 sizeCtl = sc; } break; } } return tab; }
當數組初始化完成以後,就須要將key,value建立出來的Node節點放到數組中對應的位置了,分爲幾種狀況,下面這種是原來某個位置就沒有元素值,可是爲了保證線程安全,放到多個線程同時添加,也使用CAS樂觀鎖的機制進行添加。code
//根據(n-1)&hash的結果確認當前節點所在的位置是否有元素,效果和hash%n是同樣的,只是&運算效率更高,這裏hashmap也是這樣作的,就不作更多贅述了 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //使用CAS樂觀鎖機制向對應的下標中添加對應的Node if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
//f其實是當前數組下標的Node節點,這裏判斷它的hash值是否爲MOVED,也就是-1,若是是-1,就調用helpTransfer(tab,f)方法幫助其餘線程完成擴容操做,而後再添加元素。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
接下來就要考慮數組具體下標位置有元素的狀況,這時候就須要把Node節點向當前節點下進行順延,造成鏈表或者紅黑樹的結構,還有一種狀況就是key值相同,value值不能,這時候只須要進行一個value值的替換便可。對象
V oldVal = null; //數組初始化和在數組下標中插入Node時,爲了保證線程安全使用的是CAS無鎖化機制 //那元素繼續往下插入時,線程安全的問題怎麼保證呢?可使用synchronized關鍵字 //發現同步代碼塊中鎖的對象是f,也就是當前數組下標的元素,這樣不一樣的數組下標之間彼此互相不影響。 synchronized (f) { //再次確認當前頭結點是否爲f if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //第一種狀況,發現是key值相同,只須要替換掉oldValue便可 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //第二種狀況,按照鏈表的方式進行插入 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //第三種狀況,按照紅黑樹的方式進行插入 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //通常鏈表轉紅黑樹是節點數>8的時候,但不是一旦某個數組下標的節點數大於8就轉成紅黑樹,也能夠經過調整數組的容量來解決,好比treeifyBin中進行的 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); //說明上面有須要替換掉舊值的節點 if (oldVal != null) return oldVal; break; }
當添加完一個key,value方式的Node以後,就須要檢查是否整個數據結構中的節點數超過擴容標準好比12,若是超過了就須要進行數組大小的擴容,先調用addCount()方法,由於第二個參數check大於0,因此直接看裏面這段代碼。
if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //經過resizeStamp(n),n是數組大小,獲得一個int的結果,賦值給rs保存 int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //由於sc<0不成立,因此會來到這段代碼 //這裏經過CAS的方式比較SIZECTL和sc的值,當二者相等時,會執行rs<<RESIZE<STAMP_SHIFT+2賦值操做,這個結果值是一個負數,表示當前正在執行擴容操做的線程數量 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //調用transfer方法進行真正的擴容操做 transfer(tab, null); s = sumCount(); } }
在concurrenthashmap中的擴容操做可能不止一個線程,因此每一個線程就須要分工合做完成擴容,也就是每一個線程須要領取本身負責的task,固然前提是得要有一個新的數組,這樣才能將老數組中的Node節點搬移到新數組中。
int n = tab.length, stride; //肯定線程負責數組大小的範圍 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //判斷新的數組是否爲null,爲空則進行建立,好比數組原來的大小是16,2的N次冪,擴容也須要雙倍擴容 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") //採用位移運算進行雙倍擴容 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //使用transferIndex變量記錄數組的大小,表示線程進行擴容的時候,是從後往前進行的 transferIndex = n; }
接下來就要進行搬移工做了,咱們須要用一些標識記錄一下搬移的完成狀態,同時線程將某個數組下標的節點搬移完成以後也要讓別人知道,同時也能知道有線程正在進行擴容操做。
int nextn = nextTab.length; //某個下標節點完成以後的節點類型,實際上就是繼承了Node節點,只不過點進去發現它的hash值爲MOVED也就是-1 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab
Node<K,V> f; int fh; //i指向當前數組的下標,經過while循環遍歷--i,從而知道當前線程拿到的一個區間範圍 while (advance) { int nextIndex, nextBound; //一個數組下標一個數組下標的處理 if (--i >= bound || finishing) advance = false; //表示已經沒有須要搬運的節點了,將advance賦值爲false else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //不一樣的線程搬運的內容,不斷地將transferindex的值變小 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
if (i < 0 || i >= n || i + n >= nextn) { int sc; //finishing等於true就表示全部的線程都搬運完了,作最後的收尾工做 //好比將新數組的內容賦值到table,擴容標準由原來的12變成24 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //這裏是每次有一個線程完成搬運工做,就將線程總數量-1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } }
//若是某個線程的某個數組下標搬運完成,則將該頭節點賦值爲fwd類型的,其實就是hash值爲MOVED else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //表示已經搬運完成 else if ((fh = f.hash) == MOVED) advance = true; // already processed
接下來就是每一個線程真正在搬運代碼的過程,其實這塊和hashmap1.8中的resize後面的過程很相似
synchronized (f) { //再次檢查當前數組下標的節點是否爲f if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { //新節點的位置要麼在原來的位置,要麼在原來的位置+原來數組的大小,這點和hashmap中同樣 //p.hash&n 也就是判斷這個結果是否等於0 int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //等於0會走這邊 if (runBit == 0) { ln = lastRun; hn = null; } //不等於0會走這邊 else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //將鏈表總體遷移到nextTable中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //標識原桶標識位已經處理,頭節點標記爲fw,hash值爲-1 setTabAt(tab, i, fwd); advance = true; } //這裏是紅黑樹遷移的狀況 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } }
前面說到,當鏈表長度超過8會轉成紅黑樹,可是節點總數若是小於64,會用擴容的方式代替轉紅黑樹,代碼以下
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //tryPresize進行擴容 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
點擊tryPresize方法,最終也會來到下面這段代碼,和前面addCount中的同樣
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //注意這裏的第二個參數爲null,表示新的數組尚未建立,以前也是null transfer(tab, null);
在以前put的時候,中間跳過了這段話,這段話是當前線程發現有其餘線程正在進行擴容操做,協助其餘線程擴容完成以後再繼續put元素。
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
/** * Helps transfer if a resize is in progress. */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //每當來一個線程幫助擴容,此時就會sc+1,表示多了一個線程 //其實這塊也能和transfer方法中的sc-1對應上,一個線程完成以後就數量-1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //擴容的方法,注意第二個參數有傳入nextTab,緣由是當前線程只是協助其餘線程擴容 //既然其餘線程正在擴容,說明這個新數組已經建立好了 transfer(tab, nextTab); break; } } return nextTab; } return table; }
到目前爲止,咱們分析了put過程當中會遇到線程安全的點,好比數組初始化,數組頭元素添加,put完成過程等。同時還分析了transfer擴容每一個線程領取的任務,搬運結果的方式,協助擴容等方面的內容。若是對你們有幫助,請幫忙轉發。