一文看懂ConcurrentHashMap

上次寫了一文看懂HashMap,談到HashMap的線程安全問題就不得不聊聊ConcurrentHashMap,若你不瞭解HashMap的話能夠看看上面那篇文章,ConcurrentHashMap和HashMap在不少地方是相似的,好比底層都是數組+鏈表+紅黑樹、數組大小都是2的冪次方等.......一些重複的知識點在這裏就不細講了。這篇文章主要會解決如下幾個問題:node

  • HashMap爲何多線程下會不安全
  • 什麼是CAS算法
  • ConcurrentHashMap是如何解決線程安全問題的
  • ConcurrentHashMap查找以及插入過程

其實ConcurrentHashMap相比HashMap複雜了許多,主要是由於會涉及到許多併發層面的知識點,好比CAS算法、volitale以及synchronized關鍵字等,本文會粗略介紹一下相關知識點,接下來咱們先聊聊HashMap的線程安全問題以及爲何要使用ConcurrentHashMap。面試

HashMap爲何線程不安全

HashMap在併發環境下主要有如下幾個問題:算法

  • 死循環(JDK1.7)

在1.7版本,當擴容後生成新數組,在轉移元素的過程當中,使用的是頭插法,也就是鏈表的順序會翻轉,當多個線程執行插入操做時可能會發生死循環。在1.8版本時將頭插法改爲了尾插法,解決了死循環的問題。segmentfault

  • 數據丟失

當兩個線程同時插入元素時可能會發生數據被覆蓋的狀況數組

先看下源碼安全

if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);

當兩個線程同時執行到以上代碼時,發現沒有發生哈希衝突,因而新建Node節點插入,這時先插入的節點會被後插入的節點覆蓋,致使數據丟失。數據結構

那麼有哪些解決方法呢?

  • Hashtable

給全部方法加synchronized鎖,很是低效,如今已經淘汰。多線程

  • Synchronized Map

Collections包提供的一個方法,會同步整個對象,也不推薦使用架構

  • ConcurrentHashMap

儘管沒有同步整個Map,可是它仍然是線程安全的,讀操做很是快,而寫操做則是經過加鎖完成的,推薦使用併發

在開始以前須要先介紹下CAS算法,這也是ConcurrentHashMap實現線程安全的一個關鍵點。

CAS

CAS能夠看作是樂觀鎖的一種實現方式,Java原子類中的遞增操做就經過CAS自旋實現的。
CAS全稱 Compare And Swap(比較與交換),是一種無鎖算法。在不使用鎖(沒有線程被阻塞)的狀況下實現多線程之間的變量同步。

CAS底層就是經過Unsafe類中的方法來實現的,以下所示:

unsafe.compareAndSwapInt(this, valueOffset, expect, update)

下面介紹一下各個參數

  • this:Unsafe對象自己,須要經過這個類來獲取value的內存偏移地址。
  • valueOffset:value變量的內存偏移地址。
  • expect:指望更新的值。
  • update:要更新的最新值。

經過valueOffset能夠拿到value的值,當且僅當value的值等於expect時,CAS經過原子方式用新值update來更新value的值,不然不會執行任何操做。

整個「比較+更新」操做封裝在 compareAndSwapInt()中,在JNI裏是藉助於一個CPU指令完成的,屬於原子操做,能夠保證多個線程都可以看到同一個變量的修改值。

ConcurrentHashMap的源碼中除了普通的CAS操做,還定義了三個原子操做,用於對指定位置的節點進行操做。正是這些原子操做保證了ConcurrentHashMap的線程安全,以下所示:

//獲取tab數組的第i個node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//使用CAS嘗試更新table[i]
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//寫入table[i]
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

ConcurrentHashMap

ConcurrentHashMap支持併發的讀寫。跟1.7版本相比,JDK1.8的實現已經摒棄了Segment的概念,而是直接用Node數組+鏈表+紅黑樹的數據結構來實現,併發控制使用SynchronizedCAS來操做,雖然源碼裏面還保留了,也只是爲了兼容性的考慮,所以本文主要講解的是JDK1.8版本的ConcurrentHashMap。

屬性

先來介紹一個核心屬性sizeCtl

private transient volatile int sizeCtl;

用途:控制table數組的初始化和擴容的操做,不一樣的值有不一樣的含義

  • 當爲負數時:-1表明正在初始化,-N表明有N-1個線程正在進行擴容
  • 當爲0時(默認值):表明table數組尚未被初始化
  • 當爲正數時:表示初始化或者下一次進行擴容的大小

其它屬性

transient volatile Node<K,V>[] table;//哈希數組,保存Ndode節點
private transient volatile Node<K,V>[] nextTable;//擴容用的數組,只有在擴容時纔不爲null

private static final int DEFAULT_CAPACITY = 16;//默認大小
private static final float LOAD_FACTOR = 0.75f;//負載因子

static final int MOVED = -1; //表示正在擴容

volatile

在上面咱們能夠看到volatile關鍵字,這裏先簡單介紹一下該關鍵字的做用:

  • 保證變量的可見性

在多線程環境下,某個共享變量若是被其中一個線程給修改了,其餘線程可以當即知道這個共享變量已經被修改了,當其餘線程要讀取這個變量的時候,最終會去內存中讀取,而不是從本身的工做空間中讀取

  • 保證有序性

虛擬機在進行代碼編譯優化的時候,對於那些改變順序以後不會對最終變量的值形成影響的代碼,是有可能將他們進行重排序的,可是在多線程下可能會引起線程安全問題,使用volatile能夠禁止重排序

注意:volatile關鍵字沒法保證變量的原子性。

在面試中volatile底層實現機制也是常考的一個知識點,因爲篇幅有限這裏只是簡單介紹一下概念,若是對原理感興趣的同窗能夠上網搜索一下相關資料。

數據結構

ConcurrentHashMap和HashMap都是由數組+鏈表+紅黑樹構成,不過有一個不一樣的是ConcurrentHashMap的數組中放入的不是TreeNode結點,而是將TreeNode包裝起來的TreeBin對象,以下圖所示:

concurrenthashmap.png

構造方法

public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
}

和HashMap實現差很少,也是用tableSizeFor方法來確保數組大小爲2的冪次方,​ 能夠看出構造函數主要是設定sizeCtl的值,並未對錶進行初始化。當表未初始化的時候,sizeCtl的值其實指定的是表的長度。
tableSizeFor方法用來保證數組爲2的冪次方,若是不瞭解其實現能夠參考一文看懂HashMap

初始化

在ConcurrentHashMap裏table數組第一次初始化是在initTable裏執行的,這點和HashMap有點不一樣,簡單看下初始化步驟:

  • 當數組table未初始化時,當 sizeCtl < 0 說明有別的線程正在初始化或擴容,自旋等待
  • 接着嘗試調用CAS去更新sizeCtl的值
  • 若更新成功初始化table數組,而且把sizeCtl設置爲容量閾值(也就是HashMap的threshold)
  • 若更新失敗則說明別的線程已經執行過初始化操做了,直接返回table數組便可
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    
    while ((tab = table) == null || tab.length == 0) {
        //當sizeCtl<0說明有別的線程正在初始化或擴容,自旋等待
        if ((sc = sizeCtl) < 0)
            Thread.yield(); 
         //SIZECTL:表示當前對象的內存偏移量,sc表示指望值,-1表示要替換的值,設定爲-1表示要初始化表
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //檢查table數組是否已經被初始化
                if ((tab = table) == null || tab.length == 0) {
                    //若sc=0則設置默認容量16,不然設置爲指定容量大小
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//初始化數組
                    table = tab = nt;
                    sc = n - (n >>> 2);//n - (n >>> 2) = 0.75n,也就是說sc的值等於threshold
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

這裏須要注意的一點是在else if塊裏面須要從新判斷一次table是否未初始化,由於在finally塊裏改變了sizeCtl值,這時候其它線程是可以進入else if塊中的,這樣就會執行兩次初始化操做了。

查詢

在介紹get方法以前先來看看ConurrentHashMap如何計算key的hash值,ConcurrentHashMap用了spread函數來求hash值,它與HashMap的hash函數有略微不一樣,代碼以下:

static final int spread(int h) { 
     return (h ^ (h >>> 16)) & HASH_BITS; 
}

除了高16位和低16位或操做以外,最後還和HASH_BITS相與,其值爲0x7fffffff。它的做用主要是使hash值爲正數。在ConcurrentHashMap中,Hash值爲負數有特別的意義,如-1表示ForwardingNode結點,-2表示TreeBin結點

什麼是ForwardingNode結點和TreeBin結點?

//只在擴容時出現,實現了擴容時舊錶和新表的鏈接
 static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);//MOVED = -1
            this.nextTable = tab;
        }
     ......
}

ForwardingNode節點是Node節點的子類,hash值固定爲-1,只在擴容 transfer的時候出現,當舊數組中所有的節點都遷移到新數組中時,舊數組就在數組中放置一個ForwardingNode。讀操做或者迭代讀時碰到ForwardingNode時,將操做轉發到擴容後的新的table數組上去執行,寫操做遇見它時,則嘗試幫助擴容。

至於TreeBin節點也是繼承自Node,hash值固定爲-2,是紅黑樹的包裝結點。(有關紅黑樹因爲篇幅有限這裏就不展開講了)

查詢步驟

  • 首先計算出key的hash值
  • 判斷table是否已經初始化以及數組下標位置上是否有元素(和HashMap同樣使用(n-1)&hash計算下標)
  • 判斷第一個節點是否就是要查找的節點
  • hash = -1則調用ForwardingNode的find函數轉發到nextTable上查找;若 hash = -2 則調用TreeBin的find函數查找元素
  • 不然遍歷鏈表查詢元素

接下來看看get方法源碼

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());//計算hash值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {//判斷第一個節點是否就是目標節點
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //若是hash值小於0,有兩種狀況 
            //-1是ForwardingNode,則用find函數轉發到nextTable上查找  
            //-2是TreeBin,調用TreeBin的find函數。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //正常節點則遍歷查找
            while ((e = e.next) != null) {
                if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

tabAt方法使用volatile來獲取數組上的元素,在介紹CA時已經說過了,若是忘記了請翻到上面查看。

從代碼也能夠看出get方法是不加鎖的,這裏比較須要注意的一點是hash值爲-1的ForwardingNode節點,當讀操做碰到ForwardingNode時會調用find方法轉發到擴容後的新的table數組上去執行,咱們來看看find方法的實現:

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
       //到新數組上查找元素
        Node<K,V> find(int h, Object k) {
           //使用循環,避免屢次碰到ForwardingNode致使遞歸過深
            outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    //第一個節點就是要找的節點
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        //繼續遇見ForwardingNode的狀況,這裏至關因而遞歸調用一次本方法
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else//遇見特殊節點,調用其find方法進行查找
                            return e.find(h, k);
                    }
                    if ((e = e.next) == null)//普通節點直接循環遍歷鏈表
                        return null;
                }
            }
        }
    }

擴容時當數組爲空或完成擴容後將ForwardingNode結點插入數組槽中,而find操做在新表中進行查詢。巧妙利用ForwardingNode將舊錶和新錶鏈接起來,保證了其餘線程擴容時也能對結點正常訪問。

插入

仍是同樣先來看看插入過程:

  • 若key或value爲null則拋出NullPointerException異常,也就是說不容許key或value爲null
  • 判斷table是否須要初始化,根據key的hash值計算出在數組中的下標,用tabAt方法讀取節點,若沒有發生hash衝突則用CAS插入節點
  • 若發生了hash衝突,則判斷是否爲ForwardingNode節點,說明在擴容,調用hlepTransfer幫助擴容
  • 若不是ForwardingNode節點,則使用synchronized對節點加鎖,以後遍歷鏈表,若元素已存在則更新舊值,不然在尾部插入節點
  • 若是是TreeBin節點則調用putTreeVal方法插入
  • 最後判斷鏈表是否須要轉換成紅黑樹以及調用addCount方法對節點數量+1,在該方法裏面也會判斷是否須要擴容

put方法源碼:

public V put(K key, V value) {
        return putVal(key, value, false);//false表示若已存在則進行覆蓋
}

    
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //不容許key或value爲null,這點和HashMap不一樣
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;//記錄當前鏈表或紅黑樹長度
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //判斷是否須要初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //用tabAt方法讀取table[i],若沒有發生hash衝突則用CAS插入節點
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;                   
        }
        //若hash值爲-1,則爲ForwardingNode結點,說明在擴容,調用hlepTransfer幫助擴容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            //若是是普通鏈表結點或樹結點,使用synchronized對節點加鎖
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {//二次檢查,相似於單例模式的雙重檢查
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //若元素已存在則更新value值
                            if (e.hash == hash && ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            //插入節點到鏈表尾部
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }//若是是TreeBin節點則調用putTreeVal方法插入
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
               //若鏈表長度太長,則調用treeifyBin將鏈表轉換爲紅黑樹
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);//節點數量+1,檢查是否須要進行擴容
    return null;
}

插入過程也不是很難,不少地方和HashMap差很少,能夠照着註釋多看幾遍就懂了。

擴容

在put源碼最後會調用addCount方法來修改元素個數,在addCount方法裏面又會檢查是否須要調用transfer方法來擴容,ConcurrentHashMap的併發擴容是設計的一個精髓,因爲博主能力有限至今還未搞懂,如有興趣的能夠參考這篇:阿里十年架構師,教你深度分析ConcurrentHashMap原理分析

刪除

源碼不是很難,主要是replaceNode方法的幾個參數搞懂就行,這裏再也不細講。

public V remove(Object key) {
    return replaceNode(key, null, null);
}

//cv是指望值,當待刪除節點的值等於cv時,用value替換舊值
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //table未初始化
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        //正在擴容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {              
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;                              
                                //符合更新value或者刪除節點的條件
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    //更新value
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        //CAS設置節點
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            //當前節點不是目標節點,繼續遍歷下一個節點
                            pred = e;
                            if ((e = e.next) == null)
                                //到達鏈表尾部,依舊沒有找到,跳出循環
                                break;
                        }
                    }
                    //紅黑樹
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    //若是刪除了節點,更新size
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

總結

有關ConcurrentHashMap到這裏暫時先告一段落了,博主當初還覺得三天能夠寫完,到今天已經第五天了沒想到還沒搞定,其中還有一些經典的設計好比transfer擴容方法因爲能力有限沒列出來,不過本篇對於面試來講應該已經夠用了,若是有哪裏寫得不對歡迎各位指出來!

參考

ConcurrentHashMap源碼解析
一文完全搞定CAS

相關文章
相關標籤/搜索