Java併發編程實踐 目錄html
併發編程 04—— 閉鎖CountDownLatch 與 柵欄CyclicBarrier數組
併發編程 06—— CompletionService : Executor 和 BlockingQueue安全
併發編程 07—— 任務取消數據結構
併發編程 10—— 任務取消 之 關閉 ExecutorService
併發編程 12—— 任務取消與關閉 之 shutdownNow 的侷限性
併發編程 13—— 線程池的使用 之 配置ThreadPoolExecutor 和 飽和策略
併發編程 20—— AbstractQueuedSynchronizer 深刻分析
概述
java傳統上是使用鎖來實現線程間的同步。每個object都有一個內部鎖(intrinsic lock),關鍵字synchronized實現了鎖的互斥。方法Object.wait主動的放棄鎖,而方法Object.notify(All)則用來喚醒waiting中的線程。使用synchronized確保線程同步的正確性,可是也容易形成線程的阻塞。
volatile變量與鎖相比是更輕量級的同步機制,但僅僅能保證內存的可見性,而不能用於原子化的操做。++i 看起來是原子的,而實際上倒是取當前值,自增長一,而後回寫更新。
使用鎖有如下的缺點:
(1) 若是線程持有鎖而延遲,會致使其餘的線程的等待。
(2) 高優先級的線程阻塞,而低優先級的線程持有鎖形成 優先級反轉(priority inversion)。
(3) 若是持有鎖的線程被永久地阻塞,全部等待這個鎖的線程就沒法執行下去,形成 死鎖(dead lock)。
獨佔鎖是一種悲觀的技術,老是假設闖入的線程會改變共享的變量。而對於細粒度的操做應該採用的樂觀的解決方法。允許其餘線程的闖入,可是在須要改變共享變量以前老是要檢測其是否被修改。若是沒有被修改,則完成更新,不然就放棄更新。CAS(compare and swap)就是這樣的一種技術,如今已經被大多數處理器直接支持。
CAS的含義是:「我認爲V的值應該是A,若是是,那麼將V的值更新爲B,不然不修改並告訴V的值實際爲多少」。CAS是一項樂觀的技術,它但願能成功地執行更新操做,而且若是有另一個線程在最近一次檢查後更新了該變量,那麼CAS能檢測到這個錯誤。
1 @ ThreadSafe 2 public class SimulatedCAS { 3 @ GuardeBy( "this") private int value ; 4 5 public synchronized int get(){ 6 return value ; 7 } 8 9 public synchronized int compareAndSwap( int expectedValue, int newValue){ 10 int oldValue = value ; 11 if (oldValue == expectedValue) 12 value = newValue; 13 return oldValue; 14 } 15 16 public synchronized boolean compareAndSet( int expectedValue, int newValue){ 17 return (expectedValue == compareAndSwap(expectedValue, newValue)); 18 } 19 }
當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其餘線程都將失敗。然而,失敗的線程並不會被掛起(這與獲取鎖的狀況不一樣:當獲取鎖失敗時,線程將被掛起),而是告知子啊此次競爭中失敗,並能夠再次嘗試。因爲一個線程在競爭CAS時失敗不會阻塞,所以它能夠決定是否從新嘗試,或者執行一些恢復操做,也能夠不執行任何操做。這種靈活性就大大減小了與鎖相關的活躍性風險。
CAS的典型使用模式是:首先從V中讀取值A,並根據A計算新值B,而後再經過CAS以原子方式將V中的值由A變成B(只要在這期間沒有任何線程將V的值修改成其餘值)。因爲CAS能檢測到來自其餘線程的干擾,所以即便不使用鎖也可以實現原子的 讀——改——寫 操做序列。
下面程序中的CasCounter 使用了CAS實現了一個線程安全的計數器。遞增採用了標準形式——讀取舊的值,根據它計算出新值(加1),並使用CAS來設置這個新值。若是CAS失敗,那麼該操做當即重試。一般,反覆地重試是一種合理的策略,但在一些競爭和激烈的狀況下,更好的方式是在重試以前首先等待一段時間或者回退,從而避免形成活鎖的問題。
1 @ ThreadSafe 2 public class CasCounter { 3 private SimulatedCAS value ; 4 5 public int getValue(){ 6 return value .get(); 7 } 8 9 public int increment(){ 10 int v; 11 do { 12 v = value .get(); 13 } while (v != value .compareAndSwap(v, v + 1)); 14 return v + 1; 15 } 16 }
CAS的主要缺點是:它將使調度者處理競爭問題(經過重試、回退、放棄),而在使用鎖中能自動處理競爭問題(線程在得到鎖以前將一直阻塞)。
java.util.concurrent.atomic 類的小工具包,支持在單個變量上解除鎖的線程安全編程。 AtomicBoolean 能夠用原子方式更新的 boolean 值。 AtomicInteger 能夠用原子方式更新的 int 值。 AtomicIntegerArray 能夠用原子方式更新其元素的 int 數組。 AtomicIntegerFieldUpdater<T> 基於反射的實用工具,能夠對指定類的指定 volatile int 字段進行原子更新。 AtomicLong 能夠用原子方式更新的 long 值。 AtomicLongArray 能夠用原子方式更新其元素的 long 數組。 AtomicLongFieldUpdater<T> 基於反射的實用工具,能夠對指定類的指定 volatile long 字段進行原子更新。 AtomicMarkableReference<V> AtomicMarkableReference 維護帶有標記位的對象引用,能夠原子方式對其進行更新。 AtomicReference<V> 能夠用原子方式更新的對象引用。 AtomicReferenceArray<E> 能夠用原子方式更新其元素的對象引用數組。 AtomicReferenceFieldUpdater<T,V> 基於反射的實用工具,能夠對指定類的指定 volatile 字段進行原子更新。 AtomicStampedReference<V> AtomicStampedReference 維護帶有整數「標誌」的對象引用,能夠用原子方式對其進行更新。
在Java 5.0以前,若是不編寫明確的代碼,那麼就沒法執行CAS。在Java 5.0引入了底層的支持,在int 、long 和對象的引用等類型上都公開了CAS操做,而且JVM把他們編譯爲底層硬件提供的最有效方法。在支持CAS的平臺上,運行時把它們編譯爲相應的(多條)機器指令。在最壞的狀況下,若是不支持CAS指令,那麼JVM將用自旋鎖。在原子變量類(例如 java.util.concurrent.atomic中的AtomicXxx)中使用了這些底層的JVM支持爲數字類型和引用類型提供一種高效的CAS操做,而在java.util.concurrent 中大多數類在現實時則直接或者間接地使用了這些原子變量類。
原子變量比鎖的粒度更細,量級更輕,而且對於在多處理器系統上實現高性能的併發代碼來講是很是關鍵的。原子變量將發生競爭的範圍縮小到單個變量上,這是你得到的粒度最新的狀況。
經過CAS來維持包含多個變量的不變性條件例子:在下面程序中CasNumberRange 中使用了AtomicReference和IntPair 來保存狀態,並經過使用Compare-AndSet,使它在更新上界或下界時能避免NumberRange的競態條件。
1 import java.util.concurrent.atomic.AtomicReference; 2 3 public class CasNumberRange { 4 private static class IntPair{ 5 final int lower ; // 不變性條件: lower <= upper 6 final int upper ; 7 8 public IntPair( int lower, int upper) { 9 this .lower = lower; 10 this .upper = upper; 11 } 12 } 13 14 private final AtomicReference<IntPair> values = 15 new AtomicReference<IntPair>( new IntPair(0, 0)); 16 17 public int getLower(){ 18 return values .get(). lower; 19 } 20 21 public int getUpper(){ 22 return values .get(). upper; 23 } 24 25 public void setLower( int i){ 26 while (true ){ 27 IntPair oldv = values .get(); 28 if (i > oldv.upper ){ 29 throw new IllegalArgumentException( "Cant't set lower to " + i + " > upper"); 30 } 31 IntPair newv = new IntPair(i, oldv.upper ); 32 if (values .compareAndSet(oldv, newv)){ 33 return ; 34 } 35 } 36 } 37 // 對setUpper採用相似的方法 38 }
性能比較:鎖與原子變量
使用ReentrantLock、AtomicInteger、ThreadLocal比較,一般狀況下效率排序是ThreadLocal > AtomicInteger > ReentrantLock。
若是在某種算法中,一個線程的失敗或掛起不會致使其餘線程也失敗或掛起,那麼這種算法就被稱爲非阻塞算法。
若是在算法的每一個步驟中都存在某個線程可以執行下去,那麼這種算法也被稱爲無鎖(Lock-Free)算法。
到目前爲止,已經看到了一個非阻塞算法:CasCounter。在許多常見的數據結構中均可以使用非阻塞算法,包括棧、隊列、優先隊列以及散列表等。
建立非阻塞算法的關鍵在於,找出如何將原子修改的範圍縮小到單個變量上,同時還要維護數據的一致性。
棧是最簡單的鏈式數據結構:每一個元素僅指向一個元素,而且每一個元素也只被一個元素引用。在下面程序 ConcurrentStack中,給出瞭如何經過原子引用來構建棧的實例。棧是有Node 元素構成的一個鏈表,其中棧頂做爲根節點,而且在每一個元素中都包含了一個只以及指向下一個元素的連接。push 方法建立一個新的節點,該節點的next 域指向當前的棧頂,而後使用CAS把這個新節點放入棧頂。若是在開始插入節點時,位於棧頂的節點沒有發生變化,,那麼CAS就會成功,若是棧頂節點發生變化(例如因爲其餘線程在本線程開始以前插入或移除了元素),那麼CAS將會失敗,而push 方法會根據棧的當前狀態來更新節點,而且再次嘗試。不管哪一種狀況,在CAS執行完成後,棧仍會處於一致的狀態。
1 /** 2 * ConcurrentStack 3 * <p/> 4 * Nonblocking stack using Treiber's algorithm 5 * 6 * @author Brian Goetz and Tim Peierls 7 */ 8 @ThreadSafe 9 public class ConcurrentStack <E> { 10 AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); 11 12 public void push(E item) { 13 Node<E> newHead = new Node<E>(item); 14 Node<E> oldHead; 15 do { 16 oldHead = top.get(); 17 newHead.next = oldHead; 18 } while (!top.compareAndSet(oldHead, newHead)); 19 } 20 21 public E pop() { 22 Node<E> oldHead; 23 Node<E> newHead; 24 do { 25 oldHead = top.get(); 26 if (oldHead == null) 27 return null; 28 newHead = oldHead.next; 29 } while (!top.compareAndSet(oldHead, newHead)); 30 return oldHead.item; 31 } 32 33 private static class Node <E> { 34 public final E item; 35 public Node<E> next; 36 37 public Node(E item) { 38 this.item = item; 39 } 40 } 41 }
在CasCounter 和 ConcurrentStack中說明了非阻塞算法的全部特性:某項工做的完成具備不肯定性,必須從新執行。
在像ConcurrentStackz這樣的非阻塞算法中都能確保線程安全性,由於compareAndSet像鎖定機制同樣,既能提供原子性,又能提供可見性。當一個線程須要改變棧的狀態時,將調動compareAndSet,這個方法與寫入volaitle類型的變量有着相同的內存效果。當線程檢查棧的狀態時,將在同一個AtomicReference上調用get方法,這個方法與讀取volaitle類型的變量有着相同的內存效果。所以,一個線程執行的任何修改結構均可以安全地發佈給其餘正在查看狀態的線程。而且,這個棧是經過compareAndSet來修改的,所以將採用原子操做來更新top的引用,或者在發現存在其餘線程干擾的狀況下,修改操做將失敗。
CAS的基本使用模式:在更新某個值時存在不肯定性,以及在更新失敗時從新嘗試。構建非阻塞算法的技巧在於:將執行原子修改的範圍縮小到單個變量上。
連接隊列比棧更爲複雜,由於它必須支持對頭節點和尾節點的快速訪問。所以,它須要單獨維護的頭指針和尾指針。有兩個指針指向尾部的節點:當前最後一個元素的next指針,以及尾節點。當成功地插入一個新元素時,這兩個指針都須要採用原子操做來更新。
這裏須要一些技巧來完成,第一個技巧是,即便在一個包含多個步驟的更新操做中,也要確保數據結構老是處於一致的狀態。這樣,當線程B到達時,若是發現線程A正在執行更新,那麼線程B就能夠知道有一個操做已部分完成,而且不能當即開始執行本身的更新操做。而後,B能夠等待(經過反覆檢查隊列的狀態)並直到A完成更新,從而使兩個線程不會相互干擾。
雖然這種方法可以使不一樣的線程「輪流」訪問呢數據結構,而且不會形成破壞,但若是一個線程在更新操做中失敗了,那麼其餘的線程都沒法在訪問隊列。要使得該算法成爲一個非阻塞算法,必須確保當一個線程失敗時不會妨礙其餘線程繼續執行下去。所以,第二個技巧是,若是當B到達時發現A正在修改數據結構,那麼在數據結構中應該有足夠多的信息,使得B能完成A的更新操做。若是B「幫助」A完成了更新操做,那麼B能夠執行本身的操做,而不用等到A的操做完成。當A恢復後再試圖完成其操做時,會發現B已經替它完成了。
在下面的程序中,給出了 Michael-Scott 提出的非阻塞連界隊列算法中的插入部分,它是由 ConcurrentLinkedQueue
實現的。在許多隊列算法中,空隊列一般都包含一個「哨兵節點」或者「啞(Dummy)節點」,而且頭節點和尾節點在初始化時都指向該哨兵節點。尾節點一般要麼指向哨兵節點(若是隊列爲空),即隊列的最後一個元素,要麼(當有操做正在執行更新時)指向倒數第二個元素。下圖1給出了一個處於正常狀態(或者說穩定狀態)的包含兩個元素的隊列。
1 @ThreadSafe 2 public class LinkedQueue<E> { 3 private static class Node <E> { 4 final E item; 5 final AtomicReference<LinkedQueue.Node<E>> next; 6 7 public Node(E item, LinkedQueue.Node<E> next) { 8 this.item = item; 9 this.next = new AtomicReference<LinkedQueue.Node<E>>(next); 10 } 11 } 12 13 private final LinkedQueue.Node<E> dummy = new LinkedQueue.Node<E>(null, null); 14 private final AtomicReference<LinkedQueue.Node<E>> head 15 = new AtomicReference<LinkedQueue.Node<E>>(dummy); 16 private final AtomicReference<LinkedQueue.Node<E>> tail 17 = new AtomicReference<LinkedQueue.Node<E>>(dummy); 18 19 public boolean put(E item) { 20 LinkedQueue.Node<E> newNode = new LinkedQueue.Node<E>(item, null); 21 while (true) { 22 LinkedQueue.Node<E> curTail = tail.get(); 23 LinkedQueue.Node<E> tailNext = curTail.next.get(); 24 if (curTail == tail.get()) { 25 if (tailNext != null) { // A 26 // 隊列處於中間狀態,推動尾節點 27 tail.compareAndSet(curTail, tailNext); // B 28 } else { 29 // 處於穩定狀態,嘗試插入新節點 30 if (curTail.next.compareAndSet(null, newNode)) { // C 31 // 插入操做成功,嘗試推動尾節點 32 tail.compareAndSet(curTail, newNode); // D 33 return true; 34 } 35 } 36 } 37 } 38 } 39 }
圖1 處於穩定狀態幷包含兩個元素的對立
當插入一個新的元素時,須要更新兩個指針。首先更新當前最後一個元素的next 指針,將新節點連接到隊列隊尾,而後更新尾節點,將其指向這個新元素。在兩個操做之間,隊列處於一種中間狀態,如圖2。在等二次更新完成後,隊列將再次處於穩定狀態,如圖3所示。
實現這兩個技巧的關鍵在於:當隊列處於穩定狀態時,尾節點的next域將爲空,若是隊列處於中間狀態,那麼tail.next 將爲非空。所以,任何線程都可以經過檢查tail.next 來獲取隊列當前的狀態。並且,當隊列處於中間狀態時,能夠經過將尾節點移動一個節點,從而結束其餘線程正在執行的插入元素操做,並使得隊列恢復爲穩定狀態。
圖2 在插入過程當中處於中間狀態的對立
圖3 在插入操做完成後,隊列再次處於穩定狀態
LinkedQueue.put 方法在插入新元素以前,將首先檢查隊列是否處於中間狀態(步驟A)。若是是,那麼有另外一個線程正在插入元素(在步驟C和D之間)。此時當前線程不會等待其餘線程執行完成,而是幫助它完成操做,並將尾節點向前推動一個節點(步驟B)。而後,它將重複執行這種檢查,以避免另外一個線程已經開始插入新元素,並繼續推動尾節點,直到它發現隊列處於穩定狀態以後,纔會開始執行本身的插入操做。
因爲步驟C中的CAS將把新節點連接到隊列尾部,所以若是兩個線程同時插入元素,那麼這個CAS將失敗。在這樣的狀況下,並不會形成破壞:不會發生任何變化,而且當前的線程只須要從新讀取尾節點並再次重試。若是步驟C成功了,那麼插入操做將生效,第二個CAS(步驟D)被認爲是一個「清理操做」,由於它既能夠由執行插入操做的線程來執行,也能夠由其餘任何線程來執行。若是步驟D失敗,那麼執行插入操做的線程將返回,而不是從新執行CAS,由於再也不須要重試——另外一個線程已經在步驟B中完成了這個工做。
這種方式可以工做,由於在任何線程嘗試將一個新節點插入到隊列以前,都會首先經過檢查tail.next是否非空來判斷是否須要清理隊列。若是是,它首先會推薦尾節點(可能須要執行屢次),直到隊列處於穩定狀態。
ABA問題是一種異常現象:若是在算法中的節點能夠被循環使用,那麼在使用「比較並交換」指令時就可能出現這種問題(主要在沒有垃圾回收機制的環境中)。在CAS操做中將判斷「V的值是否仍然是A?」,而且若是是的話就繼續執行更新操做。在大多數狀況下,這種判斷是徹底足夠的。然而,有時候還須要知道「自從上次看到V的值爲A以來,這個值是否發生了變化?」 在某些算法中,若是V的值首先由A變成B,再由B變成A,那麼仍然被認爲是發生了變化,並須要從新執行算法中的某些步驟。
有一個相對簡單的解決方案:不是更新某個引用的值,而是更新兩個值,包括一個引用和一個版本號。即便這個值由A變成 B,而後又變成A,版本號也將是不一樣的。
1.《java併發編程實戰》 第15章