前文(深刻JVM鎖機制-synchronized)分析了JVM中的synchronized實現,本文繼續分析JVM中的另外一種鎖Lock的實現。與synchronized不一樣的是,Lock徹底用Java寫成,在java這個層面是無關JVM實現的。java
在java.util.concurrent.locks包中有不少Lock的實現類,經常使用的有ReentrantLock、ReadWriteLock(實現類ReentrantReadWriteLock),其實現都依賴java.util.concurrent.AbstractQueuedSynchronizer類,實現思路都大同小異,所以咱們以ReentrantLock做爲講解切入點。node
通過觀察ReentrantLock把全部Lock接口的操做都委派到一個Sync類上,該類繼承了AbstractQueuedSynchronizer:算法
[java] view plain copy設計模式
static abstract class Sync extends AbstractQueuedSynchronizer 數據結構
Sync又有兩個子類:併發
[java] view plain copyapp
final static class NonfairSync extends Sync 函數
[java] view plain copy高併發
final static class FairSync extends Sync 佈局
顯然是爲了支持公平鎖和非公平鎖而定義,默認狀況下爲非公平鎖。
先理一下Reentrant.lock()方法的調用過程(默認非公平鎖):
這些討厭的Template模式致使很難直觀的看到整個調用過程,其實經過上面調用過程及AbstractQueuedSynchronizer的註釋能夠發現,AbstractQueuedSynchronizer中抽象了絕大多數Lock的功能,而只把tryAcquire方法延遲到子類中實現。tryAcquire方法的語義在於用具體子類判斷請求線程是否能夠得到鎖,不管成功與否AbstractQueuedSynchronizer都將處理後面的流程。
簡單說來,AbstractQueuedSynchronizer會把全部的請求線程構成一個CLH隊列,當一個線程執行完畢(lock.unlock())時會激活本身的後繼節點,但正在執行的線程並不在隊列中,而那些等待執行的線程所有處於阻塞狀態,通過調查線程的顯式阻塞是經過調用LockSupport.park()完成,而LockSupport.park()則調用sun.misc.Unsafe.park()本地方法,再進一步,HotSpot在Linux中中經過調用pthread_mutex_lock函數把線程交給系統內核進行阻塞。
該隊列如圖:
與synchronized相同的是,這也是一個虛擬隊列,不存在隊列實例,僅存在節點之間的先後關係。使人疑惑的是爲何採用CLH隊列呢?原生的CLH隊列是用於自旋鎖,但Doug Lea把其改造爲阻塞鎖。
當有線程競爭鎖時,該線程會首先嚐試得到鎖,這對於那些已經在隊列中排隊的線程來講顯得不公平,這也是非公平鎖的由來,與synchronized實現相似,這樣會極大提升吞吐量。
若是已經存在Running線程,則新的競爭線程會被追加到隊尾,具體是採用基於CAS的Lock-Free算法,由於線程併發對Tail調用CAS可能會致使其餘線程CAS失敗,解決辦法是循環CAS直至成功。AbstractQueuedSynchronizer的實現很是精巧,使人歎爲觀止,不入細節難以徹底領會其精髓,下面詳細說明實現過程:
nonfairTryAcquire方法將是lock方法間接調用的第一個方法,每次請求鎖時都會首先調用該方法。
[java] view plain copy
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
該方法會首先判斷當前狀態,若是c==0說明沒有線程正在競爭該鎖,若是不c !=0 說明有線程正擁有了該鎖。
若是發現c==0,則經過CAS設置該狀態值爲acquires,acquires的初始調用值爲1,每次線程重入該鎖都會+1,每次unlock都會-1,但爲0時釋放鎖。若是CAS設置成功,則能夠預計其餘任何線程調用CAS都不會再成功,也就認爲當前線程獲得了該鎖,也做爲Running線程,很顯然這個Running線程並未進入等待隊列。
若是c !=0 但發現本身已經擁有鎖,只是簡單地++acquires,並修改status值,但由於沒有競爭,因此經過setStatus修改,而非CAS,也就是說這段代碼實現了偏向鎖的功能,而且實現的很是漂亮。
addWaiter方法負責把當前沒法得到鎖的線程包裝爲一個Node添加到隊尾:
[java] view plain copy
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
其中參數mode是獨佔鎖仍是共享鎖,默認爲null,獨佔鎖。追加到隊尾的動做分兩步:
若是當前隊尾已經存在(tail!=null),則使用CAS把當前線程更新爲Tail
若是當前Tail爲null或則線程調用CAS設置隊尾失敗,則經過enq方法繼續設置Tail
下面是enq方法:
[java] view plain copy
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
該方法就是循環調用CAS,即便有高併發的場景,無限循環將會最終成功把當前線程追加到隊尾(或設置隊頭)。總而言之,addWaiter的目的就是經過CAS把當前如今追加到隊尾,並返回包裝後的Node實例。
把線程要包裝爲Node對象的主要緣由,除了用Node構造供虛擬隊列外,還用Node包裝了各類線程狀態,這些狀態被精心設計爲一些數字值:
SIGNAL(-1) :線程的後繼線程正/已被阻塞,當該線程release或cancel時要從新這個後繼線程(unpark)
CANCELLED(1):由於超時或中斷,該線程已經被取消
CONDITION(-2):代表該線程被處於條件隊列,就是由於調用了Condition.await而被阻塞
PROPAGATE(-3):傳播共享鎖
0:0表明無狀態
acquireQueued的主要做用是把已經追加到隊列的線程節點(addWaiter方法返回值)進行阻塞,但阻塞前又經過tryAccquire重試是否能得到鎖,若是重試成功能則無需阻塞,直接返回
[java] view plain copy
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
仔細看看這個方法是個無限循環,感受若是p == head && tryAcquire(arg)條件不知足循環將永遠沒法結束,固然不會出現死循環,奧祕在於第12行的parkAndCheckInterrupt會把當前線程掛起,從而阻塞住線程的調用棧。
[java] view plain copy
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如前面所述,LockSupport.park最終把線程交給系統(Linux)內核進行阻塞。固然也不是立刻把請求不到鎖的線程進行阻塞,還要檢查該線程的狀態,好比若是該線程處於Cancel狀態則沒有必要,具體的檢查在shouldParkAfterFailedAcquire中:
[java] view plain copy
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
檢查原則在於:
規則1:若是前繼的節點狀態爲SIGNAL,代表當前節點須要unpark,則返回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)將致使線程阻塞
規則2:若是前繼節點狀態爲CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,返回false,acquireQueued方法的無限循環將遞歸調用該方法,直至規則1返回true,致使線程阻塞
規則3:若是前繼節點狀態爲非SIGNAL、非CANCELLED,則設置前繼的狀態爲SIGNAL,返回false後進入acquireQueued的無限循環,與規則2同
整體看來,shouldParkAfterFailedAcquire就是靠前繼節點判斷當前線程是否應該被阻塞,若是前繼節點處於CANCELLED狀態,則順便刪除這些節點從新構造隊列。
至此,鎖住線程的邏輯已經完成,下面討論解鎖的過程。
請求鎖不成功的線程會被掛起在acquireQueued方法的第12行,12行之後的代碼必須等線程被解鎖鎖才能執行,假如被阻塞的線程獲得解鎖,則執行第13行,即設置interrupted = true,以後又進入無限循環。
從無限循環的代碼能夠看出,並非獲得解鎖的線程必定能得到鎖,必須在第6行中調用tryAccquire從新競爭,由於鎖是非公平的,有可能被新加入的線程得到,從而致使剛被喚醒的線程再次被阻塞,這個細節充分體現了「非公平」的精髓。經過以後將要介紹的解鎖機制會看到,第一個被解鎖的線程就是Head,所以p == head的判斷基本都會成功。
至此能夠看到,把tryAcquire方法延遲到子類中實現的作法很是精妙並具備極強的可擴展性,使人歎爲觀止!固然精妙的不是這個Templae設計模式,而是Doug Lea對鎖結構的精心佈局。
解鎖代碼相對簡單,主要體如今AbstractQueuedSynchronizer.release和Sync.tryRelease方法中:
class AbstractQueuedSynchronizer
[java] view plain copy
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
class Sync
[java] view plain copy
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease與tryAcquire語義相同,把如何釋放的邏輯延遲到子類中。tryRelease語義很明確:若是線程屢次鎖定,則進行屢次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設置status爲0,由於無競爭因此沒有使用CAS。
release的語義在於:若是能夠釋放鎖,則喚醒隊列第一個線程(Head),具體喚醒代碼以下:
[java] view plain copy
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這段代碼的意思在於找出第一個能夠unpark的線程,通常說來head.next == head,Head就是第一個線程,但Head.next可能被取消或被置爲null,所以比較穩妥的辦法是從後往前找第一個可用線程。貌似回溯會致使性能下降,其實這個發生的概率很小,因此不會有性能影響。以後即是通知系統內核繼續該線程,在Linux下是經過pthread_mutex_unlock完成。以後,被解鎖的線程進入上面所說的從新競爭狀態。
AbstractQueuedSynchronizer經過構造一個基於阻塞的CLH隊列容納全部的阻塞線程,而對該隊列的操做均經過Lock-Free(CAS)操做,但對已經得到鎖的線程而言,ReentrantLock實現了偏向鎖的功能。
synchronized的底層也是一個基於CAS操做的等待隊列,但JVM實現的更精細,把等待隊列分爲ContentionList和EntryList,目的是爲了下降線程的出列速度;固然也實現了偏向鎖,從數據結構來講兩者設計沒有本質區別。但synchronized還實現了自旋鎖,並針對不一樣的系統和硬件體系進行了優化,而Lock則徹底依靠系統阻塞掛起等待線程。
固然Lock比synchronized更適合在應用層擴展,能夠繼承AbstractQueuedSynchronizer定義各類實現,好比實現讀寫鎖(ReadWriteLock),公平或不公平鎖;同時,Lock對應的Condition也比wait/notify要方便的多、靈活的多。