上次講到HashMap,可是HashMap並非線程安全的,那麼有哪些線程安全的Map或者是實現線程安全的map呢?java
一、 HashTable(已棄用),使用的是內置對象鎖對map進行同步,併發執行的效率比較低(key和value均不能爲null,由於這是用在多線程的,當get返回null時,沒法肯定是不包含這個key仍是值爲null,hashMap容許key爲null,由於運行在單線程,能夠經過containKey來判斷是否存在key,而containKey在多線程中可能恰好在你調用以前remove了當前key致使當前key爲null);node
二、 ConcurrentHashMap(流行使用),JDK1.7使用分段鎖,JDK1.8使用的是CAS+synchronized實現併發訪問(key和value均不能爲null);數組
三、使用Collections的synchronizedMap(Map m) 進行包裝,使用的是傳入m的內置鎖,一樣併發執行效率低。安全
下面記錄一下JDK1.8的ConcurrentHashMap的源碼學習,花了挺長時間,比hashmap難多了多線程
下面只列出了一些屬性併發
// 保存鍵值對總數 private transient volatile long baseCount; /* 默認爲0,用來控制table的初始化和擴容操做, 小於0時表明正在擴容,而且 -n表示 * 有n – 1個線程在擴容,正數表明table容量 */ private transient volatile int sizeCtl; static final int MOVED = -1; // hash for forwarding nodes,forwarding nodes 是擴容時用到的node static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient
// 帶容量的初始化,只是初始化了容量,並無創建桶數組 public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
這裏只列出了帶容量的構造方法,還有其餘的沒列出了。oracle
public V put(K key, V value) { return putVal(key, value, false); } // onlyIfAbsent 爲 true的話表示若當前key-value不存在,進行插入;若存在,不對當前存在key-value進行更新 final V putVal(K key, V value, boolean onlyIfAbsent) { // key 和value不能爲null if (key == null || value == null) throw new NullPointerException(); //計算key的哈希值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 初始化表數組(或者叫桶數組,默認容量爲16),和hashMap同樣,table是延遲加載的,initTable()經過CAS機制實現同步,稍後會講(看到這能夠先到後面看看initTable()) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 經過CAS機制進行併發put, /* *param1 :桶數組 *param2 :節點(node)位置偏移 *param3 :節點當前預期內存值 *param4 :要在當前節點內更新的值 */ if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 常量 MOVED = -1,表示正在擴容 else if ((fh = f.hash) == MOVED) //內置鎖是加在每一個桶上的,擴容其實是對每一個桶上的元素從新分配桶,擴容能夠在不一樣的桶上多線程併發執行 tab = helpTransfer(tab, f); else { V oldVal = null; // 對桶 f 進行同步put操做 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // f < 0 else if (f instanceof TreeBin) { Node<K,V> p; // 賦值爲2的意義只是爲了避免等於0?不太清楚,addCount(1L, binCount)看到若是在桶數組不爲空時,binCount <= 1會直接返回,是這個緣由? binCount = 2; // 紅黑樹節點,加入紅黑樹中 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // TREEIFY_THRESHOLD=8,樹化閾值 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 數量+1,並進行是否擴容判斷 addCount(1L, binCount); return null; }
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 有線程在初始化,當前線程讓步 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //若CAS成功 把當前對象SIZECTL偏移位置修改成-1,即sizeConrol = -1 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 用減法來替代 *0.75,計算機中乘法的操做比減法耗時 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
private final void addCount(long x, int check) { // counterCells 在 currentMap 初始化和 put 過程都沒有進行初始化,本人暫時也不知道這是用來幹嗎的,doc註釋爲Table of counter cells. When non-null, size is a power of 2. CounterCell[] as; long b, s; // counterCells爲null,不進入if語句 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } // 嘗試擴容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { /* resizeStamp(n){ // RESIZE_STAMP_BITS = 16 *return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); *} */ int rs = resizeStamp(n); if (sc < 0) { // RESIZE_STAMP_SHIFT = 16 ,下面的if語句判斷條件是jdk的bug,在網上查資料一大佬提到 oracle bug庫連接:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427 //下列的解釋參考自:https://www.jianshu.com/p/749d1b8db066 // 若是 sc 的低 16 位不等於 標識符(校驗異常 sizeCtl 變化了) // 若是 sc == 標識符 + 1 (擴容結束了,再也不有線程進行擴容)(默認第一個線程設置 sc ==rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等於 rs + 1) // 若是 sc == 標識符 + 65535(幫助線程數已經達到最大) // 若是 nextTable == null(結束擴容了) // 若是 transferIndex <= 0 (轉移狀態變化了) // 結束循環 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); // 線程加入協同擴容 } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); //擴容 s = sumCount(); } } }
暫時沒有認真研讀,瞭解大概過程,這裏先給參考連接,下次再仔細研讀一下:dom
https://www.jianshu.com/p/aaf769fdbd20ide
相比 hashMap ,concurrentHashMap複雜不少,在處理併發安全的問題上,ConcurrenthahMap用到了 CAS + syschroynized,CAS這就要求了要了解java的內存模型,計算機的底層,因此在這些上面花了一部分時間,對 synchronized如今我也仍是隻知其一;不知其二,要去啃啃源碼,下次博客就要記錄synchronized 和 lock、紅黑樹、線程池原理等等許多java知識,任重而道遠。。。學習
參考連接
https://www.jianshu.com/p/c0642afe03e0
https://www.jianshu.com/p/aaf769fdbd20