學習ConcurrentHashMap併發寫機制

1. 前言

上篇文章講了 Unsafe 類中 CAS 的實現,實際上是在爲這篇文章打基礎。不太熟悉的小夥伴請移步Unsafe 中 CAS 的實現。本篇文章主要基於 OpenJDK8 來作源碼解析。java

2. 源碼

ConcurrentHashMap 基於 HashMap 實現。數組

JDK1.7 和 JDK1.8 做爲併發容器在實現上是有差異的。JDK1.7 經過 Segment 分段鎖實現,而 JDK1.8 經過 CAS+synchronized 實現。安全

2.1 ConcurrentHashMap 幾個重要方法

在 ConcurrentHashMap 中使用了 unSafe 方法,經過直接操做內存的方式來保證併發處理的安全性,使用的是硬件的安全機制。網絡

private static final sun.misc.Unsafe U;
    private static final long SIZECTL;
    private static final long TRANSFERINDEX;
    private static final long BASECOUNT;
    private static final long CELLSBUSY;
    private static final long CELLVALUE;
    private static final long ABASE;
    private static final int ASHIFT;

    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
            Class<?> ak = Node[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    // CAS 將Node插入bucket
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

2.2 put()流程

仍是老規矩,先上流程圖幫助閱讀源碼。併發

主體源碼以下:spa

public V put(K key, V value) {
        return putVal(key, value, false);
    }


    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 基礎數組
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 若是bucket==null,即沒有hash衝突,CAS插入
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 若是在進行擴容操做,則先擴容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 不然,存在hash衝突
            else {
                V oldVal = null;
                // 加鎖同步
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 遍歷過程當中出現相同key直接覆蓋
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 尾插法插入
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 若是是樹節點,遍歷紅黑樹
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

put 操做過程以下:線程

  • 若是沒有初始化就先調用 initTable()方法來進行初始化過程
  • 若是沒有 hash 衝突就直接 CAS 插入
  • 若是還在進行擴容操做就先進行擴容
  • 若是存在 hash 衝突,就加鎖來保證線程安全,這裏有兩種狀況,一種是鏈表形式就直接遍歷到尾端插入,一種是紅黑樹就按照紅黑樹結構插入
  • 最後一個若是該鏈表的數量大於閾值 8,就要先轉換成黑紅樹的結構,break 再一次進入循環
  • 若是添加成功就調用 addCount()方法統計 size,而且檢查是否須要擴容

2.3 ConcurrentHashMap 的存儲結構

下邊的示意圖來自網絡設計

3. 結語

本文只分析了 ConcurrentHashMapput() 方法,並無分析 get()、擴容、刪除節點等方法。主要目的是初步瞭解 ConcurrentMap 確保併發寫的設計思路。至此,本篇文章結束,感謝你們的閱讀!歡迎你們關注公衆號【當我趕上你】。code

相關文章
相關標籤/搜索