咱們在以前介紹了併發編程的鎖機制:synchronized和lock,lock接口的重要實現類是可重入鎖ReentrantLock
。而上一篇併發Lock之AQS(AbstractQueuedSynchronizer)詳解介紹了AQS,談到ReentrantLock,不得不談抽象類AbstractQueuedSynchronizer(AQS)。AQS定義了一套多線程訪問共享資源的同步器框架,ReentrantLock的實現依賴於該同步器。本文在介紹過AQS,結合其具體的實現類ReentrantLock
分析實現原理。java
ReentrantLock實現了Lock接口,內部有三個內部類,Sync、NonfairSync、FairSync,Sync是一個抽象類型,它繼承AbstractQueuedSynchronizer,這個AbstractQueuedSynchronizer是一個模板類,它實現了許多和鎖相關的功能,並提供了鉤子方法供用戶實現,好比tryAcquire,tryRelease等。Sync實現了AbstractQueuedSynchronizer的tryRelease方法。NonfairSync和FairSync兩個類繼承自Sync,實現了lock方法,公平搶佔和非公平搶佔針對tryAcquire有不一樣的實現。本文重點介紹ReentrantLock默認的實現,即非公平鎖的獲取鎖和釋放鎖的實現。node
public ReentrantLock() {
sync = new NonfairSync();
}
複製代碼
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
複製代碼
acquire(1)
實際上使用的是AbstractQueuedSynchronizer
的acquire方法,它是一套鎖搶佔的模板,整體原理是先去嘗試獲取鎖,若是沒有獲取成功,就在CLH隊列中增長一個當前線程的節點,表示等待搶佔。而後進入CLH隊列的搶佔模式,進入的時候也會去執行一次獲取鎖的操做,若是仍是獲取不到,就調用LockSupport.park將當前線程掛起。那麼當前線程何時會被喚醒呢?當持有鎖的那個線程調用unlock的時候,會將CLH隊列的頭節點的下一個節點上的線程喚醒,調用的是LockSupport.unpark方法。acquire代碼比較簡單,具體以下:public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
首先,在CLH鎖隊列尾部增長一個等待節點,這個節點保存了當前線程,經過調用addWaiter實現,這裏須要考慮初始化的狀況,在第一個等待節點進入的時候,須要初始化一個頭節點而後把當前節點加入到尾部,後續則直接在尾部加入節點就好了。編程
private Node addWaiter(Node mode) {
// 初始化一個節點,這個節點保存當前線程
Node node = new Node(Thread.currentThread(), mode);
// 當CLH隊列不爲空的視乎,直接在隊列尾部插入一個節點
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 當CLH隊列爲空的時候,調用enq方法初始化隊列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 初始化節點,頭尾都指向一個空節點
if (compareAndSetHead(new Node()))
tail = head;
} else {// 考慮併發初始化
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
首先,外層是一個無限for循環,若是當前節點是頭節點的下個節點,而且經過tryAcquire獲取到了鎖,說明頭節點已經釋放了鎖,當前線程是被頭節點那個線程喚醒的,這時候就能夠將當前節點設置成頭節點,而且將failed標記設置成false,而後返回。至於上一個節點,它的next變量被設置爲null,在下次GC的時候會清理掉。安全
若是本次循環沒有獲取到鎖,就進入線程掛起階段,也就是shouldParkAfterFailedAcquire這個方法。微信
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//
return true;
if (ws > 0) {
//
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
public void unlock() {
sync.release(1);
}
複製代碼
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製代碼
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
在ReetrantLock的tryLock(long timeout, TimeUnit unit)
提供了超時獲取鎖的功能。它的語義是在指定的時間內若是獲取到鎖就返回true,獲取不到則返回false。這種機制避免了線程無限期的等待鎖釋放。多線程
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
複製代碼
具體看一下內部類裏面的方法tryAcquireNanos
:併發
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
複製代碼
若是線程被中斷了,那麼直接拋出InterruptedException。若是未中斷,先嚐試獲取鎖,獲取成功就直接返回,獲取失敗則進入doAcquireNanos。tryAcquire咱們已經看過,這裏重點看一下doAcquireNanos作了什麼。框架
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 起始時間
long lastTime = System.nanoTime();
// 線程入隊
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 又是自旋!
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
// 若是前驅是頭節點而且佔用鎖成功,則將當前節點變成頭結點
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 若是已經超時,返回false
if (nanosTimeout <= 0)
return false;
// 超時時間未到,且須要掛起
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 阻塞當前線程直到超時時間到期
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
// 更新nanosTimeout
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
//相應中斷
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
doAcquireNanos的流程簡述爲:線程先入等待隊列,而後開始自旋,嘗試獲取鎖,獲取成功就返回,失敗則在隊列裏找一個安全點把本身掛起直到超時時間過時。這裏爲何還須要循環呢?由於當前線程節點的前驅狀態可能不是SIGNAL,那麼在當前這一輪循環中線程不會被掛起,而後更新超時時間,開始新一輪的嘗試。ui
ReentrantLock是可重入的鎖,其內部使用的就是獨佔模式的AQS。公平鎖和非公平鎖不一樣之處在於,公平鎖在獲取鎖的時候,不會先去檢查state狀態,而是直接執行aqcuire(1)。公平鎖多了hasQueuePredecessors這個方法,這個方法用於判斷CHL隊列中是否有節點,對於公平鎖,若是CHL隊列有節點,則新進入競爭的線程必定要在CHL上排隊,而非公平鎖則是無視CHL隊列中的節點,直接進行競爭搶佔,這就有可能致使CHL隊列上的節點永遠獲取不到鎖,這就是非公平鎖之因此不公平的緣由,這裏再也不贅述。this