ConcurrentHashMap與紅黑樹

ConcurrentHashMap

  ConcurrentHashMap是線程安全的HashMap;在併發的狀況下使用HashMap可能會致使死循環,在進行put操做時致使CPU利用率接近100%。是由於在多線程會致使HashMap的Entry鏈表造成環形數據結構,一旦造成環形數據結構,Entry的next結點永遠不能爲空,就會產生死循環獲取Entry。java

  在JDk1.8中ConcurrentHashMap採用Node + CAS + Synchronized來保證併發狀況下的更新不會出現問題。其底層的數據結構是:數組 + 鏈表 + 紅黑樹 的方式來實現的。node

注:點擊瞭解紅黑樹git

ConcurrentHashMap中的成員

關鍵常量

/** 
  * 最大容量,32位的Hash值的最高兩位用做控制的目的,
  * 這個值必須剛好是1<<30(2的30次方),這樣分配的java數組
  * 在索引範圍內(2的整數次冪)。
  */
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
 * Hash表默認的初始容量。必須是2的整數次冪,
 * 最小爲1,最大爲MAXIMUM_CAPACITY.
 */
private static final int DEFAULT_CAPACITY = 16;

/**
 * 最大的數組大小(非2次冪)。
 * toArray 和 related方法使用。
 */
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
 * 表默認的併發級別,未使用。爲與該類的之前版本兼容而定義
 */
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
 * T表的默認加載因子,在構造函數中重寫此值隻影響初始表容量。
 * 浮點值一般不被使用,
 * 當前表中的容量 = 初始化容量 - (初始化容量無符號右移2位)時擴容
 */
private static final float LOAD_FACTOR = 0.75f;

/**
 * 鏈表轉紅黑樹閥值,該值必須大於2,而且應該至少爲8。
 * 以便與樹移除中關於收縮後轉換回普通Bin的假設相吻合。
 */
static final int TREEIFY_THRESHOLD = 8;

/**
 * 用於在調整大小操做期間反樹化(拆分)bin的bin計數閾值,
 * 應該小於TREEIFY_THRESHOLD, 最多爲6.
 */
static final int UNTREEIFY_THRESHOLD = 6;

static final int MIN_TREEIFY_CAPACITY = 64;

private static final int MIN_TRANSFER_STRIDE = 16;

/**
 * 用於生成戳記的位的數目,單位爲sizeCtl。
 * 32位數組必須至少爲6.
 */
private static int RESIZE_STAMP_BITS = 16;

/**
 * 2^15-1,help resize的最大線程數
 */
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
 * 32-16=16,sizeCtl中記錄size大小的偏移量
 */
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

/* forwarding nodes的hash值*/
static final int MOVED     = -1; 

/* 樹根節點的hash值*/
static final int TREEBIN   = -2; 

/* ReservationNode的hash值*/
static final int RESERVED  = -3; 

/* 普通節點哈希的可用位*/
static final int HASH_BITS = 0x7fffffff;
複製代碼

關鍵屬性

/**
 * 裝載Node的數組,做爲ConcurrentHashMap的數據容器,
 * 採用懶加載的方式,直到第一次插入數據的時候纔會進行初始化操做,
 * 數組的大小老是爲2的冪次方。
 */
transient volatile Node<K,V>[] table;

/**
 * 擴容時使用,只有在擴容的時候才爲非null
 */
private transient volatile Node<K,V>[] nextTable;


/**
 * 控制Table的初始化與擴容。
 *   當值爲負數時table正在被初始化或擴容
 *     -1表示正在初始化
 *     -N則表示當前正有N-1個線程進行擴容操做
 *   正數或0表明hash表尚未被初始化,這個數值表示初始化或下一次進行擴容的大小
 */
private transient volatile int sizeCtl;
複製代碼

內部類

Node 類

   Node是最核心的內部類,它包裝了key-value鍵值對,全部插入ConcurrentHashMap的數據都包裝在這裏面。Node類實現了Map.Entry<K,V>接口,Node類中包含有屬性有key,value以及下一節點的引用,其中value和next屬性使用volatile關鍵字修飾,保證其在多線程下的可見性。不容許調用setValue方法直接改變Node的value域,它增長了find方法輔助map.get()方法。github

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    /** * Node結點的構造方法 */
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

    public final K getKey() { return key; }
    public final V getValue() { return val; }
    public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }
    
    /** * Node結點中提供的find方法,在子類中可重寫 */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}
複製代碼
TreeNode類

  樹節點類,另一個核心的數據結構,包含父接點,左連接的結點,右連接的結點,前驅結點的引用,以及結點的顏色(默認紅色)。當鏈表長度過長的時候,會轉換爲TreeNode在TreeBins中使用。TreeNode是上述Node類的子類。數組

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;

    
    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }

    Node<K,V> find(int h, Object k) {
        return findTreeNode(h, k, null);
    }

    /** * 經過給定的key從指定的根節點開始(在其子樹)查找 * 對應的TreeNode結點,沒有返回null * h 表示當前能夠的Hash值 * k 要查找的鍵(key) * kc k的Class對象,該Class應該是實現了Comparable<K>的,不然應該是null */
    final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
        // 判斷對應的鍵是否爲null
        if (k != null) {
            //獲取當前結點
            TreeNode<K,V> p = this;
            do  { //循環
                int ph, dir; K pk; TreeNode<K,V> q;
                TreeNode<K,V> pl = p.left, pr = p.right;
                if ((ph = p.hash) > h)
                    /** * 當前結點的Hash值大於要查找的Key的Hash值H * 在當前節點的左子樹中查找,反之在右子樹中 * 進行下一輪循環 */
                    p = pl;
                else if (ph < h)
                    p = pr;
                else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                    /** * 當前結點的key等於要查找的key, * 或當前結點的key不爲null且equals()方法爲true * 返回當前的結點 */
                    return p;
                
                    
                /** * 執行到這裏說明 hash比對相同, * 但當前節點的key與要查找的k不相等 */ 
                else if (pl == null)
                    /** * 左孩子爲空,指向當前節點右孩子,繼續循環 */
                    p = pr;
                else if (pr == null)
                    /** * 右孩子爲空,指向當前節點左孩子,繼續循環 */
                    p = pl;
                /** * 左右孩子都不爲空,再次進行比較, * 肯定在左子樹仍是右子樹中查找 */    
                else if ((kc != null ||
                          (kc = comparableClassFor(k)) != null) &&
                         (dir = compareComparables(kc, k, pk)) != 0)
                    /** * comparable方法來比較pk和k的大小 * dir小於0,p指向左孩子,不然指向右孩子 */
                    p = (dir < 0) ? pl : pr;
                    
                /** * 沒法經過上一步驟肯定是在左/右子樹中查找 * 從右子樹中遞歸調用findTreeNode()方法查找 */    
                else if ((q = pr.findTreeNode(h, k, kc)) != null)
                    return q;
                else
                    //在右子樹中沒有找到,到左子樹中查找
                    p = pl;
            } while (p != null);
        }
        return null;
    }
}
複製代碼
TreeBin類

  紅黑樹結構。該類並不包裝key-value鍵值對,而是TreeNode的列表和它們的根節點。它代替了TreeNode的根節點,也就是說在實際的ConcurrentHashMap「數組」中,存放的是TreeBin對象,而不是TreeNode對象。這個類含有讀寫鎖。 這裏咱們先看紅黑樹相關操做的方法。安全

static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock

        /** * 經過結點b構造紅黑樹,鏈表轉紅黑樹 */
        TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            // 將給定的節點指向頭結點
            this.first = b;
            TreeNode<K,V> r = null;
            /** * 定義X節點 爲 b 結點;next結點也爲b結點 * next 節點初始化爲頭結點,用來控制遍歷 */
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                //指向下一結點
                next = (TreeNode<K,V>)x.next;
                //將x結點的鏈接屬性清空
                x.left = x.right = null;
                if (r == null) {
                    /** * r 爲null 說明是紅黑樹中沒有結點 * x 結點就是紅黑樹的根結點 * 根結點的父結點爲null,顏色爲黑色 */
                    x.parent = null;
                    x.red = false;
                    r = x;
                }
                else {
                    // 當前結點的關鍵字
                    K k = x.key;
                    // 當前結點的hash值
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                        /** * 從紅黑樹的根結點開始遍歷,查找當前結點對應的位置 * dir 控制查找的方向 * ph 記錄當前結點的hash值 */
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            /** * 當前節點的hash值大於要插入結點的hash值, * 在當前結點的左子樹中查找 */
                            dir = -1;
                        else if (ph < h)
                            /** * 當前節點的hash值小於要插入結點的hash值, * 在當前結點的右子樹中查找 */
                            dir = 1;
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            /** * 若是hash值相等,則比較k值,用其Compare, * 若是還相等,則走tieBreakOrder方法 */
                            dir = tieBreakOrder(k, pk);
                            // 暫存當前節點
                            TreeNode<K,V> xp = p;
                            
                        /** * 根據dir控制查找方向 */    
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            
                            // 插入後平衡紅黑樹的性質
                            r = balanceInsertion(r, x);
                            break;
                        }
                    }
                }
            }
            //指定紅黑樹的根結點
            this.root = r;
            assert checkInvariants(root);
        }
        
        /** * 左旋轉過程 */
        static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) {
            
            TreeNode<K,V> r, pp, rl;
            /** * 結點P不爲null且p的右結點不爲null */
            if (p != null && (r = p.right) != null) {
                
                if ((rl = p.right = r.left) != null)
                    /** * p.right = r.left p的右結點爲r的左結點 * 而後將其賦值給rl * 當rl 不爲空的時候,肯定p與rl的關係: * 父結點與左子結點 */
                    rl.parent = p;
                
                if ((pp = r.parent = p.parent) == null)
                    /** * 結合 r = p.right * r.parent = p.parent 將p的右子樹連接到 p的父結點 * 若是P的父結點爲null,說明當前p結點爲紅黑樹的根結點 * 通過上述r.parent = p.parent 將紅黑樹的根節點轉爲r * 根結點爲r結點,顏色尾黑色 */
                    (root = r).red = false;
                else if (pp.left == p)
                    /** * 到這一步說明 p 結點不是紅黑樹的根結點 * 且p爲其父結點的左子樹, * 將r替換原來P結點的位置(左旋轉) */
                    pp.left = r;
                else
                    /** * 到這一步說明 p 結點不是紅黑樹的根結點 * 且p爲其父結點的右子樹, * 將r替換原來P結點的位置(左旋轉) */
                    pp.right = r;
                    
                /** * 上述過程只是完成了將原先以P 結點爲紅黑樹子樹 * 的根結點,替換爲以P的右結點爲根結點的部分 * 即 p.right = r.left 將原先r的左連接替換 * 成 p的右連接的過程。 */    
                
                //r的左結點爲p
                r.left = p;
                // p的父結點爲r節點
                p.parent = r;
            }
            return root;
        }

        /** * 右旋轉 爲上述左旋轉的逆過程 */
        static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) {
            TreeNode<K,V> l, pp, lr;
            if (p != null && (l = p.left) != null) {
                if ((lr = p.left = l.right) != null)
                    lr.parent = p;
                if ((pp = l.parent = p.parent) == null)
                    (root = l).red = false;
                else if (pp.right == p)
                    pp.right = l;
                else
                    pp.left = l;
                l.right = p;
                p.parent = l;
            }
            return root;
        }
        
        /** * 紅黑樹中插入結點後會打破紅黑樹性質須要平衡 * TreeNode<K,V> root 根結點 * TreeNode<K,V> x 要插入的結點 */
        static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) {
            //默認插入結點爲紅色
            x.red = true;
            for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
                // xp爲當前節點的父結點
                if ((xp = x.parent) == null) {
                    /** * 當前結點的父結點爲空,說明紅黑樹中只有一個結點 * 當前結點即爲根結點,顏色爲黑色 */
                    x.red = false;
                    return x;
                }
                /** * 當前結點的父結點(xp)不爲null * 父結點爲黑色,沒有打破紅黑樹的平衡性(着色可能有問題) * 父結點的的父結點(xpp)爲null,紅黑樹中只有兩個節點 * 上述兩種狀況直接返回root結點 */
                else if (!xp.red || (xpp = xp.parent) == null)
                    return root;
                
                /** * 當前結點的父結點(xp) 爲 其父節點(xpp)的左孩子 */
                if (xp == (xppl = xpp.left)) {
                    if ((xppr = xpp.right) != null && xppr.red) {
                        /** * 當前結點(x)得父結點(xp)的父結點(xpp)的右孩子(xppr) * 不爲null 且 顏色爲紅色(此時顏色的性質不知足) * 變換顏色 */
                        xppr.red = false; // 將xppr變爲黑色
                        xp.red = false;   // 將xp變爲黑色 
                        xpp.red = true;   // 將xpp變爲紅色 
                        x = xpp; // 將xpp指向x 繼續循環
                    }
                    /** * 當前結點的父結點的父結點右孩子爲null或顏色爲黑色 */
                    else {
                        // 若是(當前結點)x爲父結點的右孩子
                        if (x == xp.right) {
                            //左旋轉
                            root = rotateLeft(root, x = xp);
                            // 從新指定xpp
                            xpp = (xp = x.parent) == null ? null : xp.parent;
                        }
                        // 若是當前結點的父結點不爲null
                        if (xp != null) {
                            // 將xp的顏色置爲黑色
                            xp.red = false;
                            // 父結點的父結點(xpp)不爲null
                            if (xpp != null) {
                                //將xpp顏色置爲紅色
                                xpp.red = true;
                                // 有旋轉
                                root = rotateRight(root, xpp);
                            }
                        }
                    }
                }
                
                /** * 當前結點的父結點(xp) 爲 其父節點(xpp)的右孩子 */
                else {
                    //xppl 爲當前結點(x)的父結點(xp)的父結點(xpp)的左孩子
                    if (xppl != null && xppl.red) {
                        /** * xppl不爲null 且是紅結點 * xpp * / \ * red xppl xp red * ---> x結點在這一層 * | 變爲 * xpp red ---> 變換事後x結位置 * / \ * black xppl xp black * */
                        xppl.red = false;
                        xp.red = false;
                        xpp.red = true;
                        // 控制循環
                        x = xpp;
                    }
                    else {
                        //若是左叔叔爲空或者是黑色
                        if (x == xp.left) {
                            //若是當前節點是個左孩子 右旋轉 
                            root = rotateRight(root, x = xp);
                            //獲取爺爺結點
                            xpp = (xp = x.parent) == null ? null : xp.parent;
                        }
                        if (xp != null) {
                            /** * 父結點不爲null 設置父結點爲黑色 */
                            xp.red = false;
                            if (xpp != null) {
                                /** * 爺爺結點不爲null * 將其置爲紅色 * 對其進行左旋轉 */
                                xpp.red = true;
                                root = rotateLeft(root, xpp);
                            }
                        }
                    }
                }
            }
        }
        
        /** * 紅黑樹中刪除節點後會打破紅黑樹的性質須要平衡 * TreeNode<K,V> root 根結點 * TreeNode<K,V> x 要刪除的節點 */
        static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x) {
            for (TreeNode<K,V> xp, xpl, xpr;;)  {
                if (x == null || x == root)
                    // x 爲 null 或者 x 爲根結點 無須平衡
                    return root;
                else if ((xp = x.parent) == null) {
                    /** * xp null * \ * ---> x 結點位置(可爲左結點也可爲右結點) * 此時x 爲紅黑樹的根結點 */
                    x.red = false;
                    return x;
                }
                else if (x.red) {
                     /** * xp * \ * ---> x 結點位置(且爲red) * 將其變爲黑色結點 */
                    x.red = false;
                    return root;
                }
                
                else if ((xpl = xp.left) == x) {
                    // x 爲其父結點的左結點
                    if ((xpr = xp.right) != null && xpr.red) {
                        /** * xp * / \ * x xpr red * / \ * xprl xprr */
                        xpr.red = false; // 將xpr置爲黑色
                        xp.red = true;   // 將xp置爲紅色
                        // 左旋轉 xp
                        root = rotateLeft(root, xp);
                        /** * xpr * / \ * xp xprr * / \ * x xprl */
                        // 獲取 新的xpr
                        xpr = (xp = x.parent) == null ? null : xp.right;
                    }
                    
                    if (xpr == null)
                        // 控制循環
                        x = xp;
                    else {
                        /** * xpr * / \ * xp xprr * / \ * x xprl * / \ * sl sr */
                        TreeNode<K,V> sl = xpr.left, sr = xpr.right;
                        if ((sr == null || !sr.red) &&
                            (sl == null || !sl.red)) {
                            /** * 樹中xpr(即上圖的xprl) 的葉子結點不存在 或爲黑色結點 * 樹中xpr(上圖中的xprl) 置爲紅色 * 將x 指向 xp 繼續循環 */
                            xpr.red = true;
                            x = xp;
                        }
                        else {
                            if (sr == null || !sr.red) {
                                /** * xpr * / \ * xp xprr * / \ * x xprl * / \ * sr null || black */
                                if (sl != null)
                                    // sl 不爲null 將其置爲黑色
                                    sl.red = false;
                                // 樹中的xpr(上圖xprl)置爲紅色
                                xpr.red = true;
                                // 右旋轉
                                root = rotateRight(root, xpr);
                                // 從新獲取xp的右子樹
                                xpr = (xp = x.parent) == null ?
                                    null : xp.right;
                            }
                            if (xpr != null) {
                                /** * 從新獲取的xpr 不爲null * xpr 的顏色與 父結點的顏色相同 */
                                xpr.red = (xp == null) ? false : xp.red;
                                if ((sr = xpr.right) != null)
                                // 從新獲取的xpr 的右節點不爲null,將其置黑
                                sr.red = false; 
                            }
                            if (xp != null) {
                                /** * xp 不爲null * 將xp置爲紅色 * 左旋轉xp */
                                xp.red = false;
                                root = rotateLeft(root, xp);
                            }
                            x = root;
                    }
                }
            }
            else { // x 爲其父結點的右結點
                if (xpl != null && xpl.red) {
                    /** * xp * / \ * xpl x * / \ * */
                    xpl.red = false;
                    xp.red = true;
                    /** * xp xpl * / \ / \ * xpl x ==> xpll xp * / \ / \ / \ * xpll xplr xplr x */
                    root = rotateRight(root, xp);
                    /** * 從新獲取xpl * xpl * / \ * xpll xp * / \ * xplr x * |__ 新的xpl指向這裏 */
                    xpl = (xp = x.parent) == null ? null : xp.left;
                }
                if (xpl == null)
                    // 新的xpl爲null x 指向器父結點
                    x = xp;
                else {
                    // 獲取xpl的左結點與右結點
                    TreeNode<K,V> sl = xpl.left, sr = xpl.right;
                    if ((sl == null || !sl.red) &&
                        (sr == null || !sr.red)) {
                        /** * 左子結點 爲空或 爲黑色 * 且 * 右子結點 爲空或 爲黑色 * * 將 xpl 置爲紅色 */
                        xpl.red = true;
                        // 控制循環
                        x = xp;
                    }
                    else {
                        if (sl == null || !sl.red) {
                            /** * xp * / \ * xpl x * / \ / \ * sl(null || black) */
                            if (sr != null)
                                // 若是sr不爲null 設置爲黑色
                                sr.red = false;
                            //xpl置爲紅色
                            xpl.red = true;
                            //左旋轉xpl
                            root = rotateLeft(root, xpl);
                            //從新獲取xpl
                            xpl = (xp = x.parent) == null ?
                                null : xp.left;
                        }
                        if (xpl != null) {
                            // xpl 不爲null xpl的顏色與xp的顏色相同
                            xpl.red = (xp == null) ? false : xp.red;
                            if ((sl = xpl.left) != null)
                                sl.red = false;
                        }
                        if (xp != null) {
                            /** * xp不爲null * xp爲黑色 * 右旋轉xp */
                            xp.red = false;
                            root = rotateRight(root, xp);
                        }
                        x = root;
                    }
                }
            }
        }
    }
}
複製代碼

總結:bash

  在上一篇文章中,系統的學習了紅黑樹相關的知識,包括性質,以及爲了維護紅黑樹的性質須要進行相應的左旋轉,右旋轉,顏色轉換等子過程。在學習ConcurrentHashMap以前,要先對紅黑樹的操做有必定的瞭解。這篇文章,從ConcurrentHashMap的底層,經過其內部定義的一些常量,以及相關的內部類看起,從新回顧了一下紅黑樹的操做,以及併發大師的實現方式。在下一篇文章中,將學習ConcurrentHashMap相關的操做以及實現原理。微信


我的微信公衆號: 數據結構

我的github:

github.com/FunCheney多線程

相關文章
相關標籤/搜索