編寫高效的併發程序,須要對互斥問題從新研究,設計出適用於多線程的互斥協議。那麼問題來了,若是不能得到鎖,應該怎麼作?java
旋轉:繼續進行嘗試,如自旋鎖,延遲較短;node
阻塞:掛起本身,請求調度器切換到另外一個線程,代價較大。git
綜合來看,先旋轉一小段時間再阻塞,是種不錯的選擇。github
java.util.concurrent.locks.Lock
接口提供了lock()
和unlock()
兩個重要的方法,用於解決實際互斥問題。算法
Lock mutex = new MyLock(); mutex.lock(); try { do something } finally { mutex.unlock(); }
測試-設置來源於getAndSet()
操做,經過一個原子布爾型狀態變量的值判斷當前鎖的狀態。若爲true
表示鎖忙,若爲false
表示鎖空閒。數組
public class TASLock { AtomicBoolean state = new AtomicBoolean(false); public void lock() { while (state.getAndSet(true)) { ; } } public void unlock() { state.set(false); } }
TTASLock是升級版的TASLock算法,沒有直接調用getAndSet()
方法,而是在鎖看起來空閒(state.get()
返回false
)時才調用。緩存
public class TTASLock { AtomicBoolean state = new AtomicBoolean(false); public void lock() { while (true) { while (state.get()) { ; } if (! state.getAndSet(true)) { return; } } } public void unlock() { state.set(false); } }
這兩個算法都能保證無死鎖的互斥,可是TTASLock的性能會比TASLock高許多。多線程
能夠從計算機系統結構的高速緩存和局部性來解釋這個問題,每一個處理器都有一個cache,cache的訪問速度比內存快好幾個數量級。當cache命中時,會當即加載這個值;當cache缺失時,會在內存或兩一個處理器的cache中尋找這個數據。尋找的過程比較漫長,處理器在總線上廣播這個地址,其餘處理器監聽總線。若其餘處理器在本身的cache中發現這個地址,則廣播該地址及其值來作出響應。若全部處理器都沒發現這個地址,則之內存地址及其所對應的值進行響應。併發
getAndSet()
的直接調用讓TASLock性能損失許多:dom
getAndSet()
的調用實質是總線上的一個廣播,這個調用將會延遲全部的線程,由於全部線程都要經過監聽總線通訊。
getAndSet()
的調用會更新state
的值,即便值仍爲true
,可是其餘處理器cache中的鎖副本將會被丟棄,從而致使cache缺失。
當持有鎖的線程試圖釋放鎖時可能被延遲,由於總線被正在自旋的線程獨佔。
與此相反,對於TTASLock算法採用的是本地旋轉(線程反覆地重讀被緩存的值而不是反覆地使用總線),線程A持有鎖時,線程B嘗試得到鎖,但線程B只會在第一次讀鎖是cache缺失,以後每次cache命中不產生總線流量。
那麼缺點來了,TTASLock釋放鎖時,會使各自旋線程處理器中的cache副本當即失效,這些線程會從新讀取這個值,形成總線流量風暴。
對於TTASLock算法,當鎖看似空閒(state.get()
返回false
)時,存在高爭用(多個線程試圖同時獲取一個鎖)的可能。高爭用意味着獲取鎖的可能性小,而且會形成總線流量增長。線程在重試以前回退一段時間是種不錯的選擇。
這裏實現的指數回退算法的回退準則是,不成功嘗試的次數越多,發生爭用的可能性就越高,線程須要後退的時間就應越長。
public class BackoffLock { private AtomicBoolean state = new AtomicBoolean(false); private static final int MIN_DELAY = 10; private static final int MAX_DELAY = 100; public void lock() { Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY); while (true) { while (state.get()) { ; } if (! state.getAndSet(true)) { return; } else { backoff.backoff(); } } } public void unlock() { state.set(false); } } class Backoff { private final int minDelay, maxDelay; int limit; final Random random; public Backoff(int min, int max) { minDelay = min; maxDelay = max; limit = minDelay; random = new Random(); } public void backoff() { int delay = random.nextInt(limit); limit = Math.min(maxDelay, 2 * limit); try { Thread.sleep(delay); } catch (InterruptedException e) { ; } } }
指數後退算法解決了TTASLock釋放鎖時的高爭用問題,可是它的性能與minDelay
和maxDelay
的選取密切相關,而且很難找到一個通用兼容的值。
另外,BackoffLock算法還有兩個問題:
cache一致性流量:全部線程都在同一個共享存儲單元上旋轉;
臨界區利用率低:後退時間沒法肯定,線程延遲可能過長。
下面的這些是隊列鎖,名字看上去奇形怪狀的,實際上是發明者名字的首字母。隊列鎖就是將線程組織成一個隊列,讓每一個線程在不一樣的存儲單元上旋轉,從而下降cache一致性流量。
基於循環數組實現隊列鎖ALock,每一個線程檢測本身的slot對應的flag[]
域來判斷是否輪到本身。
一個線程想得到鎖,就要調用lock()
方法,得到自增tail
得到分配的slot號,而後等待這個slot空閒;當釋放鎖時,就要阻塞當前slot,而後讓下一個slot可運行。
當flag[i]
爲true
時,那麼這個線程就有權得到鎖。任意時刻的flag[]
數組中,應該只有一個slot的值爲true
。
public class ALock { ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>(); AtomicInteger tail; volatile boolean [] flag; int size; public ALock() { size = 100; tail = new AtomicInteger(0); flag = new boolean[size]; flag[0] = true; } public void lock() { int slot = tail.getAndIncrement() % size; mySlotIndex.set(slot); while (! flag[slot]) { ; } } public void unlock() { int slot = mySlotIndex.get(); flag[slot] = false; flag[(slot + 1) % size] = true; } }
mySlotIndex
是線程的局部變量,只能被一個線程訪問,每一個線程都有本身獨立初始化的副本。不須要保存在共享存儲器,不須要同步,不會產生一致性流量。使用get()
和set()
方法來訪問局部變量的值。
tail
是常規變量,域被全部的線程共享,支持原子操做。
數組flag[]
也是被多個線程共享的,可是每一個線程都是在一個數組元素的本地cache副本上旋轉。
ALock對BackoffLock的改進:在多個共享存儲單元上旋轉,將cache無效性降到最低;把一個線程釋放鎖和另外一個線程得到該鎖之間的時間間隔最小化;先來先服務的公平性。可是,數組的大小至少與最大的併發線程數相同,並非空間有效的,當併發線程最大個數爲n時,同步L個不一樣對象就須要O(Ln)大小的空間。
CLH隊列鎖表示爲QNode
對象的鏈表,每一個線程經過一個線程局部變量pred
指向其前驅。每一個線程經過檢測前驅結點的locked
域來判斷是否輪到本身。若是該域爲true
,則前驅線程要麼已經得到鎖要麼正在等待鎖;若是該域爲false
,則前驅進程已釋放鎖,輪到本身了。正常狀況下,隊列鏈中只有一個結點的locked
域爲false
。
當一個線程調用lock()
方法想得到鎖時,將本身的locked
域置爲true
,表示該線程不許備釋放鎖,而後並將本身的結點加入到隊列鏈尾部。最後就是在前驅的locked
域上旋轉,等待前驅釋放鎖。當這個線程調用unlock()
方法要釋放鎖時,線程要將本身的locked
域置爲false
,表示已經釋放鎖,而後將前驅結點做爲本身的新結點以便往後訪問。
那麼問題來了,爲何要在釋放鎖時作myNode.set(myPred.get())
這個處理呢?假設線程A釋放鎖,A的後繼結點爲B,若是不使用這種處理方式,A在釋放鎖後立刻申請鎖將本身的locked
域置爲true
,整個動做在B檢測到前驅A釋放鎖以前,那麼將發生死鎖。
public class CLHLock { AtomicReference<QNode> tail; ThreadLocal<QNode> myPred; ThreadLocal<QNode> myNode; public CLHLock() { tail = new AtomicReference<QNode>(new QNode()); myPred = new ThreadLocal<QNode>() { protected QNode initialValue() { return null; } }; myNode = new ThreadLocal<QNode>() { protected QNode initialValue() { return new QNode(); } }; } public void lock() { QNode qnode = myNode.get(); qnode.locked = true; QNode pred = tail.getAndSet(qnode); myPred.set(pred); while (pred.locked) { ; } } public void unlock() { QNode qnode = myNode.get(); qnode.locked = false; myNode.set(myPred.get()); } class QNode { boolean locked = false; } }
若是最大線程數爲n,有L個不一樣對象,那麼CLHLock只須要O(L+n)空間。比ALock所需空間少,而且不須要知道可能使用鎖的最大線程數量。可是,在無cache的系統上性能較差,由於一次要訪問兩個結點,若這兩個結點內存位置較遠,性能損失會很大。
MCS隊列鎖經過檢測本身所在結點的locked
的值來判斷是否輪到本身,等待這個域被前驅釋放鎖時改變。
線程若要得到鎖,需把本身結點添加到鏈表的尾部。若隊列鏈表原先爲空,則得到鎖。不然,將前驅結點的next
域指向本身,在本身的locked
域上自旋等待,直到前驅將該域置爲false
。線程若要釋放鎖,判斷是否在隊尾,若是是隻需將tail
置爲null
,若是不是需將後繼的locked
域置爲false
且將本身結點的next
域置爲默認的null
。注意在隊尾的狀況,可能有個線程正在得到鎖,要等一下變爲後一種狀況。
public class MCSLock { AtomicReference<QNode> tail; ThreadLocal<QNode> myNode; public MCSLock() { tail = new AtomicReference<QNode>(null); myNode = new ThreadLocal<QNode>() { protected QNode initialValue() { return new QNode(); } }; } public void lock() { QNode qnode = myNode.get(); QNode pred = tail.getAndSet(qnode); if (pred != null) { qnode.locked = true; pred.next = qnode; while (qnode.locked) { ; } } } public void unlock() { QNode qnode = myNode.get(); if (qnode.next == null) { if (tail.compareAndSet(qnode, null)) { return; } while (qnode.next == null) { ; } } qnode.next.locked = false; qnode.next = null; } class QNode { boolean locked = false; QNode next = null; } }
結點能被重複使用,該鎖的空間複雜度爲O(L+n)。MCSLock算法適合無cache的系統結構,由於每一個線程只需控制本身自旋的存儲單元。可是,釋放鎖也須要旋轉,另外讀寫比較次數比CLHLock多。
Lock接口中有個一個tryLock()
方法,能夠指定一個時限(得到鎖可等待的最大時間),若得到鎖超時則放棄嘗試。最後返回一個布爾值說明鎖是否申請成功。
時限隊列鎖TOLock是基於CLHLock類的,鎖是一個結點的虛擬隊列,每一個結點在它的前驅結點上自旋等待鎖釋放。可是當這個線程超時時,不能簡單的拋棄它的隊列結點,而是將這個結點標記爲已廢棄,這樣它的後繼們(若是有)就會注意到這個結點已放棄,轉而在放棄結點的前驅上自旋。爲了保證隊列鏈表的連續性,每次申請鎖都會new
一個QNode
。
時限隊列鎖結點的域pred
會特殊一點,它有3種類型的取值:
null
:初始狀態,未得到鎖或已釋放鎖;
AVAILABLE
:一個靜態結點,表示對應結點已釋放鎖,申請鎖成功;
QNode
:pred
域爲null
的前驅結點,表示對應結點因超時放棄鎖請求,在放棄請求時纔會設置這個值。
申請鎖時,建立一個pred
域爲null
的新結點並加入到鏈表尾部,若原先鏈表爲空或前驅結點已釋放鎖,則得到鎖。不然,在嘗試時間內,找到pred
域爲null
的前驅結點,等待它釋放鎖。若在等待前驅結點釋放鎖的過程當中超時,就嘗試從鏈表中刪除這個結點,要分這個結點是否有後繼兩種狀況。
釋放鎖時,檢查該結點是否有後繼,若無後繼可直接把tail
設置爲null
,不然將該結點的pred
域指向AVAILABLE
。
public class TOLock { static QNode AVAILABLE = new QNode(); AtomicReference<QNode> tail; ThreadLocal<QNode> myNode; public TOLock() { tail = new AtomicReference<QNode>(null); myNode = new ThreadLocal<QNode>(); } public boolean tryLock(long time, TimeUnit unit) { long startTime = System.currentTimeMillis(); long patience = TimeUnit.MILLISECONDS.convert(time, unit); QNode qnode = new QNode(); myNode.set(qnode); qnode.pred = null; QNode myPred = tail.getAndSet(qnode); if (myPred == null || myPred.pred == AVAILABLE) { return true; } while (System.currentTimeMillis() - startTime < patience) { QNode predPred = myPred.pred; if (predPred == AVAILABLE) { return true; } else { if (predPred != null) { myPred = predPred; } } } if (! tail.compareAndSet(qnode, myPred)) { qnode.pred = myPred; } return false; } public void unlock() { QNode qnode = myNode.get(); if (! tail.compareAndSet(qnode, null)) { qnode.pred = AVAILABLE; } } static class QNode { public QNode pred = null; } }
優勢:同CLHLock。
缺點:每次申請鎖都要分配一個新結點,在鎖上旋轉的線程可能要回溯一個超時結點鏈。
上面實現的這些鎖算法不支持重入。咱們可使用銀行轉帳的例子來測試一下鎖的效果,任意帳戶間能夠隨意轉帳,鎖生效時全部帳戶的總金額是不變的。完整的算法實現和測試代碼在這裏。