java併發編程——併發容器

概述

java cocurrent包提供了不少併發容器,在提供併發控制的前提下,經過優化,提高性能。本文主要討論常見的併發容器的實現機制和絕妙之處,但並不會對全部實現細節面面俱到。html

爲何JUC須要提供併發容器?

java collection framework提供了豐富的容器,有map、list、set、queue、deque。可是其存在一個不足:多數容器類都是非線程安全的,即便部分容器是線程安全的,因爲使用sychronized進行鎖控制,致使讀/寫均需進行鎖操做,性能很低。java

java collection framework能夠經過如下兩種方式實現容器對象讀寫的併發控制,可是都是基於sychronized鎖控制機制,性能低:node

1. 使用sychronized方法進行併發控制,如HashTable 和 Vector。如下代碼爲Vector.add(e)的java8實現代碼:算法

    public synchronized boolean add(E e) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = e;
        return true;
    }

2.使用工具類Collections將非線程安全容器包裝成線程安全容器。如下代碼是Collections.synchronizedMap(Map<K,V> m)將原始Map包裝爲線程安全的SynchronizedMap,可是實際上最終操做時,仍然是在被包裝的原始m上進行,只是SynchronizedMap的全部方法都加上了synchronized鎖控制。編程

    public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
        return new SynchronizedMap<>(m);   //將原始Map包裝爲線程安全的SynchronizedMap
    }
    private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {

        private final Map<K,V> m;       // Backing Map 原始的非線程安全的map對象
        final Object      mutex;        // Object on which to synchronize  加鎖對象

        SynchronizedMap(Map<K,V> m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }

        public V get(Object key) {      
            synchronized (mutex) {return m.get(key);} //全部方法加上synchronized鎖控制
        }

        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);} //全部方法加上synchronized鎖控制
        }
......
}

爲了提供高效地併發容器,java 5在java.util.cocurrent包中 引入了併發容器。api

JUC併發容器

本節對juc經常使用的幾個併發容器進行代碼分析,重點看下這些容器是如何高效地實現併發控制的。在進行具體的併發容器介紹以前,咱們提早搞清楚CAS理論是什麼東西。由於在juc併發容器的不少地方都使用到了CAS,他比加鎖處理更加高效。數組

CAS

CAS是一種無鎖的非阻塞算法,全稱爲:Compare-and-swap(比較並交換),大體思路是:先比較目標對象現值是否和舊值一致,若是一致,則更新對象爲新值;若是不一致,則代表對象已經被其餘線程修改,直接返回。算法實現的僞碼以下:安全

function cas(p : pointer to int, old : int, new : int) returns bool {
    if *p ≠ old {
        return false
    }
    *p ← new
    return true
}

參考自wiki:Compare-and-swap數據結構

ConcurrentHashMap

ConcurrentHashMap實現了HashTable的全部功能,線程安全,但卻在檢索元素時不須要鎖定,所以效率更高。多線程

ConcurrentHashMap的key 和 value都不容許null出現。緣由在於ConcurrentHashMap不能區分出value是null仍是沒有map上,相對的HashMap卻能夠容許null值,在於其使用在單線程環境下,可使用containKey(key)方法提早斷定是否能map上,從而區分這兩種狀況,可是ConcurrentHashMap在多線程使用上下文中則不能這麼斷定。參考:關於ConcurrentHashMap爲何不能put null

A hash table supporting full concurrency of retrievals and high expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.

ConcurrentHashMap個put和get方法,細節請看代碼對應位置的註釋。

public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin,當前hash對應的bin(桶)還不存在時,使用cas寫入; 寫入失敗,則再次嘗試。
            }
            else if ((fh = f.hash) == MOVED) //若是tab[i]不爲空而且hash值爲MOVED,說明該鏈表正在進行transfer操做,返回擴容完成後的table
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {          // 加鎖保證線程安全,但不是對整個table加鎖,只對當前的Node加鎖,避免其餘線程對當前Node進行寫操做。 if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) { //若是在鏈表中找到值爲key的節點e,直接設置e.val = value便可
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e; //若是沒有找到值爲key的節點,直接新建Node並加入鏈表便可
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {  //若是首節點爲TreeBin類型,說明爲紅黑樹結構,執行putTreeVal操做
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD) //若是節點數大於閾值,則轉換鏈表結構爲紅黑樹結構
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); //計數增長1,有可能觸發transfer操做(擴容)
     return null; 
    }
  transient volatile Node<K,V>[] table; //元素所在的table是volatile類型,線程間可見


public V get(Object key) { //get無需更改size和count等公共屬性,加上table是volatile類型,故而無需加鎖。 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

思考一個問題:爲何當新加Node對應的‘桶’不存在時能夠直接使用CAS操做新增該桶,並插入新節點,可是當新增Node對應的‘桶’存在時,則必須加鎖處理?

參考資料:Java併發編程總結4——ConcurrentHashMap在jdk1.8中的改進

               java1.7 ConcurrentHashMap實現細節

附上HashMap jdk 1.8版本中的實現原理講解,講的很細也很通俗易懂:Jdk1.8中的HashMap實現原理

ConcurrentLinkedQueue

ConcurrentLinkedQueue使用鏈表做爲數據結構,它採用無鎖操做,能夠任務是高併發環境下性能最好的隊列。

ConcurrentLinkedQueue是非阻塞線程安全隊列,無界,故不太適合作生產者消費者模式,而LinkedBlockingQueue是阻塞線程安全隊列,能夠作到有界,一般用於生產者消費者模式。

下面看下其offer()方法的源碼,體會下:不使用鎖,只是用CAS操做來保證線程安全。細節參考代碼對應位置的註釋。

    
/**
* 不斷嘗試:找到最新的tail節點,不斷嘗試想最新的tail節點後面添加新節點
     */
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { //不斷嘗試:找到最新的tail節點,不斷嘗試想最新的tail節點後面添加新節點。 Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time //t引用有可能並非真實的tail節點的引用,多線程操做時,容許該狀況出現,只要能保證每次新增元素是在真實的tail節點上添加的便可。 casTail(t, newNode); // Failure is OK. 即便失敗,也不影響下次offer新的元素,反正後面會試圖尋找到最新的真實tail元素 return true; } // Lost CAS race to another thread; re-read next CAS競爭失敗,再次嘗試 } else if (p == q) //遇到哨兵節點(next和item相同,空節點或者刪除節點),從head節點從新遍歷。確保找到最新的tail節點 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; //java中'!='運算符不是原子操做,故使用t != (t = tail)作一次斷定,若是tail被其餘線程更改,則直接使用最新的tail節點返回。 } }

CopyOnWriteArrayList

CopyOnWriteArrayList提供高效地讀取操做,使用在讀多寫少的場景。CopyOnWriteArrayList讀取操做不用加鎖,且是安全的;寫操做時,先copy一份原有數據數組,再對複製數據進行寫入操做,最後將複製數據替換原有數據,從而保證寫操做不影響讀操做。

下面看下CopyOnWriteArrayList的核心代碼,體會下CopyOnWrite的思想:

public class CopyOnWriteArrayList<E>    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;
    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();
    /**
     * Sets the array.
     */
    final void setArray(Object[] a) {
        array = a;
    }

    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    final Object[] getArray() {
        return array;
    }

    public E get(int index) {
        return get(getArray(), index);
    }

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();   //寫 互斥 讀
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;  //對副本進行修改操做
            setArray(newElements); //將修改後的副本替換原有的數據
            return true;
        } finally {
            lock.unlock();
        }
    }

}

 ConcurrentSkipListMap

SkipList(跳錶)是一種隨機性的數據結構,用於替代紅黑樹,由於它在高併發的狀況下,性能優於紅黑樹。跳錶其實是以空間換取時間。跳錶的基本模型示意圖以下:

ConcurrentSkipListMap的實現就是實現了一個無鎖版的跳錶,主要是利用無鎖的鏈表的實現來管理跳表底層,一樣利用CAS來完成替換。

參考資料

從零單排 Java Concurrency, SkipList&ConcurrnetSkipListMap

相關文章
相關標籤/搜索