ConcurrentHashMap:源碼分析到面試題

在多線程狀況下,咱們的HashMap在JDK1.8以前最大的問題就是會形成環鏈,在JDK1.8開始以後雖然解決了環鏈,可是仍是會由於併發的狀況下,致使數據覆蓋而丟失。雖然咱們有HashTable和Collections下的同步器能夠解決這個問題,可是這兩種方案都不能算是一個優秀的解決方案,因此就有了咱們要介紹的ConcurrentHashMap。本文主要是針對JDK1.8的源碼進行分析,可是在介紹以前也會簡單提一下,1.8以前是如何設計的!html

在瞭解ConcurrentHashMap不妨先了解一下HashMapjava

1 JDK1.7

咱們在JDK1.8以前採用的是SegmentHashEntry的方式實現的。結構以下:node

咱們是採用分段鎖來實現併發的更新。Segment是繼承自咱們的ReentrantLock來充當鎖的角色,每個Segment都對應一個鎖。從圖中咱們也能夠看到,咱們的每個Segment對象都對應了哈希表的若干個哈希桶,至關於一小段哈希表!面試

這樣咱們在實現併發更新的時候,就不會鎖住這個哈希表,而是鎖住Segment對應的那一個對象那一部分,就會提升了咱們的性能和效率。具體的源碼這裏就不分析了,由於咱們主要是介紹1.8的ConcurrentHashMap。數組

2 JDK1.8

咱們的ConcurrentHashMap在1.8以後就放棄了分段鎖的解決方案,而是採用了CAS+Synchronized來保證併發更新的安全。底層和咱們的HashMap同樣,採用的是數組+鏈表+紅黑樹的存儲結構!安全

咱們在上面說到了1.8是採用CAS+Synchronized來保證併發安全,因此在若是對CAS還不瞭解的話,能夠先看個人關於CAS的博客。(點擊跳轉數據結構

好了接下來咱們就開始對源碼進行分析了。多線程

2.1 基本屬性

ConcurrentHashMap不少基本屬性都和咱們的HashMap同樣,因此這裏我只介紹幾個不同的,並且後面咱們分析源碼會用到的。併發

//咱們的哈希表,但是使用迭代器來進行迭代
transient volatile Node<K,V>[] table;
//默認爲null,擴容的時候新生成的數組,其大小爲原數組的兩倍。
private transient volatile Node<K,V>[] nextTable;
//基礎計數器,經過CAS來進行更新
private transient volatile long baseCount;
/*
*默認爲0,用來控制table的初始化和擴容操做的
*當爲負數時,它正在進行初始化或者在擴容:
*-1,表示正在進行初始化;-N表示N-1個線程在進行擴容
*當爲正數的時候:
*若是table未初始化,表示須要初始化的大小;
*若是table初始化完成,表示table的容量,默認是table的0.75倍,
*/
private transient volatile int sizeCtl;

還有就是對比咱們的HashMap,咱們的Node也進行了重寫,將咱們的值和下一個結點都用了Volatile來修飾,線程修改後馬上刷回主存,增長了內存的可見性。源碼分析

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
...部分代碼省略...
}

2.2 構造方法

ConcurrentHashMap有五個構造方法,其中四個與HashMap相似,因此咱們主要介紹這個多了一個參數的構造方法

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

咱們第一個參數是容量大小,能夠指定;第二個參數是咱們的負載裝載因子;第三個是指定咱們的更新的併發線程數量;而後進行一些邊界處理和賦值處理。最後就將咱們的要擴容的大小賦值給了sizeCtl(上面介紹了,咱們下次要擴容的大小),注意這裏咱們並無進行初始化table,而是在第一次put的時候纔會進行初始化,下面會講到。

咱們一樣會在上面的構造方法裏面看到一個方法tableSizeFor,咱們點進去看,原來和咱們的HashMap的那個設計容量爲2的整數次冪方法同樣,至於爲何要設置成2的整數次冪,我在HashMap方法裏面也提到了。

private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

2.3 put方法

咱們知道在進行第一次put的時候會進行擴容,那麼若是有多個線程同時進來,咱們是如何保證只有一個線程成功的進行了擴容呢?咱們在第一次put的時候putVal方法裏面有這麼一行代碼

if (tab == null || (n = tab.length) == 0)
      tab = initTable();

咱們調用了initTable方法,在下面註釋上給出解析

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //若是sizectl(sc)小於0,說明已經有線程進行在初始化了,咱們的其餘進來的線程做罷
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //使用cas操做,將咱們的sc更新爲-1,表明在進行初始化了
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //右移兩位再操做,至關於0.75*n,設置了一個擴容的閾值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

回來咱們繼續看一下咱們的完整的putVal方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
     //key的散列,獲取哈希值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //若是表沒有進行初始化,則進行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //若是能夠直接將咱們的哈希值插入數組,則直接存進去,不用加鎖
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //若是插入的點是咱們的table的鏈接點,說明在擴容,咱們就幫助當前線程擴容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //而後在進行具體的增長操做的時候,加鎖
                synchronized (f) {
                    //肯定f在tab中是鏈表的頭結點
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //若是哈希桶是樹,按照樹的方式插入新值
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //若是節點大於等於8,進行變換紅黑樹
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        //調用生成樹的方法
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
     //能執行到這一步,說明節點不是被替換的,是被插入的,因此要將map的元素數量加1
        addCount(1L, binCount);
        return null;
    }

當table容量不足的時候,即table的元素數量達到容量閾值sizeCtl,須要對table進行擴容。
整個擴容分爲兩部分:

  1. 構建一個nextTable,大小爲table的兩倍。
  2. 把table的數據複製到nextTable中。

這兩個過程在單線程下實現比較簡單,可是在多線程下比較複雜。咱們的ConcurrentHashMap是支持併發插入的,這裏用圖文簡單分析一下:

多線程遍歷節點,處理了一個節點,就把對應點的值set爲forward,另外一個線程看到forward,就向後遍歷。這樣交叉就完成了複製工做。

(這裏具體的addCount方法和transfer方法暫時看的不是大懂,後面會補上!)

2.4 get方法

get方法比較簡單,就是若是是在桶第一個就返回;若是是樹的結構調用樹的方法去遍歷查找;若是是鏈表就遍歷下去查找;若是都沒找到就返回null;

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get方法這麼簡單貼上來只是爲了說明,咱們的get方法是沒有加鎖的,無阻塞的。之因此可以正確的讀取值是由於咱們在上面也說到了,重寫了node,裏面的變量都用了volatile關鍵字來進行修飾。並且經過代碼能夠得出ConcurrentHashMap的key和Value都不能爲null。

3 面試題分析

一樣的,再進行了稍微的源碼分析,咱們試着來解決一些面試題。

一、ConcurrentHashMap使用什麼技術來保證線程安全?

咱們在上面分析過了,1.7的時候採用的Segment分段鎖來實現,1.8採用的是CAS+Synchronized來實現的。具體實現細節,balabala簡單描述一下。

二、ConcurrentHashMap的get方法是否要加鎖,爲何?

不用,咱們說過了,get方法是無阻塞不加鎖的。由於咱們重寫了node類,裏面的變量都用了volatile關鍵字來進行修飾,能夠保證最新值的獲取!

三、ConcurrentHashMap1.7和1.8的區別?

數據結構

  • 1.7:SegmentHashEntry
  • 1.8:數組+鏈表+紅黑樹

併發安全實現

  • 1.7:分段式鎖(鎖的對象是一個Segment)
  • 1.8:CAS+Synchronized(下降了鎖的粒度,對象是一個Node)

其餘的面試題,無非與HashMap大徑類似,能夠看看個人HashMap分析,裏面也有面試題詳解。(點擊跳轉)

4 總結

關於源碼其實還有不少都沒有分析,由於這比HashMap要複雜也難。因此挑一些高頻考點來進行分析。感謝下面的參考資料!

5 參考資料

https://www.jianshu.com/p/e694f1e868ec

公衆號《Java3y》多線程系列文章

http://www.javashuo.com/article/p-exjabbki-ed.html

相關文章
相關標籤/搜索