JDK源碼那些事兒之併發ConcurrentHashMap上篇

前面已經說明了HashMap以及紅黑樹的一些基本知識,對JDK8的HashMap也有了必定的瞭解,本篇就開始看看併發包下的ConcurrentHashMap,說實話,仍是比較複雜的,筆者在這裏也不會過多深刻,源碼層次上了解一些主要流程便可,清楚多線程環境下整個Map的運做過程就算是很大進步了,更細的底層部分須要時間和精力來研究,暫不深刻java

前言

jdk版本:1.8

JDK7中,ConcurrentHashMap把內部細分紅了若干個小的HashMap,稱之爲段(Segment),默認被分爲16個段。多線程寫操做對每一個段進行加鎖,段與段之間互不影響。而JDK8則拋棄了這種結構,相似HashMap,多線程下爲了保證線程安全,經過CAS和synchronized進行併發控制,下降鎖顆粒度,性能上也就提升許多node

同時因爲下降鎖粒度,同時須要兼顧讀操做的正確性,增長了許多內部類來幫助完成併發控制,保證讀操做的正確執行,同時支持了併發擴容操做,算是至關複雜了,因爲過於複雜,對ConcurrentHashMap的說明將分爲兩章說明,本章就對ConcurrentHashMap的常量,變量,內部類和構造方法進行說明,下一章將重點分析其中的重要方法編程

這裏先提早說明下,有個總體印象:數組

  • ConcurrentHashMap結構上相似HashMap,即數組+鏈表+紅黑樹
  • 鎖顆粒度下降,複雜度提高
  • 多線程併發擴容
  • 多線程下保證讀操做正確性
  • 計數方式處理:分段處理
  • 函數式編程(不是本文重點,自行查閱)

類定義

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable

繼承AbstractMap,實現了ConcurrentMap接口,ConcurrentMap也能夠看看源碼,加入了併發操做方法,是一個實現了併發訪問的集合接口安全

ConcurrentHashMap繼承關係

常量

有些常量和變量可能不是很好理解,在後邊到方法時會盡可能詳細說明多線程

/**
     * 最大容量
     */
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * 默認初始化容量,同HashMap,必須爲2的倍數
     */
    private static final int DEFAULT_CAPACITY = 16;

    /**
     * 可能達到的最大的數組大小值(非2的次冪),2的31次方-8
     */
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 默認併發級別,新版本無用,爲了兼容舊版本,使用的地方在序列化方法writeObject中
     */
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /**
     * 負載因子,只是爲了兼容性
     * 構造方法裏指定的負載因子只會影響初始化的table容量
     * 通常也不使用浮點數計算
     * ConcurrentHashMap不會使用這個常量,而使用相似 n -(n >>> 2) 的方式來進行調整大小
     */
    private static final float LOAD_FACTOR = 0.75f;

    /**
     * 樹化閾值
     * 同HashMap
     */
    static final int TREEIFY_THRESHOLD = 8;

    /**
     * 調整大小時樹轉化爲鏈表的閾值
     */
    static final int UNTREEIFY_THRESHOLD = 6;

    /**
     * 能夠轉化爲紅黑樹的最小數組容量,即調整爲紅黑樹時數組長度最小值必須爲MIN_TREEIFY_CAPACITY
     * 若是bin包含太多節點,則會調整表的大小
     * 該值應至少爲4 * TREEIFY_THRESHOLD,避免擴容和樹化閾值之間的衝突。
     */
    static final int MIN_TREEIFY_CAPACITY = 64;

    /**
     * 
     * 擴容的每一個線程每次最少要遷移16個hash桶
     * 每一個線程均可參與遷移任務,每一個線程至少要連續遷移MIN_TRANSFER_STRIDE個hash桶
     * 幫助擴容提升了效率,固然複雜性也提升了不少,要處理的事情更多
     */
    private static final int MIN_TRANSFER_STRIDE = 16;

    /**
     * 與移位量和最大線程數相關
     * 先了解就好,後邊涉及到方法會進行說明
     */
    private static int RESIZE_STAMP_BITS = 16;

    /**
     * 幫助擴容的最大線程數
     */
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    /**
     * 移位量
     */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    /** 
     * Node hash值編碼,規定了各自的含義
     */
    // forwarding nodes節點hash值
    // 臨時節點,擴容時出現,不存儲實際數據
    // 若是舊數組的一個hash桶中所有的節點都遷移到新數組中,舊數組就在這個hash桶中放置一個ForwardingNode
    // 讀操做碰見該節點時,轉到新的table數組上執行,寫操做碰見時,則幫助擴容
    static final int MOVED     = -1; // hash for forwarding nodes
    // TREEBIN節點
    // TreeBin是ConcurrentHashMap中用於代理操做TreeNode的特殊節點
    // 保存實際的紅黑樹根節點,在紅黑樹插入節點時會對讀操做形成影響,該對象維護了一個讀寫鎖來保證多線程的正確性
    static final int TREEBIN   = -2; // hash for roots of trees
    // ReservationNode節點hash值
    // 保留節點,JDK8的新特性會用到,這裏不過多說明
    static final int RESERVED  = -3; // hash for transient reservations
    
    // 負數轉正數,定位hash桶時用到了,負數Hash值有特殊含義,具體看後邊
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

    // CPU數量,限制邊界,計算一個線程須要作多少遷移量
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    // 序列化兼容性
    private static final ObjectStreamField[] serialPersistentFields = {
        new ObjectStreamField("segments", Segment[].class),
        new ObjectStreamField("segmentMask", Integer.TYPE),
        new ObjectStreamField("segmentShift", Integer.TYPE)
    };

變量

/**
     * The array of bins. Lazily initialized upon first insertion.
     * Size is always a power of two. Accessed directly by iterators.
     * 
     * volatile保證可見性
     * Node類和HashMap中的相似,val和next屬性經過volatile保證可見性
     */
    transient volatile Node<K,V>[] table;

    /**
     * The next table to use; non-null only while resizing.
     *
     * 擴容時使用的數組,只在擴容時非空,擴容時會建立
     */
    private transient volatile Node<K,V>[] nextTable;

    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     *
     * sizeCtl = -1,表示有線程在進行初始化操做
     * sizeCtl < 0且不爲-1表示有多個線程正在進行擴容操做,jdk源碼解釋部分感受有點問題
     * 每次第一個線程參與擴容時,會將sizeCtl設置爲一個與當前table長度相關的數值,避免出現問題,講解方法時進行說明
     * sizeCtl > 0,表示第一次初始化操做中使用的容量,或者初始化/擴容完成後的閾值
     * sizeCtl = 0,默認值,此時在真正的初始化操做中使用默認容量
     */
    private transient volatile int sizeCtl;

    /**
     * 多線程幫助擴容相關
     * 下一個transfer任務的起始下標index + 1 的值
     * transfer時下標index從length - 1到0遞減
     * 擴容index從後往前和迭代從前日後爲了不衝突
     */
    private transient volatile int transferIndex;

    /**
     * Base counter value, used mainly when there is no contention,
     * but also as a fallback during table initialization
     * races. Updated via CAS.
     *
     * 計數器基礎值,記錄元素個數,經過CAS操做進行更新
     */
    private transient volatile long baseCount;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
     *
     * CAS自旋鎖標誌位,counterCells擴容或初始化時使用
     */
    private transient volatile int cellsBusy;

    /**
     * Table of counter cells. When non-null, size is a power of 2.
     *
     * 高併發下計數數組,counterCells數組非空時大小是2的n次冪
     */
    private transient volatile CounterCell[] counterCells;

    // views
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;

同時須要注意靜態代碼塊中已經獲取了一些變量在對象中的內存偏移量,這個是爲了方便咱們在CAS中的使用,若是不明白CAS,請自行查閱資料學習,源碼以下:併發

// Unsafe mechanics
    private static final sun.misc.Unsafe U;
    private static final long SIZECTL;
    private static final long TRANSFERINDEX;
    private static final long BASECOUNT;
    private static final long CELLSBUSY;
    private static final long CELLVALUE;
    private static final long ABASE;
    private static final int ASHIFT;

    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            // 獲取sizeCtl在對象中的內存偏移量,下同
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
            Class<?> ak = Node[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

內部類

ConcurrentHashMap中增長了許多內部類來幫助完成併發下的操做函數式編程

Node

普通Entry,與HashMap.Node相似,不一樣主要在於val和next屬性設置爲了volatile,同時不支持setValue方法,直接拋錯,增長了find方法函數

TreeNode

繼承自Node節點,與HashMap.TreeNode相似,以前文章專門介紹過這個內部類,能夠去看看,同時須要說明的是,在ConcurrentHashMap中並非直接操做TreeNode節點,而是經過TreeBin來代理操做的高併發

TreeBin

代理操做TreeNode節點,保存紅黑樹的根節點,Hash值固定爲-2,構造方法裏傳入對應的節點建立一棵紅黑樹。故在數組中保存的hash桶爲紅黑樹結構時,在數組上的節點類型爲TreeBin,該節點類自帶讀寫鎖

static final class TreeBin<K,V> extends Node<K,V> {
        // 保存樹的根節點
        TreeNode<K,V> root;
        // 鏈表頭節點
        volatile TreeNode<K,V> first;
        // 最近一次waiter狀態的線程
        volatile Thread waiter;
        // 鎖狀態
        volatile int lockState;
        // values for lockState
        // 寫鎖,二進制001
        static final int WRITER = 1; // set while holding write lock
        // 等待獲取寫鎖的狀態,二進制010
        static final int WAITER = 2; // set when waiting for write lock
        // 讀鎖狀態,二進制100
        static final int READER = 4; // increment value for setting read lock

        /**
         * 紅黑樹結構重構時須要獲取寫鎖
         */
        private final void lockRoot() {
            // CAS嘗試獲取寫鎖
            if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
                // 當線程獲取到寫鎖時,纔會返回
                contendedLock(); // offload to separate method
        }

        /**
         * 釋放寫鎖
         */
        private final void unlockRoot() {
            lockState = 0;
        }

        /**
         * 寫線程不斷嘗試獲取寫鎖,出現的幾種狀況的處理,這裏不會有寫線程和寫線程競爭的狀況出現
         * 在寫線程進入這個方法時,這個紅黑樹對應的桶已經被外部鎖住,不會讓寫線程進入,故不須要考慮寫寫競爭的狀況
         * 故當前線程爲寫線程,則判斷讀線程便可
         */
        private final void contendedLock() {
            boolean waiting = false;
            for (int s;;) {
                // 取反與操做,其餘線程未持有讀鎖
                if (((s = lockState) & ~WAITER) == 0) {
                    // CAS嘗試修改狀態爲寫鎖狀態
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
                        // 當前線程曾經處於waiting狀態,將其清除
                        if (waiting)
                            waiter = null;
                        return;
                    }
                }
                // 有讀線程
                else if ((s & WAITER) == 0) {
                    // 當前線程置爲wait狀態,waiter記錄線程
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
                        waiting = true;
                        waiter = Thread.currentThread();
                    }
                }
                // 當前線程處於waiting狀態
                else if (waiting)
                    // 阻塞線程
                    LockSupport.park(this);
            }
        }

        /**
         * 從根節點開始進行遍歷,查找到對應匹配的節點
         * 同時須要注意多線程環境下若是有寫鎖則經過鏈表結構(不是用紅黑樹結構查詢)進行查詢
         */
        final Node<K,V> find(int h, Object k) {
            if (k != null) {
                // 從first開始
                for (Node<K,V> e = first; e != null; ) {
                    int s; K ek;
                    // 1.有寫線程操做,經過鏈表查詢不進行阻塞
                    // 2.WAITER狀態
                    if (((s = lockState) & (WAITER|WRITER)) != 0) {
                        if (e.hash == h &&
                            ((ek = e.key) == k || (ek != null && k.equals(ek))))
                            return e;
                        e = e.next;
                    }
                    // 讀線程累加
                    else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                                 s + READER)) {
                        TreeNode<K,V> r, p;
                        try {
                            p = ((r = root) == null ? null :
                                 r.findTreeNode(h, k, null));
                        } finally {
                            Thread w;
                            // 當線程爲最後一個讀線程而且有寫鎖被阻塞,此時應該通知阻塞的寫線程從新嘗試得到寫鎖
                            // getAndAddInt更新以後返回舊值
                            if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                                (READER|WAITER) && (w = waiter) != null)
                                // 喚醒阻塞的線程
                                LockSupport.unpark(w);
                        }
                        return p;
                    }
                }
            }
            return null;
        }

        /**
         * 大部分與HashMap.TreeNode中的putTreeVal操做相似
         * 這裏只說下不一樣的部分
         * 多線程環境下主要是在平衡時加鎖操做,防止讀線程操做時樹結構在變化
         */
        final TreeNode<K,V> putTreeVal(int h, K k, V v) {
                ......
                TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    TreeNode<K,V> x, f = first;
                    first = x = new TreeNode<K,V>(h, k, v, f, xp);
                    if (f != null)
                        f.prev = x;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    // 新添加節點父節點爲黑色,添加節點置爲紅色便可保證平衡
                    if (!xp.red)
                        x.red = true;
                    else {
                        //在進行插入平衡操做時會對讀線程形成影響,須要加鎖,讓讀線程以鏈表方式進行查詢操做
                        lockRoot();
                        try {
                            root = balanceInsertion(root, x);
                        } finally {
                            unlockRoot();
                        }
                    }
                    break;
                }
            }
            assert checkInvariants(root);
            return null;
        }

        /**
         * 刪除時修改鏈表關係和刪除平衡操做時須要添加鎖,防止讀線程讀取錯誤
         * 代碼操做參考HashMap.TreeNode,這裏再也不詳細說明了
         */
        final boolean removeTreeNode(TreeNode<K,V> p) {
            ......
            lockRoot();
            try {
                TreeNode<K,V> replacement;
                TreeNode<K,V> pl = p.left;
                TreeNode<K,V> pr = p.right;
                if (pl != null && pr != null) {
                    TreeNode<K,V> s = pr, sl;
                    while ((sl = s.left) != null) // find successor
                        s = sl;
                    boolean c = s.red; s.red = p.red; p.red = c; // swap colors
                    TreeNode<K,V> sr = s.right;
                    TreeNode<K,V> pp = p.parent;
                    if (s == pr) { // p was s's direct parent
                        p.parent = s;
                        s.right = p;
                    }
                    else {
                        TreeNode<K,V> sp = s.parent;
                        if ((p.parent = sp) != null) {
                            if (s == sp.left)
                                sp.left = p;
                            else
                                sp.right = p;
                        }
                        if ((s.right = pr) != null)
                            pr.parent = s;
                    }
                    p.left = null;
                    if ((p.right = sr) != null)
                        sr.parent = p;
                    if ((s.left = pl) != null)
                        pl.parent = s;
                    if ((s.parent = pp) == null)
                        r = s;
                    else if (p == pp.left)
                        pp.left = s;
                    else
                        pp.right = s;
                    if (sr != null)
                        replacement = sr;
                    else
                        replacement = p;
                }
                else if (pl != null)
                    replacement = pl;
                else if (pr != null)
                    replacement = pr;
                else
                    replacement = p;
                if (replacement != p) {
                    TreeNode<K,V> pp = replacement.parent = p.parent;
                    if (pp == null)
                        r = replacement;
                    else if (p == pp.left)
                        pp.left = replacement;
                    else
                        pp.right = replacement;
                    p.left = p.right = p.parent = null;
                }

                root = (p.red) ? r : balanceDeletion(r, replacement);

                if (p == replacement) {  // detach pointers
                    TreeNode<K,V> pp;
                    if ((pp = p.parent) != null) {
                        if (p == pp.left)
                            pp.left = null;
                        else if (p == pp.right)
                            pp.right = null;
                        p.parent = null;
                    }
                }
            } finally {
                unlockRoot();
            }
            assert checkInvariants(root);
            return false;
        }

Traverser

首先須要明白的是在併發下遍歷讀操做和擴容操做同時進行時,讀操做有可能不會正確的進行,而爲了解決這個問題,引入了Traverser這個內部類來完成併發操做下的讀操做。在涉及到讀取全部hash桶時,好比containsValue操做,不能使用HashMap的方式進行,一個一個hash桶遍歷進行讀取,此時擴容時,ConcurrentHashMap會在轉移完一個hash桶後將一個ForwardingNode節點填入,若是仍是非併發遍歷,致使這個hash桶將不會被遍歷到,正確的處理方式也就是遇到ForwardingNode節點時將當前數組信息和索引保存,而後在ForwardingNode節點的nextTable數組(也就是擴容後的數組)上進行遍歷。這裏還使用了棧結構TableStack,是爲了處理多個ForwardingNode節點遍歷的狀況,同時須要注意的是,參考HashMap,在nextTable上須要遍歷舊的hash桶和新的對應的擴容hash桶(index+size)

static class Traverser<K,V> {
        // 當前table
        Node<K,V>[] tab;        // current table; updated if resized
        // 下一個entry
        Node<K,V> next;         // the next entry to use
        // 保存/恢復ForwardingNodes
        TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
        // 下一個遍歷的hash桶index
        int index;              // index of bin to use next
        // 開始index
        int baseIndex;          // current index of initial table
        // 結尾index
        int baseLimit;          // index bound for initial table
        // 數組長度
        final int baseSize;     // initial table size

        Traverser(Node<K,V>[] tab, int size, int index, int limit) {
            this.tab = tab;
            this.baseSize = size;
            this.baseIndex = this.index = index;
            this.baseLimit = limit;
            this.next = null;
        }

        /**
         * 遍歷到下一個有數據的節點並返回,無則返回null
         */
        final Node<K,V> advance() {
            Node<K,V> e;
            // next值不爲空,則直接獲取其下一個節點
            if ((e = next) != null)
                e = e.next;
            for (;;) {
                Node<K,V>[] t; int i, n;  // must use locals in checks
                // e非空則直接返回當前值e並將next賦值爲e便於下次再次調用該方法使用
                if (e != null)
                    return next = e;
                // 判斷邊界同時作了一些賦值操做
                // 數組 t = tab
                // 數組長度 n = t.length
                // 數組index i = index
                // 不知足直接返回null
                if (baseIndex >= baseLimit || (t = tab) == null ||
                    (n = t.length) <= (i = index) || i < 0)
                    return next = null;
                // e賦值爲t的第i處hash桶節點值,判斷是否爲特殊節點
                if ((e = tabAt(t, i)) != null && e.hash < 0) {
                    // ForwardingNode節點說明正在擴容,此時遍歷讀須要轉移到擴容數組上進行
                    if (e instanceof ForwardingNode) {
                        // tab指向擴容數組
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        // e置空,繼續循環
                        e = null;
                        // 保存此時的未擴容前的數組狀態,至關於入棧操做
                        pushState(t, i, n);
                        continue;
                    }
                    // TreeBin節點,e置爲first節點
                    else if (e instanceof TreeBin)
                        e = ((TreeBin<K,V>)e).first;
                    else
                        // 保留節點
                        e = null;
                }
                if (stack != null)
                    // 出棧操做,恢復tab到未擴容以前的數組繼續遍歷
                    recoverState(n);
                else if ((index = i + baseSize) >= n)
                    // 更新爲下一個hash桶索引
                    index = ++baseIndex; // visit upper slots if present
            }
        }

        /**
         * 入棧操做,保存當前數組的狀態
         */
        private void pushState(Node<K,V>[] t, int i, int n) {
            TableStack<K,V> s = spare;  // reuse if possible
            if (s != null)
                spare = s.next;
            else
                s = new TableStack<K,V>();
            s.tab = t;
            s.length = n;
            s.index = i;
            s.next = stack;
            stack = s;
        }

        /**
         * 出棧操做,恢復擴容以前的數組狀態
         */
        private void recoverState(int n) {
            TableStack<K,V> s; int len;
            while ((s = stack) != null && (index += (len = s.length)) >= n) {
                n = len;
                index = s.index;
                tab = s.tab;
                s.tab = null;
                TableStack<K,V> next = s.next;
                s.next = spare; // save for reuse
                stack = next;
                spare = s;
            }
            if (s == null && (index += baseSize) >= n)
                index = ++baseIndex;
        }
    }

遍歷過程可參考下圖(圖中數據非真實數據):

ConcurrentHashMap繼承關係

ForwardingNode

ForwardingNode節點在擴容時被使用,hash值固定爲-1,當進行擴容操做時,轉移hash桶時會在原數組位置上放置一個ForwardingNode節點,表示當前位置hash桶已轉移到新擴容數組上,當讀線程讀到ForwardingNode節點時則須要到新擴容數組對應的位置進行讀操做,若是是寫線程,則須要幫助進行擴容操做

/**
     * A node inserted at head of bins during transfer operations.
     */
    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            // MOVED hash值固定爲-1
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        
        // 提供find方法查找,這裏經過nextTable進行遍歷
        Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                // 前置判斷
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    // 判斷是hash桶第一個節點,返回這個節點
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        // 遞歸
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else
                            // 特殊節點調用其自身方法
                            return e.find(h, k);
                    }
                    // 找到最後返回空
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }
    }

ReservationNode

保留節點,源碼僅compute,computeIfAbsent中使用,經過ReservationNode對hash桶進行加鎖處理,寫操做時正常狀況下須要對第一個節點進行加鎖處理,put操做使用CAS不須要加鎖,而在compute,computeIfAbsent方法中比較複雜,需加鎖處理,在hash桶節點爲空時,添加一個ReservationNode節點,同時對其加鎖處理

static final class ReservationNode<K,V> extends Node<K,V> {
        ReservationNode() {
            super(RESERVED, null, null, null);
        }

        Node<K,V> find(int h, Object k) {
            return null;
        }
    }

構造方法

在有參構造函數中,傳入初始容量參數,調用tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)),這裏和HashMap就不同了,可自行查閱源碼,ConcurrentHashMap先擴大了1.5倍再調用tableSizeFor方法,能夠回想下tableSizeFor方法的含義,和HashMap是相同的處理方式,同時構造方法也不是table進行初始化的地方,和HashMap相似,都是在插入K-V值時進行數組的初始化,構造方法僅僅是來設置屬性值的,重要的在於sizeCtl,此時是用來記錄數組初始化使用容量的,在initTable方法中使用

public ConcurrentHashMap() {
    }
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        // 設置sizeCtl,此時表示數組初始化使用容量,由於在初始化時將會使用這個變量
        this.sizeCtl = cap;
    }
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        // sizeCtl設置爲默認容量,此時表示數組初始化使用容量
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    // concurrencyLevel 併發級別 兼容舊版本
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        // 一樣進行設置sizeCtl,此時表示數組初始化使用容量
        this.sizeCtl = cap;
    }

initTable

構造方法並無建立Node數組,最終執行是在put方法中的initTable方法,這個方法中也能看出sizeCtl在不一樣值下表明瞭不一樣的含義

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 在初始化時不能併發執行,在已經有線程進入初始化狀態時,非初始化線程需讓步操做,等待完成
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            // CAS將sizeCtl置爲-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 檢查是否已經初始化,不然進入初始化過程
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // sc = 3n/4,至關於0.75n   
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // sizeCtl表明了閾值,相似threshold
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

總結

本文從總體上對ConcurrentHashMap進行了介紹,其中常量和變量以及其中重要的內部類都進行了相關的解釋說明,固然,限於篇幅以及複雜度暫未涉及方法的源碼部分,下一篇將結合本文的相關知識重點說明其中涉及的方法,在看本文時請徹底理解HashMap的源碼,將會事半功倍,畢竟結構上和HashMap很是類似

ConcurrentHashMap從HashMap的基礎上進行演變,適應多線程併發狀況下的操做,理解下列幾點:

  • 初始化數組,單線程建立
  • 擴容時,多線程可幫助擴容(下一章將詳細說明)
  • hash桶頭節點有4種類型,ForwardingNode節點代表當前hash桶正在擴容移動中,TreeBin節點代表爲紅黑樹結構,ReservationNode節點暫很少說,還有就是正常鏈表節點
  • 遍歷操做經過一個特殊的內部類Traverser實現
  • CAS操做大大增長了併發執行的效率

水平有限,若有錯誤,請及時指出,謝謝!

相關文章
相關標籤/搜索