面試:爲了進阿里,死磕了ConcurrentHashMap源碼和麪試題(一)

該系列文章收錄在公衆號【Ccww技術博客】,原創技術文章早於博客推出

前言

在平時中集合使用中,當涉及多線程開發時,若是使用HashMap可能會致使死鎖問題,使用HashTable效率又不高。而ConcurrentHashMap在保持同步同時併發效率比較高,ConcurrentHashmap是最好的選擇,那面試中也會被經常問到,那可能的問題是:java

  • ConcurrentHashMap的實現原理node

    • ConcurrentHashMap1.7和1.8的區別?
    • ConcurrentHashMap使用什麼技術來保證線程安全
  • ConcurrentHashMap的put()方法面試

    • ConcurrentHashmap 不支持 key 或者 value 爲 null 的緣由?
    • put()方法如何實現線程安全呢?
  • ConcurrentHashMap擴容機制
  • ConcurrentHashMap的get方法是否要加鎖,爲何?
  • 其餘問題算法

    • 爲何使用ConcurrentHashMap
    • ConcurrentHashMap迭代器是強一致性仍是弱一致性?HashMap呢?
    • JDK1.7與JDK1.8中ConcurrentHashMap的區別

ConcurrentHashMap的實現原理

ConcurrentHashMap的出現主要爲了解決hashmap在併發環境下不安全,JDK1.8ConcurrentHashMap的設計與實現很是精巧,大量的利用了volatile,CAS等樂觀鎖技術來減小鎖競爭對於性能的影響,ConcurrentHashMap保證線程安全的方案是:segmentfault

  • JDK1.8:synchronized+CAS+HashEntry+紅黑樹;
  • JDK1.7:ReentrantLock+Segment+HashEntry。

JDK7 ConcurrentHashMap

在JDK1.7中ConcurrentHashMap由Segment(分段鎖)數組結構和HashEntry數組組成,且主要經過Segment(分段鎖)段技術實現線程安全。數組

Segment是一種可重入鎖,是一種數組和鏈表的結構,一個Segment中包含一個HashEntry數組,每一個HashEntry又是一個鏈表結構,所以在ConcurrentHashMap查詢一個元素的過程須要進行兩次Hash操做,以下所示:安全

  • 第一次Hash定位到Segment,
  • 第二次Hash定位到元素所在的鏈表的頭部

正是經過Segment分段鎖技術,將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問,可以實現真正的併發訪問。數據結構

這樣結構會使Hash的過程要比普通的HashMap要長,影響性能,但寫操做的時候能夠只對元素所在的Segment進行加鎖便可,不會影響到其餘的Segment,ConcurrentHashMap提高了併發能力。多線程

JDK8 ConcurrentHashMap

在JDK8ConcurrentHashMap內部機構:數組+鏈表+紅黑樹,Java 8在鏈表長度超過必定閾值(8)時將鏈表(尋址時間複雜度爲O(N))轉換爲紅黑樹(尋址時間複雜度爲O(long(N))),結構基本上與功能和JDK8的HashMap同樣,只不過ConcurrentHashMap保證線程安全性。併發

但在JDK1.8中摒棄了Segment分段鎖的數據結構,基於CAS操做保證數據的獲取以及使用synchronized關鍵字對相應數據段加鎖來實現線程安全,這進一步提升了併發性。(CAS原理詳情《面試:爲了進阿里,又把併發CAS(Compare and Swap)實現從新精讀一遍》)
))

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;  //使用了volatile屬性
        volatile Node<K,V> next;  //使用了volatile屬性
        ...
    }

ConcurrentHashMap採用Node類做爲基本的存儲單元,每一個鍵值對(key-value)都存儲在一個Node中,使用了volatile關鍵字修飾value和next,保證併發的可見性。其中Node子類有:

  • ForwardingNode:擴容節點,只是在擴容階段使用的節點,主要做爲一個標記,在處理併發時起着關鍵做用,有了ForwardingNodes,也是ConcurrentHashMap有了分段的特性,提升了併發效率
  • TreeBin:TreeNode的代理節點,用於維護TreeNodes,ConcurrentHashMap的紅黑樹存放的是TreeBin
  • TreeNode:用於樹結構中,紅黑樹的節點(當鏈表長度大於8時轉化爲紅黑樹),此節點不能直接放入桶內,只能是做爲紅黑樹的節點
  • ReservationNode:保留結點

ConcurrentHashMap中查找元素、替換元素和賦值元素都是基於sun.misc.Unsafe原子操做實現多併發的無鎖化操做。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
    }

ConcurrentHashMap的put()方法

ConcurrentHashMap的put的流程步驟

  1. 若是key或者value爲null,則拋出空指針異常,和HashMap不一樣的是HashMap單線程是容許爲Null;

    if (key == null || value == null) throw new NullPointerException();

  2. for的死循環,爲了實現CAS的無鎖化更新,若是table爲null或者table的長度爲0,則初始化table,調用initTable()方法(第一次put數據,調用默認參數實現,其中重要的sizeCtl參數)。

    //計算索引的第一步,傳入鍵值的hash值
        int hash = spread(key.hashCode());
        int binCount = 0; //保存當前節點的長度
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); //初始化Hash表
            ...
        }
  3. 肯定元素在Hash表的索引

    經過hash算法能夠將元素分散到哈希桶中。在ConcurrentHashMap中經過以下方法肯定數組索引:

    第一步:

    static final int spread(int h) {
            return (h ^ (h >>> 16)) & HASH_BITS; 
        }

    第二步:(length-1) & (h ^ (h >>> 16)) & HASH_BITS);

  4. 經過tableAt()方法找到位置tab[i]Node,當Node爲null時爲沒有hash衝突的話,使用casTabAt()方法CAS操做將元素插入到Hash表中,ConcurrentHashmap使用CAS無鎖化操做,這樣在高併發hash衝突低的狀況下,性能良好。

    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    //利用CAS操做將元素插入到Hash表中
                    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                        break;  // no lock when adding to empty bin(插入null的節點,無需加鎖)
                }
  5. 當f不爲null時,說明發生了hash衝突,當f.hash == MOVED==-1 時,說明ConcurrentHashmap正在發生resize操做,使用helpTransfer()方法幫助正在進行resize操做。

    else if ((fh = f.hash) == MOVED) //f.hash == -1 
            //hash爲-1 說明是一個forwarding nodes節點,代表正在擴容
            tab = helpTransfer(tab, f);
  6. 以上狀況都不知足的時,使用synchronized同步塊上鎖當前節點Node ,並判斷有沒有線程對數組進行了修改,若是沒有則進行:

    • 遍歷該鏈表並統計該鏈表長度binCount,查找是否有和key相同的節點,若是有則將查找到節點的val值替換爲新的value值,並返回舊的value值,不然根據key,value,hash建立新Node並將其放在鏈表的尾部
    • 若是Node fTreeBin的類型,則使用紅黑樹的方式進行插入。而後則退出synchronized(f)鎖住的代碼塊
    //當前節點加鎖
     synchronized (f) {
     //判斷下有沒有線程對數組進行了修改
     if (tabAt(tab, i) == f) {
           //若是hash值是大於等於0的說明是鏈表
            if (fh >= 0) {
                  binCount = 1;
                  for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                       //插入的元素鍵值的hash值有節點中元素的hash值相同,替換當前元素的值
                          if (e.hash == hash &&
                               ((ek = e.key) == key ||
                                (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    //替換當前元素的值
                                    e.val = value;
                                break;
                            }
                         Node<K,V> pred = e;
                          //若是循環到鏈表結尾還沒發現,那麼進行插入操做
                         if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                               }
                    }
              }else if (f instanceof TreeBin) { //節點爲樹
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    //替換舊值
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
  7. 執行完synchronized(f)同步代碼塊以後會先檢查binCount,若是大於等於TREEIFY_THRESHOLD = 8則進行treeifyBin操做嘗試將該鏈表轉換爲紅黑樹。

    if (binCount != 0) {
                  //若是節點長度大於8,轉化爲樹
                  if (binCount >= TREEIFY_THRESHOLD)
                       treeifyBin(tab, i);
                  if (oldVal != null)
                       return oldVal; 
                   break;
             }
  8. 執行了一個addCount方法,主要用於統計數量以及決定是否須要擴容.

    addCount(1L, binCount);

ConcurrentHashmap 不支持 key 或者 value 爲 null 的緣由?

ConcurrentHashmaphashMap不一樣的是,concurrentHashMapkeyvalue都不容許爲null,

由於concurrenthashmap它們是用於多線程的,併發的 ,若是map.get(key)獲得了null,不能判斷究竟是映射的value是null,仍是由於沒有找到對應的key而爲空,

而用於單線程狀態的hashmap卻能夠用containKey(key) 去判斷究竟是否包含了這個null。

put()方法如何實現線程安全呢?

  1. 在第一次put數據時,調用initTable()方法
/**  
 * Hash表的初始化和調整大小的控制標誌。爲負數,Hash表正在初始化或者擴容;  
 * (-1表示正在初始化,-N表示有N-1個線程在進行擴容)  
 * 不然,當表爲null時,保存建立時使用的初始化大小或者默認0;  
 * 初始化之後保存下一個調整大小的尺寸。  
 */  
 private transient volatile int sizeCtl;  
     //第一次put,初始化數組  
     private final Node<K,V>[] initTable() {  
         Node<K,V>[] tab; int sc;  
         while ((tab = table) == null || tab.length == 0) {  
             //若是已經有別的線程在初始化了,這裏等待一下  
             if ((sc = sizeCtl) < 0)  
             Thread.yield(); // lost initialization race; just spin  
             //-1 表示正在初始化  
             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  
             ...  
         } finally {  
            sizeCtl = sc;  
         }  
            break;  
         }  
     }  
     return tab;  
 }

使用sizeCtl參數做爲控制標誌的做用,當在從插入元素時,纔會初始化Hash表。在開始初始化的時候,

  • 首先判斷sizeCtl的值,若是sizeCtl < 0,說明有線程在初始化當前線程便放棄初始化操做。不然,將SIZECTL設置爲-1Hash表進行初始化
  • 初始化成功之後,將sizeCtl的值設置爲當前的容量值
  1. 在不存在hash衝突的時
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
     //利用CAS操做將元素插入到Hash表中  
     if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))  
     break;  // no lock when adding to empty bin(插入null的節點,無需加鎖)  
 }

(f = tabAt(tab, i = (n - 1) & hash)) == null中使用tabAt原子操做獲取數組,並利用casTabAt(tab, i, null, new Node<K,V>(hash, key, value))CAS操做將元素插入到Hash表中

  1. 在存在hash衝突時,先把當前節點使用關鍵字synchronized加鎖,而後再使用tabAt()原子操做判斷下有沒有線程對數組進行了修改,最後再進行其餘操做。

爲何要鎖住更新操做的代碼塊?

由於發生了哈希衝突,當前線程正在f所在的鏈表上進行更新操做,假如此時另一個線程也須要到這個鏈表上進行更新操做,則須要等待當前線程更新完後再執行

//當前節點加鎖  
synchronized (f) {  
     //這裏判斷下有沒有線程對數組進行了修改  
     if (tabAt(tab, i) == f) {  
     ......//do something  
 }
}

因爲篇幅過於長,分紅兩部分來說講,接下來的內容請看[《面試:爲了進阿里,死磕了ConcurrentHashMap源碼和麪試題(二)》]()

各位看官還能夠嗎?喜歡的話,動動手指點個💗,點個關注唄!!謝謝支持
歡迎關注公衆號【Ccww技術博客】,原創技術文章第一時間推出
相關文章
相關標籤/搜索