https://my.oschina.net/hosee/blog/675884javascript
http://www.javashuo.com/article/p-fhbpvnnb-a.htmlhtml
https://blog.csdn.net/jianghuxiaojin/article/details/52006118java
併發編程實踐中,ConcurrentHashMap是一個常常被使用的數據結構,相比於Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在線程安全的基礎上提供了更好的寫併發能力,但同時下降了對讀一致性的要求(這點好像CAP理論啊 O(∩_∩)O)。ConcurrentHashMap的設計與實現很是精巧,大量的利用了volatile,final,CAS等lock-free技術來減小鎖競爭對於性能的影響,不管對於Java併發編程的學習仍是Java內存模型的理解,ConcurrentHashMap的設計以及源碼都值得很是仔細的閱讀與揣摩。node
這篇日誌記錄了本身對ConcurrentHashMap的一些總結,因爲JDK6,7,8中實現都不一樣,須要分開闡述在不一樣版本中的ConcurrentHashMap。nginx
以前已經在ConcurrentHashMap原理分析中解釋了ConcurrentHashMap的原理,主要是從代碼的角度來闡述是源碼是如何寫的,本文仍然從源碼出發,挑選我的以爲重要的點(會用紅色標註)再次進行回顧,以及闡述ConcurrentHashMap的一些注意點。c++
ConcurrentHashMap採用了分段鎖的設計,只有在同一個分段內才存在競態關係,不一樣的分段鎖之間沒有鎖競爭。相比於對整個Map加鎖的設計,分段鎖大大的提升了高併發環境下的處理能力。但同時,因爲不是對整個Map加鎖,致使一些須要掃描整個Map的方法(如size(), containsValue())須要使用特殊的實現,另一些方法(如clear())甚至放棄了對一致性的要求(ConcurrentHashMap是弱一致性的,具體請查看ConcurrentHashMap能徹底替代HashTable嗎?)。git
ConcurrentHashMap中的分段鎖稱爲Segment,它即相似於HashMap(JDK7與JDK8中HashMap的實現)的結構,即內部擁有一個Entry數組,數組中的每一個元素又是一個鏈表;同時又是一個ReentrantLock(Segment繼承了ReentrantLock)。ConcurrentHashMap中的HashEntry相對於HashMap中的Entry有必定的差別性:HashEntry中的value以及next都被volatile修飾,這樣在多線程讀寫過程當中可以保持它們的可見性,代碼以下:github
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next;
併發度能夠理解爲程序運行時可以同時更新ConccurentHashMap且不產生鎖競爭的最大線程數,實際上就是ConcurrentHashMap中的分段鎖個數,即Segment[]的數組長度。ConcurrentHashMap默認的併發度爲16,但用戶也能夠在構造函數中設置併發度。當用戶設置併發度時,ConcurrentHashMap會使用大於等於該值的最小2冪指數做爲實際併發度(假如用戶設置併發度爲17,實際併發度則爲32)。運行時經過將key的高n位(n = 32 – segmentShift)和併發度減1(segmentMask)作位與運算定位到所在的Segment。segmentShift與segmentMask都是在構造過程當中根據concurrency level被相應的計算出來。算法
若是併發度設置的太小,會帶來嚴重的鎖競爭問題;若是併發度設置的過大,本來位於同一個Segment內的訪問會擴散到不一樣的Segment中,CPU cache命中率會降低,從而引發程序性能降低。(文檔的說法是根據你併發的線程數量決定,太多會導性能下降)數據庫
和JDK6不一樣,JDK7中除了第一個Segment以外,剩餘的Segments採用的是延遲初始化的機制:每次put以前都須要檢查key對應的Segment是否爲null,若是是則調用ensureSegment()以確保對應的Segment被建立。
ensureSegment可能在併發環境下被調用,但與想象中不一樣,ensureSegment並未使用鎖來控制競爭,而是使用了Unsafe對象的getObjectVolatile()提供的原子讀語義結合CAS來確保Segment建立的原子性。代碼段以下:
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } }
和JDK6同樣,ConcurrentHashMap的put方法被代理到了對應的Segment(定位Segment的原理以前已經描述過)中。與JDK6不一樣的是,JDK7版本的ConcurrentHashMap在得到Segment鎖的過程當中,作了必定的優化 - 在真正申請鎖以前,put方法會經過tryLock()方法嘗試得到鎖,在嘗試得到鎖的過程當中會對對應hashcode的鏈表進行遍歷,若是遍歷完畢仍然找不到與key相同的HashEntry節點,則爲後續的put操做提早建立一個HashEntry。當tryLock必定次數後仍沒法得到鎖,則經過lock申請鎖。
須要注意的是,因爲在併發環境下,其餘線程的put,rehash或者remove操做可能會致使鏈表頭結點的變化,所以在過程當中須要進行檢查,若是頭結點發生變化則從新對錶進行遍歷。而若是其餘線程引發了鏈表中的某個節點被刪除,即便該變化由於是非原子寫操做(刪除節點後連接後續節點調用的是Unsafe.putOrderedObject(),該方法不提供原子寫語義)可能致使當前線程沒法觀察到,但由於不影響遍歷的正確性因此忽略不計。
之因此在獲取鎖的過程當中對整個鏈表進行遍歷,主要目的是但願遍歷的鏈表被CPU cache所緩存,爲後續實際put過程當中的鏈表遍歷操做提高性能。
在得到鎖以後,Segment對鏈表進行遍歷,若是某個HashEntry節點具備相同的key,則更新該HashEntry的value值,不然新建一個HashEntry節點,將它設置爲鏈表的新head節點並將原頭節點設爲新head的下一個節點。新建過程當中若是節點總數(含新建的HashEntry)超過threshold,則調用rehash()方法對Segment進行擴容,最後將新建HashEntry寫入到數組中。
put方法中,連接新節點的下一個節點(HashEntry.setNext())以及將鏈表寫入到數組中(setEntryAt())都是經過Unsafe的putOrderedObject()方法來實現,這裏並未使用具備原子寫語義的putObjectVolatile()的緣由是:JMM會保證得到鎖到釋放鎖之間全部對象的狀態更新都會在鎖被釋放以後更新到主存,從而保證這些變動對其餘線程是可見的。
相對於HashMap的resize,ConcurrentHashMap的rehash原理相似,可是Doug Lea爲rehash作了必定的優化,避免讓全部的節點都進行復制操做:因爲擴容是基於2的冪指來操做,假設擴容前某HashEntry對應到Segment中數組的index爲i,數組的容量爲capacity,那麼擴容後該HashEntry對應到新數組中的index只可能爲i或者i+capacity,所以大多數HashEntry節點在擴容先後index能夠保持不變。基於此,rehash方法中會定位第一個後續全部節點在擴容後index都保持不變的節點,而後將這個節點以前的全部節點重排便可。這部分代碼以下:
private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
和put相似,remove在真正得到鎖以前,也會對鏈表進行遍歷以提升緩存命中率。
get與containsKey兩個方法幾乎徹底一致:他們都沒有使用鎖,而是經過Unsafe對象的getObjectVolatile()方法提供的原子讀語義,來得到Segment以及對應的鏈表,而後對鏈表遍歷判斷是否存在key相同的節點以及得到該節點的value。但因爲遍歷過程當中其餘線程可能對鏈表結構作了調整,所以get和containsKey返回的多是過期的數據,這一點是ConcurrentHashMap在弱一致性上的體現。若是要求強一致性,那麼必須使用Collections.synchronizedMap()方法。
這些方法都是基於整個ConcurrentHashMap來進行操做的,他們的原理也基本相似:首先不加鎖循環執行如下操做:循環全部的Segment(經過Unsafe的getObjectVolatile()以保證原子讀語義),得到對應的值以及全部Segment的modcount之和。若是連續兩次全部Segment的modcount和相等,則過程當中沒有發生其餘線程修改ConcurrentHashMap的狀況,返回得到的值。
當循環次數超過預約義的值時,這時須要對全部的Segment依次進行加鎖,獲取返回值後再依次解鎖。值得注意的是,加鎖過程當中要強制建立全部的Segment,不然容易出現其餘線程建立Segment並進行put,remove等操做。代碼以下:
for(int j =0; j < segments.length; ++j) ensureSegment(j).lock();// force creation
通常來講,應該避免在多線程環境下使用size和containsValue方法。
注1:modcount在put, replace, remove以及clear等方法中都會被修改。
注2:對於containsValue方法來講,若是在循環過程當中發現匹配value的HashEntry,則直接返回true。
最後,與HashMap不一樣的是,ConcurrentHashMap並不容許key或者value爲null,按照Doug Lea的說法,這麼設計的緣由是在ConcurrentHashMap中,一旦value出現null,則表明HashEntry的key/value沒有映射完成就被其餘線程所見,須要特殊處理。在JDK6中,get方法的實現中就有一段對HashEntry.value == null的防護性判斷。但Doug Lea也認可實際運行過程當中,這種狀況彷佛不可能發生(參考:http://cs.oswego.edu/pipermail/concurrency-interest/2011-March/007799.html)。
ConcurrentHashMap在JDK8中進行了巨大改動,很須要經過源碼來再次學習下Doug Lea的實現方法。
它摒棄了Segment(鎖段)的概念,而是啓用了一種全新的方式實現,利用CAS算法。它沿用了與它同時期的HashMap版本的思想,底層依然由「數組」+鏈表+紅黑樹的方式思想(JDK7與JDK8中HashMap的實現),可是爲了作到併發,又增長了不少輔助的類,例如TreeBin,Traverser等對象內部類。
首先來看幾個重要的屬性,與HashMap相同的就再也不介紹了,這裏重點解釋一下sizeCtl這個屬性。能夠說它是ConcurrentHashMap中出鏡率很高的一個屬性,由於它是一個控制標識符,在不一樣的地方有不一樣用途,並且它的取值不一樣,也表明不一樣的含義。
/** * 盛裝Node元素的數組 它的大小是2的整數次冪 * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node<K,V>[] table; /** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. hash表初始化或擴容時的一個控制位標識量。 負數表明正在進行初始化或擴容操做 -1表明正在初始化 -N 表示有N-1個線程正在進行擴容操做 正數或0表明hash表尚未被初始化,這個數值表示初始化或下一次進行擴容的大小 */ private transient volatile int sizeCtl; // 如下兩個是用來控制擴容的時候 單線程進入的變量 /** * The number of bits used for generation stamp in sizeCtl. * Must be at least 6 for 32bit arrays. */ private static int RESIZE_STAMP_BITS = 16; /** * The bit shift for recording size stamp in sizeCtl. */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /* * Encodings for Node hash fields. See above for explanation. */ static final int MOVED = -1; // hash值是-1,表示這是一個forwardNode節點 static final int TREEBIN = -2; // hash值是-2 表示這時一個TreeBin節點
Node是最核心的內部類,它包裝了key-value鍵值對,全部插入ConcurrentHashMap的數據都包裝在這裏面。它與HashMap中的定義很類似,可是可是有一些差異它對value和next屬性設置了volatile同步鎖(與JDK7的Segment相同),它不容許調用setValue方法直接改變Node的value域,它增長了find方法輔助map.get()方法。
樹節點類,另一個核心的數據結構。當鏈表長度過長的時候,會轉換爲TreeNode。可是與HashMap不相同的是,它並非直接轉換爲紅黑樹,而是把這些結點包裝成TreeNode放在TreeBin對象中,由TreeBin完成對紅黑樹的包裝。並且TreeNode在ConcurrentHashMap集成自Node類,而並不是HashMap中的集成自LinkedHashMap.Entry<K,V>類,也就是說TreeNode帶有next指針,這樣作的目的是方便基於TreeBin的訪問。
這個類並不負責包裝用戶的key、value信息,而是包裝的不少TreeNode節點。它代替了TreeNode的根節點,也就是說在實際的ConcurrentHashMap「數組」中,存放的是TreeBin對象,而不是TreeNode對象,這是與HashMap的區別。另外這個類還帶有了讀寫鎖。
這裏僅貼出它的構造方法。能夠看到在構造TreeBin節點時,僅僅指定了它的hash值爲TREEBIN常量,這也就是個標識爲。同時也看到咱們熟悉的紅黑樹構造方法
2.2.4 ForwardingNode
一個用於鏈接兩個table的節點類。它包含一個nextTable指針,用於指向下一張表。並且這個節點的key value next指針所有爲null,它的hash值爲-1. 這裏面定義的find的方法是從nextTable裏進行查詢節點,而不是以自身爲頭節點進行查找。
/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
在ConcurrentHashMap中,隨處能夠看到U, 大量使用了U.compareAndSwapXXX的方法,這個方法是利用一個CAS算法實現無鎖化的修改值的操做,他能夠大大下降鎖代理的性能消耗。這個算法的基本思想就是不斷地去比較當前內存中的變量值與你指定的一個變量值是否相等,若是相等,則接受你指定的修改的值,不然拒絕你的操做。由於當前線程中的值已經不是最新的值,你的修改極可能會覆蓋掉其餘線程修改的結果。這一點與樂觀鎖,SVN的思想是比較相似的。
unsafe代碼塊控制了一些屬性的修改工做,好比最經常使用的SIZECTL 。在這一版本的concurrentHashMap中,大量應用來的CAS方法進行變量、屬性的修改工做。利用CAS進行無鎖操做,能夠大大提升性能。
private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }
ConcurrentHashMap定義了三個原子操做,用於對指定位置的節點進行操做。正是這些原子操做保證了ConcurrentHashMap的線程安全。
//得到在i位置上的Node節點 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //利用CAS算法設置i位置上的Node節點。之因此能實現併發是由於他指定了原來這個節點的值是多少 //在CAS算法中,會比較內存中的值與你指定的這個值是否相等,若是相等才接受你的修改,不然拒絕你的修改 //所以當前線程中的值並非最新的值,這種修改可能會覆蓋掉其餘線程的修改結果 有點相似於SVN static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //利用volatile方法設置節點位置的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
對於ConcurrentHashMap來講,調用它的構造方法僅僅是設置了一些參數而已。而整個table的初始化是在向ConcurrentHashMap中插入元素的時候發生的。如調用put、computeIfAbsent、compute、merge等方法的時候,調用時機是檢查table==null。
初始化方法主要應用了關鍵屬性sizeCtl 若是這個值〈0,表示其餘線程正在進行初始化,就放棄這個操做。在這也能夠看出ConcurrentHashMap的初始化只能由一個線程完成。若是得到了初始化權限,就用CAS方法將sizeCtl置爲-1,防止其餘線程進入。初始化數組後,將sizeCtl的值改成0.75*n。
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //sizeCtl表示有其餘線程正在進行初始化操做,把線程掛起。對於table的初始化工做,只能有一個線程在進行。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置爲-1 表示本線程正在進行初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2);//至關於0.75*n 設置一個擴容的閾值 } } finally { sizeCtl = sc; } break; } } return tab; }
當ConcurrentHashMap容量不足的時候,須要對table進行擴容。這個方法的基本思想跟HashMap是很像的,可是因爲它是支持併發擴容的,因此要複雜的多。緣由是它支持多線程進行擴容操做,而並無加鎖。我想這樣作的目的不只僅是爲了知足concurrent的要求,而是但願利用併發處理去減小擴容帶來的時間影響。由於在擴容的時候,老是會涉及到從一個「數組」到另外一個「數組」拷貝的操做,若是這個操做可以併發進行,那真真是極好的了。
整個擴容操做分爲兩個部分
第一部分是構建一個nextTable,它的容量是原來的兩倍,這個操做是單線程完成的。這個單線程的保證是經過RESIZE_STAMP_SHIFT這個常量通過一次運算來保證的,這個地方在後面會有提到;
先來看一下單線程是如何完成的:
它的大致思想就是遍歷、複製的過程。首先根據運算獲得須要遍歷的次數i,而後利用tabAt方法得到i位置的元素:
若是這個位置爲空,就在原table中的i位置放入forwardNode節點,這個也是觸發併發擴容的關鍵點;
若是這個位置是Node節點(fh>=0),若是它是一個鏈表的頭節點,就構造一個反序鏈表,把他們分別放在nextTable的i和i+n的位置上
若是這個位置是TreeBin節點(fh<0),也作一個反序處理,而且判斷是否須要untreefi,把處理的結果分別放在nextTable的i和i+n的位置上
再看一下多線程是如何完成的:
在代碼的69行有一個判斷,若是遍歷到的節點是forward節點,就向後繼續遍歷,再加上給節點上鎖的機制,就完成了多線程的控制。多線程遍歷節點,處理了一個節點,就把對應點的值set爲forward,另外一個線程看到forward,就向後遍歷。這樣交叉就完成了複製工做。並且還很好的解決了線程安全的問題。 這個方法的設計實在是讓我膜拜。
/** * 一個過渡的table表 只有在擴容的時候纔會使用 */ private transient volatile Node<K,V>[] nextTable; /** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//構造一個nextTable對象 它的容量是原來的兩倍 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//構造一個連節點指針 用於標誌位 boolean advance = true;//併發擴容的關鍵屬性 若是等於true 說明這個節點已經處理過 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //這個while循環體的做用就是在控制i-- 經過i--能夠依次遍歷原hash表中的節點 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //若是全部的節點都已經完成複製工做 就把nextTable賦值給table 清空臨時對象nextTable nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1);//擴容閾值設置爲原來容量的1.5倍 依然至關於如今容量的0.75倍 return; } //利用CAS方法更新這個擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操做 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //若是遍歷到的節點爲空 則放入ForwardingNode指針 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //若是遍歷到ForwardingNode節點 說明這個點已經被處理過了 直接跳過 這裏是控制併發擴容的核心 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //節點上鎖 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //若是fh>=0 證實這是一個Node節點 if (fh >= 0) { int runBit = fh & n; //如下的部分在完成的工做是構造兩個鏈表 一個是原鏈表 另外一個是原鏈表的反序排列 Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //在nextTable的i位置上插入一個鏈表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另外一個鏈表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode節點 表示已經處理過該節點 setTabAt(tab, i, fwd); //設置advance爲true 返回到上面的while循環中 就能夠執行i--操做 advance = true; } //對TreeBin對象進行處理 與上面的過程相似 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; //構造正序和反序兩個鏈表 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //若是擴容後已經再也不須要tree的結構 反向轉換爲鏈表結構 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; //在nextTable的i位置上插入一個鏈表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另外一個鏈表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode節點 表示已經處理過該節點 setTabAt(tab, i, fwd); //設置advance爲true 返回到上面的while循環中 就能夠執行i--操做 advance = true; } } } } } }
前面的全部的介紹其實都爲這個方法作鋪墊。ConcurrentHashMap最經常使用的就是put和get兩個方法。如今來介紹put方法,這個put方法依然沿用HashMap的put方法的思想,根據hash值計算這個新插入的點在table中的位置i,若是i位置是空的,直接放進去,不然進行判斷,若是i位置是樹節點,按照樹的方式插入新的節點,不然把i插入到鏈表的末尾。ConcurrentHashMap中依然沿用這個思想,有一個最重要的不一樣點就是ConcurrentHashMap不容許key或value爲null值。另外因爲涉及到多線程,put方法就要複雜一點。在多線程中可能有如下兩個狀況
若是一個或多個線程正在對ConcurrentHashMap進行擴容操做,當前線程也要進入擴容的操做中。這個擴容的操做之因此能被檢測到,是由於transfer方法中在空結點上插入forward節點,若是檢測到須要插入的位置被forward節點佔有,就幫助進行擴容;
若是檢測到要插入的節點是非空且不是forward節點,就對這個節點加鎖,這樣就保證了線程安全。儘管這個有一些影響效率,可是仍是會比hashTable的synchronized要好得多。
總體流程就是首先定義不容許key或value爲null的狀況放入 對於每個放入的值,首先利用spread方法對key的hashcode進行一次hash計算,由此來肯定這個值在table中的位置。
若是這個位置是空的,那麼直接放入,並且不須要加鎖操做。
若是這個位置存在結點,說明發生了hash碰撞,首先判斷這個節點的類型。若是是鏈表節點(fh>0),則獲得的結點就是hash值相同的節點組成的鏈表的頭節點。須要依次向後遍歷肯定這個新加入的值所在位置。若是遇到hash值與key值都與新加入節點是一致的狀況,則只須要更新value值便可。不然依次向後遍歷,直到鏈表尾插入這個結點。若是加入這個節點之後鏈表長度大於8,就把這個鏈表轉換成紅黑樹。若是這個節點的類型已是樹節點的話,直接調用樹節點的插入方法進行插入新的值。
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { //不容許 key或value爲null if (key == null || value == null) throw new NullPointerException(); //計算hash值 int hash = spread(key.hashCode()); int binCount = 0; //死循環 什麼時候插入成功 什麼時候跳出 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //若是table爲空的話,初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //根據hash值計算出在table裏面的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //若是這個位置沒有值 ,直接放進去,不須要加鎖 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //當遇到錶鏈接點時,須要進行整合表的操做 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //結點上鎖 這裏的結點能夠理解爲hash值相同組成的鏈表的頭結點 synchronized (f) { if (tabAt(tab, i) == f) { //fh〉0 說明這個節點是一個鏈表的節點 不是樹的節點 if (fh >= 0) { binCount = 1; //在這裏遍歷鏈表全部的結點 for (Node<K,V> e = f;; ++binCount) { K ek; //若是hash值和key值相同 則修改對應結點的value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //若是遍歷到了最後一個結點,那麼就證實新的節點須要插入 就把它插入在鏈表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //若是這個節點是樹節點,就按照樹的方式插入值 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //若是鏈表長度已經達到臨界值8 就須要把鏈表轉換爲樹結構 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //將當前ConcurrentHashMap的元素數量+1 addCount(1L, binCount); return null; }
咱們能夠發現JDK8中的實現也是鎖分離的思想,只是鎖住的是一個Node,而不是JDK7中的Segment,而鎖住Node以前的操做是無鎖的而且也是線程安全的,創建在以前提到的3個原子操做上。
這是一個協助擴容的方法。這個方法被調用的時候,當前ConcurrentHashMap必定已經有了nextTable對象,首先拿到這個nextTable對象,調用transfer方法。回看上面的transfer方法能夠看到,當本線程進入擴容方法的時候會直接進入複製階段。
這個方法用於將過長的鏈表轉換爲TreeBin對象。可是他並非直接轉換,而是進行一次容量判斷,若是容量沒有達到轉換的要求,直接進行擴容操做並返回;若是知足條件才鏈表的結構抓換爲TreeBin ,這與HashMap不一樣的是,它並無把TreeNode直接放入紅黑樹,而是利用了TreeBin這個小容器來封裝全部的TreeNode.
get方法比較簡單,給定一個key來肯定value的時候,必須知足兩個條件 key相同 hash值相同,對於節點可能在鏈表或樹上的狀況,須要分別去查找。
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //計算hash值 int h = spread(key.hashCode()); //根據hash值肯定節點位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //若是搜索到的節點key與傳入的key相同且不爲null,直接返回這個節點 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //若是eh<0 說明這個節點在樹上 直接尋找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //不然遍歷鏈表 找到對應的值並返回 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
對於ConcurrentHashMap來講,這個table裏到底裝了多少東西實際上是個不肯定的數量,由於不可能在調用size()方法的時候像GC的「stop the world」同樣讓其餘線程都停下來讓你去統計,所以只能說這個數量是個估計值。對於這個估計值,ConcurrentHashMap也是大費周章才計算出來的。
爲了統計元素個數,ConcurrentHashMap定義了一些變量和一個內部類
/** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } /******************************************/ /** * 實際上保存的是hashmap中的元素個數 利用CAS鎖進行更新 但它並不用返回當前hashmap的元素個數 */ private transient volatile long baseCount; /** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */ private transient volatile int cellsBusy; /** * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells;
mappingCount與size方法的相似 從Java工程師給出的註釋來看,應該使用mappingCount代替size方法 兩個方法都沒有直接返回basecount 而是統計一次這個值,而這個值其實也是一個大概的數值,所以可能在統計的時候有其餘線程正在執行插入或刪除操做。
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } /** * Returns the number of mappings. This method should be used * instead of {@link #size} because a ConcurrentHashMap may * contain more mappings than can be represented as an int. The * value returned is an estimate; the actual count may differ if * there are concurrent insertions or removals. * * @return the number of mappings * @since 1.8 */ public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value;//全部counter的值求和 } } return sum; }
在put方法結尾處調用了addCount方法,把當前ConcurrentHashMap的元素個數+1這個方法一共作了兩件事,更新baseCount的值,檢測是否進行擴容。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //利用CAS方法更新baseCount的值 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //若是check值大於等於0 則須要檢驗是否須要進行擴容操做 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //若是已經有其餘線程在執行擴容操做 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //當前線程是惟一的或是第一個發起擴容的線程 此時nextTable=null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
JDK6,7中的ConcurrentHashmap主要使用Segment來實現減少鎖粒度,把HashMap分割成若干個Segment,在put的時候須要鎖住Segment,get時候不加鎖,使用volatile來保證可見性,當要統計全局時(好比size),首先會嘗試屢次計算modcount來肯定,這幾回嘗試中,是否有其餘線程進行了修改操做,若是沒有,則直接返回size。若是有,則須要依次鎖住全部的Segment來計算。
jdk7中ConcurrentHashmap中,當長度過長碰撞會很頻繁,鏈表的增改刪查操做都會消耗很長的時間,影響性能,因此jdk8 中徹底重寫了concurrentHashmap,代碼量從原來的1000多行變成了 6000多 行,實現上也和原來的分段式存儲有很大的區別。
主要設計上的變化有如下幾點:
至於爲何JDK8中使用synchronized而不是ReentrantLock,我猜是由於JDK8中對synchronized有了足夠的優化吧。
1. http://www.jianshu.com/p/4806633fcc55
2. https://www.zhihu.com/question/22438589
3. http://blog.csdn.net/u010723709/article/details/48007881
CAS
CAS:Compare and Swap, 翻譯成比較並交換。
java.util.concurrent包中藉助CAS實現了區別於synchronouse同步鎖的一種樂觀鎖。
本文先從CAS的應用提及,再深刻原理解析。
CAS應用
CAS有3個操做數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。
非阻塞算法 (nonblocking algorithms)
一個線程的失敗或者掛起不該該影響其餘線程的失敗或掛起的算法。
現代的CPU提供了特殊的指令,能夠自動更新共享數據,並且可以檢測到其餘線程的干擾,而 compareAndSet() 就用這些代替了鎖定。
拿出AtomicInteger來研究在沒有鎖的狀況下是如何作到數據正確性的。
private volatile int value;
首先毫無覺得,在沒有鎖的機制下可能須要藉助volatile原語,保證線程間的數據是可見的(共享的)。
這樣才獲取變量的值的時候才能直接讀取。
public final int get() {
return value;
}
而後來看看++i是怎麼作到的。
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
在這裏採用了CAS操做,每次從內存中讀取數據而後將此數據和+1後的結果進行CAS操做,若是成功就返回結果,不然重試直到成功爲止。
而compareAndSet利用JNI來完成CPU指令的操做。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
總體的過程就是這樣子的,利用CPU的CAS指令,同時藉助JNI來完成Java的非阻塞算法。其它原子操做都是利用相似的特性完成的。
其中
unsafe.compareAndSwapInt(this, valueOffset, expect, update);
相似:
if (this == expect) {
this = update
return true;
} else {
return false;
}
那麼問題就來了,成功過程當中須要2個步驟:比較this == expect,替換this = update,compareAndSwapInt如何這兩個步驟的原子性呢? 參考CAS的原理。
CAS原理
CAS經過調用JNI的代碼實現的。JNI:Java Native Interface爲JAVA本地調用,容許java調用其餘語言。
而compareAndSwapInt就是藉助C來調用CPU底層指令實現的。
下面從分析比較經常使用的CPU(intel x86)來解釋CAS的實現原理。
下面是sun.misc.Unsafe類的compareAndSwapInt()方法的源代碼:
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
能夠看到這是個本地方法調用。這個本地方法在openjdk中依次調用的c++代碼爲:unsafe.cpp,atomic.cpp和atomicwindowsx86.inline.hpp。這個本地方法的最終實如今openjdk的以下位置:openjdk-7-fcs-src-b147-27jun2011\openjdk\hotspot\src\oscpu\windowsx86\vm\ atomicwindowsx86.inline.hpp(對應於windows操做系統,X86處理器)。下面是對應於intel x86處理器的源代碼的片斷:
// Adding a lock prefix to an instruction on MP machine // VC++ doesn't like the lock prefix to be on a single line // so we can't insert a label after the lock prefix. // By emitting a lock prefix, we can define a label after it. #define LOCK_IF_MP(mp) __asm cmp mp, 0 \ __asm je L0 \ __asm _emit 0xF0 \ __asm L0: inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange int mp = os::is_MP(); __asm { mov edx, dest mov ecx, exchange_value mov eax, compare_value LOCK_IF_MP(mp) cmpxchg dword ptr [edx], ecx } }
如上面源代碼所示,程序會根據當前處理器的類型來決定是否爲cmpxchg指令添加lock前綴。若是程序是在多處理器上運行,就爲cmpxchg指令加上lock前綴(lock cmpxchg)。反之,若是程序是在單處理器上運行,就省略lock前綴(單處理器自身會維護單處理器內的順序一致性,不須要lock前綴提供的內存屏障效果)。
intel的手冊對lock前綴的說明以下:
備註知識:
關於CPU的鎖有以下3種:
3.1 處理器自動保證基本內存操做的原子性
首先處理器會自動保證基本的內存操做的原子性。處理器保證從系統內存當中讀取或者寫入一個字節是原子的,意思是當一個處理器讀取一個字節時,其餘處理器不能訪問這個字節的內存地址。奔騰6和最新的處理器能自動保證單處理器對同一個緩存行裏進行16/32/64位的操做是原子的,可是複雜的內存操做處理器不能自動保證其原子性,好比跨總線寬度,跨多個緩存行,跨頁表的訪問。可是處理器提供總線鎖定和緩存鎖定兩個機制來保證複雜內存操做的原子性。
3.2 使用總線鎖保證原子性
第一個機制是經過總線鎖保證原子性。若是多個處理器同時對共享變量進行讀改寫(i++就是經典的讀改寫操做)操做,那麼共享變量就會被多個處理器同時進行操做,這樣讀改寫操做就不是原子的,操做完以後共享變量的值會和指望的不一致,舉個例子:若是i=1,咱們進行兩次i++操做,咱們指望的結果是3,可是有可能結果是2。以下圖
緣由是有可能多個處理器同時從各自的緩存中讀取變量i,分別進行加一操做,而後分別寫入系統內存當中。那麼想要保證讀改寫共享變量的操做是原子的,就必須保證CPU1讀改寫共享變量的時候,CPU2不能操做緩存了該共享變量內存地址的緩存。
處理器使用總線鎖就是來解決這個問題的。所謂總線鎖就是使用處理器提供的一個LOCK#信號,當一個處理器在總線上輸出此信號時,其餘處理器的請求將被阻塞住,那麼該處理器能夠獨佔使用共享內存。
3.3 使用緩存鎖保證原子性
第二個機制是經過緩存鎖定保證原子性。在同一時刻咱們只需保證對某個內存地址的操做是原子性便可,但總線鎖定把CPU和內存之間通訊鎖住了,這使得鎖按期間,其餘處理器不能操做其餘內存地址的數據,因此總線鎖定的開銷比較大,最近的處理器在某些場合下使用緩存鎖定代替總線鎖定來進行優化。
頻繁使用的內存會緩存在處理器的L1,L2和L3高速緩存裏,那麼原子操做就能夠直接在處理器內部緩存中進行,並不須要聲明總線鎖,在奔騰6和最近的處理器中能夠使用「緩存鎖定」的方式來實現複雜的原子性。所謂「緩存鎖定」就是若是緩存在處理器緩存行中內存區域在LOCK操做期間被鎖定,當它執行鎖操做回寫內存時,處理器不在總線上聲言LOCK#信號,而是修改內部的內存地址,並容許它的緩存一致性機制來保證操做的原子性,由於緩存一致性機制會阻止同時修改被兩個以上處理器緩存的內存區域數據,當其餘處理器回寫已被鎖定的緩存行的數據時會起緩存行無效,在例1中,當CPU1修改緩存行中的i時使用緩存鎖定,那麼CPU2就不能同時緩存了i的緩存行。
可是有兩種狀況下處理器不會使用緩存鎖定。第一種狀況是:當操做的數據不能被緩存在處理器內部,或操做的數據跨多個緩存行(cache line),則處理器會調用總線鎖定。第二種狀況是:有些處理器不支持緩存鎖定。對於Inter486和奔騰處理器,就算鎖定的內存區域在處理器的緩存行中也會調用總線鎖定。
以上兩個機制咱們能夠經過Inter處理器提供了不少LOCK前綴的指令來實現。好比位測試和修改指令BTS,BTR,BTC,交換指令XADD,CMPXCHG和其餘一些操做數和邏輯指令,好比ADD(加),OR(或)等,被這些指令操做的內存區域就會加鎖,致使其餘處理器不能同時訪問它。
CAS缺點
CAS雖然很高效的解決原子操做,可是CAS仍然存在三大問題。ABA問題,循環時間長開銷大和只能保證一個共享變量的原子操做
1. ABA問題。由於CAS須要在操做值的時候檢查下值有沒有發生變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。
從Java1.5開始JDK的atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法做用是首先檢查當前引用是否等於預期引用,而且當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。
關於ABA問題參考文檔: http://blog.hesey.net/2011/09/resolve-aba-by-atomicstampedreference.html
2. 循環時間長開銷大。自旋CAS若是長時間不成功,會給CPU帶來很是大的執行開銷。若是JVM能支持處理器提供的pause指令那麼效率會有必定的提高,pause指令有兩個做用,第一它能夠延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它能夠避免在退出循環的時候因內存順序衝突(memory order violation)而引發CPU流水線被清空(CPU pipeline flush),從而提升CPU的執行效率。
3. 只能保證一個共享變量的原子操做。當對一個共享變量執行操做時,咱們能夠使用循環CAS的方式來保證原子操做,可是對多個共享變量操做時,循環CAS就沒法保證操做的原子性,這個時候就能夠用鎖,或者有一個取巧的辦法,就是把多個共享變量合併成一個共享變量來操做。好比有兩個共享變量i=2,j=a,合併一下ij=2a,而後用CAS來操做ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你能夠把多個變量放在一個對象裏來進行CAS操做。
因爲java的CAS同時具備 volatile 讀和volatile寫的內存語義,所以Java線程之間的通訊如今有了下面四種方式:
Java的CAS會使用現代處理器上提供的高效機器級別原子指令,這些原子指令以原子方式對內存執行讀-改-寫操做,這是在多處理器中實現同步的關鍵(從本質上來講,可以支持原子性讀-改-寫指令的計算機器,是順序計算圖靈機的異步等價機器,所以任何現代的多處理器都會去支持某種能對內存執行原子性讀-改-寫操做的原子指令)。同時,volatile變量的讀/寫和CAS能夠實現線程之間的通訊。把這些特性整合在一塊兒,就造成了整個concurrent包得以實現的基石。若是咱們仔細分析concurrent包的源代碼實現,會發現一個通用化的實現模式:
AQS,非阻塞數據結構和原子變量類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實現的,而concurrent包中的高層類又是依賴於這些基礎類來實現的。從總體來看,concurrent包的實現示意圖以下:
鎖是用來作併發最簡單的方式,固然其代價也是最高的。內核態的鎖的時候須要操做系統進行一次上下文切換,加鎖、釋放鎖會致使比較多的上下文切換和調度延時,等待鎖的線程會被掛起直至鎖釋放。在上下文切換的時候,cpu以前緩存的指令和數據都將失效,對性能有很大的損失。操做系統對多線程的鎖進行判斷就像兩姐妹在爲一個玩具在爭吵,而後操做系統就是能決定他們誰能拿到玩具的父母,這是很慢的。用戶態的鎖雖然避免了這些問題,可是其實它們只是在沒有真實的競爭時纔有效。
Java在JDK1.5以前都是靠synchronized關鍵字保證同步的,這種經過使用一致的鎖定協議來協調對共享狀態的訪問,能夠確保不管哪一個線程持有守護變量的鎖,都採用獨佔的方式來訪問這些變量,若是出現多個線程同時訪問鎖,那第一些線線程將被掛起,當線程恢復執行時,必須等待其它線程執行完他們的時間片之後才能被調度執行,在掛起和恢復執行過程當中存在着很大的開銷。鎖還存在着其它一些缺點,當一個線程正在等待鎖時,它不能作任何事。若是一個線程在持有鎖的狀況下被延遲執行,那麼全部須要這個鎖的線程都沒法執行下去。若是被阻塞的線程優先級高,而持有鎖的線程優先級低,將會致使優先級反轉(Priority Inversion)。
獨佔鎖是一種悲觀鎖,synchronized就是一種獨佔鎖,它假設最壞的狀況,而且只有在確保其它線程不會形成干擾的狀況下執行,會致使其它全部須要鎖的線程掛起,等待持有鎖的線程釋放鎖。而另外一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操做,若是由於衝突失敗就重試,直到成功爲止。
與鎖相比,volatile變量是一和更輕量級的同步機制,由於在使用這些變量時不會發生上下文切換和線程調度等操做,可是volatile變量也存在一些侷限:不能用於構建原子的複合操做,所以當一個變量依賴舊值時就不能使用volatile變量。(參考:談談volatiile)
volatile只能保證變量對各個線程的可見性,但不能保證原子性。爲何?見個人另一篇文章:《爲何volatile不能保證原子性而Atomic能夠?》
原子操做指的是在一步以內就完成並且不能被中斷。原子操做在多線程環境中是線程安全的,無需考慮同步的問題。在java中,下列操做是原子操做:
問題來了,爲何long型賦值不是原子操做呢?例如:
1
|
long
foo = 65465498L;
|
實時上java會分兩步寫入這個long變量,先寫32位,再寫後32位。這樣就線程不安全了。若是改爲下面的就線程安全了:
1
|
private
volatile
long
foo;
|
由於volatile內部已經作了synchronized.
要實現無鎖(lock-free)的非阻塞算法有多種實現方法,其中CAS(比較與交換,Compare and swap)是一種有名的無鎖算法。CAS, CPU指令,在大多數處理器架構,包括IA3二、Space中採用的都是CAS指令,CAS的語義是「我認爲V的值應該爲A,若是是,那麼將V的值更新爲B,不然不修改並告訴V的值實際爲多少」,CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知此次競爭中失敗,並能夠再次嘗試。CAS有3個操做數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。CAS無鎖算法的C實現以下:
1
2
3
4
5
6
7
8
9
|
int
compare_and_swap (
int
* reg,
int
oldval,
int
newval)
{
ATOMIC();
int
old_reg_val = *reg;
if
(old_reg_val == oldval)
*reg = newval;
END_ATOMIC();
return
old_reg_val;
}
|
CAS比較與交換的僞代碼能夠表示爲:
do{
備份舊數據;
基於舊數據構造新數據;
}while(!CAS( 內存地址,備份的舊數據,新數據 ))
(上圖的解釋:CPU去更新一個值,但若是想改的值再也不是原來的值,操做就失敗,由於很明顯,有其它操做先改變了這個值。)
就是指當二者進行比較時,若是相等,則證實共享數據沒有被修改,替換成新值,而後繼續往下運行;若是不相等,說明共享數據已經被修改,放棄已經所作的操做,而後從新執行剛纔的操做。容易看出 CAS 操做是基於共享數據不會被修改的假設,採用了相似於數據庫的 commit-retry 的模式。當同步衝突出現的機會不多時,這種假設能帶來較大的性能提高。
前面說過了,CAS(比較並交換)是CPU指令級的操做,只有一步原子操做,因此很是快。並且CAS避免了請求操做系統來裁定鎖的問題,不用麻煩操做系統,直接在CPU內部就搞定了。但CAS就沒有開銷了嗎?不!有cache miss的狀況。這個問題比較複雜,首先須要瞭解CPU的硬件體系結構:
上圖能夠看到一個8核CPU計算機系統,每一個CPU有cache(CPU內部的高速緩存,寄存器),管芯內還帶有一個互聯模塊,使管芯內的兩個核能夠互相通訊。在圖中央的系統互聯模塊可讓四個管芯相互通訊,而且將管芯與主存鏈接起來。數據以「緩存線」爲單位在系統中傳輸,「緩存線」對應於內存中一個 2 的冪大小的字節塊,大小一般爲 32 到 256 字節之間。當 CPU 從內存中讀取一個變量到它的寄存器中時,必須首先將包含了該變量的緩存線讀取到 CPU 高速緩存。一樣地,CPU 將寄存器中的一個值存儲到內存時,不只必須將包含了該值的緩存線讀到 CPU 高速緩存,還必須確保沒有其餘 CPU 擁有該緩存線的拷貝。
好比,若是 CPU0 在對一個變量執行「比較並交換」(CAS)操做,而該變量所在的緩存線在 CPU7 的高速緩存中,就會發生如下通過簡化的事件序列:
以上是刷新不一樣CPU緩存的開銷。最好狀況下的 CAS 操做消耗大概 40 納秒,超過 60 個時鐘週期。這裏的「最好狀況」是指對某一個變量執行 CAS 操做的 CPU 正好是最後一個操做該變量的CPU,因此對應的緩存線已經在 CPU 的高速緩存中了,相似地,最好狀況下的鎖操做(一個「round trip 對」包括獲取鎖和隨後的釋放鎖)消耗超過 60 納秒,超過 100 個時鐘週期。這裏的「最好狀況」意味着用於表示鎖的數據結構已經在獲取和釋放鎖的 CPU 所屬的高速緩存中了。鎖操做比 CAS 操做更加耗時,是因深刻理解並行編程
爲鎖操做的數據結構中須要兩個原子操做。緩存未命中消耗大概 140 納秒,超過 200 個時鐘週期。須要在存儲新值時查詢變量的舊值的 CAS 操做,消耗大概 300 納秒,超過 500 個時鐘週期。想一想這個,在執行一次 CAS 操做的時間裏,CPU 能夠執行 500 條普通指令。這代表了細粒度鎖的侷限性。
如下是cache miss cas 和lock的性能對比:
在JDK1.5以前,若是不編寫明確的代碼就沒法執行CAS操做,在JDK1.5中引入了底層的支持,在int、long和對象的引用等類型上都公開了CAS的操做,而且JVM把它們編譯爲底層硬件提供的最有效的方法,在運行CAS的平臺上,運行時把它們編譯爲相應的機器指令,若是處理器/CPU不支持CAS指令,那麼JVM將使用自旋鎖。所以,值得注意的是,CAS解決方案與平臺/編譯器緊密相關(好比x86架構下其對應的彙編指令是lock cmpxchg,若是想要64Bit的交換,則應使用lock cmpxchg8b。在.NET中咱們能夠使用Interlocked.CompareExchange函數)。
在原子類變量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了這些底層的JVM支持爲數字類型的引用類型提供一種高效的CAS操做,而在java.util.concurrent中的大多數類在實現時都直接或間接的使用了這些原子變量類。
Java 1.6中AtomicLong.incrementAndGet()的實現源碼爲:
因而可知,AtomicLong.incrementAndGet的實現用了樂觀鎖技術,調用了sun.misc.Unsafe類庫裏面的 CAS算法,用CPU指令來實現無鎖自增。因此,AtomicLong.incrementAndGet的自增比用synchronized的鎖效率倍增。
1
2
3
4
5
6
7
8
9
10
11
12
|
public
final
int
getAndIncrement() {
for
(;;) {
int
current = get();
int
next = current +
1
;
if
(compareAndSet(current, next))
return
current;
}
}
public
final
boolean
compareAndSet(
int
expect,
int
update) {
return
unsafe.compareAndSwapInt(
this
, valueOffset, expect, update);
}
|
下面是測試代碼:能夠看到用AtomicLong.incrementAndGet的性能比用synchronized高出幾倍。
下面是比非阻塞自增稍微複雜一點的CAS的例子:非阻塞堆棧/ConcurrentStack
。ConcurrentStack
中的 push()
和pop()
操做在結構上與NonblockingCounter
上類似,只是作的工做有些冒險,但願在 「提交」 工做的時候,底層假設沒有失效。push()
方法觀察當前最頂的節點,構建一個新節點放在堆棧上,而後,若是最頂端的節點在初始觀察以後沒有變化,那麼就安裝新節點。若是 CAS 失敗,意味着另外一個線程已經修改了堆棧,那麼過程就會從新開始。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public
class
ConcurrentStack<E> {
AtomicReference<Node<E>> head =
new
AtomicReference<Node<E>>();
public
void
push(E item) {
Node<E> newHead =
new
Node<E>(item);
Node<E> oldHead;
do
{
oldHead = head.get();
newHead.next = oldHead;
}
while
(!head.compareAndSet(oldHead, newHead));
}
public
E pop() {
Node<E> oldHead;
Node<E> newHead;
do
{
oldHead = head.get();
if
(oldHead ==
null
)
return
null
;
newHead = oldHead.next;
}
while
(!head.compareAndSet(oldHead,newHead));
return
oldHead.item;
}
static
class
Node<E> {
final
E item;
Node<E> next;
public
Node(E item) {
this
.item = item; }
}
}
|
在輕度到中度的爭用狀況下,非阻塞算法的性能會超越阻塞算法,由於 CAS 的多數時間都在第一次嘗試時就成功,而發生爭用時的開銷也不涉及線程掛起和上下文切換,只多了幾個循環迭代。沒有爭用的 CAS 要比沒有爭用的鎖便宜得多(這句話確定是真的,由於沒有爭用的鎖涉及 CAS 加上額外的處理),而爭用的 CAS 比爭用的鎖獲取涉及更短的延遲。
在高度爭用的狀況下(即有多個線程不斷爭用一個內存位置的時候),基於鎖的算法開始提供比非阻塞算法更好的吞吐率,由於當線程阻塞時,它就會中止爭用,耐心地等候輪到本身,從而避免了進一步爭用。可是,這麼高的爭用程度並不常見,由於多數時候,線程會把線程本地的計算與爭用共享數據的操做分開,從而給其餘線程使用共享數據的機會。
以上的示例(自增計數器和堆棧)都是很是簡單的非阻塞算法,一旦掌握了在循環中使用 CAS,就能夠容易地模仿它們。對於更復雜的數據結構,非阻塞算法要比這些簡單示例複雜得多,由於修改鏈表、樹或哈希表可能涉及對多個指針的更新。CAS 支持對單一指針的原子性條件更新,可是不支持兩個以上的指針。因此,要構建一個非阻塞的鏈表、樹或哈希表,須要找到一種方式,能夠用 CAS 更新多個指針,同時不會讓數據結構處於不一致的狀態。
在鏈表的尾部插入元素,一般涉及對兩個指針的更新:「尾」 指針老是指向列表中的最後一個元素,「下一個」 指針從過去的最後一個元素指向新插入的元素。由於須要更新兩個指針,因此須要兩個 CAS。在獨立的 CAS 中更新兩個指針帶來了兩個須要考慮的潛在問題:若是第一個 CAS 成功,而第二個 CAS 失敗,會發生什麼?若是其餘線程在第一個和第二個 CAS 之間企圖訪問鏈表,會發生什麼?
對於非複雜數據結構,構建非阻塞算法的 「技巧」 是確保數據結構總處於一致的狀態(甚至包括在線程開始修改數據結構和它完成修改之間),還要確保其餘線程不只可以判斷出第一個線程已經完成了更新仍是處在更新的中途,還可以判斷出若是第一個線程走向 AWOL,完成更新還須要什麼操做。若是線程發現了處在更新中途的數據結構,它就能夠 「幫助」 正在執行更新的線程完成更新,而後再進行本身的操做。當第一個線程回來試圖完成本身的更新時,會發現再也不須要了,返回便可,由於 CAS 會檢測到幫助線程的干預(在這種狀況下,是建設性的干預)。
這種 「幫助鄰居」 的要求,對於讓數據結構免受單個線程失敗的影響,是必需的。若是線程發現數據結構正處在被其餘線程更新的中途,而後就等候其餘線程完成更新,那麼若是其餘線程在操做中途失敗,這個線程就可能永遠等候下去。即便不出現故障,這種方式也會提供糟糕的性能,由於新到達的線程必須放棄處理器,致使上下文切換,或者等到本身的時間片過時(而這更糟)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public
class
LinkedQueue <E> {
private
static
class
Node <E> {
final
E item;
final
AtomicReference<Node<E>> next;
Node(E item, Node<E> next) {
this
.item = item;
this
.next =
new
AtomicReference<Node<E>>(next);
}
}
private
AtomicReference<Node<E>> head
=
new
AtomicReference<Node<E>>(
new
Node<E>(
null
,
null
));
private
AtomicReference<Node<E>> tail = head;
public
boolean
put(E item) {
Node<E> newNode =
new
Node<E>(item,
null
);
while
(
true
) {
Node<E> curTail = tail.get();
Node<E> residue = curTail.next.get();
if
(curTail == tail.get()) {
if
(residue ==
null
)
/* A */
{
if
(curTail.next.compareAndSet(
null
, newNode))
/* C */
{
tail.compareAndSet(curTail, newNode)
/* D */
;
return
true
;
}
}
else
{
tail.compareAndSet(curTail, residue)
/* B */
;
}
}
}
}
}
|
具體算法相見IBM Developerworks
Java5中的ConcurrentHashMap,線程安全,設計巧妙,用桶粒度的鎖,避免了put和get中對整個map的鎖定,尤爲在get中,只對一個HashEntry作鎖定操做,性能提高是顯而易見的。
具體實現中使用了鎖分離機制,在這個帖子中有很是詳細的討論。這裏有關於Java內存模型結合ConcurrentHashMap的分析。如下是JDK6的ConcurrentHashMap的源碼:
ConcurrentLinkedQueue也是一樣使用了CAS指令,但其性能並不高由於太多CAS操做。其源碼以下:
服務端編程的3大性能殺手:一、大量線程致使的線程切換開銷。二、鎖。三、非必要的內存拷貝。在高併發下,對於純內存操做來講,單線程是要比多線程快的, 能夠比較一下多線程程序在壓力測試下cpu的sy和ni百分比。高併發環境下要實現高吞吐量和線程安全,兩個思路:一個是用優化的鎖實現,一個是lock-free的無鎖結構。但非阻塞算法要比基於鎖的算法複雜得多。開發非阻塞算法是至關專業的訓練,並且要證實算法的正確也極爲困難,不只和具體的目標機器平臺和編譯器相關,並且須要複雜的技巧和嚴格的測試。雖然Lock-Free編程很是困難,可是它一般能夠帶來比基於鎖編程更高的吞吐量。因此Lock-Free編程是大有前途的技術。它在線程停止、優先級倒置以及信號安全等方面都有着良好的表現。
另外,在設計思路上除了儘可能減小資源爭用之外,還能夠借鑑nginx/node.js等單線程大循環的機制,用單線程或CPU數相同的線程開闢大的隊列,併發的時候任務壓入隊列,線程輪詢而後一個個順序執行。因爲每一個都採用異步I/O,沒有阻塞線程。這個大隊列能夠使用RabbitMQueue,或是JDK的同步隊列(性能稍差),或是使用Disruptor無鎖隊列(Java)。任務處理能夠所有放在內存(多級緩存、讀寫分離、ConcurrentHashMap、甚至分佈式緩存Redis)中進行增刪改查。最後用Quarz維護定時把緩存數據同步到DB中。固然,這只是中小型系統的思路,若是是大型分佈式系統會很是複雜,須要分而治理,用SOA的思路,參考這篇文章的圖。(注:Redis是單線程的純內存數據庫,單線程無需鎖,而Memcache是多線程的帶CAS算法,二者都使用epoll,no-blocking io)
若是深刻 JVM 和操做系統,會發現非阻塞算法無處不在。垃圾收集器使用非阻塞算法加快併發和平行的垃圾蒐集;調度器使用非阻塞算法有效地調度線程和進程,實現內在鎖。在 Mustang(Java 6.0)中,基於鎖的SynchronousQueue
算法被新的非阻塞版本代替。不多有開發人員會直接使用 SynchronousQueue
,可是經過Executors.newCachedThreadPool()
工廠構建的線程池用它做爲工做隊列。比較緩存線程池性能的對比測試顯示,新的非阻塞同步隊列實現提供了幾乎是當前實現 3 倍的速度。在 Mustang 的後續版本(代碼名稱爲 Dolphin)中,已經規劃了進一步的改進。