輕鬆理解 Java HashMap 和 ConcurrentHashMap

前言java

Map 這樣的 Key Value 在軟件開發中是很是經典的結構,經常使用於在內存中存放數據。node

本篇主要想討論 ConcurrentHashMap 這樣一個併發容器,在正式開始以前我以爲有必要談談 HashMap,沒有它就不會有後面的 ConcurrentHashMap。面試

HashMap數組

衆所周知 HashMap 底層是基於 數組 + 鏈表 組成的,不過在 jdk1.7 和 1.8 中具體實現稍有不一樣。安全

Base 1.7

1.7 中的數據結構圖:數據結構

先來看看 1.7 中的實現。併發

這是 HashMap 中比較核心的幾個成員變量;看看分別是什麼意思?app

  1. 初始化桶大小,由於底層是數組,因此這是數組默認的大小。dom

  2. 桶最大值。ide

  3. 默認的負載因子(0.75)

  4. table 真正存放數據的數組。

  5. Map 存放數量的大小。

  6. 桶大小,可在初始化時顯式指定。

  7. 負載因子,可在初始化時顯式指定。

 

重點解釋下負載因子:

因爲給定的 HashMap 的容量大小是固定的,好比默認初始化:

public HashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
}
public HashMap(int initialCapacity, float loadFactor) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException("Illegal initial capacity: " +
                                           initialCapacity);
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    if (loadFactor <= 0 || Float.isNaN(loadFactor))
        throw new IllegalArgumentException("Illegal load factor: " +
                                           loadFactor);
    this.loadFactor = loadFactor;
    threshold = initialCapacity;
    init();
}

給定的默認容量爲 16,負載因子爲 0.75。Map 在使用過程當中不斷的往裏面存放數據,當數量達到了 16 * 0.75 = 12 就須要將當前 16 的容量進行擴容,而擴容這個過程涉及到 rehash、複製數據等操做,因此很是消耗性能。

所以一般建議能提早預估 HashMap 的大小最好,儘可能的減小擴容帶來的性能損耗。

根據代碼能夠看到其實真正存放數據的是

transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;

這個數組,那麼它又是如何定義的呢?

Entry 是 HashMap 中的一個內部類,從他的成員變量很容易看出:

  • key 就是寫入時的鍵。

  • value 天然就是值。

  • 開始的時候就提到 HashMap 是由數組和鏈表組成,因此這個 next 就是用於實現鏈表結構。

  • hash 存放的是當前 key 的 hashcode。

 

知曉了基本結構,那來看看其中重要的寫入、獲取函數:

put 方法

public V put(K key, V value) {
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key);
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    addEntry(hash, key, value, i);
    return null;
}
  • 判斷當前數組是否須要初始化。

  • 若是 key 爲空,則 put 一個空值進去。

  • 根據 key 計算出 hashcode。

  • 根據計算出的 hashcode 定位出所在桶。

  • 若是桶是一個鏈表則須要遍歷判斷裏面的hashcode、key 是否和傳入 key 相等,若是相等則進行覆蓋,並返回原來的值。

  • 若是桶是空的,說明當前位置沒有數據存入;新增一個 Entry 對象寫入當前位置。

void addEntry(int hash, K key, V value, int bucketIndex) {
    if ((size >= threshold) && (null != table[bucketIndex])) {
        resize(2 * table.length);
        hash = (null != key) ? hash(key) : 0;
        bucketIndex = indexFor(hash, table.length);
    }
    createEntry(hash, key, value, bucketIndex);
}
void createEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

當調用 addEntry 寫入 Entry 時須要判斷是否須要擴容。

若是須要就進行兩倍擴充,並將當前的 key 從新 hash 並定位。

而在 createEntry 中會將當前位置的桶傳入到新建的桶中,若是當前桶有值就會在位置造成鏈表。

get 方法

再來看看 get 函數:

public V get(Object key) {
    if (key == null)
        return getForNullKey();
    Entry<K,V> entry = getEntry(key);
    return null == entry ? null : entry.getValue();
}
final Entry<K,V> getEntry(Object key) {
    if (size == 0) {
        return null;
    }
    int hash = (key == null) ? 0 : hash(key);
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}
  • 首先也是根據 key 計算出 hashcode,而後定位到具體的桶中。

  • 判斷該位置是否爲鏈表。

  • 不是鏈表就根據 key、key 的 hashcode 是否相等來返回值。

  • 爲鏈表則須要遍歷直到 key 及 hashcode 相等時候就返回值。

  • 啥都沒取到就直接返回 null 。

 

Base 1.8

不知道 1.7 的實現你們看出須要優化的點沒有?

其實一個很明顯的地方就是:

當 Hash 衝突嚴重時,在桶上造成的鏈表會變的愈來愈長,這樣在查詢時的效率就會愈來愈低;時間複雜度爲 O(N)

所以 1.8 中重點優化了這個查詢效率。

1.8 HashMap 結構圖:

先來看看幾個核心的成員變量:

static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
/**
 * The maximum capacity, used if a higher value is implicitly specified
 * by either of the constructors with arguments.
 * MUST be a power of two <= 1<<30.
 */
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
 * The load factor used when none specified in constructor.
 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
transient Node<K,V>[] table;
/**
 * Holds cached entrySet(). Note that AbstractMap fields are used
 * for keySet() and values().
 */
transient Set<Map.Entry<K,V>> entrySet;
/**
 * The number of key-value mappings contained in this map.
 */
transient int size;

和 1.7 大致上都差很少,仍是有幾個重要的區別:

  • TREEIFY_THRESHOLD 用於判斷是否須要將鏈表轉換爲紅黑樹的閾值。

  • HashEntry 修改成 Node。

 

Node 的核心組成其實也是和 1.7 中的 HashEntry 同樣,存放的都是 key value hashcode next 等數據。

再來看看核心方法。

put 方法

看似要比 1.7 的複雜,咱們一步步拆解:

  1. 判斷當前桶是否爲空,空的就須要初始化(resize 中會判斷是否進行初始化)。

  2. 根據當前 key 的 hashcode 定位到具體的桶中並判斷是否爲空,爲空代表沒有 Hash 衝突就直接在當前位置建立一個新桶便可。

  3. 若是當前桶有值( Hash 衝突),那麼就要比較當前桶中的 key、key 的 hashcode 與寫入的 key 是否相等,相等就賦值給 e,在第 8 步的時候會統一進行賦值及返回。

  4. 若是當前桶爲紅黑樹,那就要按照紅黑樹的方式寫入數據。

  5. 若是是個鏈表,就須要將當前的 key、value 封裝成一個新節點寫入到當前桶的後面(造成鏈表)。

  6. 接着判斷當前鏈表的大小是否大於預設的閾值,大於時就要轉換爲紅黑樹。

  7. 若是在遍歷過程當中找到 key 相同時直接退出遍歷。

  8. 若是 e != null 就至關於存在相同的 key,那就須要將值覆蓋。

  9. 最後判斷是否須要進行擴容。

 

get 方法

public V get(Object key) {
    Node<K,V> e;
    return (e = getNode(hash(key), key)) == null ? null : e.value;
}
final Node<K,V> getNode(int hash, Object key) {
    Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (first = tab[(n - 1) & hash]) != null) {
        if (first.hash == hash && // always check first node
            ((k = first.key) == key || (key != null && key.equals(k))))
            return first;
        if ((e = first.next) != null) {
            if (first instanceof TreeNode)
                return ((TreeNode<K,V>)first).getTreeNode(hash, key);
            do {
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    return e;
            } while ((e = e.next) != null);
        }
    }
    return null;
}

get 方法看起來就要簡單許多了。

  • 首先將 key hash 以後取得所定位的桶。

  • 若是桶爲空則直接返回 null 。

  • 不然判斷桶的第一個位置(有多是鏈表、紅黑樹)的 key 是否爲查詢的 key,是就直接返回 value。

  • 若是第一個不匹配,則判斷它的下一個是紅黑樹仍是鏈表。

  • 紅黑樹就按照樹的查找方式返回值。

  • 否則就按照鏈表的方式遍歷匹配返回值。

 

從這兩個核心方法(get/put)能夠看出 1.8 中對大鏈表作了優化,修改成紅黑樹以後查詢效率直接提升到了 O(logn)

可是 HashMap 原有的問題也都存在,好比在併發場景下使用時容易出現死循環。

final HashMap<String, String> map = new HashMap<String, String>();
for (int i = 0; i < 1000; i++) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            map.put(UUID.randomUUID().toString(), "");
        }
    }).start();
}

可是爲何呢?簡單分析下。

看過上文的還記得在 HashMap 擴容的時候會調用 resize() 方法,就是這裏的併發操做容易在一個桶上造成環形鏈表;這樣當獲取一個不存在的 key 時,計算出的 index 正好是環形鏈表的下標就會出現死循環。

以下圖:

遍歷方式

還有一個值得注意的是 HashMap 的遍歷方式,一般有如下幾種:

Iterator<Map.Entry<String, Integer>> entryIterator = map.entrySet().iterator();
        while (entryIterator.hasNext()) {
            Map.Entry<String, Integer> next = entryIterator.next();
            System.out.println("key=" + next.getKey() + " value=" + next.getValue());
        }

Iterator<String> iterator = map.keySet().iterator();
        while (iterator.hasNext()){
            String key = iterator.next();
            System.out.println("key=" + key + " value=" + map.get(key));
        }

強烈建議使用第一種 EntrySet 進行遍歷。

第一種能夠把 key value 同時取出,第二種還得須要經過 key 取一次 value,效率較低。

簡單總結下 HashMap:不管是 1.7 仍是 1.8 其實都能看出 JDK 沒有對它作任何的同步操做,因此併發會出問題,甚至 1.7 中出現死循環致使系統不可用(1.8 已經修復死循環問題)。

所以 JDK 推出了專項專用的 ConcurrentHashMap ,該類位於 java.util.concurrent 包下,專門用於解決併發問題。

堅持看到這裏的朋友算是已經把 ConcurrentHashMap 的基礎已經打牢了,下面正式開始分析。

ConcurrentHashMap

ConcurrentHashMap 一樣也分爲 1.7 、1.8 版,二者在實現上略有不一樣。

Base 1.7

先來看看 1.7 的實現,下面是他的結構圖:

如圖所示,是由 Segment 數組、HashEntry 組成,和 HashMap 同樣,仍然是數組加鏈表。

它的核心成員變量:

/**
 * Segment 數組,存放數據時首先須要定位到具體的 Segment 中。
 */
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;

Segment 是 ConcurrentHashMap 的一個內部類,主要的組成以下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
       private static final long serialVersionUID = 2249069246763182397L;

       // 和 HashMap 中的 HashEntry 做用同樣,真正存放數據的桶
       transient volatile HashEntry<K,V>[] table;
       transient int count;
       transient int modCount;
       transient int threshold;
       final float loadFactor;

}

看看其中 HashEntry 的組成:

和 HashMap 很是相似,惟一的區別就是其中的核心數據如 value ,以及鏈表都是 Volatile 修飾的,保證了獲取時的可見性。

原理上來講:ConcurrentHashMap 採用了分段鎖技術,其中 Segment 繼承於 ReentrantLock。不會像 HashTable 那樣不論是 put 仍是 get 操做都須要作同步處理,理論上 ConcurrentHashMap 支持 CurrencyLevel (Segment 數組數量)的線程併發。每當一個線程佔用鎖訪問一個 Segment 時,不會影響到其餘的 Segment。

下面也來看看核心的 put get 方法。

put 方法

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

首先是經過 key 定位到 Segment,以後在對應的 Segment 中進行具體的 put。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

雖然 HashEntry 中的 value 是用 volatile 關鍵詞修飾的,可是並不能保證併發的原子性,因此 put 操做時仍然須要加鎖處理。

首先第一步的時候會嘗試獲取鎖,若是獲取失敗確定就有其餘線程存在競爭,則利用 scanAndLockForPut() 自旋獲取鎖。

  1. 嘗試自旋獲取鎖。

  2. 若是重試的次數達到了 MAX_SCAN_RETRIES 則改成阻塞鎖獲取,保證能獲取成功。

再結合圖看看 put 的流程。

  1. 將當前 Segment 中的 table 經過 key 的 hashcode 定位到 HashEntry。

  2. 遍歷該 HashEntry,若是不爲空則判斷傳入的 key 和當前遍歷的 key 是否相等,相等則覆蓋舊的value。

  3. 不爲空則須要新建一個 HashEntry 並加入到 Segment 中,同時會先判斷是否須要擴容。

  4. 最後會解除在 1 中所獲取當前 Segment 的鎖。

 

get 方法

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

get 邏輯比較簡單:

只須要將 Key 經過 Hash 以後定位到具體的 Segment ,再經過一次 Hash 定位到具體的元素上。

因爲 HashEntry 中的 value 屬性是用 volatile 關鍵詞修飾的,保證了內存可見性,因此每次獲取時都是最新值。

ConcurrentHashMap 的 get 方法是很是高效的,由於整個過程都不須要加鎖

Base 1.8

1.7 已經解決了併發問題,而且能支持 N 個 Segment 這麼屢次數的併發,但依然存在 HashMap 在 1.7 版本中的問題。

那就是查詢遍歷鏈表效率過低。

所以 1.8 作了一些數據結構上的調整。

首先來看下底層的組成結構:

看起來是否是和 1.8 HashMap 結構相似?

其中拋棄了原有的 Segment 分段鎖,而採用了 CAS + synchronized 來保證併發安全性。

也將 1.7 中存放數據的 HashEntry 改成 Node,但做用都是相同的。

其中的 val next 都用了 volatile 修飾,保證了可見性。

put 方法

重點來看看 put 函數:

  • 根據 key 計算出 hashcode 。

  • 判斷是否須要進行初始化。

  • f 即爲當前 key 定位出的 Node,若是爲空表示當前位置能夠寫入數據,利用 CAS 嘗試寫入,失敗則自旋保證成功。

  • 若是當前位置的 hashcode == MOVED == -1,則須要進行擴容。

  • 若是都不知足,則利用 synchronized 鎖寫入數據。

  • 若是數量大於 TREEIFY_THRESHOLD 則要轉換爲紅黑樹。

 

get 方法

  • 根據計算出來的 hashcode 尋址,若是就在桶上那麼直接返回值。

  • 若是是紅黑樹那就按照樹的方式獲取值。

  • 就不知足那就按照鏈表的方式遍歷獲取值。

1.8 在 1.7 的數據結構上作了大的改動,採用紅黑樹以後能夠保證查詢效率(O(logn)),甚至取消了 ReentrantLock 改成了 synchronized,這樣能夠看出在新版的 JDK 中對 synchronized 優化是很到位的。

總結

看完了整個 HashMap 和 ConcurrentHashMap 在 1.7 和 1.8 中不一樣的實現方式相信你們對他們的理解應該會更加到位。

其實這塊也是面試的重點內容,一般的套路是:

  1. 談談你理解的 HashMap,講講其中的 get put 過程。

  2. 1.8 作了什麼優化?

  3. 是線程安全的嘛?

  4. 不安全會致使哪些問題?

  5. 如何解決?有沒有線程安全的併發容器?

  6. ConcurrentHashMap 是如何實現的? 1.七、1.8 實現有何不一樣?爲何這麼作?

 

這一串問題相信你們仔細看完都能懟回面試官。

除了面試會問到以外平時的應用其實也蠻多,像以前談到的 Guava 中 Cache 的實現就是利用 ConcurrentHashMap 的思想。

同時也能學習 JDK 做者大牛們的優化思路以及併發解決方案。

相關文章
相關標籤/搜索