jdk1.8 ConcurrentHashMap 源碼學習

 

上次講到HashMap,可是HashMap並非線程安全的,那麼有哪些線程安全的Map或者是實現線程安全的map呢?java

一、 HashTable(已棄用),使用的是內置對象鎖對map進行同步,併發執行的效率比較低(key和value均不能爲null,由於這是用在多線程的,當get返回null時,沒法肯定是不包含這個key仍是值爲null,hashMap容許key爲null,由於運行在單線程,能夠經過containKey來判斷是否存在key,而containKey在多線程中可能恰好在你調用以前remove了當前key致使當前key爲null);node

二、 ConcurrentHashMap(流行使用),JDK1.7使用分段鎖,JDK1.8使用的是CAS+synchronized實現併發訪問(key和value均不能爲null);數組

三、使用Collections的synchronizedMap(Map m) 進行包裝,使用的是傳入m的內置鎖,一樣併發執行效率低。安全

下面記錄一下JDK1.8的ConcurrentHashMap的源碼學習,花了挺長時間,比hashmap難多了多線程

屬性

  下面只列出了一些屬性併發

// 保存鍵值對總數
private transient volatile long baseCount;

/* 默認爲0,用來控制table的初始化和擴容操做, 小於0時表明正在擴容,而且 -n表示
* 有n – 1個線程在擴容,正數表明table容量
*/
private transient volatile int sizeCtl;
static final int MOVED     = -1; // hash for forwarding nodes,forwarding nodes 是擴容時用到的node
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient 

構造方法

// 帶容量的初始化,只是初始化了容量,並無創建桶數組
public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
}

這裏只列出了帶容量的構造方法,還有其餘的沒列出了。oracle

put()方法

public V put(K key, V value) {
        return putVal(key, value, false);
}
// onlyIfAbsent 爲 true的話表示若當前key-value不存在,進行插入;若存在,不對當前存在key-value進行更新
final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key 和value不能爲null
        if (key == null || value == null) throw new NullPointerException();
            //計算key的哈希值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                    // 初始化表數組(或者叫桶數組,默認容量爲16),和hashMap同樣,table是延遲加載的,initTable()經過CAS機制實現同步,稍後會講(看到這能夠先到後面看看initTable())
                tab = initTable();
                    
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                    
              // 經過CAS機制進行併發put,
              /*
               *param1 :桶數組
               *param2 :節點(node)位置偏移
               *param3 :節點當前預期內存值
               *param4 :要在當前節點內更新的值
              */
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
                // 常量 MOVED = -1,表示正在擴容
            else if ((fh = f.hash) == MOVED)
       //內置鎖是加在每一個桶上的,擴容其實是對每一個桶上的元素從新分配桶,擴容能夠在不一樣的桶上多線程併發執行
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
        // 對桶 f 進行同步put操做
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                            // f < 0 
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                       // 賦值爲2的意義只是爲了避免等於0?不太清楚,addCount(1L, binCount)看到若是在桶數組不爲空時,binCount <= 1會直接返回,是這個緣由?
                            binCount = 2;
               // 紅黑樹節點,加入紅黑樹中
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                        // TREEIFY_THRESHOLD=8,樹化閾值
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 數量+1,並進行是否擴容判斷
        addCount(1L, binCount);
        return null;
    }
View Code

initTable() 方法

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0) // 有線程在初始化,當前線程讓步
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
              //若CAS成功 把當前對象SIZECTL偏移位置修改成-1,即sizeConrol = -1
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
            // 用減法來替代 *0.75,計算機中乘法的操做比減法耗時
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
View Code

addCount()方法

private final void addCount(long x, int check) {
     // counterCells 在 currentMap 初始化和 put 過程都沒有進行初始化,本人暫時也不知道這是用來幹嗎的,doc註釋爲Table of counter cells. When non-null, size is a power of 2.
        CounterCell[] as; long b, s;
        // counterCells爲null,不進入if語句
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
    // 嘗試擴容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
            /* resizeStamp(n){ // RESIZE_STAMP_BITS = 16
             *return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
             *}
             */
                int rs = resizeStamp(n);
                if (sc < 0) {
                // RESIZE_STAMP_SHIFT = 16 ,下面的if語句判斷條件是jdk的bug,在網上查資料一大佬提到 oracle bug庫連接:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
                //下列的解釋參考自:https://www.jianshu.com/p/749d1b8db066
                // 若是 sc 的低 16 位不等於 標識符(校驗異常 sizeCtl 變化了)
                // 若是 sc == 標識符 + 1 (擴容結束了,再也不有線程進行擴容)(默認第一個線程設置 sc ==rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等於 rs + 1)
                // 若是 sc == 標識符 + 65535(幫助線程數已經達到最大)
                // 若是 nextTable == null(結束擴容了)
                // 若是 transferIndex <= 0 (轉移狀態變化了)
                // 結束循環
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt); // 線程加入協同擴容
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null); //擴容
                s = sumCount();
            }
        }
    }
View Code

transfer()方法

  暫時沒有認真研讀,瞭解大概過程,這裏先給參考連接,下次再仔細研讀一下:dom

  https://www.jianshu.com/p/aaf769fdbd20ide

總結

  相比 hashMap ,concurrentHashMap複雜不少,在處理併發安全的問題上,ConcurrenthahMap用到了 CAS + syschroynized,CAS這就要求了要了解java的內存模型,計算機的底層,因此在這些上面花了一部分時間,對 synchronized如今我也仍是隻知其一;不知其二,要去啃啃源碼,下次博客就要記錄synchronized 和 lock、紅黑樹、線程池原理等等許多java知識,任重而道遠。。。學習

 

參考連接

https://www.jianshu.com/p/c0642afe03e0

https://www.jianshu.com/p/aaf769fdbd20

相關文章
相關標籤/搜索