Java併發(9)- 從同步容器到併發容器

引言

容器是Java基礎類庫中使用頻率最高的一部分,Java集合包中提供了大量的容器類來幫組咱們簡化開發,我前面的文章中對Java集合包中的關鍵容器進行過一個系列的分析,但這些集合類都是非線程安全的,即在多線程的環境下,都須要其餘額外的手段來保證數據的正確性,最簡單的就是經過synchronized關鍵字將全部使用到非線程安全的容器代碼所有同步執行。這種方式雖然能夠達到線程安全的目的,但存在幾個明顯的問題:首先編碼上存在必定的複雜性,相關的代碼段都須要添加鎖。其次這種一刀切的作法在高併發狀況下性能並不理想,基本至關於串行執行。JDK1.5中爲咱們提供了一系列的併發容器,集中在java.util.concurrent包下,用來解決這兩個問題,先從同步容器提及。java

同步容器Vector和HashTable

爲了簡化代碼開發的過程,早期的JDK在java.util包中提供了Vector和HashTable兩個同步容器,這兩個容器的實現和早期的ArrayList和HashMap代碼實現基本同樣,不一樣在於Vector和HashTable在每一個方法上都添加了synchronized關鍵字來保證同一個實例同時只有一個線程能訪問,部分源碼以下:node

//Vector
public synchronized int size() {};
public synchronized E get(int index) {};

//HashTable 
public synchronized V put(K key, V value) {};
public synchronized V remove(Object key) {};
複製代碼

經過對每一個方法添加synchronized,保證了屢次操做的串行。這種方式雖然使用起來方便了,但並無解決高併發下的性能問題,與手動鎖住ArrayList和HashMap並無什麼區別,不論讀仍是寫都會鎖住整個容器。其次這種方式存在另外一個問題:當多個線程進行復合操做時,是線程不安全的。能夠經過下面的代碼來講明這個問題:算法

public static void deleteVector(){
    int index = vectors.size() - 1;
    vectors.remove(index);
}
複製代碼

代碼中對Vector進行了兩步操做,首先獲取size,而後移除最後一個元素,多線程狀況下若是兩個線程交叉執行,A線程調用size後,B線程移除最後一個元素,這時A線程繼續remove將會拋出索引超出的錯誤。數組

那麼怎麼解決這個問題呢?最直接的修改方案就是對代碼塊加鎖來防止多線程同時執行:安全

public static void deleteVector(){
    synchronized (vectors) {
        int index = vectors.size() - 1;
        vectors.remove(index);
    }
}
複製代碼

若是上面的問題經過加鎖來解決沒有太直觀的影響,那麼來看看對vectors進行迭代的狀況:bash

public static void foreachVector(){
    synchronized (vectors) {
        for (int i = 0; i < vectors.size(); i++) {
            System.out.println(vectors.get(i).toString());
        }
    }
}
複製代碼

爲了不多線程狀況下在迭代的過程當中其餘線程對vectors進行了修改,就不得不對整個迭代過程加鎖,想象這麼一個場景,若是迭代操做很是頻繁,或者vectors元素很大,那麼全部的修改和讀取操做將不得不在鎖外等待,這將會對多線程性能形成極大的影響。那麼有沒有什麼方式可以很好的對容器的迭代操做和修改操做進行分離,在修改時不影響容器的迭代操做呢?這就須要java.util.concurrent包中的各類併發容器了出場了。數據結構

併發容器CopyOnWrite

CopyOnWrite--寫時複製容器是一種經常使用的併發容器,它經過多線程下讀寫分離來達到提升併發性能的目的,和前面咱們講解StampedLock時所用的解決方案相似:任什麼時候候均可以進行讀操做,寫操做則須要加鎖。不一樣的是,在CopyOnWrite中,對容器的修改操做加鎖後,經過copy一個新的容器來進行修改,修改完畢後將容器替換爲新的容器便可。多線程

這種方式的好處顯而易見:經過copy一個新的容器來進行修改,這樣讀操做就不須要加鎖,能夠併發讀,由於在讀的過程當中是採用的舊的容器,即便新容器作了修改對舊容器也沒有影響,同時也很好的解決了迭代過程當中其餘線程修改致使的併發問題。併發

JDK中提供的併發容器包括CopyOnWriteArrayList和CopyOnWriteArraySet,下面經過CopyOnWriteArrayList的部分源碼來理解這種思想:app

//添加元素
public boolean add(E e) {
    //獨佔鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        //複製一個新的數組newElements
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        //修改後指向新的數組
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

public E get(int index) {
    //未加鎖,直接獲取
    return get(getArray(), index);
}
複製代碼

代碼很簡單,在add操做中經過一個共享的ReentrantLock來獲取鎖,這樣能夠防止多線程下多個線程同時修改容器內容。獲取鎖後經過Arrays.copyOf複製了一個新的容器,而後對新的容器進行了修改,最後直接經過setArray將原數組引用指向了新的數組,避免了在修改過程當中迭代數據出現錯誤。get操做因爲是讀操做,未加鎖,直接讀取就行。CopyOnWriteArraySet相似,這裏不作過多講解。

CopyOnWrite容器雖然在多線程下使用是安全的,相比較Vector也大大提升了讀寫的性能,但它也有自身的問題。

首先就是性能,在講解ArrayList的文章中提到過,ArrayList的擴容因爲使用了Arrays.copyOf每次都須要申請更大的空間以及複製現有的元素到新的數組,對性能存在必定影響。CopyOnWrite容器也不例外,每次修改操做都會申請新的數組空間,而後進行替換。因此在高併發頻繁修改容器的狀況下,會不斷申請新的空間,同時會形成頻繁的GC,這時使用CopyOnWrite容器並非一個好的選擇。

其次還有一個數據一致性問題,因爲在修改中copy了新的數組進行替換,同時舊數組若是還在被使用,那麼新的數據就不能被及時讀取到,這樣就形成了數據不一致,若是須要強數據一致性,CopyOnWrite容器也不太適合。

併發容器ConcurrentHashMap

ConcurrentHashMap容器相較於CopyOnWrite容器在併發加鎖粒度上有了更大一步的優化,它經過修改對單個hash桶元素加鎖的達到了更細粒度的併發控制。在瞭解ConcurrentHashMap容器以前,推薦你們先閱讀我以前對HashMap源碼分析的文章--Java集合(5)一 HashMap與HashSet,由於在底層數據結構上,ConcurrentHashMap和HashMap都使用了數組+鏈表+紅黑樹的方式,只是在HashMap的基礎上添加了併發相關的一些控制,因此這裏只對ConcurrentHashMap中併發相關代碼作一些分析。

仍是先從ConcurrentHashMap的寫操做開始,這裏就是put方法:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode()); //計算桶的hash值
    int binCount = 0;
    //循環插入元素,避免併發插入失敗
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //若是當前桶無元素,則經過cas操做插入新節點
            if (casTabAt(tab, i, null,
                            new Node<K,V>(hash, key, value, null)))
                break;                   
        }
        //若是當前桶正在擴容,則協助擴容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //hash衝突時鎖住當前須要添加節點的頭元素,多是鏈表頭節點或者紅黑樹的根節點
            synchronized (f) { 
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                            value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}
複製代碼

在put元素的過程當中,有幾個併發處理的關鍵點:

  • 若是當前桶對應的節點尚未元素插入,經過典型的無鎖cas操做嘗試插入新節點,減小加鎖的機率,併發狀況下若是插入不成功,很容易想到自旋,也就是for (Node<K,V>[] tab = table;;)
  • 若是當前桶正在擴容,則協助擴容((fh = f.hash) == MOVED)。這裏是一個重點,ConcurrentHashMap的擴容和HashMap不同,它在多線程狀況下或使用多個線程同時擴容,每一個線程擴容指定的一部分hash桶,當前線程擴容完指定桶以後會繼續獲取下一個擴容任務,直到擴容所有完成。擴容的大小和HashMap同樣,都是翻倍,這樣能夠有效減小移動的元素數量,也就是使用2的冪次方的緣由,在HashMap中也同樣。
  • 在發生hash衝突時僅僅只鎖住當前須要添加節點的頭元素便可,多是鏈表頭節點或者紅黑樹的根節點,其餘桶節點都不須要加鎖,大大減少了鎖粒度。

經過ConcurrentHashMap添加元素的過程,知道了ConcurrentHashMap容器是經過CAS + synchronized一塊兒來實現併發控制的。這裏有個額外的問題:爲何使用synchronized而不使用ReentrantLock?前面個人文章也對synchronized以及ReentrantLock的實現方式和性能作過度析,在這裏個人理解是synchronized在後期優化空間上比ReentrantLock更大。

併發容器ConcurrentSkipListMap

java.util中對應的容器在java.util.concurrent包中基本均可以找到對應的併發容器:List和Set有對應的CopyOnWriteArrayList與CopyOnWriteArraySet,HashMap有對應的ConcurrentHashMap,可是有序的TreeMap或並無對應的ConcurrentTreeMap。

爲何沒有ConcurrentTreeMap呢?這是由於TreeMap內部使用了紅黑樹來實現,紅黑樹是一種自平衡的二叉樹,當樹被修改時,須要從新平衡,從新平衡操做可能會影響樹的大部分節點,若是併發量很是大的狀況下,這就須要在許多樹節點上添加互斥鎖,那併發就失去了意義。因此提供了另一種併發下的有序map實現:ConcurrentSkipListMap。

ConcurrentSkipListMap內部使用跳錶(SkipList)這種數據結構來實現,他的結構相對紅黑樹來講很是簡單理解,實現起來也相對簡單,並且在理論上它的查找、插入、刪除時間複雜度都爲log(n)。在併發上,ConcurrentSkipListMap採用無鎖的CAS+自旋來控制。

跳錶簡單來講就是一個多層的鏈表,底層是一個普通的鏈表,而後逐層減小,一般經過一個簡單的算法實現每一層元素是下一層的元素的二分之一,這樣當搜索元素時從最頂層開始搜索,能夠說是另外一種形式的二分查找。

一個簡單的獲取跳錶層數機率算法實現以下:

int random_level()  {  
    K = 1;  
    while (random(0,1))  
        K++;  
  
    return K;  
}  
複製代碼

經過簡單的0和1獲取機率,1層的機率爲50%,2層的機率爲25%,3層的機率爲12.5%,這樣逐級遞減。

一個三層的跳錶添加元素的過程以下:

插入值爲15的節點:
插入後:
維基百科中有一個添加節點的動圖,這裏也貼出來方便理解:

經過分析ConcurrentSkipListMap的put方法來理解跳錶以及CAS自旋併發控制:

private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前繼節點
            if (n != null) { //查找到前繼節點
                Object v; int c;
                Node<K,V> f = n.next; //獲取後繼節點的後繼節點
                if (n != b.next)  //發生競爭,兩次節點獲取不一致,併發致使
                    break;
                if ((v = n.value) == null) {  // 節點已經被刪除
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n) 
                    break;
                if ((c = cpr(cmp, key, n.key)) > 0) { //進行下一輪查找,比當前key大
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) { //相等時直接cas修改值
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }

            z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
            if (!b.casNext(n, z)) //cas修改值 
                break;         // restart if lost race to append to b
            break outer;
        }
    }

    int rnd = ThreadLocalRandom.nextSecondarySeed(); //獲取隨機數
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
        int level = 1, max;
        while (((rnd >>>= 1) & 1) != 0) // 獲取跳錶層級
            ++level;
        Index<K,V> idx = null;
        HeadIndex<K,V> h = head;
        if (level <= (max = h.level)) { //若是獲取的調錶層級小於等於當前最大層級,則直接添加,並將它們組成一個上下的鏈表
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        else { // try to grow by one level //不然增長一層level,在這裏體現爲Index<K,V>數組
            level = max + 1; // hold in array and later pick the one to use
            @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                h = head;
                int oldLevel = h.level;
                if (level <= oldLevel) // lost race to add level
                    break;
                HeadIndex<K,V> newh = h;
                Node<K,V> oldbase = h.node;
                for (int j = oldLevel+1; j <= level; ++j) //新添加的level層的具體數據
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // 逐層插入數據過程
        splice: for (int insertionLevel = level;;) {
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                if (q == null || t == null)
                    break splice;
                if (r != null) {
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    int c = cpr(cmp, key, n.key);
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                if (j == insertionLevel) {
                    if (!q.link(r, t))
                        break; // restart
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}
複製代碼

這裏的插入方法很複雜,能夠分爲3大步來理解:第一步獲取前繼節點後經過CAS來插入節點;第二步對level層數進行判斷,若是大於最大層數,則插入一層;第三步插入對應層的數據。整個插入過程所有經過CAS自旋的方式保證併發狀況下的數據正確性。

總結

JDK中提供了豐富的併發容器供咱們使用,文章中介紹的也並不全面,重點是要經過了解各類併發容器的原理,明白他們各自獨特的使用場景。這裏簡單作個總結:當併發讀遠多於修改的場景下須要使用List和Set時,能夠考慮使用CopyOnWriteArrayList和CopyOnWriteArraySet;當須要併發使用<Key, Value>鍵值對存取數據時,可使用ConcurrentHashMap;當要保證併發<Key, Value>鍵值對有序時可使用ConcurrentSkipListMap。

相關文章
相關標籤/搜索