帶你走進Java集合之ConcurrentHashMap

1、概述

上一篇文章《帶你走進java集合之HashMap》分析了HashMap的實現原理,重點分析了HashMap是怎麼樣的一種數據結構,以及如何去插入,查詢,擴容等操做。相信通過上一篇文章的學習,你們應該對HashMap有了必定的瞭解,可是咱們知道HashMap是一個線程不安全的集合,在多線程狀況下使用HashMap會有不少問題,那麼咱們如何使用一個線程安全的HashMap呢,接下來咱們就介紹一個線程安全的Map,ConcurrentHashMap,咱們來看看這個線程安全的Map道理如何使用,又是如何實現的。java

2、HashMap和ConcurrentHashMap的對比

咱們用一段代碼證實下HashMap的線程不安全,以及ConcurrentHashMap的線程安全性。代碼邏輯很簡單,開啓10000個線程,每一個線程作很簡單的操做,就是put一個key,而後刪除一個key,理論上線程安全的狀況下,最後map的size()確定爲0。node

Map<Object,Object> myMap=new HashMap<>();
        // ConcurrentHashMap myMap=new ConcurrentHashMap();
        for (int i = 0; i <10000 ; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                        double a=Math.random();
                        myMap.put(a,a);
                        myMap.remove(a);
                }
            }).start();
        }
        Thread.sleep(15000l);//多休眠下,保證上面的線程操做完畢。
        System.out.println(myMap.size());
複製代碼

結果:編程

這裏顯示Map的size=13,可是實際上map裏還有一個key。 一樣的代碼咱們用ConcurrentHashMap來運行下,結果以下:

這裏就證實了ConcurrentHashMap是線程安全的,咱們接下來從源碼分析下ConcurrentHashMap是如何保證線程安全的,本次源碼jdk版本爲1.8。

3、ConcurrentHashMap源碼分析

3.1 ConcurrentHashMap的基礎屬性

//默認最大的容量 
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默認初始化的容量
private static final int DEFAULT_CAPACITY = 16;
//最大的數組可能長度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默認的併發級別,目前並無用,只是爲了保持兼容性
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//和hashMap同樣,負載因子
private static final float LOAD_FACTOR = 0.75f;
//和HashMap同樣,鏈表轉換爲紅黑樹的閾值,默認是8
static final int TREEIFY_THRESHOLD = 8;
//紅黑樹轉換鏈表的閥值,默認是6
static final int UNTREEIFY_THRESHOLD = 6;
//進行鏈表轉換最少須要的數組長度,若是沒有達到這個數字,只能進行擴容
static final int MIN_TREEIFY_CAPACITY = 64;
//table擴容時, 每一個線程最少遷移table的槽位個數
private static final int MIN_TRANSFER_STRIDE = 16;
//感受是用來計算偏移量和線程數量的標記
private static int RESIZE_STAMP_BITS = 16;
//可以調整的最大線程數量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//記錄偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//值爲-1, 當Node.hash爲MOVED時, 表明着table正在擴容
static final int MOVED     = -1;
//TREEBIN, 置爲-2, 表明此元素後接紅黑樹
static final int TREEBIN   = -2;
//感受是佔位符,目前沒看出來明顯的做用
static final int RESERVED  = -3;
//主要用來計算Hash值的
static final int HASH_BITS = 0x7fffffff; 
//節點數組
transient volatile Node<K,V>[] table;
//table遷移過程臨時變量, 在遷移過程當中將元素所有遷移到nextTable上
private transient volatile Node<K,V>[] nextTable;
//基礎計數器
private transient volatile long baseCount;
//table擴容和初始化的標記,不一樣的值表明不一樣的含義,默認爲0,表示未初始化
//-1: table正在初始化;小於-1,表示table正在擴容;大於0,表示初始化完成後下次擴容的大小
private transient volatile int sizeCtl;
//table容量從n擴到2n時, 是從索引n->1的元素開始遷移, transferIndex表明當前已經遷移的元素下標
private transient volatile int transferIndex;
//擴容時候,CAS鎖標記
private transient volatile int cellsBusy;
//計數器表,大小是2次冪
private transient volatile CounterCell[] counterCells;
複製代碼

上面就是ConcurrentHashMap的基本屬性,咱們大部分和HashMap同樣,只是增長了部分屬性,後面咱們來分析增長的部分屬性是起到如何的做用的。數組

3.2 ConcurrentHashMap的經常使用方法屬性

  • put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
        //key和value不容許爲null
        if (key == null || value == null) throw new NullPointerException();
        //計算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //若是table沒有初始化,進行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //計算數組的位置    
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //若是該位置爲空,構造新節點添加便可
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }//若是正在擴容
            else if ((fh = f.hash) == MOVED)
            //幫着一塊兒擴容
                tab = helpTransfer(tab, f);
            else {
            //開始真正的插入
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                    //若是已經初始化完成了
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //這裏key相同直接覆蓋原來的節點
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //不然添加到節點的最後面
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,  value, null);
                                    break;
                                }
                            }
                        }//若是節點是樹節點,就進行樹節點添加操做
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,alue)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }//判斷節點是否要轉換成紅黑樹
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);//紅黑樹轉換
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //計數器,採用CAS計算size大小,而且檢查是否須要擴容
        addCount(1L, binCount);
        return null;
    }
複製代碼

咱們發現ConcurrentHashMap的put方法和HashMap的邏輯相差不大,主要是新增了線程安所有分,在添加元素時候,採用synchronized來保證線程安全,而後計算size的時候採用CAS操做進行計算。整個put流程比較簡單,總結下就是:安全

1.判斷key和vaule是否爲空,若是爲空,直接拋出異常。bash

2.判斷table數組是否已經初始化完畢,若是沒有初始化,進行初始化。數據結構

3.計算key的hash值,若是該位置爲空,直接構造節點放入。多線程

4.若是table正在擴容,進入幫助擴容方法。併發

5.最後開啓同步鎖,進行插入操做,若是開啓了覆蓋選項,直接覆蓋,不然,構造節點添加到尾部,若是節點數超過紅黑樹閾值,進行紅黑樹轉換。若是當前節點是樹節點,進行樹插入操做。dom

6.最後統計size大小,並計算是否須要擴容。

  • get方法
public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //計算hash值
        int h = spread(key.hashCode());
        //若是table已經初始化,而且計算hash值的索引位置node不爲空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //若是hash相等,key相等,直接返回該節點的value
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }//若是hash值爲負值表示正在擴容,這個時候查的是ForwardingNode的find方法來定位到節點。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //循環遍歷鏈表,查詢key和hash值相等的節點。
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
複製代碼

get方法比較簡單,主要流程以下:

1.直接計算hash值,查找的節點若是key和hash值相等,直接返回該節點的value就行。

2.若是table正在擴容,就調用ForwardingNode的find方法查找節點。

3.若是沒有擴容,直接循環遍歷鏈表,查找到key和hash值同樣的節點值便可。

  • ConcurrentHashMap的擴容

ConcurrentHashMap的擴容相對於HashMap的擴容相對複雜,由於涉及到了多線程操做,這裏擴容方法主要是transfer,咱們來分析下這個方法的源碼,研究下是如何擴容的。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //保證每一個線程擴容最少是16,
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
            //擴容2倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                //出現異常狀況就不擴容了。
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //用新數組對象接收
            nextTable = nextTab;
            //初始化擴容下表爲原數組的長度
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //擴容期間的過渡節點
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                //若是該線程已經完成了
                if (--i >= bound || finishing)
                    advance = false;
                //設置擴容轉移下標,若是下標小於0,說明已經沒有區間能夠操做了,線程能夠退出了
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }CAS操做設置區間
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //若是計算的區間小於0了,說明區間分配已經完成,沒有剩餘區間分配了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {//若是擴容完成了,進行收尾工做
                    nextTable = null;//清空臨時數組
                    table = nextTab;//賦值原數組
                    sizeCtl = (n << 1) - (n >>> 1);//從新賦值sizeCtl
                    return;
                }//若是擴容還在進行,本身任務完成就進行sizeCtl-1,這裏是由於,擴容是經過helpTransfer()和addCount()方法來調用的,在調用transfer()真正擴容以前,sizeCtl都會+1,因此這裏每一個線程完成後就進行-1。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //這裏應該是判斷擴容是否結束
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //結束,賦值狀態
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }//若是在table中沒找到,就用過渡節點
            else if ((f = tabAt(tab, i)) == null)
                //成功設置就進入下一個節點
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                //若是節點不爲空,而且該位置的hash值爲-1,表示已經處理了,直接進入下一個循環便可
                advance = true; // already processed
            else {
            //這裏說明老table該位置不爲null,也沒有被處理過,進行真正的處理邏輯。進行同步鎖
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //若是hash值大於0
                        if (fh >= 0) {
                        //爲運算結果
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //這裏的邏輯和hashMap是同樣的,都是採用2個鏈表進行處理,具體分析能夠查看我分析HashMap的文章
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }//若是是樹節點,執行樹節點的擴容數據轉移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                //也是經過位運算判斷兩個鏈表的位置    
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //這裏判斷是否進行樹轉換
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                           //這裏把新處理的鏈表賦值到新數組中
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
    
複製代碼

ConcurrentHashMap的擴容仍是比較複雜,複雜主要表如今,控制多線程擴容層面上,擴容的源碼我沒有解析的很細,一方面是確實比較複雜,本人有某些地方也不是太明白,另外一方面是我以爲咱們研究主要是弄懂其思想,能搞明白關鍵代碼和關鍵思路便可,只要不是從新實現一套相似的功能,我想就不用糾結其所有細節了。總結下ConcurrentHashMap的擴容步驟以下:

1.獲取線程擴容處理步長,最少是16,也就是單個線程處理擴容的節點個數。

2.新建一個原來容量2倍的數組,構造過渡節點,用於擴容期間的查詢操做。

3.進行死循環進行轉移節點,主要根據finishing變量判斷是否擴容結束,在擴容期間經過給不一樣的線程設置不一樣的下表索引進行擴容操做,就是不一樣的線程,操做的數組分段不同,同時利用synchronized同步鎖鎖住操做的節點,保證了線程安全。

4.真正進行節點在新數組的位置是和HashMap擴容邏輯同樣的,經過位運算計算出新鏈表是否位於原位置或者位於原位置+擴容的長度位置,具體分析能夠查看個人這篇文章

4、總結

1.ConcurrentHashMap大部分的邏輯代碼和HashMap是同樣的,主要經過synchronized和來保證節點插入擴容的線程安全,這裏確定有同窗會問,爲啥使用synchronized呢?而不用採起樂觀鎖,或者lock呢?我我的以爲可能緣由有2點:

  • a.樂觀鎖比較適用於競爭衝突比較少的場景,若是衝突比較多,那麼就會致使不停的重試,這樣反而性能更低。
  • b.synchronized在經歷了優化以後,其實性能已經和lock沒啥差別了,某些場景可能還比lock快。因此,我以爲這是採用synchronized來同步的緣由。

2.ConcurrentHashMap的擴容核心邏輯主要是給不一樣的線程分配不一樣的數組下標,而後每一個線程處理各自下表區間的節點。同時處理節點複用了hashMap的邏輯,經過位運行,能夠知道節點擴容後的位置,要麼在原位置,要麼在原位置+oldlength位置,最後直接賦值便可。

5、參考

更好地理解jdk1.8中ConcurrentHashMap實現機制

6、推薦閱讀

Java鎖之ReentrantLock(一)

Java鎖之ReentrantLock(二)

Java鎖之ReentrantReadWriteLock

JAVA NIO編程入門(一)

JAVA NIO 編程入門(二)

JAVA NIO 編程入門(三)

帶你走進java集合之ArrayList

帶你走進java集合之HashMap

相關文章
相關標籤/搜索