其實ConcurrentHashMap我本身已經看過不少遍了,可是今天在面試阿里的時候本身在描述ConcurrentHashMap發現本身根本講不清楚什麼是ConcurrentHashMap,以及裏面是怎麼實現的,搞的我忽然發現本身什麼都不懂,因此我想要再次的來分析一下這個源碼,徹底理解ConcurrentHashMap,而不是覺得本身懂了,實際上本身不懂。html
首先咱們看一下put方法,put方法會調用到putVal方法上面。java
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //若是put進去的是個鏈表,這個參數表示鏈表的大小 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //初始化鏈表 tab = initTable(); //若是這個槽位沒有數據 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //使用CAS將這個新的node設置到hash桶裏面去 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //幫助遷移 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { //獲取鎖 V oldVal = null; synchronized (f) { //雙重檢查鎖 if (tabAt(tab, i) == f) { //若是hash值大於等於0,那麼表明這個節點裏的數據是鏈表 if (fh >= 0) { binCount = 1; //每次遍歷完後binCount加1,表示鏈表長度 for (Node<K,V> e = f;; ++binCount) { K ek; //若是hash值和key值都相同,那麼覆蓋,break結束循環 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //下一個節點爲null,說明遍歷到尾節點了,那麼直接在尾節點設值一個新的值 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //若是是紅黑樹 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //若是鏈表個數大於8,那麼就調用這個方法 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
解釋一下上面的源碼作了什麼:node
接下來咱們先看initTable 方法,再看treeifyBin和helpTransfer面試
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //一開始的時候sizeCtl爲0 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //將sizeCtl用CAS設置成-1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //由於sc一開始爲0,因此n取DEFAULT_CAPACITY爲16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //將table賦值爲大小爲16的Node數組 table = tab = nt; //將sc的設置爲總容量的75%,若是 n 爲 16 的話,那麼這裏 sc = 12 sc = n - (n >>> 2); } } finally { //最後將sizeCtl設置爲sc的值 sizeCtl = sc; } break; } } return tab; }
這個方法裏面初始化了一個很重要的變量sizeCtl,初始值爲總容量的75%,table初始化爲一個容量爲16的數組數組
下面咱們在看看treeifyBin方法安全
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //若是數據的長度小於64,那麼調用tryPresize進行擴容 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //若是這個槽位裏面的元素是鏈表 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //給鏈表頭加上鎖 synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; //遍歷鏈表,而後初始化紅黑樹對象 for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //給tab槽位爲index的元素設置新的對象 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
treeifyBin這個方法裏面並非只是將鏈表轉化爲紅黑樹,而是當tab的長度大於64的時候纔會將鏈表轉成紅黑樹,不然的話,會調用tryPresize方法。多線程
而後咱們進入到tryPresize方法裏面看看,tryPresize傳入的參數是當前tab數組長度的兩倍。ide
private final void tryPresize(int size) { //本來傳進來的size已是兩倍了,這裏會再往上取最近的 2 的 n 次方 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 這個 if 分支和以前說的初始化數組的代碼基本上是同樣的,在這裏,咱們能夠不用管這塊代碼 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); //一開始進來的時候sc是大於0的 if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //將SIZECTL設置爲一個很大的複數 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
這個方法裏面,會對tab數據進行校驗,若是沒有初始化的話會從新進行初始化大小,若是是第一次進來的話會將SIZECTL設置成一個很大的複數,而後調用transfer方法,傳如當前的tab數據和null。源碼分析
接着咱們來看transfer方法,這個方法比較長,主要的擴容和轉移節點都在這個方法裏面實現,咱們將這個長方法分紅代碼塊,一步步分析:this
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { //若是當前tab數組長度爲16 int n = tab.length, stride; //那麼(n >>> 3) / NCPU = 0 小於MIN_TRANSFER_STRIDE if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //將stride設置爲 16 stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //若是n是16,那麼nextTab就是一個容量爲32的空數組 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //將transferIndex賦值爲16 transferIndex = n; } ... }
這個代碼塊主要是作nextTable、transferIndex 、stride的賦值操做。
... //初始化nextn爲32 int nextn = nextTab.length; //新建一個ForwardingNode對象,裏面放入長度爲32的nextTab數組 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; //初始化bound爲0 for (int i = 0, bound = 0;;) { ... }
下面的代碼會所有包裹在這個for循環裏面,因此咱們來分析一下這個for循環裏面的代碼
for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; //將nextIndex設置爲transferIndex,一開始16 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //一開始的時候nextIndex是和stride相同,那麼nextBound爲0,TRANSFERINDEX也爲0 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //這裏bound也直接爲0 bound = nextBound; //i = 15 i = nextIndex - 1; advance = false; } } ... }
這個方法是爲了設置transferIndex這個屬性,transferIndex一開始是原tab數組的長度,每次會向前移動stride大小的值,若是transferIndex減到了0或小於0,那麼就設置I等於-1,i在下面的代碼會說到。
for (int i = 0, bound = 0;;) { ... //在上面一段代碼塊中,若是transferIndex已經小於等於0了,就會把i設置爲-1 if (i < 0 || i >= n || i + n >= nextn) { int sc; //表示遷移已經完成 if (finishing) { //將nextTable置空,表示不須要遷移了 nextTable = null; //將table設置爲新的數組 table = nextTab; //sizeCtl設置爲n的 1.5倍 sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 到這裏,說明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT, // 也就是說,全部的遷移任務都作完了,也就會進入到上面的 if(finishing){} 分支了 finishing = advance = true; i = n; // recheck before commit } } ... }
這個方法是用來表示已經遷移完畢了,能夠退出。
for (int i = 0, bound = 0;;) { ... //若是該槽位沒有元素,那麼直接把tab的i槽位設置爲fwd else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //說明這個槽位已經有其餘線程遷移過了 else if ((fh = f.hash) == MOVED) advance = true; // already processed //走到這裏,說明tab的這個槽位裏面有數據,那麼咱們須要得到槽位的頭節點的監視器鎖 else { synchronized (f) { if (tabAt(tab, i) == f) { ... } } } ... }
在這個代碼塊中,i會從最後一個元素一個個往前移動,而後根據i這個index來判斷tab裏面槽位的狀況。
下面的代碼咱們來分析監視器鎖裏面的內容:
synchronized (f) { if (tabAt(tab, i) == f) { //fh是當前節點的hash值 if (fh >= 0) { int runBit = fh & n; //lastRun設置爲頭節點 Node<K,V> lastRun = f; // 須要將鏈表一分爲二, // 找到原鏈表中的 lastRun,而後 lastRun 及其以後的節點是一塊兒進行遷移的 // lastRun 以前的節點須要進行克隆,而後分到兩個鏈表中 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //其中的一個鏈表放在新數組的位置 i setTabAt(nextTab, i, ln); //另外一個鏈表放在新數組的位置 i+n setTabAt(nextTab, i + n, hn); //將原數組該位置處設置爲 fwd,表明該位置已經處理完畢 //其餘線程一旦看到該位置的 hash 值爲 MOVED,就不會進行遷移了 setTabAt(tab, i, fwd); //advance 設置爲 true,表明該位置已經遷移完畢 advance = true; } //下面紅黑樹的遷移和上面差很少 else if (f instanceof TreeBin) { .... } } }
這個方法主要是將頭節點裏面的鏈表拆分紅兩個鏈表,而後設置到新的數組中去,再給老的數組設置爲fwd,表示這個節點已經遷移過了。
到這裏transfer方法已經分析完畢了。 這裏我再舉個例子,讓你們根據透徹的明白多線程之間是怎麼進行遷移工做的。
咱們假設stride仍是默認的16,第一次進來nextTab爲null,可是tab的長度爲32。 一開始的賦值: 1. n會設置成32,而且n只會賦值一次,表明被遷移的數組長度 2. nextTab會被設置成一個大小爲64的數組,並塞入到新的ForwardingNode對象中去。 3. transferIndex會被賦值爲32 進入循環: 初始化i爲0,bound爲0; 第一次循環: 1. 因爲advance初始化爲true,因此會進入到while循環中,循環出來後,transferIndex會被設置成16,bound被設置成16,i設置成31。這裏你可能會問 2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置爲fwd,將advance設置成爲true 第二次循環: 1. --i,變爲30,--i >= bound成立,並將advance設置成false 2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置爲fwd,將advance設置成爲true 。。。 第十六次循環: 1. --i,變爲15,將transferIndex設置爲0,bound也設置爲0,i設置爲15 2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置爲fwd,將advance設置成爲true 第三十二次循環: 1. 這個時候--i等於-1,而且(nextIndex = transferIndex) <= 0成立,那麼會將i設置爲-1,advance設置爲false 2. 會把SIZECTL用CAS設置爲原來的值加1,而後設置finishing爲true 第三十三次循環: 1. 因爲finishing爲true,那麼nextTable設置爲null,table設置爲新的數組值,sizeCtl設置爲舊tab的長度的1.5倍
原文出處:https://www.cnblogs.com/luozhiyun/p/11406557.html