Map你們族的那點事兒

Map


Map是一種用於快速查找的數據結構,它以鍵值對的形式存儲數據,每個鍵都是惟一的,且對應着一個值,若是想要查找Map中的數據,只須要傳入一個鍵,Map會對鍵進行匹配並返回鍵所對應的值,能夠說Map其實就是一個存放鍵值對的集合。Map被各類編程語言普遍使用,只不過在名稱上可能會有些混淆,像Python中叫作字典(Dictionary),也有些語言稱其爲關聯數組(Associative Array),但其實它們都是同樣的,都是一個存放鍵值對的集合。至於Java中常常用到的HashMap也是Map的一種,它被稱爲散列表,關於散列表的細節我會在本文中解釋HashMap的源碼時說起。html

Java還提供了一種與Map密切相關的數據結構:Set,它是數學意義上的集合,特性以下:java

  • 無序性:一個集合中,每一個元素的地位都是相同的,元素之間也都是無序的。不過Java中也提供了有序的Set,這點卻是沒有徹底遵循。node

  • 互異性:一個集合中,任何兩個元素都是不相同的。git

  • 肯定性:給定一個集合以及其任一元素,該元素屬於或者不屬於該集合是必須能夠肯定的。程序員

很明顯,Map中的key就很符合這些特性,Set的實現其實就是在內部使用Map。例如,HashSet就定義了一個類型爲HashMap的成員變量,向HashSet添加元素a,等同於向它內部的HashMap添加了一個key爲a,value爲一個Object對象的鍵值對,這個Object對象是HashSet的一個常量,它是一個虛擬值,沒有什麼實際含義,源碼以下:github

private transient HashMap<E,Object> map;

    // Dummy value to associate with an Object in the backing Map
    private static final Object PRESENT = new Object();

    public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }
複製代碼

小插曲事後,讓咱們接着說Map,它是JDK的一個頂級接口,提供了三種集合視圖(Collection Views):包含全部key的集合、包含全部value的集合以及包含全部鍵值對的集合,Map中的元素順序與它所返回的集合視圖中的元素的迭代順序相關,也就是說,Map自己是不保證有序性的,固然也有例外,好比TreeMap就對有序性作出了保證,這主要由於它是基於紅黑樹實現的。算法

所謂的集合視圖就是由集合自己提供的一種訪問數據的方式,同時對視圖的任何修改也會影響到集合。比如Map.keySet()返回了它包含的key的集合,若是你調用了Map.remove(key)那麼keySet.contains(key)也將返回false,再好比說Arrays.asList(T)能夠把一個數組封裝成一個List,這樣你就能夠經過List的API來訪問和操做這些數據,以下列示例代碼:spring

String[] strings = {"a", "b", "c"};
List<String> list = Arrays.asList(strings);
System.out.println(list.get(0)); // "a"
strings[0] = "d";
System.out.println(list.get(0)); // "d"
list.set(0, "e");
System.out.println(strings[0]); // "e"
複製代碼

是否是感受很神奇,其實Arrays.asList()只是將傳入的數組與Arrays中的一個內部類ArrayList(注意,它與java.util包下的ArrayList不是同一個)作了一個」綁定「,在調用get()時會直接根據下標返回數組中的元素,而調用set()時也會直接修改數組中對應下標的元素。相對於直接複製來講,集合視圖的優勢是內存利用率更高,假設你有一個數組,又很想使用List的API來操做它,那麼你不用new一個ArrayList以拷貝數組中的元素,只須要一點額外的內存(經過Arrays.ArrayList對數組進行封裝),原始數據依然是在數組中的,並不會複製成多份。編程

Map接口規範了Map數據結構的通用API(也含有幾個用於簡化操做的default方法,default是JDK8的新特性,它是接口中聲明的方法的默認實現,即非抽象方法)而且還在內部定義了Entry接口(鍵值對的實體類),在JDK中提供的全部Map數據結構都實現了Map接口,下面爲Map接口的源碼(代碼中的註釋太長了,基本都是些實現的規範,爲了篇幅我就儘可能省略了)。api

package java.util;

import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.io.Serializable;

public interface Map<K,V> {
	
	// 查詢操做

    /** * 返回這個Map中所包含的鍵值對的數量,若是大於Integer.MAX_VALUE, * 則應該返回Integer.MAX_VALUE。 */
    int size();

    /** * Map是否爲空。 */
    boolean isEmpty();

    /** * Map中是否包含key,若是是返回true,不然false。 */
    boolean containsKey(Object key);

    /** * Map中是否包含value,若是是返回true,不然false。 */
    boolean containsValue(Object value);

    /** * 根據key查找value,若是Map不包含該key,則返回null。 */
    V get(Object key);

    // 修改操做

    /** * 添加一對鍵值對,若是Map中已含有這個key,那麼新value將覆蓋掉舊value, * 並返回舊value,若是Map中以前沒有這個key,那麼返回null。 */
    V put(K key, V value);

    /** * 刪除指定key並返回以前的value,若是Map中沒有該key,則返回null。 */
    V remove(Object key);


    // 批量操做

    /** * 將指定Map中的全部鍵值對批量添加到當前Map。 */
    void putAll(Map<? extends K, ? extends V> m);

    /** * 刪除Map中全部的鍵值對。 */
    void clear();


    // 集合視圖

    /** * 返回包含Map中全部key的Set,對該視圖的全部修改操做會對Map產生一樣的影響,反之亦然。 */
    Set<K> keySet();

    /** * 返回包含Map中全部value的集合,對該視圖的全部修改操做會對Map產生一樣的影響,反之亦然。 */
    Collection<V> values();

    /** * 返回包含Map中全部鍵值對的Set,對該視圖的全部修改操做會對Map產生一樣的影響,反之亦然。 */
    Set<Map.Entry<K, V>> entrySet();

    /** * Entry表明一對鍵值對,規範了一些基本函數以及幾個已實現的類函數(各類比較器)。 */
    interface Entry<K,V> {
       
        K getKey();

        V getValue();

        V setValue(V value);

        boolean equals(Object o);

        int hashCode();

        public static <K extends Comparable<? super K>, V> Comparator<Map.Entry<K,V>> comparingByKey() {
            return (Comparator<Map.Entry<K, V>> & Serializable)
                (c1, c2) -> c1.getKey().compareTo(c2.getKey());
        }

        public static <K, V extends Comparable<? super V>> Comparator<Map.Entry<K,V>> comparingByValue() {
            return (Comparator<Map.Entry<K, V>> & Serializable)
                (c1, c2) -> c1.getValue().compareTo(c2.getValue());
        }

        public static <K, V> Comparator<Map.Entry<K, V>> comparingByKey(Comparator<? super K> cmp) {
            Objects.requireNonNull(cmp);
            return (Comparator<Map.Entry<K, V>> & Serializable)
                (c1, c2) -> cmp.compare(c1.getKey(), c2.getKey());
        }

        public static <K, V> Comparator<Map.Entry<K, V>> comparingByValue(Comparator<? super V> cmp) {
            Objects.requireNonNull(cmp);
            return (Comparator<Map.Entry<K, V>> & Serializable)
                (c1, c2) -> cmp.compare(c1.getValue(), c2.getValue());
        }
    }

    // 比較和hashing

    /** * 將指定的對象與此Map進行比較是否相等。 */
    boolean equals(Object o);

    /** * 返回此Map的hash code。 */
    int hashCode();

    // 默認方法(非抽象方法)

    /** * 根據key查找value,若是該key不存在或等於null則返回defaultValue。 */
    default V getOrDefault(Object key, V defaultValue) {
        V v;
        return (((v = get(key)) != null) || containsKey(key)) ? v : defaultValue;
    }

    /** * 遍歷Map並對每一個鍵值對執行指定的操做(action)。 * BiConsumer是一個函數接口(具備一個抽象方法的接口,用於支持Lambda), * 它表明了一個接受兩個輸入參數的操做,且不返回任何結果。 * 至於它奇怪的名字,根據Java中的其餘函數接口的命名規範,Bi應該是Binary的縮寫,意思是二元的。 */
    default void forEach(BiConsumer<? super K, ? super V> action) {
        Objects.requireNonNull(action);
        for (Map.Entry<K, V> entry : entrySet()) {
            K k;
            V v;
            try {
                k = entry.getKey();
                v = entry.getValue();
            } catch(IllegalStateException ise) {
                // this usually means the entry is no longer in the map.
                throw new ConcurrentModificationException(ise);
            }
            action.accept(k, v);
        }
    }

    /** * 遍歷Map,而後調用傳入的函數function生成新value對舊value進行替換。 * BiFunction一樣是一個函數接口,它接受兩個輸入參數而且返回一個結果。 */
    default void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
        Objects.requireNonNull(function);
        for (Map.Entry<K, V> entry : entrySet()) {
            K k;
            V v;
            try {
                k = entry.getKey();
                v = entry.getValue();
            } catch(IllegalStateException ise) {
                // this usually means the entry is no longer in the map.
                throw new ConcurrentModificationException(ise);
            }

            // ise thrown from function is not a cme.
            v = function.apply(k, v);

            try {
                entry.setValue(v);
            } catch(IllegalStateException ise) {
                // this usually means the entry is no longer in the map.
                throw new ConcurrentModificationException(ise);
            }
        }
    }

    /** * 若是指定的key不存在或者關聯的value爲null,則添加鍵值對。 */
    default V putIfAbsent(K key, V value) {
        V v = get(key);
        if (v == null) {
            v = put(key, value);
        }

        return v;
    }

    /** * 當指定key關聯的value與傳入的參數value相等時刪除該key。 */
    default boolean remove(Object key, Object value) {
        Object curValue = get(key);
        if (!Objects.equals(curValue, value) ||
            (curValue == null && !containsKey(key))) {
            return false;
        }
        remove(key);
        return true;
    }

    /** * 當指定key關聯的value與oldValue相等時,使用newValue進行替換。 */
    default boolean replace(K key, V oldValue, V newValue) {
        Object curValue = get(key);
        if (!Objects.equals(curValue, oldValue) ||
            (curValue == null && !containsKey(key))) {
            return false;
        }
        put(key, newValue);
        return true;
    }

    /** * 當指定key關聯到某個value時進行替換。 */
    default V replace(K key, V value) {
        V curValue;
        if (((curValue = get(key)) != null) || containsKey(key)) {
            curValue = put(key, value);
        }
        return curValue;
    }

    /** * 當指定key沒有關聯到一個value或者value爲null時,調用mappingFunction生成值並添加鍵值對到Map。 * Function是一個函數接口,它接受一個輸入參數並返回一個結果,若是mappingFunction返回的結果 * 也爲null,那麼將不會調用put。 */
    default V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
        Objects.requireNonNull(mappingFunction);
        V v;
        if ((v = get(key)) == null) {
            V newValue;
            if ((newValue = mappingFunction.apply(key)) != null) {
                put(key, newValue);
                return newValue;
            }
        }

        return v;
    }

    /** * 當指定key關聯到一個value而且不爲null時,調用remappingFunction生成newValue, * 若是newValue不爲null,那麼進行替換,不然刪除該key。 */
    default V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
        Objects.requireNonNull(remappingFunction);
        V oldValue;
        if ((oldValue = get(key)) != null) {
            V newValue = remappingFunction.apply(key, oldValue);
            if (newValue != null) {
                put(key, newValue);
                return newValue;
            } else {
                remove(key);
                return null;
            }
        } else {
            return null;
        }
    }

    /** * remappingFunction根據key與其相關聯的value生成newValue, * 當newValue等於null時刪除該key,不然添加或者替換舊的映射。 */
    default V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
        Objects.requireNonNull(remappingFunction);
        V oldValue = get(key);

        V newValue = remappingFunction.apply(key, oldValue);
        if (newValue == null) {
            // delete mapping
            if (oldValue != null || containsKey(key)) {
                // something to remove
                remove(key);
                return null;
            } else {
                // nothing to do. Leave things as they were.
                return null;
            }
        } else {
            // add or replace old mapping
            put(key, newValue);
            return newValue;
        }
    }

    /** * 當指定key沒有關聯到一個value或者value爲null,將它與傳入的參數value * 進行關聯。不然,調用remappingFunction生成newValue並進行替換。 * 若是,newValue等於null,那麼刪除該key。 */
    default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
        Objects.requireNonNull(remappingFunction);
        Objects.requireNonNull(value);
        V oldValue = get(key);
        V newValue = (oldValue == null) ? value :
                   remappingFunction.apply(oldValue, value);
        if(newValue == null) {
            remove(key);
        } else {
            put(key, newValue);
        }
        return newValue;
    }
}
複製代碼

須要注意一點,這些default方法都是非線程安全的,任何保證線程安全的擴展類都必須重寫這些方法,例如ConcurrentHashMap。

下圖爲Map的繼承關係結構圖,它也是本文接下來將要分析的Map實現類的大綱,這些實現類都是比較經常使用的,在JDK中Map的實現類有幾十個,大部分都是咱們用不到的,限於篇幅緣由就不一一講解了(本文包含許多源碼與對實現細節的分析,建議讀者抽出一段連續的空閒時間靜下心來慢慢閱讀)。

本文做者爲SylvanasSun(sylvanas.sun@gmail.com),首發於SylvanasSun’s Blog。 原文連接:https://sylvanassun.github.io/2018/03/16/2018-03-16-map_family/ (轉載請務必保留本段聲明,而且保留超連接。)

AbstractMap


AbstractMap是一個抽象類,它是Map接口的一個骨架實現,最小化實現了此接口提供的抽象函數。在Java的Collection框架中基本都遵循了這一規定,骨架實如今接口與實現類之間構建了一層抽象,其目的是爲了複用一些比較通用的函數以及方便擴展,例如List接口擁有骨架實現AbstractList、Set接口擁有骨架實現AbstractSet等。

下面咱們按照不一樣的操做類型來看看AbstractMap都實現了什麼,首先是查詢操做:

package java.util;
import java.util.Map.Entry;

public abstract class AbstractMap<K,V> implements Map<K,V> {
    
    protected AbstractMap() {
    }

    // Query Operations

    public int size() {
        return entrySet().size();
    }

    // 鍵值對的集合視圖留給具體的實現類實現
    public abstract Set<Entry<K,V>> entrySet();

    public boolean isEmpty() {
        return size() == 0;
    }

    /** * 遍歷entrySet,而後逐個進行比較。 */
    public boolean containsValue(Object value) {
        Iterator<Entry<K,V>> i = entrySet().iterator();
        if (value==null) {
            while (i.hasNext()) {
                Entry<K,V> e = i.next();
                if (e.getValue()==null)
                    return true;
            }
        } else {
            while (i.hasNext()) {
                Entry<K,V> e = i.next();
                if (value.equals(e.getValue()))
                    return true;
            }
        }
        return false;
    }

    /** * 跟containsValue()同理,只不過比較的是key。 */
    public boolean containsKey(Object key) {
        Iterator<Map.Entry<K,V>> i = entrySet().iterator();
        if (key==null) {
            while (i.hasNext()) {
                Entry<K,V> e = i.next();
                if (e.getKey()==null)
                    return true;
            }
        } else {
            while (i.hasNext()) {
                Entry<K,V> e = i.next();
                if (key.equals(e.getKey()))
                    return true;
            }
        }
        return false;
    }

    /** * 遍歷entrySet,而後根據key取出關聯的value。 */
    public V get(Object key) {
        Iterator<Entry<K,V>> i = entrySet().iterator();
        if (key==null) {
            while (i.hasNext()) {
                Entry<K,V> e = i.next();
                if (e.getKey()==null)
                    return e.getValue();
            }
        } else {
            while (i.hasNext()) {
                Entry<K,V> e = i.next();
                if (key.equals(e.getKey()))
                    return e.getValue();
            }
        }
        return null;
    }
}
複製代碼

能夠發現這些操做都是依賴於函數entrySet()的,它返回了一個鍵值對的集合視圖,因爲不一樣的實現子類的Entry實現可能也是不一樣的,因此通常是在內部實現一個繼承於AbstractSet且泛型爲Map.Entry的內部類做爲EntrySet,接下來是修改操做與批量操做:

// Modification Operations

    /** * 沒有提供實現,子類必須重寫該方法,不然調用put()會拋出異常。 */
    public V put(K key, V value) {
        throw new UnsupportedOperationException();
    }

    /** * 遍歷entrySet,先找到目標的entry,而後刪除。 *(還記得以前說過的嗎,集合視圖中的操做也會影響到實際數據) */
    public V remove(Object key) {
        Iterator<Entry<K,V>> i = entrySet().iterator();
        Entry<K,V> correctEntry = null;
        if (key==null) {
            while (correctEntry==null && i.hasNext()) {
                Entry<K,V> e = i.next();
                if (e.getKey()==null)
                    correctEntry = e;
            }
        } else {
            while (correctEntry==null && i.hasNext()) {
                Entry<K,V> e = i.next();
                if (key.equals(e.getKey()))
                    correctEntry = e;
            }
        }

        V oldValue = null;
        if (correctEntry !=null) {
            oldValue = correctEntry.getValue();
            i.remove();
        }
        return oldValue;
    }


    // Bulk Operations

    /** * 遍歷參數m,而後將每個鍵值對put到該Map中。 */
    public void putAll(Map<? extends K, ? extends V> m) {
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            put(e.getKey(), e.getValue());
    }

    /** * 清空entrySet等價於清空該Map。 */
    public void clear() {
        entrySet().clear();
    }

複製代碼

AbstractMap並無實現put()函數,這樣作是爲了考慮到也許會有不可修改的Map實現子類繼承它,而對於一個可修改的Map實現子類則必須重寫put()函數。

AbstractMap沒有提供entrySet()的實現,可是卻提供了keySet()values()集合視圖的默認實現,它們都是依賴於entrySet()返回的集合視圖實現的,源碼以下:

/** * keySet和values是lazy的,它們只會在第一次請求視圖時進行初始化, * 並且它們是無狀態的,因此只須要一個實例(初始化一次)。 */
    transient Set<K>        keySet;
    transient Collection<V> values;

    /** * 返回一個AbstractSet的子類,能夠發現它的行爲都委託給了entrySet返回的集合視圖 * 與當前的AbstractMap實例,因此說它自身是無狀態的。 */
    public Set<K> keySet() {
        Set<K> ks = keySet;
        if (ks == null) {
            ks = new AbstractSet<K>() {
                public Iterator<K> iterator() {
                    return new Iterator<K>() {
                        private Iterator<Entry<K,V>> i = entrySet().iterator();

                        public boolean hasNext() {
                            return i.hasNext();
                        }

                        public K next() {
                            return i.next().getKey();
                        }

                        public void remove() {
                            i.remove();
                        }
                    };
                }

                public int size() {
                    return AbstractMap.this.size();
                }

                public boolean isEmpty() {
                    return AbstractMap.this.isEmpty();
                }

                public void clear() {
                    AbstractMap.this.clear();
                }

                public boolean contains(Object k) {
                    return AbstractMap.this.containsKey(k);
                }
            };
            keySet = ks;
        }
        return ks;
    }

    /** * 與keySet()基本一致,惟一的區別就是返回的是AbstractCollection的子類, * 主要是由於value不須要保持互異性。 */
    public Collection<V> values() {
        Collection<V> vals = values;
        if (vals == null) {
            vals = new AbstractCollection<V>() {
                public Iterator<V> iterator() {
                    return new Iterator<V>() {
                        private Iterator<Entry<K,V>> i = entrySet().iterator();

                        public boolean hasNext() {
                            return i.hasNext();
                        }

                        public V next() {
                            return i.next().getValue();
                        }

                        public void remove() {
                            i.remove();
                        }
                    };
                }

                public int size() {
                    return AbstractMap.this.size();
                }

                public boolean isEmpty() {
                    return AbstractMap.this.isEmpty();
                }

                public void clear() {
                    AbstractMap.this.clear();
                }

                public boolean contains(Object v) {
                    return AbstractMap.this.containsValue(v);
                }
            };
            values = vals;
        }
        return vals;
    }
複製代碼

它還提供了兩個Entry的實現類:SimpleEntry與SimpleImmutableEntry,這兩個類的實現很是簡單,區別也只是前者是可變的,然後者是不可變的。

private static boolean eq(Object o1, Object o2) {
        return o1 == null ? o2 == null : o1.equals(o2);
    }

    public static class SimpleEntry<K,V> implements Entry<K,V>, java.io.Serializable {
        private static final long serialVersionUID = -8499721149061103585L;

        private final K key;
        private V value;

        public SimpleEntry(K key, V value) {
            this.key   = key;
            this.value = value;
        }

        public SimpleEntry(Entry<? extends K, ? extends V> entry) {
            this.key   = entry.getKey();
            this.value = entry.getValue();
        }

        public K getKey() {
            return key;
        }

        public V getValue() {
            return value;
        }

        public V setValue(V value) {
            V oldValue = this.value;
            this.value = value;
            return oldValue;
        }

        public boolean equals(Object o) {
            if (!(o instanceof Map.Entry))
                return false;
            Map.Entry<?,?> e = (Map.Entry<?,?>)o;
            return eq(key, e.getKey()) && eq(value, e.getValue());
        }

        public int hashCode() {
            return (key   == null ? 0 :   key.hashCode()) ^
                   (value == null ? 0 : value.hashCode());
        }

        public String toString() {
            return key + "=" + value;
        }

    }

    /** * 它與SimpleEntry的區別在於它是不可變的,value被final修飾,而且不支持setValue()。 */
    public static class SimpleImmutableEntry<K,V> implements Entry<K,V>, java.io.Serializable {
        private static final long serialVersionUID = 7138329143949025153L;

        private final K key;
        private final V value;

        public SimpleImmutableEntry(K key, V value) {
            this.key   = key;
            this.value = value;
        }

        public SimpleImmutableEntry(Entry<? extends K, ? extends V> entry) {
            this.key   = entry.getKey();
            this.value = entry.getValue();
        }

        public K getKey() {
            return key;
        }

        public V getValue() {
            return value;
        }

        public V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public boolean equals(Object o) {
            if (!(o instanceof Map.Entry))
                return false;
            Map.Entry<?,?> e = (Map.Entry<?,?>)o;
            return eq(key, e.getKey()) && eq(value, e.getValue());
        }

        public int hashCode() {
            return (key   == null ? 0 :   key.hashCode()) ^
                   (value == null ? 0 : value.hashCode());
        }

        public String toString() {
            return key + "=" + value;
        }

    }
複製代碼

咱們經過閱讀上述的源碼不難發現,AbstractMap實現的操做都依賴於entrySet()所返回的集合視圖。剩下的函數就沒什麼好說的了,有興趣的話能夠本身去看看。

TreeMap


TreeMap是基於紅黑樹(一種自平衡的二叉查找樹)實現的一個保證有序性的Map,在繼承關係結構圖中能夠得知TreeMap實現了NavigableMap接口,而該接口又繼承了SortedMap接口,咱們先來看看這兩個接口定義了一些什麼功能。

SortedMap


首先是SortedMap接口,實現該接口的實現類應當按照天然排序保證key的有序性,所謂天然排序便是根據key的compareTo()函數(須要實現Comparable接口)或者在構造函數中傳入的Comparator實現類來進行排序,集合視圖遍歷元素的順序也應當與key的順序一致。SortedMap接口還定義瞭如下幾個有效利用有序性的函數:

package java.util;

public interface SortedMap<K,V> extends Map<K,V> {
    /** * 用於在此Map中對key進行排序的比較器,若是爲null,則使用key的compareTo()函數進行比較。 */
    Comparator<? super K> comparator();

    /** * 返回一個key的範圍爲從fromKey到toKey的局部視圖(包括fromKey,不包括toKey,包左不包右), * 若是fromKey和toKey是相等的,則返回一個空視圖。 * 返回的局部視圖一樣是此Map的集合視圖,因此對它的操做是會與Map互相影響的。 */
    SortedMap<K,V> subMap(K fromKey, K toKey);

    /** * 返回一個嚴格地小於toKey的局部視圖。 */
    SortedMap<K,V> headMap(K toKey);

    /** * 返回一個大於或等於fromKey的局部視圖。 */
    SortedMap<K,V> tailMap(K fromKey);

    /** * 返回當前Map中的第一個key(最小)。 */
    K firstKey();

    /** * 返回當前Map中的最後一個key(最大)。 */
    K lastKey();

    Set<K> keySet();

    Collection<V> values();

    Set<Map.Entry<K, V>> entrySet();
}
複製代碼

NavigableMap


而後是SortedMap的子接口NavigableMap,該接口擴展了一些用於導航(Navigation)的方法,像函數lowerEntry(key)會根據傳入的參數key返回一個小於key的最大的一對鍵值對,例如,咱們以下調用lowerEntry(6),那麼將返回key爲5的鍵值對,若是沒有key爲5,則會返回key爲4的鍵值對,以此類推,直到返回null(實在找不到的狀況下)。

public static void main(String[] args) {
        NavigableMap<Integer, Integer> map = new TreeMap<>();
        for (int i = 0; i < 10; i++)
            map.put(i, i);
        
        assert map.lowerEntry(6).getKey() == 5;
        assert map.lowerEntry(5).getKey() == 4;
        assert map.lowerEntry(0).getKey() == null;
    }
複製代碼

NavigableMap定義的都是一些相似於lowerEntry(key)的方法和以逆序、升序排序的集合視圖,這些方法利用有序性實現了相比SortedMap接口更加靈活的操做。

package java.util;

public interface NavigableMap<K,V> extends SortedMap<K,V> {
    /** * 返回一個小於指定key的最大的一對鍵值對,若是找不到則返回null。 */
    Map.Entry<K,V> lowerEntry(K key);

    /** * 返回一個小於指定key的最大的一個key,若是找不到則返回null。 */
    K lowerKey(K key);

    /** * 返回一個小於或等於指定key的最大的一對鍵值對,若是找不到則返回null。 */
    Map.Entry<K,V> floorEntry(K key);

    /** * 返回一個小於或等於指定key的最大的一個key,若是找不到則返回null。 */
    K floorKey(K key);

    /** * 返回一個大於或等於指定key的最小的一對鍵值對,若是找不到則返回null。 */
    Map.Entry<K,V> ceilingEntry(K key);

    /** * 返回一個大於或等於指定key的最小的一個key,若是找不到則返回null。 */
    K ceilingKey(K key);

    /** * 返回一個大於指定key的最小的一對鍵值對,若是找不到則返回null。 */
    Map.Entry<K,V> higherEntry(K key);

    /** * 返回一個大於指定key的最小的一個key,若是找不到則返回null。 */
    K higherKey(K key);

    /** * 返回該Map中最小的鍵值對,若是Map爲空則返回null。 */
    Map.Entry<K,V> firstEntry();

    /** * 返回該Map中最大的鍵值對,若是Map爲空則返回null。 */
    Map.Entry<K,V> lastEntry();

    /** * 返回並刪除該Map中最小的鍵值對,若是Map爲空則返回null。 */
    Map.Entry<K,V> pollFirstEntry();

    /** * 返回並刪除該Map中最大的鍵值對,若是Map爲空則返回null。 */
    Map.Entry<K,V> pollLastEntry();

    /** * 返回一個以當前Map降序(逆序)排序的集合視圖 */
    NavigableMap<K,V> descendingMap();

    /** * 返回一個包含當前Map中全部key的集合視圖,該視圖中的key以升序(正序)排序。 */
    NavigableSet<K> navigableKeySet();

    /** * 返回一個包含當前Map中全部key的集合視圖,該視圖中的key以降序(逆序)排序。 */
    NavigableSet<K> descendingKeySet();

    /** * 與SortedMap.subMap基本一致,區別在於多的兩個參數fromInclusive和toInclusive, * 它們表明是否包含from和to,若是fromKey與toKey相等,而且fromInclusive與toInclusive * 都爲true,那麼不會返回空集合。 */
    NavigableMap<K,V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive);

    /** * 返回一個小於或等於(inclusive爲true的狀況下)toKey的局部視圖。 */
    NavigableMap<K,V> headMap(K toKey, boolean inclusive);

    /** * 返回一個大於或等於(inclusive爲true的狀況下)fromKey的局部視圖。 */
    NavigableMap<K,V> tailMap(K fromKey, boolean inclusive);

    /** * 等價於subMap(fromKey, true, toKey, false)。 */
    SortedMap<K,V> subMap(K fromKey, K toKey);

    /** * 等價於headMap(toKey, false)。 */
    SortedMap<K,V> headMap(K toKey);

    /** * 等價於tailMap(fromKey, true)。 */
    SortedMap<K,V> tailMap(K fromKey);
}
複製代碼

NavigableMap接口相對於SortedMap接口來講靈活了許多,正由於TreeMap也實現了該接口,因此在須要數據有序並且想靈活地訪問它們的時候,使用TreeMap就很是合適了。

紅黑樹


上文咱們提到TreeMap的內部實現基於紅黑樹,而紅黑樹又是二叉查找樹的一種。二叉查找樹是一種有序的樹形結構,優點在於查找、插入的時間複雜度只有O(log n),特性以下:

  • 任意節點最多含有兩個子節點。

  • 任意節點的左、右節點均可以看作爲一棵二叉查找樹。

  • 若是任意節點的左子樹不爲空,那麼左子樹上的全部節點的值均小於它的根節點的值。

  • 若是任意節點的右子樹不爲空,那麼右子樹上的全部節點的值均大於它的根節點的值。

  • 任意節點的key都是不一樣的。

二叉查找樹

儘管二叉查找樹看起來很美好,但事與願違,二叉查找樹在極端狀況下會變得並非那麼有效率,假設咱們有一個有序的整數序列:1,2,3,4,5,6,7,8,9,10,...,若是把這個序列按順序所有插入到二叉查找樹時會發生什麼呢?二叉查找樹會產生傾斜,序列中的每個元素都大於它的根節點(前一個元素),左子樹永遠是空的,那麼這棵二叉查找樹就跟一個普通的鏈表沒什麼區別了,查找操做的時間複雜度只有O(n)

爲了解決這個問題須要引入自平衡的二叉查找樹,所謂自平衡,便是在樹結構將要傾斜的狀況下進行修正,這個修正操做被稱爲旋轉,經過旋轉操做可讓樹趨於平衡。

紅黑樹是平衡二叉查找樹的一種實現,它的名字來自於它的子節點是着色的,每一個子節點非黑即紅,因爲只有兩種顏色(兩種狀態),通常使用boolean來表示,下面爲TreeMap中實現的Entry,它表明紅黑樹中的一個節點:

// Red-black mechanics

    private static final boolean RED   = false;
    private static final boolean BLACK = true;

    /** * Node in the Tree. Doubles as a means to pass key-value pairs back to * user (see Map.Entry). */

    static final class Entry<K,V> implements Map.Entry<K,V> {
        K key;
        V value;
        Entry<K,V> left;
        Entry<K,V> right;
        Entry<K,V> parent;
        boolean color = BLACK;

        /** * Make a new cell with given key, value, and parent, and with * {@code null} child links, and BLACK color. */
        Entry(K key, V value, Entry<K,V> parent) {
            this.key = key;
            this.value = value;
            this.parent = parent;
        }

        /** * Returns the key. * * @return the key */
        public K getKey() {
            return key;
        }

        /** * Returns the value associated with the key. * * @return the value associated with the key */
        public V getValue() {
            return value;
        }

        /** * Replaces the value currently associated with the key with the given * value. * * @return the value associated with the key before this method was * called */
        public V setValue(V value) {
            V oldValue = this.value;
            this.value = value;
            return oldValue;
        }

        public boolean equals(Object o) {
            if (!(o instanceof Map.Entry))
                return false;
            Map.Entry<?,?> e = (Map.Entry<?,?>)o;

            return valEquals(key,e.getKey()) && valEquals(value,e.getValue());
        }

        public int hashCode() {
            int keyHash = (key==null ? 0 : key.hashCode());
            int valueHash = (value==null ? 0 : value.hashCode());
            return keyHash ^ valueHash;
        }

        public String toString() {
            return key + "=" + value;
        }
    }
複製代碼

任何平衡二叉查找樹的查找操做都是與二叉查找樹是同樣的,由於查找操做並不會影響樹的結構,也就不須要進行修正,代碼以下:

public V get(Object key) {
        Entry<K,V> p = getEntry(key);
        return (p==null ? null : p.value);
    }

    final Entry<K,V> getEntry(Object key) {
        // 使用Comparator進行比較
        if (comparator != null)
            return getEntryUsingComparator(key);
        if (key == null)
            throw new NullPointerException();
        @SuppressWarnings("unchecked")
            Comparable<? super K> k = (Comparable<? super K>) key;
        Entry<K,V> p = root;
        // 從根節點開始,不斷比較key的大小進行查找
        while (p != null) {
            int cmp = k.compareTo(p.key);
            if (cmp < 0) // 小於,轉向左子樹
                p = p.left;
            else if (cmp > 0) // 大於,轉向右子樹
                p = p.right;
            else
                return p;
        }
        return null; // 沒有相等的key,返回null
    }
複製代碼

而插入和刪除操做與平衡二叉查找樹的細節是息息相關的,關於紅黑樹的實現細節,我以前寫過的一篇博客紅黑樹的那點事兒已經講的很清楚了,對這方面不瞭解的讀者建議去閱讀一下,就不在這裏重複敘述了。

集合視圖


最後看一下TreeMap的集合視圖的實現,集合視圖通常都是實現了一個封裝了當前實例的類,因此對集合視圖的修改本質上就是在修改當前實例,TreeMap也不例外。

TreeMap的headMap()tailMap()以及subMap()函數都返回了一個靜態內部類AscendingSubMap<K, V>,從名字上也能猜出來,爲了支持倒序,確定也還有一個DescendingSubMap<K, V>,它們都繼承於NavigableSubMap<K, V>,一個繼承AbstractMap<K, V>並實現了NavigableMap<K, V>的抽象類:

abstract static class NavigableSubMap<K,V> extends AbstractMap<K,V> implements NavigableMap<K,V>, java.io.Serializable {
        private static final long serialVersionUID = -2102997345730753016L;

        final TreeMap<K,V> m;

        /** * (fromStart, lo, loInclusive) 與 (toEnd, hi, hiInclusive)表明了兩個三元組, * 若是fromStart爲true,那麼範圍的下限(絕對)爲map(被封裝的TreeMap)的起始key, * 其餘值將被忽略。 * 若是loInclusive爲true,lo將會被包含在範圍內,不然lo是在範圍外的。 * toEnd與hiInclusive與上述邏輯類似,只不過考慮的是上限。 */
        final K lo, hi;
        final boolean fromStart, toEnd;
        final boolean loInclusive, hiInclusive;

        NavigableSubMap(TreeMap<K,V> m,
                        boolean fromStart, K lo, boolean loInclusive,
                        boolean toEnd,     K hi, boolean hiInclusive) {
            if (!fromStart && !toEnd) {
                if (m.compare(lo, hi) > 0)
                    throw new IllegalArgumentException("fromKey > toKey");
            } else {
                if (!fromStart) // type check
                    m.compare(lo, lo);
                if (!toEnd)
                    m.compare(hi, hi);
            }

            this.m = m;
            this.fromStart = fromStart;
            this.lo = lo;
            this.loInclusive = loInclusive;
            this.toEnd = toEnd;
            this.hi = hi;
            this.hiInclusive = hiInclusive;
        }

        // internal utilities

        final boolean tooLow(Object key) {
            if (!fromStart) {
                int c = m.compare(key, lo);
                // 若是key小於lo,或等於lo(須要lo不包含在範圍內)
                if (c < 0 || (c == 0 && !loInclusive))
                    return true;
            }
            return false;
        }

        final boolean tooHigh(Object key) {
            if (!toEnd) {
                int c = m.compare(key, hi);
                // 若是key大於hi,或等於hi(須要hi不包含在範圍內)
                if (c > 0 || (c == 0 && !hiInclusive))
                    return true;
            }
            return false;
        }

        final boolean inRange(Object key) {
            return !tooLow(key) && !tooHigh(key);
        }

        final boolean inClosedRange(Object key) {
            return (fromStart || m.compare(key, lo) >= 0)
                && (toEnd || m.compare(hi, key) >= 0);
        }

        // 判斷key是否在該視圖的範圍以內
        final boolean inRange(Object key, boolean inclusive) {
            return inclusive ? inRange(key) : inClosedRange(key);
        }

        /* * 以abs開頭的函數爲關係操做的絕對版本。 */

        /* * 得到最小的鍵值對: * 若是fromStart爲true,那麼直接返回當前map實例的第一個鍵值對便可, * 不然,先判斷lo是否包含在範圍內, * 若是是,則得到當前map實例中大於或等於lo的最小的鍵值對, * 若是不是,則得到當前map實例中大於lo的最小的鍵值對。 * 若是獲得的結果e超過了範圍的上限,那麼返回null。 */
        final TreeMap.Entry<K,V> absLowest() {
            TreeMap.Entry<K,V> e =
                (fromStart ?  m.getFirstEntry() :
                 (loInclusive ? m.getCeilingEntry(lo) :
                                m.getHigherEntry(lo)));
            return (e == null || tooHigh(e.key)) ? null : e;
        }

        // 與absLowest()相反
        final TreeMap.Entry<K,V> absHighest() {
            TreeMap.Entry<K,V> e =
                (toEnd ?  m.getLastEntry() :
                 (hiInclusive ?  m.getFloorEntry(hi) :
                                 m.getLowerEntry(hi)));
            return (e == null || tooLow(e.key)) ? null : e;
        }

        // 下面的邏輯就都很簡單了,注意會先判斷key是否越界,
        // 若是越界就返回絕對值。

        final TreeMap.Entry<K,V> absCeiling(K key) {
            if (tooLow(key))
                return absLowest();
            TreeMap.Entry<K,V> e = m.getCeilingEntry(key);
            return (e == null || tooHigh(e.key)) ? null : e;
        }

        final TreeMap.Entry<K,V> absHigher(K key) {
            if (tooLow(key)) 
                return absLowest();
            TreeMap.Entry<K,V> e = m.getHigherEntry(key);
            return (e == null || tooHigh(e.key)) ? null : e;
        }

        final TreeMap.Entry<K,V> absFloor(K key) {
            if (tooHigh(key))
                return absHighest();
            TreeMap.Entry<K,V> e = m.getFloorEntry(key);
            return (e == null || tooLow(e.key)) ? null : e;
        }

        final TreeMap.Entry<K,V> absLower(K key) {
            if (tooHigh(key))
                return absHighest();
            TreeMap.Entry<K,V> e = m.getLowerEntry(key);
            return (e == null || tooLow(e.key)) ? null : e;
        }

        /** 返回升序遍歷的絕對上限 */
        final TreeMap.Entry<K,V> absHighFence() {
            return (toEnd ? null : (hiInclusive ?
                                    m.getHigherEntry(hi) :
                                    m.getCeilingEntry(hi)));
        }

        /** 返回降序遍歷的絕對下限 */
        final TreeMap.Entry<K,V> absLowFence() {
            return (fromStart ? null : (loInclusive ?
                                        m.getLowerEntry(lo) :
                                        m.getFloorEntry(lo)));
        }

        // 剩下的就是實現NavigableMap的方法以及一些抽象方法
		// 和NavigableSubMap中的集合視圖函數。
        // 大部分操做都是靠當前實例map的方法和上述用於判斷邊界的方法提供支持
        .....
    }
複製代碼

一個局部視圖最重要的是要可以判斷出傳入的key是否屬於該視圖的範圍內,在上面的代碼中能夠發現NavigableSubMap提供了很是多的輔助函數用於判斷範圍,接下來咱們看看NavigableSubMap的迭代器是如何實現的:

/** * Iterators for SubMaps */
        abstract class SubMapIterator<T> implements Iterator<T> {
            TreeMap.Entry<K,V> lastReturned;
            TreeMap.Entry<K,V> next;
            final Object fenceKey;
            int expectedModCount;

            SubMapIterator(TreeMap.Entry<K,V> first,
                           TreeMap.Entry<K,V> fence) {
                expectedModCount = m.modCount; 
                lastReturned = null;
                next = first;
                // UNBOUNDED是一個虛擬值(一個Object對象),表示無邊界。
                fenceKey = fence == null ? UNBOUNDED : fence.key;
            }

            // 只要next不爲null而且沒有超過邊界
            public final boolean hasNext() {
                return next != null && next.key != fenceKey;
            }

            final TreeMap.Entry<K,V> nextEntry() {
                TreeMap.Entry<K,V> e = next;
                // 已經遍歷到頭或者越界了
                if (e == null || e.key == fenceKey)
                    throw new NoSuchElementException();
                // modCount是一個記錄操做數的計數器
                // 若是與expectedModCount不一致
                // 則表明當前map實例在遍歷過程當中已被修改過了(從其餘線程)
                if (m.modCount != expectedModCount)
                    throw new ConcurrentModificationException();
                // 向後移動next指針
                // successor()返回指定節點的繼任者
                // 它是節點e的右子樹的最左節點
                // 也就是比e大的最小的節點
                // 若是e沒有右子樹,則會試圖向上尋找
                next = successor(e);
                lastReturned = e; // 記錄最後返回的節點
                return e;
            }

            final TreeMap.Entry<K,V> prevEntry() {
                TreeMap.Entry<K,V> e = next;
                if (e == null || e.key == fenceKey)
                    throw new NoSuchElementException();
                if (m.modCount != expectedModCount)
                    throw new ConcurrentModificationException();
                // 向前移動next指針
                // predecessor()返回指定節點的前任
                // 它與successor()邏輯相反。
                next = predecessor(e);
                lastReturned = e;
                return e;
            }

            final void removeAscending() {
                if (lastReturned == null)
                    throw new IllegalStateException();
                if (m.modCount != expectedModCount)
                    throw new ConcurrentModificationException();
                // 被刪除的節點被它的繼任者取代
                // 執行完刪除後,lastReturned實際指向了它的繼任者
                if (lastReturned.left != null && lastReturned.right != null)
                    next = lastReturned;
                m.deleteEntry(lastReturned);
                lastReturned = null;
                expectedModCount = m.modCount;
            }

            final void removeDescending() {
                if (lastReturned == null)
                    throw new IllegalStateException();
                if (m.modCount != expectedModCount)
                    throw new ConcurrentModificationException();
                m.deleteEntry(lastReturned);
                lastReturned = null;
                expectedModCount = m.modCount;
            }

        }

        final class SubMapEntryIterator extends SubMapIterator<Map.Entry<K,V>> {
            SubMapEntryIterator(TreeMap.Entry<K,V> first,
                                TreeMap.Entry<K,V> fence) {
                super(first, fence);
            }
            public Map.Entry<K,V> next() {
                return nextEntry();
            }
            public void remove() {
                removeAscending();
            }
        }

        final class DescendingSubMapEntryIterator extends SubMapIterator<Map.Entry<K,V>> {
            DescendingSubMapEntryIterator(TreeMap.Entry<K,V> last,
                                          TreeMap.Entry<K,V> fence) {
                super(last, fence);
            }

            public Map.Entry<K,V> next() {
                return prevEntry();
            }
            public void remove() {
                removeDescending();
            }
        }
複製代碼

到目前爲止,咱們已經針對集合視圖討論了許多,想必你們也可以理解集合視圖的概念了,因爲SortedMap與NavigableMap的緣故,TreeMap中的集合視圖是很是多的,包括各類局部視圖和不一樣排序的視圖,有興趣的讀者能夠本身去看看源碼,後面的內容不會再對集合視圖進行過多的解釋了。

HashMap


光從名字上應該也能猜到,HashMap確定是基於hash算法實現的,這種基於hash實現的map叫作散列表(hash table)。

散列表中維護了一個數組,數組的每個元素被稱爲一個桶(bucket),當你傳入一個key = "a"進行查詢時,散列表會先把key傳入散列(hash)函數中進行尋址,獲得的結果就是數組的下標,而後再經過這個下標訪問數組便可獲得相關聯的值。

咱們都知道數組中數據的組織方式是線性的,它會直接分配一串連續的內存地址序列,要找到一個元素只須要根據下標來計算地址的偏移量便可(查找一個元素的起始地址爲:數組的起始地址加上下標乘以該元素類型佔用的地址大小)。所以散列表在理想的狀況下,各類操做的時間複雜度只有O(1),這甚至超過了二叉查找樹,雖然理想的狀況並不老是知足的,關於這點以後咱們還會說起。

爲何是hash?


hash算法是一種能夠從任何數據中提取出其「指紋」的數據摘要算法,它將任意大小的數據(輸入)映射到一個固定大小的序列(輸出)上,這個序列被稱爲hash code、數據摘要或者指紋。比較出名的hash算法有MD五、SHA。

hash是具備惟一性且不可逆的,惟一性指的是相同的輸入產生的hash code永遠是同樣的,而不可逆也比較容易理解,數據摘要算法並非壓縮算法,它只是生成了一個該數據的摘要,沒有將數據進行壓縮。壓縮算法通常都是使用一種更節省空間的編碼規則將數據從新編碼,解壓縮只須要按着編碼規則解碼就是了,試想一下,一個幾百MB甚至幾GB的數據生成的hash code都只是一個擁有固定長度的序列,若是再能逆向解壓縮,那麼其餘壓縮算法該情何以堪?

咱們上述討論的僅僅是在密碼學中的hash算法,而在散列表中所須要的散列函數是要可以將key尋址到buckets中的一個位置,散列函數的實現影響到整個散列表的性能。

一個完美的散列函數要可以作到均勻地將key分佈到buckets中,每個key分配到一個bucket,但這是不可能的。雖然hash算法具備惟一性,但同時它還具備重複性,惟一性保證了相同輸入的輸出是一致的,卻沒有保證不一樣輸入的輸出是不一致的,也就是說,徹底有可能兩個不一樣的key被分配到了同一個bucket(由於它們的hash code多是相同的),這叫作碰撞衝突。總之,理想很豐滿,現實很骨感,散列函數只能儘量地減小衝突,沒有辦法徹底消除衝突。

散列函數的實現方法很是多,一個優秀的散列函數要看它能不能將key分佈均勻。首先介紹一種最簡單的方法:除留餘數法,先對key進行hash獲得它的hash code,而後再用該hash code對buckets數組的元素數量取餘,獲得的結果就是bucket的下標,這種方法簡單高效,也能夠當作對集羣進行負載均衡的路由算法。

private int hash(Key key) {
   // & 0x7fffffff 是爲了屏蔽符號位,M爲bucket數組的長度
   return (key.hashCode() & 0x7fffffff) % M;
}
複製代碼

要注意一點,只有整數才能進行取餘運算,若是hash code是一個字符串或別的類型,那麼你須要將它轉換爲整數才能使用除留餘數法,不過Java在Object對象中提供了hashCode()函數,該函數返回了一個int值,因此任何你想要放入HashMap的自定義的抽象數據類型,都必須實現該函數和equals()函數,這兩個函數之間也遵照着一種約定:若是a.equals(b) == true,那麼a與b的hashCode()也必須是相同的。

下面爲String類的hashCode()函數,它先遍歷了內部的字符數組,而後在每一次循環中計算hash code(將hash code乘以一個素數並加上當前循環項的字符):

/** The value is used for character storage. */
    private final char value[];

    /** Cache the hash code for the string */
    private int hash; // Default to 0

    public int hashCode() {
        int h = hash;
        if (h == 0 && value.length > 0) {
            char val[] = value;

            for (int i = 0; i < value.length; i++) {
                h = 31 * h + val[i];
            }
            hash = h;
        }
        return h;
    }
複製代碼

HashMap沒有采用這麼簡單的方法,有一個緣由是HashMap中的buckets數組的長度永遠爲一個2的冪,而不是一個素數,若是長度爲素數,那麼可能會更適合簡單暴力的除留餘數法(固然除留餘數法雖然簡單卻並非那麼高效的),順便一提,時代的眼淚Hashtable就使用了除留餘數法,它沒有強制約束buckets數組的長度。

HashMap在內部實現了一個hash()函數,首先要對hashCode()的返回值進行處理:

static final int hash(Object key) {
        int h;
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }
複製代碼

該函數將key.hashCode()的低16位和高16位作了個異或運算,其目的是爲了擾亂低位的信息以實現減小碰撞衝突。以後還須要把hash()的返回值與table.length - 1作與運算(table爲buckets數組),獲得的結果便是數組的下標。

table.length - 1就像是一個低位掩碼(這個設計也優化了擴容操做的性能),它和hash()作與操做時必然會將高位屏蔽(由於一個HashMap不可能有特別大的buckets數組,至少在不斷自動擴容以前是不可能的,因此table.length - 1的大部分高位都爲0),只保留低位,看似沒什麼毛病,但這其實暗藏玄機,它會致使老是隻有最低的幾位是有效的,這樣就算你的hashCode()實現得再好也難以免發生碰撞。這時,hash()函數的價值就體現出來了,它對hash code的低位添加了隨機性而且混合了高位的部分特徵,顯著減小了碰撞衝突的發生(關於hash()函數的效果如何,能夠參考這篇文章An introduction to optimising a hashing strategy)。

HashMap的散列函數具體流程以下圖:

解決衝突


在上文中咱們已經屢次提到碰撞衝突,可是散列函數不多是完美的,key分佈徹底均勻的狀況是不存在的,因此碰撞衝突老是難以免。

那麼發生碰撞衝突時怎麼辦?總不能丟棄數據吧?必需要有一種合理的方法來解決這個問題,HashMap使用了叫作分離連接(Separate chaining,也有人翻譯成拉鍊法)的策略來解決衝突。它的主要思想是每一個bucket都應當是一個互相獨立的數據結構,當發生衝突時,只須要把數據放入bucket中(由於bucket自己也是一個能夠存放數據的數據結構),這樣查詢一個key所消耗的時間爲訪問bucket所消耗的時間加上在bucket中查找的時間。

HashMap的buckets數組其實就是一個鏈表數組,在發生衝突時只須要把Entry(還記得Entry嗎?HashMap的Entry實現就是一個簡單的鏈表節點,它包含了key和value以及hash code)放到鏈表的尾部,若是未發生衝突(位於該下標的bucket爲null),那麼就把該Entry作爲鏈表的頭部。並且HashMap還使用了Lazy策略,buckets數組只會在第一次調用put()函數時進行初始化,這是一種防止內存浪費的作法,像ArrayList也是Lazy的,它在第一次調用add()時纔會初始化內部的數組。

分離連接法

不過鏈表雖然實現簡單,可是在查找的效率上只有O(n),並且咱們大部分的操做都是在進行查找,在hashCode()設計的不是很是良好的狀況下,碰撞衝突可能會頻繁發生,鏈表也會變得愈來愈長,這個效率是很是差的。Java 8對其實現了優化,鏈表的節點數量在到達閾值時會轉化爲紅黑樹,這樣查找所需的時間就只有O(log n)了,閾值的定義以下:

/** * The bin count threshold for using a tree rather than list for a * bin. Bins are converted to trees when adding an element to a * bin with at least this many nodes. The value must be greater * than 2 and should be at least 8 to mesh with assumptions in * tree removal about conversion back to plain bins upon * shrinkage. */
    static final int TREEIFY_THRESHOLD = 8;
複製代碼

若是在插入Entry時發現一條鏈表超過閾值,就會執行如下的操做,對該鏈表進行樹化;相對的,若是在刪除Entry(或進行擴容)時發現紅黑樹的節點太少(根據閾值UNTREEIFY_THRESHOLD),也會把紅黑樹退化成鏈表。

/** * 替換指定hash所處位置的鏈表中的全部節點爲TreeNode, * 若是buckets數組過小,就進行擴容。 */
    final void treeifyBin(Node<K,V>[] tab, int hash) {
        int n, index; Node<K,V> e;
        // MIN_TREEIFY_CAPACITY = 64,小於該值表明數組中的節點並非不少
        // 因此選擇進行擴容,只有數組長度大於該值時纔會進行樹化。
        if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
            resize();
        else if ((e = tab[index = (n - 1) & hash]) != null) {
            TreeNode<K,V> hd = null, tl = null;
            // 轉換鏈表節點爲樹節點,注意要處理好鏈接關係
            do {
                TreeNode<K,V> p = replacementTreeNode(e, null);
                if (tl == null)
                    hd = p;
                else {
                    p.prev = tl;
                    tl.next = p;
                }
                tl = p;
            } while ((e = e.next) != null);
            if ((tab[index] = hd) != null)
                hd.treeify(tab); // 從頭部開始構造樹
        }
    }

        // 該函數定義在TreeNode中
        final void treeify(Node<K,V>[] tab) {
            TreeNode<K,V> root = null;
            for (TreeNode<K,V> x = this, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (root == null) { // 初始化root節點
                    x.parent = null;
                    x.red = false;
                    root = x;
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = root;;) {
                        int dir, ph;
                        K pk = p.key;
                        // 肯定節點的方向
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        // 若是kc == null
                        // 而且k沒有實現Comparable接口
                        // 或者k與pk是沒有可比較性的(類型不一樣)
                        // 或者k與pk是相等的(返回0也有多是相等)
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);

                        // 肯定方向後插入節點,修正紅黑樹的平衡
                        TreeNode<K,V> xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            root = balanceInsertion(root, x);
                            break;
                        }
                    }
                }
            }
            // 確保給定的root是該bucket中的第一個節點
            moveRootToFront(tab, root);
        }

        static int tieBreakOrder(Object a, Object b) {
            int d;
            if (a == null || b == null ||
                (d = a.getClass().getName().
                 compareTo(b.getClass().getName())) == 0)
                // System.identityHashCode()將調用並返回傳入對象的默認hashCode()
                // 也就是說,不管是否重寫了hashCode(),都將調用Object.hashCode()。
                // 若是傳入的對象是null,那麼就返回0
                d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
                     -1 : 1);
            return d;
        }            
複製代碼

解決碰撞衝突的另外一種策略叫作開放尋址法(Open addressing),它與分離連接法的思想大相徑庭。在開放尋址法中,全部Entry都會存儲在buckets數組,一個明顯的區別是,分離連接法中的每一個bucket都是一個鏈表或其餘的數據結構,而開放尋址法中的每一個bucket就僅僅只是Entry自己。

開放尋址法是基於數組中的空位來解決衝突的,它的想法很簡單,與其使用鏈表等數據結構,不如直接在數組中留出空位來當作一個標記,反正都要佔用額外的內存。

當你查找一個key的時候,首先會從起始位置(經過散列函數計算出的數組索引)開始,不斷檢查當前bucket是否爲目標Entry(經過比較key來判斷),若是當前bucket不是目標Entry,那麼就向後查找(查找的間隔取決於實現),直到遇見一個空位(null),這表明你想要找的key不存在。

若是你想要put一個全新的Entry(Map中沒有這個key存在),依然會從起始位置開始進行查找,若是起始位置不是空的,則表明發生了碰撞衝突,只好不斷向後查找,直到發現一個空位。

開放尋址法的名字也是來源於此,一個Entry的位置並非徹底由hash值決定的,因此也叫作Closed hashing,相對的,分離連接法也被稱爲Open hashing或Closed addressing。

根據向後探測(查找)的算法不一樣,開放尋址法有多種不一樣的實現,咱們介紹一種最簡單的算法:線性探測法(Linear probing),在發生碰撞時,簡單地將索引加一,若是到達了數組的尾部就折回到數組的頭部,直到找到目標或一個空位。

線性探測法

基於線性探測法的查找操做以下:

private K[] keys; // 存儲key的數組
    private V[] vals; // 存儲值的數組 

    public V get(K key) {
    	// m是buckets數組的長度,即keys和vals的長度。
    	// 當i等於m時,取模運算會得0(折回數組頭部)
        for (int i = hash(key); keys[i] != null; i = (i + 1) % m) {
            if (keys[i].equals(key))
                return vals[i];
        }
        return null;
    }
複製代碼

插入操做稍微麻煩一些,須要在插入以前判斷當前數組的剩餘容量,而後決定是否擴容。數組的剩餘容量越多,表明Entry之間的間隔越大以及越早遇見空位(向後探測的次數就越少),效率天然就會變高。代價就是額外消耗的內存較多,這也是在用空間換取時間。

public void put(K key, V value) {
        // n是Entry的數量,若是n超過了數組長度的一半,就擴容一倍
        if (n >= m / 2) resize(2 * m);

        int i;
        for (i = hash(key); keys[i] != null; i = (i + 1) % m) {
            if (keys[i].equals(key)) {
                vals[i] = value;
                return;
            }
        }

        // 沒有找到目標,那麼就插入一對新的Entry
        keys[i] = key;
        vals[i] = value;
        n++;
    }
複製代碼

接下來是刪除操做,須要注意一點,咱們不能簡單地把目標key所在的位置(keys和vals數組)設置爲null,這樣會致使此位置以後的Entry沒法被探測到,因此須要將目標右側的全部Entry從新插入到散列表中:

public V delete(K key) {
    int i = hash(key);
    // 先找到目標的索引
    while (!key.equals(keys[i])) {
        i = (i + 1) % m;
    }

    V oldValue = vals[i];
    // 刪除目標key和value
    keys[i] = null;
    vals[i] = null;
    // 指針移動到下一個索引
    i = (i + 1) % m;
    while (keys[i] != null) {
        // 先刪除而後從新插入
        K keyToRehash = keys[i];
        V valToRehash = vals[i];
        keys[i] = null;
        vals[i] = null;
        n--;
        put(keyToRehash, valToRehash);
        i = (i + 1) % m;
    }

    n--;
    // 當前Entry小於等於數組長度的八分之一時,進行縮容
    if (n > 0 && n <= m / 8) resize(m / 2);

    return oldValue;
}
複製代碼

動態擴容


散列表以數組的形式組織bucket,問題在於數組是靜態分配的,爲了保證查找的性能,須要在Entry數量大於一個臨界值時進行擴容,不然就算散列函數的效果再好,也不免產生碰撞。

所謂擴容,其實就是用一個容量更大(在原容量上乘以二)的數組來替換掉當前的數組,這個過程須要把舊數組中的數據從新hash到新數組,因此擴容也能在必定程度上減緩碰撞。

HashMap經過負載因子(Load Factor)乘以buckets數組的長度來計算出臨界值,算法:threshold = load_factor * capacity。好比,HashMap的默認初始容量爲16(capacity = 16),默認負載因子爲0.75(load_factor = 0.75),那麼臨界值就爲threshold = 0.75 * 16 = 12,只要Entry的數量大於12,就會觸發擴容操做。

還能夠經過下列的構造函數來自定義負載因子,負載因子越小查找的性能就會越高,但同時額外佔用的內存就會越多,若是沒有特殊須要不建議修改默認值。

/** * 能夠發現構造函數中根本就沒初始化buckets數組。 * (以前說過buckets數組會推遲到第一次調用put()時進行初始化) */
    public HashMap(int initialCapacity, float loadFactor) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException("Illegal initial capacity: " +
                                               initialCapacity);
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        if (loadFactor <= 0 || Float.isNaN(loadFactor))
            throw new IllegalArgumentException("Illegal load factor: " +
                                               loadFactor);
        this.loadFactor = loadFactor;
        // tableSizeFor()確保initialCapacity必須爲一個2的N次方
        this.threshold = tableSizeFor(initialCapacity);
    }
複製代碼

buckets數組的大小約束對於整個HashMap都相當重要,爲了防止傳入一個不是2次冪的整數,必需要有所防範。tableSizeFor()函數會嘗試修正一個整數,並轉換爲離該整數最近的2次冪。

/** * Returns a power of two size for the given target capacity. */
    static final int tableSizeFor(int cap) {
        int n = cap - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
複製代碼

tableSizeFor()工做流程

還記得數組索引的計算方法嗎?index = (table.length - 1) & hash,這實際上是一種優化手段,因爲數組的大小永遠是一個2次冪,在擴容以後,一個元素的新索引要麼是在原位置,要麼就是在原位置加上擴容前的容量。這個方法的巧妙之處全在於&運算,以前提到過&運算只會關注n - 1(n = 數組長度)的有效位,當擴容以後,n的有效位相比以前會多增長一位(n會變成以前的二倍,因此確保數組長度永遠是2次冪很重要),而後只須要判斷hash在新增的有效位的位置是0仍是1就能夠算出新的索引位置,若是是0,那麼索引沒有發生變化,若是是1,索引就爲原索引加上擴容前的容量。

擴容前與擴容後的索引計算

這樣在每次擴容時都不用從新計算hash,省去了很多時間,並且新增有效位是0仍是1是帶有隨機性的,以前兩個碰撞的Entry又有可能在擴容時再次均勻地散佈開。下面是resize()的源碼:

final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table; // table就是buckets數組
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        int oldThr = threshold;
        int newCap, newThr = 0;
        // oldCap大於0,進行擴容,設置閾值與新的容量
        if (oldCap > 0) {
            // 超過最大值不會進行擴容,而且把閾值設置成Interger.MAX_VALUE
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            // 沒超過最大值,擴容爲原來的2倍
            // 向左移1位等價於乘2
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                     oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1; // double threshold
        }
        // oldCap = 0,oldThr大於0,那麼就把閾值作爲新容量以進行初始化
        // 這種狀況發生在用戶調用了帶有參數的構造函數(會對threshold進行初始化)
        else if (oldThr > 0) // initial capacity was placed in threshold
            newCap = oldThr;
        // oldCap與oldThr都爲0,這種狀況發生在用戶調用了無參構造函數
        // 採用默認值進行初始化
        else {               // zero initial threshold signifies using defaults
            newCap = DEFAULT_INITIAL_CAPACITY;
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        // 若是newThr尚未被賦值,那麼就根據newCap計算出閾值
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                      (int)ft : Integer.MAX_VALUE);
        }
        threshold = newThr;
        @SuppressWarnings({"rawtypes","unchecked"})
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
        table = newTab;
        // 若是oldTab != null,表明這是擴容操做
        // 須要將擴容前的數組數據遷移到新數組
        if (oldTab != null) {
            // 遍歷oldTab的每個bucket,而後移動到newTab
            for (int j = 0; j < oldCap; ++j) {
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    // 索引j的bucket只有一個Entry(未發生過碰撞)
                    // 直接移動到newTab
                    if (e.next == null)
                        newTab[e.hash & (newCap - 1)] = e;
                    // 若是是一個樹節點(表明已經轉換成紅黑樹了)
                    // 那麼就將這個節點拆分爲lower和upper兩棵樹
                    // 首先會對這個節點進行遍歷
                    // 只要當前節點的hash & oldCap == 0就連接到lower樹
                    // 注意這裏是與oldCap進行與運算,而不是oldCap - 1(n - 1)
                    // oldCap就是擴容後新增有效位的掩碼
                    // 好比oldCap=16,二進制10000,n-1 = 1111,擴容後的n-1 = 11111
                    // 只要hash & oldCap == 0,就表明hash的新增有效位爲0
                    // 不然就連接到upper樹(新增有效位爲1)
                    // lower會被放入newTab[原索引j],upper樹會被放到newTab[原索引j + oldCap]
                    // 若是lower或者upper樹的節點少於閾值,會被退化成鏈表
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        // 下面操做的邏輯與分裂樹節點基本一致
                        // 只不過split()操做的是TreeNode
                        // 並且會將兩條TreeNode鏈表組織成紅黑樹
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;
    }
複製代碼

使用HashMap時還須要注意一點,它不會動態地進行縮容,也就是說,你不該該保留一個已經刪除過大量Entry的HashMap(若是不打算繼續添加元素的話),此時它的buckets數組通過屢次擴容已經變得很是大了,這會佔用很是多的無用內存,這樣作的好處是不用屢次對數組進行擴容或縮容操做。不過通常也不會出現這種狀況,若是碰見了,請絕不猶豫地丟掉它,或者把數據轉移到一個新的HashMap。

添加元素


咱們已經瞭解了HashMap的內部實現與工做原理,它在內部維護了一個數組,每個key都會通過散列函數得出在數組的索引,若是兩個key的索引相同,那麼就使用分離連接法解決碰撞衝突,當Entry的數量大於臨界值時,對數組進行擴容。

接下來以一個添加元素(put())的過程爲例來梳理一下知識,下圖是put()函數的流程圖:

put()流程

而後是源碼:

public V put(K key, V value) {
        return putVal(hash(key), key, value, false, true);
    }

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        // table == null or table.length == 0
        // 第一次調用put(),初始化table
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
        // 沒有發生碰撞,直接放入到數組
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            // 發生碰撞(頭節點就是目標節點)
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            // 節點爲紅黑樹
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            // 節點爲鏈表
            else {
                for (int binCount = 0; ; ++binCount) {
                    // 未找到目標節點,在鏈表尾部連接新節點
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            // 鏈表過長,轉換爲紅黑樹
                            treeifyBin(tab, hash);
                        break;
                    }
                    // 找到目標節點,退出循環
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            // 節點已存在,替換value
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                // afterNodeXXXXX是提供給LinkedHashMap重寫的函數
                // 在HashMap中沒有意義
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
        // 超過臨界值,進行擴容
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }
複製代碼

WeakHashMap


WeakHashMap是一個基於Map接口實現的散列表,實現細節與HashMap相似(都有負載因子、散列函數等等,但沒有HashMap那麼多優化手段),它的特殊之處在於每一個key都是一個弱引用。

首先咱們要明白什麼是弱引用,Java將引用分爲四類(從JDK1.2開始),強度依次逐漸減弱:

  • 強引用: 就是日常使用的普通引用對象,例如Object obj = new Object(),這就是一個強引用,強引用只要還存在,就不會被垃圾收集器回收。

  • 軟引用: 軟引用表示一個還有用但並不是必需的對象,不像強引用,它還須要經過SoftReference類來間接引用目標對象(除了強引用都是如此)。被軟引用關聯的對象,在將要發生內存溢出異常以前,會被放入回收範圍之中以進行第二次回收(若是第二次回收以後依舊沒有足夠的內存,那麼就會拋出OOM異常)。

  • 弱引用: 一樣是表示一個非必需的對象,但要比軟引用的強度還要弱,須要經過WeakReference類來間接引用目標對象。被弱引用關聯的對象只能存活到下一次垃圾回收發生以前,當觸發垃圾回收時,不管當前內存是否足夠,都會回收掉只被弱引用關聯的對象(若是這個對象還被強引用所引用,那麼就不會被回收)。

  • 虛引用: 這是一種最弱的引用關係,須要經過PhantomReference類來間接引用目標對象。一個對象是否有虛引用的存在,徹底不會對其生存時間構成影響,也沒法經過虛引用來得到對象實例。虛引用的惟一做用就是能在這個對象被回收時收到一個系統通知(結合ReferenceQueue使用)。基於這點能夠經過虛引用來實現對象的析構函數,這比使用finalize()函數是要靠譜多了。

WeakHashMap適合用來當作一個緩存來使用。假設你的緩存系統是基於強引用實現的,那麼你就必須以手動(或者用一條線程來不斷輪詢)的方式來刪除一個無效的緩存項,而基於弱引用實現的緩存項只要沒被其餘強引用對象關聯,就會被直接放入回收隊列。

須要注意的是,只有key是被弱引用關聯的,而value通常都是一個強引用對象。所以,須要確保value沒有關聯到它的key,不然會對key的回收產生阻礙。在極端的狀況下,一個value對象A引用了另外一個key對象D,而與D相對應的value對象C又反過來引用了與A相對應的key對象B,這就會產生一個引用循環,致使D與B都沒法被正常回收。想要解決這個問題,就只能把value也變成一個弱引用,例如m.put(key, new WeakReference(value)),弱引用之間的互相引用不會產生影響。

查找操做的實現跟HashMap相比簡單了許多,只要讀懂了HashMap,基本都能看懂,源碼以下:

/** * Value representing null keys inside tables. */
    private static final Object NULL_KEY = new Object();

    /** * Use NULL_KEY for key if it is null. */
    private static Object maskNull(Object key) {
        return (key == null) ? NULL_KEY : key;
    }

    /** * Returns index for hash code h. */
    private static int indexFor(int h, int length) {
        return h & (length-1);
    }

    public V get(Object key) {
        // WeakHashMap容許null key與null value
        // null key會被替換爲一個虛擬值
        Object k = maskNull(key); 
        int h = hash(k);
        Entry<K,V>[] tab = getTable();
        int index = indexFor(h, tab.length);
        Entry<K,V> e = tab[index];
        // 遍歷鏈表
        while (e != null) {
            if (e.hash == h && eq(k, e.get()))
                return e.value;
            e = e.next;
        }
        return null;
    }
複製代碼

儘管key是一個弱引用,但仍需手動地回收那些已經無效的Entry。這個操做會在getTable()函數中執行,無論是查找、添加仍是刪除,都須要調用getTable()來得到buckets數組,因此這是種防止內存泄漏的被動保護措施。

/** * The table, resized as necessary. Length MUST Always be a power of two. */
    Entry<K,V>[] table;

    /** * Reference queue for cleared WeakEntries */
    private final ReferenceQueue<Object> queue = new ReferenceQueue<>();

    /** * Expunges stale entries from the table. */
    private void expungeStaleEntries() {
        // 遍歷ReferenceQueue,而後清理table中無效的Entry
        for (Object x; (x = queue.poll()) != null; ) {
            synchronized (queue) {
                @SuppressWarnings("unchecked")
                    Entry<K,V> e = (Entry<K,V>) x;
                int i = indexFor(e.hash, table.length);

                Entry<K,V> prev = table[i];
                Entry<K,V> p = prev;
                while (p != null) {
                    Entry<K,V> next = p.next;
                    if (p == e) {
                        if (prev == e)
                            table[i] = next;
                        else
                            prev.next = next;
                        // Must not null out e.next;
                        // stale entries may be in use by a HashIterator
                        e.value = null; // Help GC
                        size--;
                        break;
                    }
                    prev = p;
                    p = next;
                }
            }
        }
    }

    /** * Returns the table after first expunging stale entries. */
    private Entry<K,V>[] getTable() {
        expungeStaleEntries();
        return table;
    }
複製代碼

而後是插入操做與刪除操做,實現都比較簡單:

public V put(K key, V value) {
        Object k = maskNull(key);
        int h = hash(k);
        Entry<K,V>[] tab = getTable();
        int i = indexFor(h, tab.length);

        for (Entry<K,V> e = tab[i]; e != null; e = e.next) {
            if (h == e.hash && eq(k, e.get())) {
                V oldValue = e.value;
                if (value != oldValue)
                    e.value = value;
                return oldValue;
            }
        }

        modCount++;
        Entry<K,V> e = tab[i];
        // e被鏈接在new Entry的後面
        tab[i] = new Entry<>(k, value, queue, h, e);
        if (++size >= threshold)
            resize(tab.length * 2);
        return null;
    }

    public V remove(Object key) {
        Object k = maskNull(key);
        int h = hash(k);
        Entry<K,V>[] tab = getTable();
        int i = indexFor(h, tab.length);
        Entry<K,V> prev = tab[i];
        Entry<K,V> e = prev;

        while (e != null) {
            Entry<K,V> next = e.next;
            if (h == e.hash && eq(k, e.get())) {
                modCount++;
                size--;
                if (prev == e)
                    tab[i] = next;
                else
                    prev.next = next;
                return e.value;
            }
            prev = e;
            e = next;
        }

        return null;
    }
複製代碼

咱們並無在put()函數中發現key被轉換成弱引用,這是怎麼回事?key只有在第一次被放入buckets數組時才須要轉換成弱引用,也就是new Entry<>(k, value, queue, h, e),WeakHashMap的Entry實現其實就是WeakReference的子類。

/** * The entries in this hash table extend WeakReference, using its main ref * field as the key. */
    private static class Entry<K,V> extends WeakReference<Object> implements Map.Entry<K,V> {
        V value;
        final int hash;
        Entry<K,V> next;

        /** * Creates new entry. */
        Entry(Object key, V value,
              ReferenceQueue<Object> queue,
              int hash, Entry<K,V> next) {
            super(key, queue);
            this.value = value;
            this.hash  = hash;
            this.next  = next;
        }

        @SuppressWarnings("unchecked")
        public K getKey() {
            return (K) WeakHashMap.unmaskNull(get());
        }

        public V getValue() {
            return value;
        }

        public V setValue(V newValue) {
            V oldValue = value;
            value = newValue;
            return oldValue;
        }

        public boolean equals(Object o) {
            if (!(o instanceof Map.Entry))
                return false;
            Map.Entry<?,?> e = (Map.Entry<?,?>)o;
            K k1 = getKey();
            Object k2 = e.getKey();
            if (k1 == k2 || (k1 != null && k1.equals(k2))) {
                V v1 = getValue();
                Object v2 = e.getValue();
                if (v1 == v2 || (v1 != null && v1.equals(v2)))
                    return true;
            }
            return false;
        }

        public int hashCode() {
            K k = getKey();
            V v = getValue();
            return Objects.hashCode(k) ^ Objects.hashCode(v);
        }

        public String toString() {
            return getKey() + "=" + getValue();
        }
    }
複製代碼

有關使用WeakReference的一個典型案例是ThreadLocal,感興趣的讀者能夠參考我以前寫的博客聊一聊Spring中的線程安全性

LinkedHashMap


LinkedHashMap繼承HashMap並實現了Map接口,同時具備可預測的迭代順序(按照插入順序排序)。它與HashMap的不一樣之處在於,維護了一條貫穿其所有Entry的雙向鏈表(由於額外維護了鏈表的關係,性能上要略差於HashMap,不過集合視圖的遍歷時間與元素數量成正比,而HashMap是與buckets數組的長度成正比的),能夠認爲它是散列表與鏈表的結合。

/** * The head (eldest) of the doubly linked list. */
    transient LinkedHashMap.Entry<K,V> head;

    /** * The tail (youngest) of the doubly linked list. */
    transient LinkedHashMap.Entry<K,V> tail;

    /** * 迭代順序模式的標記位,若是爲true,採用訪問排序,不然,採用插入順序 * 默認插入順序(構造函數中默認設置爲false) */
    final boolean accessOrder;

    /** * Constructs an empty insertion-ordered <tt>LinkedHashMap</tt> instance * with the default initial capacity (16) and load factor (0.75). */
    public LinkedHashMap() {
        super();
        accessOrder = false;
    }
複製代碼

LinkedHashMap的Entry實現也繼承自HashMap,只不過多了指向先後的兩個指針。

/** * HashMap.Node subclass for normal LinkedHashMap entries. */
    static class Entry<K,V> extends HashMap.Node<K,V> {
        Entry<K,V> before, after;
        Entry(int hash, K key, V value, Node<K,V> next) {
            super(hash, key, value, next);
        }
    }
複製代碼

你也能夠經過構造函數來構造一個迭代順序爲訪問順序(accessOrder設爲true)的LinkedHashMap,這個訪問順序指的是按照最近被訪問的Entry的順序進行排序(從最近最少訪問到最近最多訪問)。基於這點能夠簡單實現一個採用LRU(Least Recently Used)策略的緩存。

public LinkedHashMap(int initialCapacity, float loadFactor, boolean accessOrder) {
        super(initialCapacity, loadFactor);
        this.accessOrder = accessOrder;
    }
複製代碼

LinkedHashMap複用了HashMap的大部分代碼,因此它的查找實現是很是簡單的,惟一稍微複雜點的操做是保證訪問順序。

public V get(Object key) {
        Node<K,V> e;
        if ((e = getNode(hash(key), key)) == null)
            return null;
        if (accessOrder)
            afterNodeAccess(e);
        return e.value;
    }
複製代碼

還記得這些afterNodeXXXX命名格式的函數嗎?咱們以前已經在HashMap中見識過了,這些函數在HashMap中只是一個空實現,是專門用來讓LinkedHashMap重寫實現的hook函數。

// 在HashMap.removeNode()的末尾處調用
    // 將e從LinkedHashMap的雙向鏈表中刪除
    void afterNodeRemoval(Node<K,V> e) { // unlink
        LinkedHashMap.Entry<K,V> p =
            (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
        p.before = p.after = null;
        if (b == null)
            head = a;
        else
            b.after = a;
        if (a == null)
            tail = b;
        else
            a.before = b;
    }

    // 在HashMap.putVal()的末尾處調用
    // evict是一個模式標記,若是爲false表明buckets數組處於建立模式
    // HashMap.put()函數對此標記設置爲true
    void afterNodeInsertion(boolean evict) { // possibly remove eldest
        LinkedHashMap.Entry<K,V> first;
        // LinkedHashMap.removeEldestEntry()永遠返回false
        // 避免了最年長元素被刪除的可能(就像一個普通的Map同樣)
        if (evict && (first = head) != null && removeEldestEntry(first)) {
            K key = first.key;
            removeNode(hash(key), key, null, false, true);
        }
    }

    // HashMap.get()沒有調用此函數,因此LinkedHashMap重寫了get()
	// get()與put()都會調用afterNodeAccess()來保證訪問順序
    // 將e移動到tail,表明最近訪問到的節點
    void afterNodeAccess(Node<K,V> e) { // move node to last
        LinkedHashMap.Entry<K,V> last;
        if (accessOrder && (last = tail) != e) {
            LinkedHashMap.Entry<K,V> p =
                (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
            p.after = null;
            if (b == null)
                head = a;
            else
                b.after = a;
            if (a != null)
                a.before = b;
            else
                last = b;
            if (last == null)
                head = p;
            else {
                p.before = last;
                last.after = p;
            }
            tail = p;
            ++modCount;
        }
    }
複製代碼

注意removeEldestEntry()默認永遠返回false,這時它的行爲與普通的Map無異。若是你把removeEldestEntry()重寫爲永遠返回true,那麼就有可能使LinkedHashMap處於一個永遠爲空的狀態(每次put()或者putAll()都會刪除頭節點)。

一個比較合理的實現示例:

protected boolean removeEldestEntry(Map.Entry eldest){
    return size() > MAX_SIZE;
}
複製代碼

LinkedHashMap重寫了newNode()等函數,以初始化或鏈接節點到它內部的雙向鏈表:

// 連接節點p到鏈表尾部(或初始化鏈表)
    private void linkNodeLast(LinkedHashMap.Entry<K,V> p) {
        LinkedHashMap.Entry<K,V> last = tail;
        tail = p;
        if (last == null)
            head = p;
        else {
            p.before = last;
            last.after = p;
        }
    }

    // 用dst替換掉src
    private void transferLinks(LinkedHashMap.Entry<K,V> src, LinkedHashMap.Entry<K,V> dst) {
        LinkedHashMap.Entry<K,V> b = dst.before = src.before;
        LinkedHashMap.Entry<K,V> a = dst.after = src.after;
        // src是頭節點
        if (b == null)
            head = dst;
        else
            b.after = dst;
        // src是尾節點
        if (a == null)
            tail = dst;
        else
            a.before = dst;
    }   

    Node<K,V> newNode(int hash, K key, V value, Node<K,V> e) {
        LinkedHashMap.Entry<K,V> p =
            new LinkedHashMap.Entry<K,V>(hash, key, value, e);
        linkNodeLast(p);
        return p;
    }

    Node<K,V> replacementNode(Node<K,V> p, Node<K,V> next) {
        LinkedHashMap.Entry<K,V> q = (LinkedHashMap.Entry<K,V>)p;
        LinkedHashMap.Entry<K,V> t =
            new LinkedHashMap.Entry<K,V>(q.hash, q.key, q.value, next);
        transferLinks(q, t);
        return t;
    }

    TreeNode<K,V> newTreeNode(int hash, K key, V value, Node<K,V> next) {
        TreeNode<K,V> p = new TreeNode<K,V>(hash, key, value, next);
        linkNodeLast(p);
        return p;
    }

    TreeNode<K,V> replacementTreeNode(Node<K,V> p, Node<K,V> next) {
        LinkedHashMap.Entry<K,V> q = (LinkedHashMap.Entry<K,V>)p;
        TreeNode<K,V> t = new TreeNode<K,V>(q.hash, q.key, q.value, next);
        transferLinks(q, t);
        return t;
    }
複製代碼

遍歷LinkedHashMap所須要的時間與Entry數量成正比,這是由於迭代器直接對雙向鏈表進行迭代,而鏈表中只會含有Entry節點。迭代的順序是從頭節點開始一直到尾節點,插入操做會將新節點連接到尾部,因此保證了插入順序,而訪問順序會經過afterNodeAccess()來保證,訪問次數越多的節點越接近尾部。

abstract class LinkedHashIterator {
        LinkedHashMap.Entry<K,V> next;
        LinkedHashMap.Entry<K,V> current;
        int expectedModCount;

        LinkedHashIterator() {
            next = head;
            expectedModCount = modCount;
            current = null;
        }

        public final boolean hasNext() {
            return next != null;
        }

        final LinkedHashMap.Entry<K,V> nextNode() {
            LinkedHashMap.Entry<K,V> e = next;
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
            if (e == null)
                throw new NoSuchElementException();
            current = e;
            next = e.after;
            return e;
        }

        public final void remove() {
            Node<K,V> p = current;
            if (p == null)
                throw new IllegalStateException();
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
            current = null;
            K key = p.key;
            removeNode(hash(key), key, null, false, false);
            expectedModCount = modCount;
        }
    }

    final class LinkedKeyIterator extends LinkedHashIterator implements Iterator<K> {
        public final K next() { return nextNode().getKey(); }
    }

    final class LinkedValueIterator extends LinkedHashIterator implements Iterator<V> {
        public final V next() { return nextNode().value; }
    }

    final class LinkedEntryIterator extends LinkedHashIterator implements Iterator<Map.Entry<K,V>> {
        public final Map.Entry<K,V> next() { return nextNode(); }
    }
複製代碼

ConcurrentHashMap


咱們上述所講的Map都是非線程安全的,這意味着不該該在多個線程中對這些Map進行修改操做,輕則會產生數據不一致的問題,甚至還會由於併發插入元素而致使鏈表成環(插入會觸發擴容,而擴容操做須要將原數組中的元素rehash到新數組,這時併發操做就有可能產生鏈表的循環引用從而成環),這樣在查找時就會發生死循環,影響到整個應用程序。

Collections.synchronizedMap(Map<K,V> m)能夠將一個Map轉換成線程安全的實現,其實也就是經過一個包裝類,而後把全部功能都委託給傳入的Map實現,並且包裝類是基於synchronized關鍵字來保證線程安全的(時代的眼淚Hashtable也是基於synchronized關鍵字),底層使用的是互斥鎖(同一時間內只能由持有鎖的線程訪問,其餘競爭線程進入睡眠狀態),性能與吞吐量差強人意。

public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
        return new SynchronizedMap<>(m);
    }
    
    private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable {
        private static final long serialVersionUID = 1978198479659022715L;

        private final Map<K,V> m;     // Backing Map
        final Object      mutex;        // Object on which to synchronize

        SynchronizedMap(Map<K,V> m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }

        SynchronizedMap(Map<K,V> m, Object mutex) {
            this.m = m;
            this.mutex = mutex;
        }

        public int size() {
            synchronized (mutex) {return m.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return m.isEmpty();}
        }

        ............
    }
複製代碼

然而ConcurrentHashMap的實現細節遠沒有這麼簡單,所以性能也要高上許多。它沒有使用一個全局鎖來鎖住本身,而是採用了減小鎖粒度的方法,儘可能減小由於競爭鎖而致使的阻塞與衝突,並且ConcurrentHashMap的檢索操做是不須要鎖的。

在Java 7中,ConcurrentHashMap把內部細分紅了若干個小的HashMap,稱之爲段(Segment),默認被分爲16個段。對於一個寫操做而言,會先根據hash code進行尋址,得出該Entry應被存放在哪個Segment,而後只要對該Segment加鎖便可。

理想狀況下,一個默認的ConcurrentHashMap能夠同時接受16個線程進行寫操做(若是都是對不一樣Segment進行操做的話)。

分段鎖對於size()這樣的全局操做來講就沒有任何做用了,想要得出Entry的數量就須要遍歷全部Segment,得到全部的鎖,而後再統計總數。事實上,ConcurrentHashMap會先試圖使用無鎖的方式統計總數,這個嘗試會進行3次,若是在相鄰的2次計算中得到的Segment的modCount次數一致,表明這兩次計算過程當中都沒有發生過修改操做,那麼就能夠當作最終結果返回,不然,就要得到全部Segment的鎖,從新計算size。

本文主要討論的是Java 8的ConcurrentHashMap,它與Java 7的實現差異較大。徹底放棄了段的設計,而是變回與HashMap類似的設計,使用buckets數組與分離連接法(一樣會在超過閾值時樹化,對於構造紅黑樹的邏輯與HashMap差異不大,只不過須要額外使用CAS來保證線程安全),鎖的粒度也被細分到每一個數組元素(我的認爲這樣作的緣由是由於HashMap在Java 8中也實現了很多優化,即便碰撞嚴重,也能保證必定的性能,並且Segment不只臃腫還有弱一致性的問題存在),因此它的併發級別與數組長度相關(Java 7則是與段數相關)。

/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */
    transient volatile Node<K,V>[] table;
複製代碼

尋址


ConcurrentHashMap的散列函數與HashMap並無什麼區別,一樣是把key的hash code的高16位與低16位進行異或運算(由於ConcurrentHashMap的buckets數組長度也永遠是一個2的N次方),而後將擾亂後的hash code與數組的長度減一(實際可訪問到的最大索引)進行與運算,得出的結果便是目標所在的位置。

// 2^31 - 1,int類型的最大值
    // 該掩碼錶示節點hash的可用位,用來保證hash永遠爲一個正整數
    static final int HASH_BITS = 0x7fffffff;

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }
複製代碼

下面是查找操做的源碼,實現比較簡單。

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                // 先嚐試判斷鏈表頭是否爲目標,若是是就直接返回
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                // eh < 0表明這是一個特殊節點(TreeBin或ForwardingNode)
                // 因此直接調用find()進行遍歷查找
                return (p = e.find(h, key)) != null ? p.val : null;
            // 遍歷鏈表
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
複製代碼

一個普通的節點(鏈表節點)的hash不可能小於0(已經在spread()函數中修正過了),因此小於0的只多是一個特殊節點,它不能用while循環中遍歷鏈表的方式來進行遍歷。

TreeBin是紅黑樹的頭部節點(紅黑樹的節點爲TreeNode),它自己不含有key與value,而是指向一個TreeNode節點的鏈表與它們的根節點,同時使用CAS(ConcurrentHashMap並非徹底基於互斥鎖實現的,而是與CAS這種樂觀策略搭配使用,以提升性能)實現了一個讀寫鎖,迫使Writer(持有這個鎖)在樹重構操做以前等待Reader完成。

ForwardingNode是一個在數據轉移過程(由擴容引發)中使用的臨時節點,它會被插入到頭部。它與TreeBin(和TreeNode)都是Node類的子類。

爲了判斷出哪些是特殊節點,TreeBin和ForwardingNode的hash域都只是一個虛擬值:

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        ......

        /** * Virtualized support for map.get(); overridden in subclasses. */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

    /* * Encodings for Node hash fields. See above for explanation. */
    static final int MOVED     = -1; // hash for forwarding nodes
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations 

    static final class TreeBin<K,V> extends Node<K,V> {
        ....

        TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            ....
        }   
        
        ....     
    }

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

        .....
    }        
複製代碼

可見性


咱們在get()函數中並無發現任何與鎖相關的代碼,那麼它是怎麼保證線程安全的呢?一個操做ConcurrentHashMap.get("a"),它的步驟基本分爲如下幾步:

  • 根據散列函數計算出的索引訪問table。

  • 從table中取出頭節點。

  • 遍歷頭節點直到找到目標節點。

  • 從目標節點中取出value並返回。

因此只要保證訪問table與節點的操做老是可以返回最新的數據就能夠了。ConcurrentHashMap並無採用鎖的方式,而是經過volatile關鍵字來保證它們的可見性。在上文貼出的代碼中能夠發現,table、Node.val和Node.next都是被volatile關鍵字所修飾的。

volatile關鍵字保證了多線程環境下變量的可見性與有序性,底層實現基於內存屏障(Memory Barrier)。

爲了優化性能,現代CPU工做時的指令執行順序與應用程序的代碼順序實際上是不一致的(有些編譯器也會進行這種優化),也就是所謂的亂序執行技術。亂序執行能夠提升CPU流水線的工做效率,只要保證數據符合程序邏輯上的正確性便可(遵循happens-before原則)。不過現在是多核時代,若是隨便亂序而不提供防禦措施那是會出問題的。每個cpu上都會進行亂序優化,單cpu所保證的邏輯次序可能會被其餘cpu所破壞。

內存屏障就是針對此狀況的防禦措施。能夠認爲它是一個同步點(但它自己也是一條cpu指令)。例如在IA32指令集架構中引入的SFENCE指令,在該指令以前的全部寫操做必須所有完成,讀操做仍能夠亂序執行。LFENCE指令則保證以前的全部讀操做必須所有完成,另外還有粒度更粗的MFENCE指令保證以前的全部讀寫操做都必須所有完成。

內存屏障就像是一個保護指令順序的柵欄,保護後面的指令不被前面的指令跨越。將內存屏障插入到寫操做與讀操做之間,就能夠保證以後的讀操做能夠訪問到最新的數據,由於屏障前的寫操做已經把數據寫回到內存(根據緩存一致性協議,不會直接寫回到內存,而是改變該cpu私有緩存中的狀態,而後通知給其餘cpu這個緩存行已經被修改過了,以後另外一個cpu在讀操做時就能夠發現該緩存行已是無效的了,這時它會從其餘cpu中讀取最新的緩存行,而後以前的cpu纔會更改狀態並寫回到內存)。

例如,讀一個被volatile修飾的變量V老是可以從JMM(Java Memory Model)主內存中得到最新的數據。由於內存屏障的緣由,每次在使用變量V(經過JVM指令use,後面說的也都是JVM中的指令而不是cpu)以前都必須先執行load指令(把從主內存中獲得的數據放入到工做內存),根據JVM的規定,load指令必須發生在read指令(從主內存中讀取數據)以後,因此每次訪問變量V都會先從主內存中讀取。相對的,寫操做也由於內存屏障保證的指令順序,每次都會直接寫回到主內存。

不過volatile關鍵字並不能保證操做的原子性,對該變量進行併發的連續操做是非線程安全的,所幸ConcurrentHashMap只是用來確保訪問到的變量是最新的,因此也不會發生什麼問題。

出於性能考慮,Doug Lea(java.util.concurrent包的做者)直接經過Unsafe類來對table進行操做。

Java號稱是安全的編程語言,而保證安全的代價就是犧牲程序員自由操控內存的能力。像在C/C++中能夠經過操做指針變量達到操做內存的目的(其實操做的是虛擬地址),但這種靈活性在新手手中也常常會帶來一些愚蠢的錯誤,好比內存訪問越界。

Unsafe從字面意思能夠看出是不安全的,它包含了許多本地方法(在JVM平臺上運行的其餘語言編寫的程序,主要爲C/C++,由JNI實現),這些方法支持了對指針的操做,因此它才被稱爲是不安全的。雖然不安全,但畢竟是由C/C++實現的,像一些與操做系統交互的操做確定是快過Java的,畢竟Java與操做系統之間還隔了一層抽象(JVM),不過代價就是失去了JVM所帶來的多平臺可移植性(本質上也只是一個c/cpp文件,若是換了平臺那就要從新編譯)。

對table進行操做的函數有如下三個,都使用到了Unsafe(在java.util.concurrent包隨處可見):

@SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        // 從tab數組中獲取一個引用,遵循Volatile語義
        // 參數2是一個在tab中的偏移量,用來尋找目標對象
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        // 經過CAS操做將tab數組中位於參數2偏移量位置的值替換爲v
        // c是指望值,若是指望值與實際值不符,返回false
        // 不然,v會成功地被設置到目標位置,返回true
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        // 設置tab數組中位於參數2偏移量位置的值,遵循Volatile語義
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
複製代碼

若是對Unsafe感興趣,能夠參考這篇文章:Java Magic. Part 4: sun.misc.Unsafe

初始化


ConcurrentHashMap與HashMap同樣是Lazy的,buckets數組會在第一次訪問put()函數時進行初始化,它的默認構造函數甚至是個空函數。

/** * Creates a new, empty map with the default initial table size (16). */
    public ConcurrentHashMap() {
    }
複製代碼

可是有一點須要注意,ConcurrentHashMap是工做在多線程併發環境下的,若是有多個線程同時調用了put()函數該怎麼辦?這會致使重複初始化,因此必需要有對應的防禦措施。

ConcurrentHashMap聲明瞭一個用於控制table的初始化與擴容的實例變量sizeCtl,默認值爲0。當它是一個負數的時候,表明table正處於初始化或者擴容的狀態。-1表示table正在進行初始化,-N則表示當前有N-1個線程正在進行擴容。

在其餘狀況下,若是table還未初始化(table == null),sizeCtl表示table進行初始化的數組大小(因此從構造函數傳入的initialCapacity在通過計算後會被賦給它)。若是table已經初始化過了,則表示下次觸發擴容操做的閾值,算法stzeCtl = n - (n >>> 2),也就是n的75%,與默認負載因子(0.75)的HashMap一致。

private transient volatile int sizeCtl;
複製代碼

初始化table的操做位於函數initTable(),源碼以下:

/** * Initializes table, using the size recorded in sizeCtl. */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // sizeCtl小於0,這意味着已經有其餘線程進行初始化了
            // 因此當前線程讓出CPU時間片
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            // 不然,經過CAS操做嘗試修改sizeCtl
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        // 默認構造函數,sizeCtl = 0,使用默認容量(16)進行初始化
                        // 不然,會根據sizeCtl進行初始化
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 計算閾值,n的75%
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 閾值賦給sizeCtl
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
複製代碼

sizeCtl是一個volatile變量,只要有一個線程CAS操做成功,sizeCtl就會被暫時地修改成-1,這樣其餘線程就可以根據sizeCtl得知table是否已經處於初始化狀態中,最後sizeCtl會被設置成閾值,用於觸發擴容操做。

擴容


ConcurrentHashMap觸發擴容的時機與HashMap相似,要麼是在將鏈表轉換成紅黑樹時判斷table數組的長度是否小於閾值(64),若是小於就進行擴容而不是樹化,要麼就是在添加元素的時候,判斷當前Entry數量是否超過閾值,若是超過就進行擴容。

private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            // 小於MIN_TREEIFY_CAPACITY,進行擴容
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    // 將鏈表轉換成紅黑樹...
                }
            }
        }
    }

    ...

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        ...
        addCount(1L, binCount); // 計數
        return null;
    }

    private final void addCount(long x, int check) {
        // 計數...
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // s(元素個數)大於等於sizeCtl,觸發擴容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                // 擴容標誌位
                int rs = resizeStamp(n);
                // sizeCtl爲負數,表明正有其餘線程進行擴容
                if (sc < 0) {
                    // 擴容已經結束,中斷循環
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 進行擴容,並設置sizeCtl,表示擴容線程 + 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 觸發擴容(第一個進行擴容的線程)
                // 並設置sizeCtl告知其餘線程
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                // 統計個數,用於循環檢測是否還須要擴容
                s = sumCount();
            }
        }
    }
複製代碼

能夠看到有關sizeCtl的操做牽涉到了大量的位運算,咱們先來理解這些位運算的意義。首先是resizeStamp(),該函數返回一個用於數據校驗的標誌位,意思是對長度爲n的table進行擴容。它將n的前導零(最高有效位以前的零的數量)和1 << 15作或運算,這時低16位的最高位爲1,其餘都爲n的前導零。

static final int resizeStamp(int n) {
        // RESIZE_STAMP_BITS = 16
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }
複製代碼

初始化sizeCtl(擴容操做被第一個線程首次進行)的算法爲(rs << RESIZE_STAMP_SHIFT) + 2,首先RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS = 16,那麼rs << 16等於將這個標誌位移動到了高16位,這時最高位爲1,因此sizeCtl此時是個負數,而後加二(至於爲何是2,還記得有關sizeCtl的說明嗎?1表明初始化狀態,因此實際的線程個數是要減去1的)表明當前有一個線程正在進行擴容,

這樣sizeCtl就被分割成了兩部分,高16位是一個對n的數據校驗的標誌位,低16位表示參與擴容操做的線程個數 + 1。

可能會有讀者有所疑惑,更新進行擴容的線程數量的操做爲何是sc + 1而不是sc - 1,這是由於對sizeCtl的操做都是基於位運算的,因此不會關心它自己的數值是多少,只關心它在二進制上的數值,而sc + 1會在低16位上加1。

tryPresize()函數跟addCount()的後半段邏輯相似,不斷地根據sizeCtl判斷當前的狀態,而後選擇對應的策略。

private final void tryPresize(int size) {
        // 對size進行修正
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        // sizeCtl是默認值或正整數
        // 表明table還未初始化
        // 或尚未其餘線程正在進行擴容
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                // 設置sizeCtl,告訴其餘線程,table如今正處於初始化狀態
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            // 計算下次觸發擴容的閾值
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        // 將閾值賦給sizeCtl
                        sizeCtl = sc;
                    }
                }
            }
            // 沒有超過閾值或者大於容量的上限,中斷循環
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            // 進行擴容,與addCount()後半段的邏輯一致
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }
複製代碼

擴容操做的核心在於數據的轉移,在單線程環境下數據的轉移很簡單,無非就是把舊數組中的數據遷移到新的數組。可是這在多線程環境下是行不通的,須要保證線程安全性,在擴容的時候其餘線程也可能正在添加元素,這時又觸發了擴容怎麼辦?有人可能會說,這不難啊,用一個互斥鎖把數據轉移操做的過程鎖住不就行了?這確實是一種可行的解決方法,但一樣也會帶來極差的吞吐量。

互斥鎖會致使全部訪問臨界區的線程陷入阻塞狀態,這會消耗額外的系統資源,內核須要保存這些線程的上下文並放到阻塞隊列,持有鎖的線程耗時越長,其餘競爭線程就會一直被阻塞,所以吞吐量低下,致使響應時間緩慢。並且鎖老是會伴隨着死鎖問題,一旦發生死鎖,整個應用程序都會所以受到影響,因此加鎖永遠是最後的備選方案。

Doug Lea沒有選擇直接加鎖,而是基於CAS實現無鎖的併發同步策略,使人佩服的是他不只沒有把其餘線程拒之門外,甚至還邀請它們一塊兒來協助工做。

那麼如何才能讓多個線程協同工做呢?Doug Lea把整個table數組當作多個線程之間共享的任務隊列,而後只需維護一個指針,當有一個線程開始進行數據轉移,就會先移動指針,表示指針劃過的這片bucket區域由該線程負責。

這個指針被聲明爲一個volatile整型變量,它的初始位置位於table的尾部,即它等於table.length,很明顯這個任務隊列是逆向遍歷的。

/** * The next table index (plus one) to split while resizing. */
    private transient volatile int transferIndex;

    /** * 一個線程須要負責的最小bucket數 */
    private static final int MIN_TRANSFER_STRIDE = 16;
	
    /** * The next table to use; non-null only while resizing. */
    private transient volatile Node<K,V>[] nextTable;	
複製代碼

一個已經遷移完畢的bucket會被替換成ForwardingNode節點,用來標記此bucket已經被其餘線程遷移完畢了。咱們以前提到過ForwardingNode,它是一個特殊節點,能夠經過hash域的虛擬值來識別它,它一樣重寫了find()函數,用來在新數組中查找目標。

數據遷移的操做位於transfer()函數,多個線程之間依靠sizeCtl與transferIndex指針來協同工做,每一個線程都有本身負責的區域,一個完成遷移的bucket會被設置爲ForwardingNode,其餘線程碰見這個特殊節點就跳過該bucket,處理下一個bucket。

transfer()函數能夠大體分爲三部分,第一部分對後續須要使用的變量進行初始化:

/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // 根據當前機器的CPU數量來決定每一個線程負責的bucket數
        // 避免由於擴容線程過多,反而影響到性能
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // 初始化nextTab,容量爲舊數組的一倍
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n; // 初始化指針
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
複製代碼

第二部分爲當前線程分配任務和控制當前線程的任務進度,這部分是transfer()的核心邏輯,描述瞭如何與其餘線程協同工做:

// i指向當前bucket,bound表示當前線程所負責的bucket區域的邊界
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 這個循環使用CAS不斷嘗試爲當前線程分配任務
            // 直到分配成功或任務隊列已經被所有分配完畢
            // 若是當前線程已經被分配過bucket區域
            // 那麼會經過--i指向下一個待處理bucket而後退出該循環
            while (advance) {
                int nextIndex, nextBound;
                // --i表示將i指向下一個待處理的bucket
                // 若是--i >= bound,表明當前線程已經分配過bucket區域
                // 而且還留有未處理的bucket
                if (--i >= bound || finishing)
                    advance = false;
                // transferIndex指針 <= 0 表示全部bucket已經被分配完畢
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 移動transferIndex指針
                // 爲當前線程設置所負責的bucket區域的範圍
                // i指向該範圍的第一個bucket,注意i是逆向遍歷的
                // 這個範圍爲(bound, i),i是該區域最後一個bucket,遍歷順序是逆向的
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // 當前線程已經處理完了所負責的全部bucket
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 若是任務隊列已經所有完成
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    // 設置新的閾值
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 工做中的擴容線程數量減1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    // (resizeStamp << RESIZE_STAMP_SHIFT) + 2表明當前有一個擴容線程
                    // 相對的,(sc - 2) != resizeStamp << RESIZE_STAMP_SHIFT
                    // 表示當前還有其餘線程正在進行擴容,因此直接返回
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    // 不然,當前線程就是最後一個進行擴容的線程
                    // 設置finishing標識
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 若是待處理bucket是空的
            // 那麼插入ForwardingNode,以通知其餘線程
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 若是待處理bucket的頭節點是ForwardingNode
            // 說明此bucket已經被處理過了,跳過該bucket
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
複製代碼

最後一部分是具體的遷移過程(對當前指向的bucket),這部分的邏輯與HashMap相似,拿舊數組的容量當作一個掩碼,而後與節點的hash進行與操做,能夠得出該節點的新增有效位,若是新增有效位爲0就放入一個鏈表A,若是爲1就放入另外一個鏈表B,鏈表A在新數組中的位置不變(跟在舊數組的索引一致),鏈表B在新數組中的位置爲原索引加上舊數組容量。

這個方法減小了rehash的計算量,並且還能達到均勻分佈的目的,若是不能理解請去看本文中HashMap擴容操做的解釋。

else {
                // 對於節點的操做仍是要加上鎖的
                // 不過這個鎖的粒度很小,只鎖住了bucket的頭節點
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // hash code不爲負,表明這是條鏈表
                        if (fh >= 0) {
                            // fh & n 得到hash code的新增有效位,用於將鏈表分離成兩類
                            // 要麼是0要麼是1,關於這個位運算的更多細節
                            // 請看本文中有關HashMap擴容操做的解釋
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            // 這個循環用於記錄最後一段連續的同一類節點
                            // 這個類別是經過fh & n來區分的
                            // 這段連續的同類節點直接被複用,不會產生額外的複製
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            // 0被放入ln鏈表,1被放入hn鏈表
                            // lastRun是連續同類節點的起始節點
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            // 將最後一段的連續同類節點以前的節點按類別複製到ln或hn
                            // 鏈表的插入方向是往頭部插入的,Node構造函數的第四個參數是next
                            // 因此就算遇到類別與lastRun一致的節點也只會被插入到頭部
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // ln鏈表被放入到原索引位置,hn放入到原索引 + 舊數組容量
                            // 這一點與HashMap一致,若是看不懂請去參考本文對HashMap擴容的講解
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd); // 標記該bucket已被處理
                            advance = true;
                        }
                        // 對紅黑樹的操做,邏輯與鏈表同樣,按新增有效位進行分類
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 元素數量沒有超過UNTREEIFY_THRESHOLD,退化成鏈表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
複製代碼

計數


在Java 7中ConcurrentHashMap對每一個Segment單獨計數,想要獲得總數就須要得到全部Segment的鎖,而後進行統計。因爲Java 8拋棄了Segment,顯然是不能再這樣作了,並且這種方法雖然簡單準確但也捨棄了性能。

Java 8聲明瞭一個volatile變量baseCount用於記錄元素的個數,對這個變量的修改操做是基於CAS的,每當插入元素或刪除元素時都會調用addCount()函數進行計數。

private transient volatile long baseCount;

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 嘗試使用CAS更新baseCount失敗
        // 轉用CounterCells進行更新
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 在CounterCells未初始化
            // 或嘗試經過CAS更新當前線程的CounterCell失敗時
            // 調用fullAddCount(),該函數負責初始化CounterCells和更新計數
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            // 統計總數
            s = sumCount();
        }
        if (check >= 0) {
        	// 判斷是否須要擴容,在上文中已經講過了
        }
    }    
複製代碼

counterCells是一個元素爲CounterCell的數組,該數組的大小與當前機器的CPU數量有關,而且它不會被主動初始化,只有在調用fullAddCount()函數時纔會進行初始化。

CounterCell是一個簡單的內部靜態類,每一個CounterCell都是一個用於記錄數量的單元:

/** * Table of counter cells. When non-null, size is a power of 2. */
    private transient volatile CounterCell[] counterCells;

    /** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }
複製代碼

註解@sun.misc.Contended用於解決僞共享問題。所謂僞共享,便是在同一緩存行(CPU緩存的基本單位)中存儲了多個變量,當其中一個變量被修改時,就會影響到同一緩存行內的其餘變量,致使它們也要跟着被標記爲失效,其餘變量的緩存命中率將會受到影響。解決僞共享問題的方法通常是對該變量填充一些無心義的佔位數據,從而使它獨享一個緩存行。

ConcurrentHashMap的計數設計與LongAdder相似。在一個低併發的狀況下,就只是簡單地使用CAS操做來對baseCount進行更新,但只要這個CAS操做失敗一次,就表明有多個線程正在競爭,那麼就轉而使用CounterCell數組進行計數,數組內的每一個ConuterCell都是一個獨立的計數單元。

每一個線程都會經過ThreadLocalRandom.getProbe() & m尋址找到屬於它的CounterCell,而後進行計數。ThreadLocalRandom是一個線程私有的僞隨機數生成器,每一個線程的probe都是不一樣的(這點基於ThreadLocalRandom的內部實現,它在內部維護了一個probeGenerator,這是一個類型爲AtomicInteger的靜態常量,每當初始化一個ThreadLocalRandom時probeGenerator都會先自增一個常量而後返回的整數即爲當前線程的probe,probe變量被維護在Thread對象中),能夠認爲每一個線程的probe就是它在CounterCell數組中的hash code。

這種方法將競爭數據按照線程的粒度進行分離,相比全部競爭線程對一個共享變量使用CAS不斷嘗試在性能上要效率多了,這也是爲何在高併發環境下LongAdder要優於AtomicInteger的緣由。

fullAddCount()函數根據當前線程的probe尋找對應的CounterCell進行計數,若是CounterCell數組未被初始化,則初始化CounterCell數組和CounterCell。該函數的實現與Striped64類(LongAdder的父類)的longAccumulate()函數是同樣的,把CounterCell數組當成一個散列表,每一個線程的probe就是hash code,散列函數也僅僅是簡單的(n - 1) & probe

CounterCell數組的大小永遠是一個2的n次方,初始容量爲2,每次擴容的新容量都是以前容量乘以二,處於性能考慮,它的最大容量上限是機器的CPU數量。

因此說CounterCell數組的碰撞衝突是很嚴重的,由於它的bucket基數過小了。而發生碰撞就表明着一個CounterCell會被多個線程競爭,爲了解決這個問題,Doug Lea使用無限循環加上CAS來模擬出一個自旋鎖來保證線程安全,自旋鎖的實現基於一個被volatile修飾的整數變量,該變量只會有兩種狀態:0和1,當它被設置爲0時表示沒有加鎖,當它被設置爲1時表示已被其餘線程加鎖。這個自旋鎖用於保護初始化CounterCell、初始化CounterCell數組以及對CounterCell數組進行擴容時的安全。

CounterCell更新計數是依賴於CAS的,每次循環都會嘗試經過CAS進行更新,若是成功就退出無限循環,不然就調用ThreadLocalRandom.advanceProbe()函數爲當前線程更新probe,而後從新開始循環,以指望下一次尋址到的CounterCell沒有被其餘線程競爭。

若是連着兩次CAS更新都沒有成功,那麼會對CounterCell數組進行一次擴容,這個擴容操做只會在當前循環中觸發一次,並且只能在容量小於上限時觸發。

fullAddCount()函數的主要流程以下:

  • 首先檢查當前線程有沒有初始化過ThreadLocalRandom,若是沒有則進行初始化。ThreadLocalRandom負責更新線程的probe,而probe又是在數組中進行尋址的關鍵。

  • 檢查CounterCell數組是否已經初始化,若是已初始化,那麼就根據probe找到對應的CounterCell。

    • 若是這個CounterCell等於null,須要先初始化CounterCell,經過把計數增量傳入構造函數,因此初始化只要成功就說明更新計數已經完成了。初始化的過程須要獲取自旋鎖。

    • 若是不爲null,就按上文所說的邏輯對CounterCell實施更新計數。

  • CounterCell數組未被初始化,嘗試獲取自旋鎖,進行初始化。數組初始化的過程會附帶初始化一個CounterCell來記錄計數增量,因此只要初始化成功就表示更新計數完成。

  • 若是自旋鎖被其餘線程佔用,沒法進行數組的初始化,只好經過CAS更新baseCount。

private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        // 當前線程的probe等於0,證實該線程的ThreadLocalRandom還未被初始化
        // 以及當前線程是第一次進入該函數
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            // 初始化ThreadLocalRandom,當前線程會被設置一個probe
            ThreadLocalRandom.localInit();      // force initialization
            // probe用於在CounterCell數組中尋址
            h = ThreadLocalRandom.getProbe();
            // 未競爭標誌
            wasUncontended = true;
        }
        // 衝突標誌
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            // CounterCell數組已初始化
            if ((as = counterCells) != null && (n = as.length) > 0) {
                // 若是尋址到的Cell爲空,那麼建立一個新的Cell
                if ((a = as[(n - 1) & h]) == null) {
                    // cellsBusy是一個只有0和1兩個狀態的volatile整數
                    // 它被當作一個自旋鎖,0表明無鎖,1表明加鎖
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        // 將傳入的x做爲初始值建立一個新的CounterCell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        // 經過CAS嘗試對自旋鎖加鎖
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            // 加鎖成功,聲明Cell是否建立成功的標誌
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                // 再次檢查CounterCell數組是否不爲空
                                // 而且尋址到的Cell爲空
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    // 將以前建立的新Cell放入數組
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                // 釋放鎖
                                cellsBusy = 0;
                            }
                            // 若是已經建立成功,中斷循環
                            // 由於新Cell的初始值就是傳入的增量,因此計數已經完畢了
                            if (created)
                                break;
                            // 若是未成功
                            // 表明as[(n - 1) & h]這個位置的Cell已經被其餘線程設置
                            // 那麼就從循環頭從新開始
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                // as[(n - 1) & h]非空
                // 在addCount()函數中經過CAS更新當前線程的Cell進行計數失敗
                // 會傳入wasUncontended = false,表明已經有其餘線程進行競爭
                else if (!wasUncontended)       // CAS already known to fail
                    // 設置未競爭標誌,以後會從新計算probe,而後從新執行循環
                    wasUncontended = true;      // Continue after rehash
                // 嘗試進行計數,若是成功,那麼就退出循環
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                // 嘗試更新失敗,檢查counterCell數組是否已經擴容
                // 或者容量達到最大值(CPU的數量)
                else if (counterCells != as || n >= NCPU)
                    // 設置衝突標誌,防止跳入下面的擴容分支
                    // 以後會從新計算probe
                    collide = false;            // At max size or stale
                // 設置衝突標誌,從新執行循環
                // 若是下次循環執行到該分支,而且衝突標誌仍然爲true
                // 那麼會跳過該分支,到下一個分支進行擴容
                else if (!collide)
                    collide = true;
                // 嘗試加鎖,而後對counterCells數組進行擴容
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        // 檢查是否已被擴容
                        if (counterCells == as) {// Expand table unless stale
                            // 新數組容量爲以前的1倍
                            CounterCell[] rs = new CounterCell[n << 1];
                            // 遷移數據到新數組
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        // 釋放鎖
                        cellsBusy = 0;
                    }
                    collide = false;
                    // 從新執行循環
                    continue;                   // Retry with expanded table
                }
                // 爲當前線程從新計算probe
                h = ThreadLocalRandom.advanceProbe(h);
            }
            // CounterCell數組未初始化,嘗試獲取自旋鎖,而後進行初始化
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        // 初始化CounterCell數組,初始容量爲2
                        CounterCell[] rs = new CounterCell[2];
                        // 初始化CounterCell
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                // 初始化CounterCell數組成功,退出循環
                if (init)
                    break;
            }
            // 若是自旋鎖被佔用,則只好嘗試更新baseCount
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }
複製代碼

對於統計總數,只要可以理解CounterCell的思想,就很簡單了。仔細想想,每次計數的更新都會被分攤在baseCount和CounterCell數組中的某一CounterCell,想要得到總數,把它們統計相加就是了。

public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }

     final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }   
複製代碼

其實size()函數返回的總數可能並非百分百精確的,試想若是前一個遍歷過的CounterCell又進行了更新會怎麼樣?儘管只是一個估算值,但在大多數場景下都還能接受,並且性能上是要比Java 7好上太多了。

添加元素


添加元素的主要邏輯與HashMap沒什麼區別,有所區別的複雜操做如擴容和計數咱們上文都已經深刻解析過了,因此總體來講putVal()函數仍是比較簡單的,可能惟一須要注意的就是在對節點進行操做的時候須要經過互斥鎖保證線程安全,這個互斥鎖的粒度很小,只對須要操做的這個bucket加鎖。

public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0; // 節點計數器,用於判斷是否須要樹化
        // 無限循環+CAS,無鎖的標準套路
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 初始化table
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // bucket爲null,經過CAS建立頭節點,若是成功就結束循環
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // bucket爲ForwardingNode
            // 當前線程前去協助進行擴容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 節點是鏈表
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 找到目標,設置value
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 未找到節點,插入新節點到鏈表尾部
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 節點是紅黑樹
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 根據bucket中的節點數決定是否樹化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // oldVal不等於null,說明沒有新節點
                    // 因此直接返回,不進行計數
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 計數
        addCount(1L, binCount);
        return null;
    }
複製代碼

至於刪除元素的操做位於函數replaceNode(Object key, V value, Object cv),當table[key].val等於指望值cv時(或cv等於null),更新節點的值爲value,若是value等於null,那麼刪除該節點。

remove()函數經過調用replaceNode(key, null, null)來達成刪除目標節點的目的,replaceNode()的具體實現與putVal()沒什麼差異,只不過對鏈表的操做有所不一樣而已,因此就很少敘述了。

並行計算


Java 8除了對ConcurrentHashMap從新設計之外,還引入了基於Lambda表達式的Stream API。它是對集合對象功能上的加強(因此不止ConcurrentHashMap,其餘集合也都實現了該API),以一種優雅的方式來批量操做、聚合或遍歷集合中的數據。

最重要的是,它還提供了並行模式,充分利用了多核CPU的優點實現並行計算。讓咱們看看以下的示例代碼:

public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        String keys = "ABCDEFG";
        for (int i = 1; i <= keys.length(); i++) {
            map.put(String.valueOf(keys.charAt(i - 1)), i);
        }

        map.forEach(2,
                (k, v) -> System.out.println("key-" + k + ":value-" + v + ". by thread->" + Thread.currentThread().getName()));
    }
複製代碼

這段代碼經過兩個線程(包括主線程)並行地遍歷map中的元素,而後輸出到控制檯,輸出以下:

key-A:value-1. by thread->main
key-D:value-4. by thread->ForkJoinPool.commonPool-worker-2
key-B:value-2. by thread->main
key-E:value-5. by thread->ForkJoinPool.commonPool-worker-2
key-C:value-3. by thread->main
key-F:value-6. by thread->ForkJoinPool.commonPool-worker-2
key-G:value-7. by thread->ForkJoinPool.commonPool-worker-2
複製代碼

很明顯,有兩個線程在進行工做,那麼這是怎麼實現的呢?咱們先來看看forEach()函數:

public void forEach(long parallelismThreshold, BiConsumer<? super K,? super V> action) {
        if (action == null) throw new NullPointerException();
        new ForEachMappingTask<K,V>
            (null, batchFor(parallelismThreshold), 0, 0, table,
             action).invoke();
    }
複製代碼

parallelismThreshold是須要並行執行該操做的線程數量,action則是回調函數(咱們想要執行的操做)。action的類型爲BiConsumer,是一個用於支持Lambda表達式的FunctionalInterface,它接受兩個輸入參數並返回0個結果。

@FunctionalInterface
public interface BiConsumer<T, U> {

    /** * Performs this operation on the given arguments. * * @param t the first input argument * @param u the second input argument */
    void accept(T t, U u);
複製代碼

看來實現並行計算的關鍵在於ForEachMappingTask對象,經過它的繼承關係結構圖能夠發現,ForEachMappingTask其實就是ForkJoinTask。

集合的並行計算是基於Fork/Join框架實現的,工做線程交由ForkJoinPool線程池維護。它推崇分而治之的思想,將一個大的任務分解成多個小的任務,經過fork()函數(有點像Linux的fork()系統調用來建立子進程)來開啓一個工做線程執行其中一個小任務,經過join()函數等待工做線程執行完畢(須要等全部工做線程執行完畢才能合併最終結果),只要全部的小任務都已經處理完成,就表明這個大的任務也完成了。

像上文中的示例代碼就是將遍歷這個大任務分解成了N個小任務,而後交由兩個工做線程進行處理。

static final class ForEachMappingTask<K,V> extends BulkTask<K,V,Void> {
        final BiConsumer<? super K, ? super V> action;
        ForEachMappingTask
            (BulkTask<K,V,?> p, int b, int i, int f, Node<K,V>[] t,
             BiConsumer<? super K,? super V> action) {
            super(p, b, i, f, t);
            this.action = action;
        }
        public final void compute() {
            final BiConsumer<? super K, ? super V> action;
            if ((action = this.action) != null) {
                for (int i = baseIndex, f, h; batch > 0 &&
                         (h = ((f = baseLimit) + i) >>> 1) > i;) {
                    // 記錄待完成任務的數量
                    addToPendingCount(1);
                    // 開啓一個工做線程執行任務
                    // 其他參數是任務的區間以及table和回調函數
                    new ForEachMappingTask<K,V>
                        (this, batch >>>= 1, baseLimit = h, f, tab,
                         action).fork();
                }
                for (Node<K,V> p; (p = advance()) != null; )
                    // 調用回調函數
                    action.accept(p.key, p.val);
                // 與addToPendingCount()相反
                // 它會減小待完成任務的計數器
                // 若是該計數器爲0,表明全部任務已經完成了
                propagateCompletion();
            }
        }
    }
複製代碼

其餘並行計算函數的實現也都差很少,只不過具體的Task實現不一樣,例如search()

public <U> U search(long parallelismThreshold, BiFunction<? super K, ? super V, ? extends U> searchFunction) {
        if (searchFunction == null) throw new NullPointerException();
        return new SearchMappingsTask<K,V,U>
            (null, batchFor(parallelismThreshold), 0, 0, table,
             searchFunction, new AtomicReference<U>()).invoke();
    }
複製代碼

爲了節省篇幅(說實話如今彷佛不多有人能耐心看完一篇長文_(:з」∠)_),有關Stream API是如何使用Fork/Join框架進行工做以及實現細節就很少講了,之後有機會再說吧。

參考文獻


相關文章
相關標籤/搜索