Java容器(一)—CurrentHashMap詳解(JDK1.8)

摘要

在涉及到Java多線程開發時,若是咱們使用HashMap可能會致使死鎖問題,使用HashTable效率又不高。而ConcurrentHashMap既能夠保持同步也能夠提升併發效率,因此這個時候ConcurrentHashmap是咱們最好的選擇。java

爲何使用ConcurrentHashMap

  • 在多線程環境中使用HashMap的put方法有可能致使程序死循環,由於多線程可能會致使HashMap造成環形鏈表,即鏈表的一個節點的next節點永不爲null,就會產生死循環。這時,CPU的利用率接近100%,因此併發狀況下不能使用HashMap。node

  • HashTable經過使用synchronized保證線程安全,但在線程競爭激烈的狀況下效率低下。由於當一個線程訪問HashTable的同步方法時,其餘線程只能阻塞等待佔用線程操做完畢。算法

  • ConcurrentHashMap使用分段鎖的思想,對於不一樣的數據段使用不一樣的鎖,能夠支持多個線程同時訪問不一樣的數據段,這樣線程之間就不存在鎖競爭,從而提升了併發效率。數組

簡介

在閱讀ConcurrentHashMap的源碼時,有一段相關描述。安全

The primary design goal of this hash table is to maintain concurrent readability(typically method get(), but also iterators and related methods) while minimizing update contention. Secondary goals are to keep space consumption about the same or better than java.util.HashMap, and to support high initial insertion rates on an empty table by many threads.bash

大體意思就是: ConcurrentHashMap的主要設計目的是保持併發的可讀性(一般是指的get()方法的使用,同時也包括迭代器和相關方法),同時最小化更新徵用(即在進行插入操做或者擴容時也能夠保持其餘數據段的訪問)。第二個目標就是在空間利用方面保持與HashMap一致或者更好,而且支持多線程在空表的初始插入速率。數據結構

Java7與Java8中的ConcurrentHashMap:

在ConcurrentHashMap中主要經過鎖分段技術實現上述目標。多線程

在Java7中,ConcurrentHashMap由Segment數組結構和HashEntry數組組成。Segment是一種可重入鎖,是一種數組和鏈表的結構,一個Segment中包含一個HashEntry數組,每一個HashEntry又是一個鏈表結構。正是經過Segment分段鎖,ConcurrentHashMap實現了高效率的併發。併發

在Java8中,ConcurrentHashMap去除了Segment分段鎖的數據結構,主要是基於CAS操做保證保證數據的獲取以及使用synchronized關鍵字對相應數據段加鎖實現了主要功能,這進一步提升了併發性。同時同時爲了提升哈希碰撞下的尋址性能,Java 8在鏈表長度超過必定閾值(8)時將鏈表(尋址時間複雜度爲O(N))轉換爲紅黑樹(尋址時間複雜度爲O(long(N)))。app

Java8中ConcurrentHashMap的結構

在Java8中,ConcurrentHashMap棄用了Segment類,可是保留了Segment屬性,用於序列化。目前ConcurrentHashMap採用Node類做爲基本的存儲單元,每一個鍵值對(key-value)都存儲在一個Node中。同時Node也有一些子類,TreeNodes用於樹結構中(當鏈表長度大於8時轉化爲紅黑樹);TreeBins用於維護TreeNodes。當鏈表轉樹時,用於封裝TreeNode。也就是說,ConcurrentHashMap的紅黑樹存放的是TreeBin,而不是treeNode;ForwordingNodes是一個重要的結構,它用於ConcurrentHashMap擴容時,是一個標誌節點,內部有一個指向nextTable的屬性,同時也提供了查找的方法;

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;  //使用了volatile屬性
        volatile Node<K,V> next;  //使用了volatile屬性

        Node(int hash, K key, V val) {
            this.hash = hash;
            this.key = key;
            this.val = val;
        }
        Node(int hash, K key, V val, Node<K,V> next) {
            this(hash, key, val);
            this.next = next;
        }
        public final K getKey()     { return key; }
        public final V getValue()   { return val; }
        public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
        public final String toString() {...}
        public final V setValue(V value) {...}

        public final boolean equals(Object o) {...}

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {...}
    }
複製代碼

處理Node以外,Node的一個子類ForwardingNodes也是一個重要的結構,它主要做爲一個標記,在處理併發時起着關鍵做用,有了ForwardingNodes,也是ConcurrentHashMap有了分段的特性,提升了併發效率。

/**
     * A node inserted at head of bins during transfer operations.
     */
    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
	        //hash值默認爲MOVED(-1)
            super(MOVED, null, null);
            this.nextTable = tab;
        }

        Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            //在nextTable中查找,nextTable能夠看作是當前hash表的一個副本
            outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else
                            return e.find(h, k);
                    }
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }
    }
複製代碼

ConcurrentHashMap中的原子操做

在ConcurrentHashMap中經過原子操做查找元素、替換元素和設置元素。這些原子操做起着很是關鍵的做用,你能夠在全部ConcurrentHashMap的基本功能中看到它們的身影。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
    }
複製代碼

ConcurrentHashMap的功能實現

1.ConcurrentHashMap初始化

在介紹初始化以前先介紹一個重要的參數sizeCtl,含義以下:

/**
     * Table initialization and resizing control.  When negative,the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads). Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     * Hash表的初始化和調整大小的控制標誌。爲負數,Hash表正在初始化或者擴容;
     * (-1表示正在初始化,-N表示有N-1個線程在進行擴容)
     * 不然,當表爲null時,保存建立時使用的初始化大小或者默認0;
     * 初始化之後保存下一個調整大小的尺寸。
     */
    private transient volatile int sizeCtl;
複製代碼

這個參數起到一個控制標誌的做用,在ConcurrentHashMap初始化和擴容都有用到。 ConcurrentHashMap構造函數只是設置了一些參數,並無對Hash表進行初始化。當在從插入元素時,纔會初始化Hash表。在開始初始化的時候,首先判斷sizeCtl的值,若是sizeCtl < 0,說明有線程在初始化,當前線程便放棄初始化操做。不然,將SIZECTL設置爲-1,Hash表進行初始化。初始化成功之後,將sizeCtl的值設置爲當前的容量值。

/**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
	        //sizeCtl小於0,正在初始化
            if ((sc = sizeCtl) < 0)
                //調用yield()函數,使線程讓出CPU資源
                Thread.yield(); // lost initialization race; just spin
            //設置SIZECTL爲-1,表示正在初始化
            else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2); //n-(1/4)n,即默認的容量(n * loadFactor)
                    }
                } finally {
                    sizeCtl = sc; //從新設置sizeCtl
                }
                break;
            }
        }
        return tab;
    }
複製代碼

2.肯定元素在Hash表的索引

經過hash算法能夠將元素分散到哈希桶中。在ConcurrentHashMap中經過以下方法肯定數組索引: 第一步:

static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS; 
    }
複製代碼

第二步:(length-1) & (h ^ (h >>> 16)) & HASH_BITS);

ConcurrentHashMap的put方法

  • 若是key或者value爲null,則拋出空指針異常;

  • 若是table爲null或者table的長度爲0,則初始化table,調用initTable()方法。

  • 計算當前鍵值的索引位置,若是Hash表中當前節點爲null,則將元素直接插入。(注意,這裏使用的就是前面鎖說的CAS操做)

  • 若是當前位置的節點元素的hash值爲-1,說明這是一個ForwaringNodes節點,即正在進行擴容。那麼當前線程加入擴容。

  • 當前節點不爲null,對當前節點加鎖,將元素插入到當前節點。在Java8中,當節點長度大於8時,就將節點轉爲樹的結構。

/** Implementation for put and putIfAbsent */
	final V putVal(K key, V value, boolean onlyIfAbsent) {
		//數據不合法,拋出異常
        if (key == null || value == null) throw new NullPointerException();
        //計算索引的第一步,傳入鍵值的hash值
        int hash = spread(key.hashCode());
        int binCount = 0; //保存當前節點的長度
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); //初始化Hash表
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
	            //利用CAS操做將元素插入到Hash表中
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;  // no lock when adding to empty bin(插入null的節點,無需加鎖)
            }
            else if ((fh = f.hash) == MOVED) //f.hash == -1 
	            //正在擴容,當前線程加入擴容
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent && fh == hash &&  // check first node
                     ((fk = f.key) == key || fk != null && key.equals(fk)) &&
                     (fv = f.val) != null)
                return fv;
            else {
                V oldVal = null;
                //當前節點加鎖
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //插入的元素鍵值的hash值有節點中元素的hash值相同,替換當前元素的值
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
	                                    //替換當前元素的值
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //沒有相同的值,直接插入到節點中
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value);
                                    break;
                                }
                            }
                        }
                        //節點爲樹
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
	                                //替換舊值
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
	                //若是節點長度大於8,轉化爲樹
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal; 
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
複製代碼

ConcurrentHashMap的擴容機制

當ConcurrentHashMap中元素的數量達到cap * loadFactor時,就須要進行擴容。擴容主要經過transfer()方法進行,當有線程進行put操做時,若是正在進行擴容,能夠經過helpTransfer()方法加入擴容。也就是說,ConcurrentHashMap支持多線程擴容,多個線程處理不一樣的節點。

  • 開始擴容,首先計算步長,也就是每一個線程分配到的擴容的節點數(默認是16)。這個值是根據當前容量和CPU的數量來計算(stride = (NCPU > 1) ? (n >>> 3) / NCPU : n),最小是16。

  • 接下來初始化臨時的Hash表nextTable,若是nextTable爲null,初始化nextTable長度爲原來的2倍;

  • 經過計算出的步長開始遍歷Hash表,其中座標是經過一個原子操做(compareAndSetInt)記錄。經過一個while循環,若是在一個線程的步長內便跳過此節點。不然轉下一步;

  • 若是當前節點爲空,之間將此節點在舊的Hash表中設置爲一個ForwardingNodes節點,表示這個節點已經被處理過了。

  • 若是當前節點元素的hash值爲MOVED(f.hash == -1),表示這是一個ForwardingNodes節點,則直接跳過。不然,開始從新處理節點;

  • 對當前節點進行加鎖,在這一步的擴容操做中,從新計算元素位置的操做與HashMap中是同樣的,即當前元素鍵值的hash與長度進行&操做,若是結果爲0則保持位置不變,爲1位置就是i+n。其中進行處理的元素是最後一個符合條件的元素,因此擴容後多是一種倒序,但在Hash表中這種順序也沒有太大的影響。

  • 最後若是是鏈表結構直接得到高位與低位的新鏈表節點,若是是樹結構,一樣計算高位與低位的節點,可是須要根據節點的長度進行判斷是否須要轉化爲樹的結構。

/**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //根據長度和CPU的數量計算步長,最小是16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                //初始化新的Hash表,長度爲原來的2倍
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //初始化ForwardingNodes節點
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true; //是否跨過節點的標記
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //根據步長判斷是否須要跨過節點
            while (advance) {
                int nextIndex, nextBound;
                //到達沒有處理的節點下標
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                //全部節點都已經接收處理
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSetInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                //更新下表transferIndex,在步長的範圍內都忽略
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //全部節點都被接收處理或者已經處理完畢
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //處理完畢
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    //更新sizeCtl
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //判斷全部節點是否所有被處理
                if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //若是節點爲null,直接標記爲已接收處理
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //鍵值的hash爲-1,表示這是一個ForwardingNodes節點,已經被處理
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
	            //對當前節點進行加鎖
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
	                        //索引位置是否改變的標誌
                            int runBit = fh & n;
                            Node<K,V> lastRun = f; //最後一個元素
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                //從新計算更新直到最後一個元素
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            //runBit = 0,保持位置不變
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            //runBit = 1,位置時i+n
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //從新遍歷節點元素
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //構建低位(位置不變)新的鏈表
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                    //構建高位(i+n)新的鏈表
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //將新的鏈表設置到新的Hash表中相應的位置
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            //將原來的Hash表中相應位置的節點設置爲ForwardingNodes節點
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //若是節點是樹的結構
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                //一樣的方式計算新的索引位置
                                if ((h & n) == 0) {
	                                //構建新的鏈表結構
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                //構建新的鏈表結構
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //判斷是否須要轉化爲樹
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            //判斷是否須要轉化爲樹
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            //將新的鏈表設置到新的Hash表中相應的位置
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            //將原來的Hash表中相應位置的節點設置爲ForwardingNodes節點
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
複製代碼

ConcurrentHashMap的get方法

ConcurrentHashMap的get方法就是從Hash表中讀取數據,並且與擴容不衝突。該方法沒有同步鎖。

  • 經過鍵值的hash計算索引位置,若是知足條件,直接返回對應的值;

  • 若是相應節點的hash值小於0 ,即該節點在進行擴容,直接在調用ForwardingNodes節點的find方法進行查找。

  • 不然,遍歷當前節點直到找到對應的元素。

/**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        //知足條件直接返回對應的值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //e.hash<0,正在擴容
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //遍歷當前節點
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
複製代碼

總結

ConcurrentHashMap就介紹到這裏,以上主要是對ConcurrentHashMap中常用到的特性進行分析,若是對其餘內容感興趣,能夠閱讀相應的源碼。

相關文章
相關標籤/搜索