前兩已經分析過幾篇關於CHM的源碼,本篇分析下1.8中的實現,已經棄用以前 segment 雙桶的機制。可是本質仍是將鎖細化達到性能的提高,可是不是以前版本中定義的segment 上鎖,而是用了synchronized關鍵字鎖住 table (1.8中只維護了和hashmap中相似的數據結構。一個數據,數組內的結構是鏈表或者紅黑樹)中本次所操做對象的鏈表頭。一樣大量使用了UnSafe的 本地方法。如CAS,putOr而且最大的改進在於實現了併發的擴容。同時內部數據結構與Java8中HashMap同樣,增長了紅黑樹。當鏈表中的長度大於必定值後,轉化爲紅黑樹(紅黑樹結構的同時也仍然保持了鏈表的結構,下面會詳細介紹)。html
其實對於1.8的源碼分析 我所引用 的兩篇分章已經分析的十分詳細了,本篇本章就不對每一個方法或參數在詳細說明了,只對其中我認爲難以理解的對方加以歸納,重點對如何實現併發擴容進行了分析。若有不對的地方,歡迎你們一塊兒來交流。node
先簡單分析下put操做。咱們能夠猜測,在以前的實現中,首先定位segment,上鎖。然後操做在對segment中的map進一步處理。如今的實現中並無用segment,而是延用 hashMap中單桶,定位到鏈表後,直接 上鎖,然後對鏈表就行操做,一樣是將鎖細化。簡化了不少的計算操做。 從put的源碼能夠看出整體與hashmap的put操做相差不大,除了加鎖,另外就是增長了判斷當前結點是不是ForwardingNode。表示當前正在擴容,且該結點已經擴容完畢。然後經過helpTransfer 判斷是否參與擴容的過程。數組
與1.8中的HashMap同樣,當鏈表長度超過必定長度時,會轉換成紅黑樹。可是 二者之間有一點細微的區別。以下代碼,表示HashMap的樹節點的數據結構。安全
HashMap /** * Entry for Tree bins. Extends LinkedHashMap.Entry (which in turn * extends Node) so can be used as extension of either regular or * linked node. */ 也是繼承於HashMap中的Node,能夠將該節點作爲數組中的鏈表節點。 static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; 。。。}
HashMap中TreeNode的實現了紅黑樹功能的方法。數據結構
ConcurrentHashMap中的TreeNode相對來講很簡單,只定義了基本的數據結構和必定查找的方法。以下併發
/** * Nodes for use in TreeBins */ ConcurrentHashMap 樹的成員與上面相似,可是 static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; }
並無實現紅黑樹應實現的方法。可是二者有一個共通點,就是與單純的紅黑樹的數據結構多了一個變量,就是prev,增長這個變量的意義在於刪除紅黑樹的node時,須要找到被刪node的上一個結點,由於若是隻是單向鏈表與紅黑樹的結構。刪除紅黑樹的結點時,單向鏈表就不能維護了,由於找不到被冊結點的上一個結點。所以這裏增長了一個prev的結構,造成雙向鏈表。less
而這樣作是其增長了一個TreeBin來包裝TreeNode,而這個容器不直接保存用戶的key,value信息。hash值爲定值-2,在遍歷時可經過hash值判斷是當前Node是哪一種結構。-1表示正在ForwardingNode,-2爲TreeBin,大於0爲鏈表。ide
ConcurrentHashMap /** * TreeNodes used at the heads of bins. TreeBins do not hold user * keys or values, but instead point to list of TreeNodes and * their root. They also maintain a parasitic read-write lock * forcing writers (who hold bin lock) to wait for readers (who do * not) to complete before tree restructuring operations. */ static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; //紅黑樹的根結點。 volatile TreeNode<K,V> first; // 指向鏈表的頭部,雖然爲紅黑樹,但保持了鏈表的結構。在unTree化時簡化。 volatile Thread waiter; volatile int lockState; 。。。}
接下來介紹下由鏈表是怎麼轉化爲紅黑樹的。首先經過以下的方法將原來的Node轉換成TreeNode,從且從單身鏈表轉成雙向鏈表。此時尚未發生紅黑樹的操做。源碼分析
/* ---------------- Conversion from/to TreeBins -------------- */ /** * Replaces all linked nodes in bin at given index unless table is * too small, in which case resizes instead. */ private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
setTabAt(tab, index, new TreeBin<K,V>(hd));
而後new封裝轉換的TreeNode,在構造 方法裏轉換成紅黑樹。hd爲原鏈表的頭部。如今作爲紅黑樹的first。在構造方法裏執行的過程就和hashMap中轉紅黑樹的過程相似 ,根據key的 systemCode的大小,決定在樹中的位置,造成一個顆二叉樹。最後經過r = balanceInsertion(r, x); 由二叉樹轉爲紅黑樹。r爲root指針。性能
而在HashMap中就沒有first指針的概念,雖然其內部一樣仍是個雙向鏈表(經過prev實現)。它是在轉換成紅黑樹以後 ,經過將紅黑樹的root結點作爲first node。
// HashMap 中treeify後執行的方法,將root作爲鏈表的first node /** * Ensures that the given root is the first node of its bin. */ static <K,V> void moveRootToFront(Node<K,V>[] tab, TreeNode<K,V> root) { int n; if (root != null && tab != null && (n = tab.length) > 0) { int index = (n - 1) & root.hash; TreeNode<K,V> first = (TreeNode<K,V>)tab[index]; if (root != first) { Node<K,V> rn; tab[index] = root; TreeNode<K,V> rp = root.prev; if ((rn = root.next) != null) ((TreeNode<K,V>)rn).prev = rp; if (rp != null) rp.next = rn; if (first != null) first.prev = root; root.next = first; root.prev = null; } assert checkInvariants(root); } }
以下,這三個操做是實現併發訪問的關鍵。經過UNSAFE的本地方法,1.7中就已經引入過,而在1.8中繼續扮演着重要的角色。而且大量使用了CAS的操做。
關於 UNSAFE方法 在以前1.7就已經分析過 @SuppressWarnings("unchecked") //定位tab中index爲i的 Node static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //CAS設置tab中index爲i的結點爲node static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //Volatile 寫 tab中的 Node。在加鎖的狀況下調用 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
擴容這一操做相比於以前的實現,1.8中最大的不一樣在於實現了併發擴容。其核心 是經過了一個特殊的Node。其定義以下 若是遍歷時發現node的 hash值是 -1,表示當前正在擴容。且當前table中的該node已經容完成。遍歷下一個node進行擴容。
/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { 。。。。 } }
而其併發到底又是怎麼實現的呢 ? 接下來看,詳細代碼就不貼了,這裏分析關鍵地方,
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range 。。。。。。。 }
stride 很是關鍵,大體的意思 是每一個處理器處理的node個數不小於16。而這一步的實如今下面的方法 。
i=0開始,第一步CAS 設置 transferIndex,該值初始爲原表的大小,假設n爲64,第一個線程將其CAS設爲64-16等於48.則第一個線程就先執行64到48以前的node的擴容操做,若是在此期間第二個線程要執行到transfer方法,則transferIndex爲 32.其執行48到32之間。可是若是沒有其它線程進來執行,則它們就接着往下,爭取下一個stride數量的擴容操做。可是當最後transferIndex的值小於等於0時。表示此時已經不須要參與擴容了。如此經過CAS設置transferIndex的值,解決併發的衝突。同時每一個node擴容時,要上鎖。以防其它操做改變該鏈表的結構。這樣就能夠有多個線程併發擴容,且不會產生衝突。
Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } }
以下是在transfer過程當中,原node是鏈表的狀況下擴容過程。
if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; }
下圖表示遍歷到第i個結點,上鎖後,判斷是鏈表後,首先是找到lastRun,其表示尾部 hash&n 相同的排在最前面的node。1表示擴容後在高位hn,0表示在低位ln(也就是原來的位置)。而之因此 用hash&n 的 表達式來決定原node在擴容後的位置,而沒有經過hash計算新數組的大小 來決定 位置 。這個實現十分的巧妙,直接將hash值與原表的大小& 一下。 這樣的作的緣由是由於, put操做時決定結點所在位置時經過 hash& n-1。而n咱們這裏定義的是2的x次方。n-1顯然就是 11..11的結點,而n就是 1..00。舉個例子n爲16,則n-1爲 1111。而n爲10000。對應擴容後的大小爲100000,原結點在新數組的位置就是hash& 11111。如今咱們發現 原以前的差異就在於 原數組的&操做。也就是 11111中第一個1。因此擴容時,不須要從新計算。而只須要將原來的hash值與n進行&操做(這樣的操做簡化了計算的複雜度)。就能夠肯定在新table的位置。0表示原索引,而1表示 i+n的位置。且不須要考慮併發的問題,nextTable的 i和 i+n的位置 只會由原table的i中的node給從新佔據,而咱們一開始就對table的 node i已經上鎖了。因此是安全的。
上面分析了鏈表的反序處理。紅黑樹相似,可是須要判讀是否unTree。
總結:進一步將鎖細化,不在是設置的併發級別,隨着擴容以後 ,鎖粒度進一步細化,而且提供了併發擴容,且大量使用了UNSAFE的本地方法,性能也進一步提高。
本文沒有對CHM的全部操做進行分析 ,如get,size,remove等,在下面引用的兩篇文章已經很詳細的分析,上文只是對部門關鍵點代表了個人一點點的見解。若有不正之處但願你們指出來。本文後面接着在完善。
http://www.importnew.com/22007.html
http://blog.csdn.net/u010723709/article/details/48007881