public void java.util.concurrent.locks.ReentrantLock.lock()html
獲取鎖。java
若是該鎖沒有被另外一個線程保持,則獲取該鎖並當即返回,將鎖的保持計數設置爲 1。node
若是當前線程已經保持該鎖,則將保持計數加 1,而且該方法當即返回。算法
若是該鎖被另外一個線程保持,則出於線程調度的目的,禁用當前線程,而且在得到鎖以前,該線程將一直處於休眠狀態,此時鎖保持計數被設置爲 1。併發
從上面的文檔能夠看出ReentrantLock是可重入鎖的實現。高併發
而內部是委託java.util.concurrent.locks.ReentrantLock.Sync.lock()實現的。post
java.util.concurrent.locks.ReentrantLock.Sync是抽象類,有java.util.concurrent.locks.ReentrantLock.FairSync和java.util.concurrent.locks.ReentrantLock.NonfairSync兩個實現,也就是常說的公平鎖和不公平鎖。性能
公平鎖和非公平鎖ui
若是獲取一個鎖是按照請求的順序獲得的,那麼就是公平鎖,不然就是非公平鎖。spa
在沒有深刻了解內部機制及實現以前,先了解下爲何會存在公平鎖和非公平鎖。
公平鎖保證一個阻塞的線程最終可以得到鎖,由於是有序的,因此老是能夠按照請求的順序得到鎖。
不公平鎖意味着後請求鎖的線程可能在其前面排列的休眠線程恢復前拿到鎖,這樣就有可能提升併發的性能。這是由於一般狀況下掛起的線程從新開始與它真正開始運行,兩者之間會產生嚴重的延時。所以非公平鎖就能夠利用這段時間完成操做。
這是非公平鎖在某些時候比公平鎖性能要好的緣由之一。
兩者在實現上的區別會在後面介紹,咱們先從公平鎖(FairSync)開始。
前面說過java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS)是Lock的基礎,對於一個FairSync而言,lock()就直接調用AQS的acquire(int arg);
public final void acquire(int arg) 以獨佔模式獲取對象,忽略中斷。經過至少調用一次
tryAcquire(int)
來實現此方法,並在成功時返回。不然在成功以前,一直調用tryAcquire(int)
將線程加入隊列,線程可能重複被阻塞或不被阻塞。
在介紹實現以前先要補充上一節的知識,對於一個AQS的實現而言,一般狀況下須要實現如下方法來描述如何鎖定線程。
tryAcquire(int)
試圖在獨佔模式下獲取對象狀態。此方法應該查詢是否容許它在獨佔模式下獲取對象狀態,若是容許,則獲取它。此方法老是由執行 acquire 的線程來調用。若是此方法報告失敗,則 acquire 方法能夠將線程加入隊列(若是尚未將它加入隊列),直到得到其餘某個線程釋放了該線程的信號。也就是說此方法是一種嘗試性方法,若是成功獲取鎖那最好,若是沒有成功也沒有關係,直接返回false。
tryRelease(int)
試圖設置狀態來反映獨佔模式下的一個釋放。 此方法老是由正在執行釋放的線程調用。釋放鎖可能失敗或者拋出異常,這個在後面會具體分析。tryAcquireShared(int) 試圖在共享模式下獲取對象狀態。
tryReleaseShared(int) 試圖設置狀態來反映共享模式下的一個釋放。
isHeldExclusively() 若是對於當前(正調用的)線程,同步是以獨佔方式進行的,則返回
true
。
除了tryAcquire(int)外,其它方法會在後面具體介紹。首先對於ReentrantLock而言,不論是公平鎖仍是非公平鎖,都是獨佔鎖,也就是說同時可以有一個線程持有鎖。所以對於acquire(int arg)而言,arg==1。在AQS中acquire的實現以下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這個看起來比較複雜,咱們分解如下4個步驟。
這是一個比較複雜的過程,咱們循序漸進一個一個分析。
tryAcquire(acquires)
對於公平鎖而言,它的實現方式以下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (isFirst(current) &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
在這段代碼中,前面說明對於AQS存在一個state來描述當前有多少線程持有鎖。因爲AQS支持共享鎖(例如讀寫鎖,後面會繼續講),因此這裏state>=0,可是因爲ReentrantLock是獨佔鎖,因此這裏不妨理解爲0<=state,acquires=1。isFirst(current)是一個很複雜的邏輯,包括踢出無用的節點等複雜過程,這裏暫且不提,大致上的意思是說判斷AQS是否爲空或者當前線程是否在隊列頭(爲了區分公平與非公平鎖)。
比較非公平鎖的tryAcquire實現java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire(int),公平鎖多了一個判斷當前節點是否在隊列頭,這個就保證了是否按照請求鎖的順序來決定獲取鎖的順序(同一個線程的屢次獲取鎖除外)。
如今再回頭看公平鎖和非公平鎖的lock()方法。公平鎖只有一句acquire(1);而非公平鎖的調用以下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
很顯然,非公平鎖在第一次獲取鎖,或者其它線程釋放鎖後(可能等待),優先採用compareAndSetState(0,1)而後設置AQS獨佔線程而持有鎖,這樣有時候比acquire(1)順序檢查鎖持有而要高效。即便在重入鎖上,也就是compareAndSetState(0,1)失敗,可是是當前線程持有鎖上,非公平鎖也沒有問題。
addWaiter(mode)
tryAcquire失敗就意味着入隊列了。此時AQS的隊列中節點Node就開始發揮做用了。通常狀況下AQS支持獨佔鎖和共享鎖,而獨佔鎖在Node中就意味着條件(Condition)隊列爲空(上一篇中介紹過相關概念)。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有兩個常量,
static final Node EXCLUSIVE = null; //獨佔節點模式
static final Node SHARED = new Node(); //共享節點模式
addWaiter(mode)中的mode就是節點模式,也就是共享鎖仍是獨佔鎖模式。
前面一再強調ReentrantLock是獨佔鎖模式。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
上面是節點如隊列的一部分。當前僅當隊列不爲空而且將新節點插入尾部成功後直接返回新節點。不然進入enq(Node)進行操做。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq(Node)去隊列操做實現了CHL隊列的算法,若是爲空就建立頭結點,而後同時比較節點尾部是不是改變來決定CAS操做是否成功,當且僅當成功後纔將爲不節點的下一個節點指向爲新節點。能夠看到這裏仍然是CAS操做。
acquireQueued(node,arg)
自旋請求鎖,若是可能的話掛起線程,直到獲得鎖,返回當前線程是否中斷過(若是park()過而且中斷過的話有一個interrupted中斷位)。
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
下面的分析就須要用到上節節點的狀態描述了。acquireQueued過程是這樣的:
一個節點是否該park()是關鍵,這是由方法java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node, Node)實現的。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
if (s < 0) return true;
if (s > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}
selfInterrupt()
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
若是線程曾經中斷過(或者阻塞過)(好比手動interrupt()或者超時等等,那麼就再中斷一次,中斷兩次的意思就是清除中斷位)。
大致上整個Lock.lock()就這樣一個流程。除了lock()方法外,還有lockInterruptibly()/tryLock()/unlock()/newCondition()等,在接下來的章節中會一一介紹。