ConcurrentHashMap是線程安全的HashMap;在併發的狀況下使用HashMap可能會致使死循環,在進行put操做時致使CPU利用率接近100%。是由於在多線程會致使HashMap的Entry鏈表造成環形數據結構,一旦造成環形數據結構,Entry的next結點永遠不能爲空,就會產生死循環獲取Entry。java
在JDk1.8中ConcurrentHashMap採用Node + CAS + Synchronized來保證併發狀況下的更新不會出現問題。其底層的數據結構是:數組 + 鏈表 + 紅黑樹 的方式來實現的。node
注:點擊瞭解紅黑樹。git
/**
* 最大容量,32位的Hash值的最高兩位用做控制的目的,
* 這個值必須剛好是1<<30(2的30次方),這樣分配的java數組
* 在索引範圍內(2的整數次冪)。
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* Hash表默認的初始容量。必須是2的整數次冪,
* 最小爲1,最大爲MAXIMUM_CAPACITY.
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 最大的數組大小(非2次冪)。
* toArray 和 related方法使用。
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 表默認的併發級別,未使用。爲與該類的之前版本兼容而定義
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* T表的默認加載因子,在構造函數中重寫此值隻影響初始表容量。
* 浮點值一般不被使用,
* 當前表中的容量 = 初始化容量 - (初始化容量無符號右移2位)時擴容
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 鏈表轉紅黑樹閥值,該值必須大於2,而且應該至少爲8。
* 以便與樹移除中關於收縮後轉換回普通Bin的假設相吻合。
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 用於在調整大小操做期間反樹化(拆分)bin的bin計數閾值,
* 應該小於TREEIFY_THRESHOLD, 最多爲6.
*/
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* 用於生成戳記的位的數目,單位爲sizeCtl。
* 32位數組必須至少爲6.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* 2^15-1,help resize的最大線程數
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* 32-16=16,sizeCtl中記錄size大小的偏移量
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/* forwarding nodes的hash值*/
static final int MOVED = -1;
/* 樹根節點的hash值*/
static final int TREEBIN = -2;
/* ReservationNode的hash值*/
static final int RESERVED = -3;
/* 普通節點哈希的可用位*/
static final int HASH_BITS = 0x7fffffff;
複製代碼
/**
* 裝載Node的數組,做爲ConcurrentHashMap的數據容器,
* 採用懶加載的方式,直到第一次插入數據的時候纔會進行初始化操做,
* 數組的大小老是爲2的冪次方。
*/
transient volatile Node<K,V>[] table;
/**
* 擴容時使用,只有在擴容的時候才爲非null
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 控制Table的初始化與擴容。
* 當值爲負數時table正在被初始化或擴容
* -1表示正在初始化
* -N則表示當前正有N-1個線程進行擴容操做
* 正數或0表明hash表尚未被初始化,這個數值表示初始化或下一次進行擴容的大小
*/
private transient volatile int sizeCtl;
複製代碼
Node是最核心的內部類,它包裝了key-value鍵值對,全部插入ConcurrentHashMap的數據都包裝在這裏面。Node類實現了Map.Entry<K,V>接口,Node類中包含有屬性有key,value以及下一節點的引用,其中value和next屬性使用volatile關鍵字修飾,保證其在多線程下的可見性。不容許調用setValue方法直接改變Node的value域,它增長了find方法輔助map.get()方法。github
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
/** * Node結點的構造方法 */
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/** * Node結點中提供的find方法,在子類中可重寫 */
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
複製代碼
樹節點類,另一個核心的數據結構,包含父接點,左連接的結點,右連接的結點,前驅結點的引用,以及結點的顏色(默認紅色)。當鏈表長度過長的時候,會轉換爲TreeNode在TreeBins中使用。TreeNode是上述Node類的子類。數組
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/** * 經過給定的key從指定的根節點開始(在其子樹)查找 * 對應的TreeNode結點,沒有返回null * h 表示當前能夠的Hash值 * k 要查找的鍵(key) * kc k的Class對象,該Class應該是實現了Comparable<K>的,不然應該是null */
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
// 判斷對應的鍵是否爲null
if (k != null) {
//獲取當前結點
TreeNode<K,V> p = this;
do { //循環
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
/** * 當前結點的Hash值大於要查找的Key的Hash值H * 在當前節點的左子樹中查找,反之在右子樹中 * 進行下一輪循環 */
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
/** * 當前結點的key等於要查找的key, * 或當前結點的key不爲null且equals()方法爲true * 返回當前的結點 */
return p;
/** * 執行到這裏說明 hash比對相同, * 但當前節點的key與要查找的k不相等 */
else if (pl == null)
/** * 左孩子爲空,指向當前節點右孩子,繼續循環 */
p = pr;
else if (pr == null)
/** * 右孩子爲空,指向當前節點左孩子,繼續循環 */
p = pl;
/** * 左右孩子都不爲空,再次進行比較, * 肯定在左子樹仍是右子樹中查找 */
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
/** * comparable方法來比較pk和k的大小 * dir小於0,p指向左孩子,不然指向右孩子 */
p = (dir < 0) ? pl : pr;
/** * 沒法經過上一步驟肯定是在左/右子樹中查找 * 從右子樹中遞歸調用findTreeNode()方法查找 */
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
//在右子樹中沒有找到,到左子樹中查找
p = pl;
} while (p != null);
}
return null;
}
}
複製代碼
紅黑樹結構。該類並不包裝key-value鍵值對,而是TreeNode的列表和它們的根節點。它代替了TreeNode的根節點,也就是說在實際的ConcurrentHashMap「數組」中,存放的是TreeBin對象,而不是TreeNode對象。這個類含有讀寫鎖。 這裏咱們先看紅黑樹相關操做的方法。安全
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
/** * 經過結點b構造紅黑樹,鏈表轉紅黑樹 */
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
// 將給定的節點指向頭結點
this.first = b;
TreeNode<K,V> r = null;
/** * 定義X節點 爲 b 結點;next結點也爲b結點 * next 節點初始化爲頭結點,用來控制遍歷 */
for (TreeNode<K,V> x = b, next; x != null; x = next) {
//指向下一結點
next = (TreeNode<K,V>)x.next;
//將x結點的鏈接屬性清空
x.left = x.right = null;
if (r == null) {
/** * r 爲null 說明是紅黑樹中沒有結點 * x 結點就是紅黑樹的根結點 * 根結點的父結點爲null,顏色爲黑色 */
x.parent = null;
x.red = false;
r = x;
}
else {
// 當前結點的關鍵字
K k = x.key;
// 當前結點的hash值
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
/** * 從紅黑樹的根結點開始遍歷,查找當前結點對應的位置 * dir 控制查找的方向 * ph 記錄當前結點的hash值 */
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
/** * 當前節點的hash值大於要插入結點的hash值, * 在當前結點的左子樹中查找 */
dir = -1;
else if (ph < h)
/** * 當前節點的hash值小於要插入結點的hash值, * 在當前結點的右子樹中查找 */
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
/** * 若是hash值相等,則比較k值,用其Compare, * 若是還相等,則走tieBreakOrder方法 */
dir = tieBreakOrder(k, pk);
// 暫存當前節點
TreeNode<K,V> xp = p;
/** * 根據dir控制查找方向 */
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
// 插入後平衡紅黑樹的性質
r = balanceInsertion(r, x);
break;
}
}
}
}
//指定紅黑樹的根結點
this.root = r;
assert checkInvariants(root);
}
/** * 左旋轉過程 */
static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) {
TreeNode<K,V> r, pp, rl;
/** * 結點P不爲null且p的右結點不爲null */
if (p != null && (r = p.right) != null) {
if ((rl = p.right = r.left) != null)
/** * p.right = r.left p的右結點爲r的左結點 * 而後將其賦值給rl * 當rl 不爲空的時候,肯定p與rl的關係: * 父結點與左子結點 */
rl.parent = p;
if ((pp = r.parent = p.parent) == null)
/** * 結合 r = p.right * r.parent = p.parent 將p的右子樹連接到 p的父結點 * 若是P的父結點爲null,說明當前p結點爲紅黑樹的根結點 * 通過上述r.parent = p.parent 將紅黑樹的根節點轉爲r * 根結點爲r結點,顏色尾黑色 */
(root = r).red = false;
else if (pp.left == p)
/** * 到這一步說明 p 結點不是紅黑樹的根結點 * 且p爲其父結點的左子樹, * 將r替換原來P結點的位置(左旋轉) */
pp.left = r;
else
/** * 到這一步說明 p 結點不是紅黑樹的根結點 * 且p爲其父結點的右子樹, * 將r替換原來P結點的位置(左旋轉) */
pp.right = r;
/** * 上述過程只是完成了將原先以P 結點爲紅黑樹子樹 * 的根結點,替換爲以P的右結點爲根結點的部分 * 即 p.right = r.left 將原先r的左連接替換 * 成 p的右連接的過程。 */
//r的左結點爲p
r.left = p;
// p的父結點爲r節點
p.parent = r;
}
return root;
}
/** * 右旋轉 爲上述左旋轉的逆過程 */
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) {
TreeNode<K,V> l, pp, lr;
if (p != null && (l = p.left) != null) {
if ((lr = p.left = l.right) != null)
lr.parent = p;
if ((pp = l.parent = p.parent) == null)
(root = l).red = false;
else if (pp.right == p)
pp.right = l;
else
pp.left = l;
l.right = p;
p.parent = l;
}
return root;
}
/** * 紅黑樹中插入結點後會打破紅黑樹性質須要平衡 * TreeNode<K,V> root 根結點 * TreeNode<K,V> x 要插入的結點 */
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) {
//默認插入結點爲紅色
x.red = true;
for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
// xp爲當前節點的父結點
if ((xp = x.parent) == null) {
/** * 當前結點的父結點爲空,說明紅黑樹中只有一個結點 * 當前結點即爲根結點,顏色爲黑色 */
x.red = false;
return x;
}
/** * 當前結點的父結點(xp)不爲null * 父結點爲黑色,沒有打破紅黑樹的平衡性(着色可能有問題) * 父結點的的父結點(xpp)爲null,紅黑樹中只有兩個節點 * 上述兩種狀況直接返回root結點 */
else if (!xp.red || (xpp = xp.parent) == null)
return root;
/** * 當前結點的父結點(xp) 爲 其父節點(xpp)的左孩子 */
if (xp == (xppl = xpp.left)) {
if ((xppr = xpp.right) != null && xppr.red) {
/** * 當前結點(x)得父結點(xp)的父結點(xpp)的右孩子(xppr) * 不爲null 且 顏色爲紅色(此時顏色的性質不知足) * 變換顏色 */
xppr.red = false; // 將xppr變爲黑色
xp.red = false; // 將xp變爲黑色
xpp.red = true; // 將xpp變爲紅色
x = xpp; // 將xpp指向x 繼續循環
}
/** * 當前結點的父結點的父結點右孩子爲null或顏色爲黑色 */
else {
// 若是(當前結點)x爲父結點的右孩子
if (x == xp.right) {
//左旋轉
root = rotateLeft(root, x = xp);
// 從新指定xpp
xpp = (xp = x.parent) == null ? null : xp.parent;
}
// 若是當前結點的父結點不爲null
if (xp != null) {
// 將xp的顏色置爲黑色
xp.red = false;
// 父結點的父結點(xpp)不爲null
if (xpp != null) {
//將xpp顏色置爲紅色
xpp.red = true;
// 有旋轉
root = rotateRight(root, xpp);
}
}
}
}
/** * 當前結點的父結點(xp) 爲 其父節點(xpp)的右孩子 */
else {
//xppl 爲當前結點(x)的父結點(xp)的父結點(xpp)的左孩子
if (xppl != null && xppl.red) {
/** * xppl不爲null 且是紅結點 * xpp * / \ * red xppl xp red * ---> x結點在這一層 * | 變爲 * xpp red ---> 變換事後x結位置 * / \ * black xppl xp black * */
xppl.red = false;
xp.red = false;
xpp.red = true;
// 控制循環
x = xpp;
}
else {
//若是左叔叔爲空或者是黑色
if (x == xp.left) {
//若是當前節點是個左孩子 右旋轉
root = rotateRight(root, x = xp);
//獲取爺爺結點
xpp = (xp = x.parent) == null ? null : xp.parent;
}
if (xp != null) {
/** * 父結點不爲null 設置父結點爲黑色 */
xp.red = false;
if (xpp != null) {
/** * 爺爺結點不爲null * 將其置爲紅色 * 對其進行左旋轉 */
xpp.red = true;
root = rotateLeft(root, xpp);
}
}
}
}
}
}
/** * 紅黑樹中刪除節點後會打破紅黑樹的性質須要平衡 * TreeNode<K,V> root 根結點 * TreeNode<K,V> x 要刪除的節點 */
static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x) {
for (TreeNode<K,V> xp, xpl, xpr;;) {
if (x == null || x == root)
// x 爲 null 或者 x 爲根結點 無須平衡
return root;
else if ((xp = x.parent) == null) {
/** * xp null * \ * ---> x 結點位置(可爲左結點也可爲右結點) * 此時x 爲紅黑樹的根結點 */
x.red = false;
return x;
}
else if (x.red) {
/** * xp * \ * ---> x 結點位置(且爲red) * 將其變爲黑色結點 */
x.red = false;
return root;
}
else if ((xpl = xp.left) == x) {
// x 爲其父結點的左結點
if ((xpr = xp.right) != null && xpr.red) {
/** * xp * / \ * x xpr red * / \ * xprl xprr */
xpr.red = false; // 將xpr置爲黑色
xp.red = true; // 將xp置爲紅色
// 左旋轉 xp
root = rotateLeft(root, xp);
/** * xpr * / \ * xp xprr * / \ * x xprl */
// 獲取 新的xpr
xpr = (xp = x.parent) == null ? null : xp.right;
}
if (xpr == null)
// 控制循環
x = xp;
else {
/** * xpr * / \ * xp xprr * / \ * x xprl * / \ * sl sr */
TreeNode<K,V> sl = xpr.left, sr = xpr.right;
if ((sr == null || !sr.red) &&
(sl == null || !sl.red)) {
/** * 樹中xpr(即上圖的xprl) 的葉子結點不存在 或爲黑色結點 * 樹中xpr(上圖中的xprl) 置爲紅色 * 將x 指向 xp 繼續循環 */
xpr.red = true;
x = xp;
}
else {
if (sr == null || !sr.red) {
/** * xpr * / \ * xp xprr * / \ * x xprl * / \ * sr null || black */
if (sl != null)
// sl 不爲null 將其置爲黑色
sl.red = false;
// 樹中的xpr(上圖xprl)置爲紅色
xpr.red = true;
// 右旋轉
root = rotateRight(root, xpr);
// 從新獲取xp的右子樹
xpr = (xp = x.parent) == null ?
null : xp.right;
}
if (xpr != null) {
/** * 從新獲取的xpr 不爲null * xpr 的顏色與 父結點的顏色相同 */
xpr.red = (xp == null) ? false : xp.red;
if ((sr = xpr.right) != null)
// 從新獲取的xpr 的右節點不爲null,將其置黑
sr.red = false;
}
if (xp != null) {
/** * xp 不爲null * 將xp置爲紅色 * 左旋轉xp */
xp.red = false;
root = rotateLeft(root, xp);
}
x = root;
}
}
}
else { // x 爲其父結點的右結點
if (xpl != null && xpl.red) {
/** * xp * / \ * xpl x * / \ * */
xpl.red = false;
xp.red = true;
/** * xp xpl * / \ / \ * xpl x ==> xpll xp * / \ / \ / \ * xpll xplr xplr x */
root = rotateRight(root, xp);
/** * 從新獲取xpl * xpl * / \ * xpll xp * / \ * xplr x * |__ 新的xpl指向這裏 */
xpl = (xp = x.parent) == null ? null : xp.left;
}
if (xpl == null)
// 新的xpl爲null x 指向器父結點
x = xp;
else {
// 獲取xpl的左結點與右結點
TreeNode<K,V> sl = xpl.left, sr = xpl.right;
if ((sl == null || !sl.red) &&
(sr == null || !sr.red)) {
/** * 左子結點 爲空或 爲黑色 * 且 * 右子結點 爲空或 爲黑色 * * 將 xpl 置爲紅色 */
xpl.red = true;
// 控制循環
x = xp;
}
else {
if (sl == null || !sl.red) {
/** * xp * / \ * xpl x * / \ / \ * sl(null || black) */
if (sr != null)
// 若是sr不爲null 設置爲黑色
sr.red = false;
//xpl置爲紅色
xpl.red = true;
//左旋轉xpl
root = rotateLeft(root, xpl);
//從新獲取xpl
xpl = (xp = x.parent) == null ?
null : xp.left;
}
if (xpl != null) {
// xpl 不爲null xpl的顏色與xp的顏色相同
xpl.red = (xp == null) ? false : xp.red;
if ((sl = xpl.left) != null)
sl.red = false;
}
if (xp != null) {
/** * xp不爲null * xp爲黑色 * 右旋轉xp */
xp.red = false;
root = rotateRight(root, xp);
}
x = root;
}
}
}
}
}
}
複製代碼
總結:bash
在上一篇文章中,系統的學習了紅黑樹相關的知識,包括性質,以及爲了維護紅黑樹的性質須要進行相應的左旋轉,右旋轉,顏色轉換等子過程。在學習ConcurrentHashMap以前,要先對紅黑樹的操做有必定的瞭解。這篇文章,從ConcurrentHashMap的底層,經過其內部定義的一些常量,以及相關的內部類看起,從新回顧了一下紅黑樹的操做,以及併發大師的實現方式。在下一篇文章中,將學習ConcurrentHashMap相關的操做以及實現原理。微信
我的微信公衆號: 數據結構