java cocurrent包提供了不少併發容器,在提供併發控制的前提下,經過優化,提高性能。本文主要討論常見的併發容器的實現機制和絕妙之處,但並不會對全部實現細節面面俱到。html
java collection framework提供了豐富的容器,有map、list、set、queue、deque。可是其存在一個不足:多數容器類都是非線程安全的,即便部分容器是線程安全的,因爲使用sychronized進行鎖控制,致使讀/寫均需進行鎖操做,性能很低。java
java collection framework能夠經過如下兩種方式實現容器對象讀寫的併發控制,可是都是基於sychronized鎖控制機制,性能低:node
1. 使用sychronized方法進行併發控制,如HashTable 和 Vector。如下代碼爲Vector.add(e)的java8實現代碼:算法
public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; }
2.使用工具類Collections將非線程安全容器包裝成線程安全容器。如下代碼是Collections.synchronizedMap(Map<K,V> m)將原始Map包裝爲線程安全的SynchronizedMap,可是實際上最終操做時,仍然是在被包裝的原始m上進行,只是SynchronizedMap的全部方法都加上了synchronized鎖控制。編程
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) { return new SynchronizedMap<>(m); //將原始Map包裝爲線程安全的SynchronizedMap }
private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private final Map<K,V> m; // Backing Map 原始的非線程安全的map對象 final Object mutex; // Object on which to synchronize 加鎖對象 SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } public V get(Object key) { synchronized (mutex) {return m.get(key);} //全部方法加上synchronized鎖控制 } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} //全部方法加上synchronized鎖控制 }
......
}
爲了提供高效地併發容器,java 5在java.util.cocurrent包中 引入了併發容器。api
本節對juc經常使用的幾個併發容器進行代碼分析,重點看下這些容器是如何高效地實現併發控制的。在進行具體的併發容器介紹以前,咱們提早搞清楚CAS理論是什麼東西。由於在juc併發容器的不少地方都使用到了CAS,他比加鎖處理更加高效。數組
CAS是一種無鎖的非阻塞算法,全稱爲:Compare-and-swap(比較並交換),大體思路是:先比較目標對象現值是否和舊值一致,若是一致,則更新對象爲新值;若是不一致,則代表對象已經被其餘線程修改,直接返回。算法實現的僞碼以下:安全
function cas(p : pointer to int, old : int, new : int) returns bool { if *p ≠ old { return false } *p ← new return true }
參考自wiki:Compare-and-swap數據結構
ConcurrentHashMap實現了HashTable的全部功能,線程安全,但卻在檢索元素時不須要鎖定,所以效率更高。多線程
ConcurrentHashMap的key 和 value都不容許null出現。緣由在於ConcurrentHashMap不能區分出value是null仍是沒有map上,相對的HashMap卻能夠容許null值,在於其使用在單線程環境下,可使用containKey(key)方法提早斷定是否能map上,從而區分這兩種狀況,可是ConcurrentHashMap在多線程使用上下文中則不能這麼斷定。參考:關於ConcurrentHashMap爲何不能put null
A hash table supporting full concurrency of retrievals and high expected concurrency for updates. This class obeys the same functional specification as
Hashtable
, and includes versions of methods corresponding to each method ofHashtable
. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable withHashtable
in programs that rely on its thread safety but not on its synchronization details.
ConcurrentHashMap個put和get方法,細節請看代碼對應位置的註釋。
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin,當前hash對應的bin(桶)還不存在時,使用cas寫入; 寫入失敗,則再次嘗試。 } else if ((fh = f.hash) == MOVED) //若是tab[i]不爲空而且hash值爲MOVED,說明該鏈表正在進行transfer操做,返回擴容完成後的table tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { // 加鎖保證線程安全,但不是對整個table加鎖,只對當前的Node加鎖,避免其餘線程對當前Node進行寫操做。 if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { //若是在鏈表中找到值爲key的節點e,直接設置e.val = value便可 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //若是沒有找到值爲key的節點,直接新建Node並加入鏈表便可 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //若是首節點爲TreeBin類型,說明爲紅黑樹結構,執行putTreeVal操做 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //若是節點數大於閾值,則轉換鏈表結構爲紅黑樹結構 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); //計數增長1,有可能觸發transfer操做(擴容)
return null;
}
transient volatile Node<K,V>[] table; //元素所在的table是volatile類型,線程間可見
public V get(Object key) { //get無需更改size和count等公共屬性,加上table是volatile類型,故而無需加鎖。 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
思考一個問題:爲何當新加Node對應的‘桶’不存在時能夠直接使用CAS操做新增該桶,並插入新節點,可是當新增Node對應的‘桶’存在時,則必須加鎖處理?
參考資料:Java併發編程總結4——ConcurrentHashMap在jdk1.8中的改進
附上HashMap jdk 1.8版本中的實現原理講解,講的很細也很通俗易懂:Jdk1.8中的HashMap實現原理
ConcurrentLinkedQueue使用鏈表做爲數據結構,它採用無鎖操做,能夠任務是高併發環境下性能最好的隊列。
ConcurrentLinkedQueue是非阻塞線程安全隊列,無界,故不太適合作生產者消費者模式,而LinkedBlockingQueue是阻塞線程安全隊列,能夠作到有界,一般用於生產者消費者模式。
下面看下其offer()方法的源碼,體會下:不使用鎖,只是用CAS操做來保證線程安全。細節參考代碼對應位置的註釋。
/**
* 不斷嘗試:找到最新的tail節點,不斷嘗試想最新的tail節點後面添加新節點
*/
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { //不斷嘗試:找到最新的tail節點,不斷嘗試想最新的tail節點後面添加新節點。 Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time //t引用有可能並非真實的tail節點的引用,多線程操做時,容許該狀況出現,只要能保證每次新增元素是在真實的tail節點上添加的便可。 casTail(t, newNode); // Failure is OK. 即便失敗,也不影響下次offer新的元素,反正後面會試圖尋找到最新的真實tail元素 return true; } // Lost CAS race to another thread; re-read next CAS競爭失敗,再次嘗試 } else if (p == q) //遇到哨兵節點(next和item相同,空節點或者刪除節點),從head節點從新遍歷。確保找到最新的tail節點 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; //java中'!='運算符不是原子操做,故使用t != (t = tail)作一次斷定,若是tail被其餘線程更改,則直接使用最新的tail節點返回。 } }
CopyOnWriteArrayList提供高效地讀取操做,使用在讀多寫少的場景。CopyOnWriteArrayList讀取操做不用加鎖,且是安全的;寫操做時,先copy一份原有數據數組,再對複製數據進行寫入操做,最後將複製數據替換原有數據,從而保證寫操做不影響讀操做。
下面看下CopyOnWriteArrayList的核心代碼,體會下CopyOnWrite的思想:
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { /** The array, accessed only via getArray/setArray. */ private transient volatile Object[] array; /** The lock protecting all mutators */ final transient ReentrantLock lock = new ReentrantLock(); /** * Sets the array. */ final void setArray(Object[] a) { array = a; } /** * Gets the array. Non-private so as to also be accessible * from CopyOnWriteArraySet class. */ final Object[] getArray() { return array; } public E get(int index) { return get(getArray(), index); } /** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */ public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); //寫 互斥 讀 try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; //對副本進行修改操做 setArray(newElements); //將修改後的副本替換原有的數據 return true; } finally { lock.unlock(); } } }
SkipList(跳錶)是一種隨機性的數據結構,用於替代紅黑樹,由於它在高併發的狀況下,性能優於紅黑樹。跳錶其實是以空間換取時間。跳錶的基本模型示意圖以下:
ConcurrentSkipListMap的實現就是實現了一個無鎖版的跳錶,主要是利用無鎖的鏈表的實現來管理跳表底層,一樣利用CAS來完成替換。