ConcurrentHashMap和HashMap同樣是一個用來存儲鍵值對<key,value>的集合類,但和HashMap不一樣的是ConcurrentHashMap是線程安全的,也就是多個線程同時對ConcurrentHashMap進行修改或者刪除增長操做不會出現數據錯誤的問題.
和HashMap同樣採用數組+鏈表+紅黑樹實現但和HashMap不一樣的是,數組中存儲的節點類型有所增長,包括Node<key,value>,TreeNode<key,value>,ForwardingNode<key,value>,新增這個節點的目的就是爲了線程併發協助擴容時使用java
//01111111111111111111111111111111 該值能夠保證計算出來的哈希值爲正數 static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash//該屬性用在擴容時生成一個負值,表示正在擴容 //The number of bits used for generation stamp in sizeCtl. //sizeCtl中用於生成戳記的位數。 //Must be at least 6 for 32bit arrays. //對於32位數組,必須至少爲6。 private static int RESIZE_STAMP_BITS = 16;//和上面同樣,也是爲了在擴容時生成一個負值,具體在代碼中解釋 //The bit shift for recording size stamp in sizeCtl. //在sizeCtl中記錄大小戳的位移位。 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;//表示當前桶位正在被遷移 //Encodings for Node hash fields. See above for explanation. static final int MOVED = -1;//表示當前桶是以樹來存儲節點的 static final int TREEBIN = -2;//Number of CPUS, to place bounds on some sizings //cpu的數量,用來計算元素數量時限制CounterCell數組大小 static final int NCPU = Runtime.getRuntime().availableProcessors();/** * The next table to use; non-null only while resizing. * 用來擴容的哈希表 */ private transient volatile Node<K, V>[] nextTable;/** * Base counter value, used mainly when there is no contention, but also as a fallback during table initialization * races. Updated via CAS. * 哈希表元素數量,經過longAdder來維護 */ private transient volatile long baseCount;/** * Table initialization and resizing control. * 哈希表初始化和擴容大小控制. * When negative, the table is being initialized or resized: * 當這個值爲負數時,表示哈希表正在初始化或從新計算大小 * -1 for initialization, * -1 表示正在初始化了 * else -(1 + the number of active resizing threads). * 表示哈希表正在擴容,-(1+n),表示此時有n個線程正在共同完成哈希表的擴容 * Otherwise, when table is null, holds the initial table size to use upon creation,or 0 for default. * 不然,當哈希表爲空時, 保留要建立哈希表的大小0或默認(16) * After initialization, holds the next element count value upon which to resize the table. * 初始化完成以後,保留下一次須要擴容的閾值 */ private transient volatile int sizeCtl;/** * The next table index (plus one) to split while resizing. * 擴容時的當前轉移下標 */ private transient volatile int transferIndex;/** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. * 獲取計算集合元素容量的CounterCell對象的鎖 */ private transient volatile int cellsBusy;/** * Table of counter cells. When non-null, size is a power of 2. * 計算元素數量的數組 */ private transient volatile CounterCell[] counterCells;
/** * 和HashMap構造函數不一樣的是,數組容量的計算老是大於傳入容量的2的冪 * 即若是傳入32則數組初始容量爲64,而不是32,而HashMap計算出來爲32 */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
/** * 這個方法就是HashMap中的hash方法,用來計算哈希值 */ static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
獲取節點node
public V get(Object key) { Node<K, V>[] tab; Node<K, V> e, p; int n, eh; K ek; //計算散列值 int h = spread(key.hashCode()); //計算下標(這一塊同HashMap再也不贅述) if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) //哈希值小於0,表示爲樹節點,從樹中尋找,這一步和HashMap一致 return (p = e.find(h, key)) != null ? p.val : null; //在鏈表中尋找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
插入節點數組
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //計算哈希值 int hash = spread(key.hashCode()); //插入桶的節點數量 int binCount = 0; //使用死循環,目的是可能有的線程正在協助擴容,以後還須要插入或者更新,或者須要操做的節點所在的桶已經被其餘線程鎖定,須要等待其餘線程執行完以後再執行 for (Node<K, V>[] tab = table; ; ) { Node<K, V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //初始化哈希表 tab = initTable(); else if //計算下標,而且計算該下標是否有元素 ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //cas插入,這一步不須要鎖,由於當前桶爲空 if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null))) break; } else if //表明當前節點已經被移動,正在擴容,須要當前線程協助擴容 ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //鎖住頭節點,保證全部線程的插入都是線程安全的 synchronized (f) { //這一步判斷的緣由是,可能插入元素以後會形成鏈表樹化,須要插入的位置已經發生了變化 if (tabAt(tab, i) == f) { if (fh >= 0) { //當前桶上有多少個節點 binCount = 1; for (Node<K, V> e = f; ; ++binCount) { K ek; //查找到了key相同的節點,直接修改值並返回舊的值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K, V> pred = e; //沒有找到相同的key,直接向鏈表尾部插入節點 if ((e = e.next) == null) { pred.next = new Node<K, V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //給樹裏面插入節點 Node<K, V> p; binCount = 2; if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //須要樹化 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); //修改舊值,直接將舊值返回 if (oldVal != null) return oldVal; break; } } } //計算節點數量 addCount(1L, binCount); return null; }
初始化哈希表安全
private final Node<K, V>[] initTable() { Node<K, V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //小於0表示正在初始化或者正在擴容,讓出cpu if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if //判斷sc是否與SIZECTL是否相等,若是相等,則將SIZECTL設置爲-1,表示當前正在初始化(只有一個線程能進行此操做,其餘線程會被擋在前面的判斷上) (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //防止有線程已經初始化了1 if ((tab = table) == null || tab.length == 0) { //該sc若是在構造器上傳入了,則會被計算爲大於其的2次冪,不然會按照默認值初始化 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n]; table = tab = nt; //設置下一次擴容的閾值 n - (n >>> 2) = n - n / 4 = (3 / 4) * n = 0.75n,即下一次的擴容閾值爲當前哈希表數量的0.75*n sc = n - (n >>> 2); } } finally { //設置sizeCtl爲-1,表示初始化動做已經有線程在執行了 sizeCtl = sc; } break; } } return tab; }
計算節點數量多線程
private final void addCount(long x, int check) { CounterCell[] as; long b, s; /* * 維護數組長度 */ //嘗試cas直接修改值,若是修改失敗 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; //數組爲空或者長度小於0或者對應的位置爲空或者直接修改數組對應位置上的值失敗,則進行修改操做 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } //桶上的節點數量小於等於1,不須要判斷擴容,直接退出 if (check <= 1) return; //獲取當前數組的元素數量 s = sumCount(); } /* * 判斷是否須要擴容 */ if (check >= 0) { Node<K, V>[] tab, nt; int n, sc; //當前節點數量大於擴容閾值,而且數組不爲空而且數組長度小於最大值則須要擴容 while (s >= (long) (sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //獲取一個負值 int rs = resizeStamp(n); //若是sc小於0,說明正在擴容,須要協助擴容 if (sc < 0) { //判斷擴容是否完成 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //協助擴容,這裏sc+1表明新加入一個線程協助擴容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if /* * 假設 rs = 00000000 00000000 10000000 00000000 * 將其向左移16位結果爲 10000000 00000000 00000000 00000000 能夠看出該值爲負 * 這一步嘗試將sc設置爲負數 */ (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //將舊數組置空,裏面會建立一個新的數組 transfer(tab, null); //計算集合元素數量 s = sumCount(); } } }
private final void fullAddCount(long x, boolean wasUncontended) { int h; //獲取當前線程的hash值 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } //檢測是否有衝突,若是最後一個桶不爲null,則爲true boolean collide = false; for (; ; ) { CounterCell[] as; CounterCell a; int n; long v; //數組若是不爲空,則優先對CounterCell裏面的counterCell的value進行累加 if ((as = counterCells) != null && (n = as.length) > 0) { //當前位置爲空 if ((a = as[(n - 1) & h]) == null) { //當前沒有線程嘗試修改該值 if (cellsBusy == 0) { CounterCell r = new CounterCell(x); //搶佔修改的鎖 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { //釋放鎖 cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } //搶佔失敗 collide = false; } else if //桶位不爲空,從新計算線程hash值,繼續循環 (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash /* * 從新計算hash值以後,對應的桶位仍是不爲空,對value進行累加 * 嘗試cas對value加值 */ else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; //數組長度已經大於等於CPU的核數了,不須要再擴容了 else if (counterCells != as || n >= NCPU) collide = false; //當沒有衝突,修改成有衝突,從新計算hash值,繼續循環 else if (!collide) collide = true; else if //屢次循環沒有設置成功值,則對原數組進行擴容 (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale //數組長度沒有超過cpu核數,將數組擴容兩倍 CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) //擴容使用 rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //從新計算隨機值 h = ThreadLocalRandom.advanceProbe(h); } else if //初始進來數組爲空,須要初始化數組 (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if //數組爲空而且有其餘線程正在建立數組,嘗試直接對baseCount進行累加 (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
擴容並遷移併發
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) { //stride表示遷移數據的區間 int n = tab.length, stride; /* * 這裏計算每一個CPU負責遷移元素的個數 * 若是這裏的跨度區間小於16,則按照最小區間16來計算 */ if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; //這裏表示爲第一個線程來擴容 if (nextTab == null) { try { //擴容爲兩倍 @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //遷移數據的index transferIndex = n; } //新擴容數組的長度 int nextn = nextTab.length; //建立頭節點,該節點會被標識爲MOVED表示數據正在遷移中 ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab //從後向前遷移 for (int i = 0, bound = 0; ; ) { Node<K, V> f; int fh; while (advance) { int nextIndex, nextBound; //不屬於本身的遷移位置或者已經遷移完成直接退出 if (--i >= bound || finishing) advance = false; else if //下一個遷移位置小於等於0直接退出 ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if //計算遷移位置(多線程會劃分多個區間) (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; //判斷是否全部的線程都作完了任務 if (finishing) { nextTable = null; table = nextTab; //等於0.75 * 2n,也就是新數組擴容2倍*擴容因子 sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //判斷擴容是否成功 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if //若是當前位置爲空,直接插入fwd節點,表示當前節點正在被遷移 ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) //正在被遷移,須要從新計算位置 advance = true; // already processed else { //遷移代碼 synchronized (f) { if (tabAt(tab, i) == f) { Node<K, V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K, V> lastRun = f; for (Node<K, V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K, V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K, V>(ph, pk, pv, ln); else hn = new Node<K, V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //遷移完成,設置頭節點爲fwd setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K, V> t = (TreeBin<K, V>) f; TreeNode<K, V> lo = null, loTail = null; TreeNode<K, V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K, V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K, V> p = new TreeNode<K, V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K, V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K, V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //遷移完成,設置頭節點爲fwd setTabAt(tab, i, fwd); //從新計算位置繼續遷移 advance = true; } } } } } }
獲取節點數量less
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long) Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) n); }
/** * 獲取哈希表中節點的數量(非線程安全) * 這個方法返回的數據不必定準確,由於可能在調用該方法的時候,有其餘線程正在嘗試給數組中的節點加值 */ final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (CounterCell counterCell : as) { if ((a = counterCell) != null) sum += a.value; } } return sum; }