上篇文章講了 Unsafe 類中 CAS 的實現,實際上是在爲這篇文章打基礎。不太熟悉的小夥伴請移步Unsafe 中 CAS 的實現。本篇文章主要基於 OpenJDK8
來作源碼解析。java
ConcurrentHashMap 基於 HashMap 實現。數組
JDK1.7 和 JDK1.8 做爲併發容器在實現上是有差異的。JDK1.7 經過 Segment 分段鎖實現,而 JDK1.8 經過 CAS+synchronized 實現。安全
在 ConcurrentHashMap 中使用了 unSafe 方法,經過直接操做內存的方式來保證併發處理的安全性,使用的是硬件的安全機制。網絡
private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } // CAS 將Node插入bucket static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
仍是老規矩,先上流程圖幫助閱讀源碼。併發
主體源碼以下:線程
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; // 基礎數組 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 初始化 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 若是bucket==null,即沒有hash衝突,CAS插入 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 若是在進行擴容操做,則先擴容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 不然,存在hash衝突 else { V oldVal = null; // 加鎖同步 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 遍歷過程當中出現相同key直接覆蓋 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; // 尾插法插入 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 若是是樹節點,遍歷紅黑樹 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
put 操做過程以下:設計
下邊的示意圖來自網絡code
本文只分析了 ConcurrentHashMap
的 put()
方法,並無分析 get()、擴容、刪除節點等方法。主要目的是初步瞭解 ConcurrentMap 確保併發寫的設計思路。至此,本篇文章結束,感謝你們的閱讀!歡迎你們關注公衆號【當我趕上你】。內存