Doug Lea 在 java.util.concurrent
(JUC)中提供一套基礎工具用於幫助開發者更加方便的開發併發程序,包括 Lock
、Semaphore
、CountDownLatch
、CyclicBarrier
等等,而實現這些類的實現都藉助了一個可以控制多個線程的併發訪問的工具,那就是 AbstractQueuedSynchronizer
(AQS)。java
AQS 的數據結構形式以下圖所示,其維護了一個 FIFO 的雙向隊列,嘗試獲取鎖的線程都以節點的形式存在於隊列中node
在對源碼分析以前,首先須要瞭解一些基礎的內容。編程
首先,鎖分爲兩種,獨佔鎖和共享鎖,顧名思義,獨佔鎖是指最多同時只能有一個線程獲取到鎖,而共享鎖則容許最多 n 個線程同時獲取到鎖。根據在獲取鎖的過程當中是否響應中斷請求,可分爲響應中斷和不響應中斷的請求。數據結構
其次,每一個節點都有其對應的狀態,初始狀態爲0。併發
// 等待超時或被中斷,取消獲取鎖
static final int CANCELLED = 1;
// 說明該節點的後續被掛起了,當釋放鎖或取消時,須要喚醒後繼節點
static final int SIGNAL = -1;
// 表示節點處於Condition隊列中
static final int CONDITION = -2;
// 用於共享式鎖,表示下一次嘗試獲取共享鎖時,須要無條件傳播下去
static final int PROPAGATE = -3;
複製代碼
爲了更好的理解源碼,我會經過在源碼的基礎上增長註釋的方式對源碼進行解釋(英文註釋爲源碼原本的註釋)。對於有方法調用的地方,能夠直接跳到對應方法的講解,按流程一步步理解,也能夠經過註釋瞭解整個方法的步驟,再細看以前調用的每一個方法。函數
/** * 不響應中斷的獨佔鎖獲取入口 * 其中 tryAcuquire() 方法爲獲取鎖的抽象方法,返回 true 表示獲取鎖成功,須要實現類根據獲取鎖的方式本身定義 */
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 若是 tryAcquire() 獲取鎖失敗,則經過 addWaiter() 加入到同步隊列中,再經過 acquireQueued() 不斷嘗試獲取鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//因爲不響應中斷,若是檢測到中斷,acquireQueued() 會返回 true,進入方法體
// 因爲檢測時使用了 Thread.interrupted(),中斷標誌被重置,須要恢復中斷標誌
selfInterrupt();
}
/** * 將線程信息包裝成一個 Node 加入到同步隊列的隊尾中 */
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 嘗試經過一次 CAS 將節點加入到隊尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 走到這裏說明要麼有競爭 CAS 失敗,要麼同步器隊列還沒初始化即 pred == null
enq(node);
return node;
}
private Node enq(final Node node) {
// 無限循環 CAS 直到將節點加入到隊尾中
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/** * 因獲取鎖失敗而加入同步隊列中的線程在這裏不斷嘗試獲取鎖 * 返回中斷狀態交由上層函數處理 */
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 因爲這是一個先進先出的隊列,只有當本身的前驅是頭結點(頭結點表示已經獲取到鎖)時,才能輪到本身來爭奪鎖
if (p == head && tryAcquire(arg)) {
// tryAcquire() 返回 true 說明獲取鎖成功
// 將 nod e節點設置爲 head,此外 setHead() 是不須要 CAS 的,由於不會有競爭
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 獲取失敗後,查看是否須要被掛起,若是須要掛起,檢查是否有中斷信息
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 複習一下,SIGNAL 說明該節點的後續被掛起了,當釋放鎖或取消時,須要喚醒後繼節點
// 若是前驅節點已是 SIGNAL 狀態了 說明當前線程能夠安心被掛起了,等待前驅來喚醒本身
if (ws == Node.SIGNAL)
return true;
// ws > 0 說明前驅節點被取消了(CANCELLED == 1),須要跳過被取消的節點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 將前驅節點經過CAS改成 SIGNAL 狀態,但最後仍是會返回 false
// 若是在下一次循環中若是仍是沒拿到鎖,則會進入該方法第一個判斷,返回true,
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 掛起線程
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
/** * 方法入口 */
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/** * 和不響應中斷的獲取方法惟一不一樣的是,在檢測到中斷後是拋出中斷異常而不是返回true,其餘沒有區別 */
private void doAcquireInterruptibly(int arg) //... if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();
// ...
}
複製代碼
/** * 方法入口 */
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/** * 基本上和以前的差很少,若是超時了就直接返回 false,掛起線程時也使用了帶計時的 parkNanos */
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 若是超時了 返回false
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 注意這裏 nanosTimeout > spinForTimeoutThreshold(默認1000納秒)時才掛起,小於這個閾值時直接自旋,再也不掛起
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
/** * 和加鎖同樣,這裏的 tryRelease() 也是抽象方法,須要子類本身實現 * 實際工做就是喚醒後繼節點而已,出隊的操做也是在獲取鎖的時候由後繼結點完成的 */
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 若是 h.waitStatus == 0 ,說明不是 SIGNAL 狀態,沒有須要喚醒的節點,直接返回
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 若是後繼節點已經取消了,那麼從新調整後繼直到沒有取消的爲止
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 若是有未取消的後繼,喚醒他
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
在實現上,共享鎖和獨佔鎖在實現上的核心區別在於:工具
隊列中的線程節點嘗試獲取鎖資源,若是成功則喚醒後面還在等待的共享節點並把該喚醒事件傳遞下去,即會依次喚醒在該節點後面的全部共享節點。oop
/** * 方法入口,tryAcquireShared爲抽象方法: * 返回小於0表示獲取失敗 * 等於0表示當前線程獲取到鎖,但後續線程獲取不到,即不須要傳播後續節點 * 大於0表示後續線程也能獲取到,須要傳播後續節點 */
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// >=0表示獲取鎖成功
if (r >= 0) {
// 這裏和獨佔模型不一樣,除了設置頭結點後還須要向後傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// 若是propagate > 0 或者 h.waitStatus < 0(PROPAGATE) 須要喚醒後繼節點
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 若是後繼結點是獨佔結點,就不喚醒了
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 隊列裏至少有2個節點,不然沒有傳播必要
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 和共享鎖不一樣的是,這個方法能夠在setHeadAndPropagate和releaseShared兩個方法中被調用
// 存在一個線程正獲取完鎖向後傳播,另外一個線程釋放鎖的狀況,因此須要CAS控制
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// ws == 0 代表是隊列的最後一個節點,那麼CAS爲PROPAGATE,代表下一次tryShared時,須要傳播
// 若是失敗說明有新後繼節點將其改成了SIGNAL後掛起了,那麼繼續循環傳播
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 若是head改變了,說明有新的排隊的線程獲取到了鎖,再次檢查
if (h == head) // loop if head changed
break;
}
}
複製代碼
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 釋放成功後,日後傳播
doReleaseShared();
return true;
}
return false;
}
複製代碼
Condition 提供了線程之間的通訊機制,和 synchronize 中的 wait() 和 notify() 的做用是同樣的,而且同一個鎖能夠有多個 condition。源碼分析
Condition是一個接口,實際上 lock.newCondition() 返回的是 AQS 的內部類 ConditionObject。其核心的兩個方法就是 await() 和 signal()。ui
當調用await()時,線程加入到等待隊列中等待,和同步隊列類似,也是一個FIFO的隊列,但雖然用的數據結構相同,等待隊列只用了單向的功能。其維護的數據結構圖以下所示:
/** * 將當前線程信息包裝加入等待隊列中並掛起線程等待喚醒 * 因爲能調用 await() 的線程必定是獲取到鎖的,因此下面的操做都不須要額外的CAS操做來處理線程競爭 */
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入到等待隊列中
Node node = addConditionWaiter();
// 釋放鎖,fullyRelease() 調用的是獨佔鎖的釋放方法realse(state),即一次釋放全部的重入鎖, state記錄了重入的次數
int savedState = fullyRelease(node);
int interruptMode = 0;
// 只要尚未被 signal() 給加入到同步隊列,就掛起,除非被中斷
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 若是被中斷了 跳出循環,返回 0 或 THROW_IE 或 REINTERRUPT
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 到這裏爲止,無論怎麼出的循環,都已經被加入同步隊列了(要麼被 signal() 加入,要麼在中斷檢測方法中加入)
// ----------------------------------------------------------
// 別忘了 acquireQueued() 返回的獲取鎖的過程當中是否被中斷了
// 若是在獲取鎖的過程當中被中斷了,而且以前的 interruptMode != THROW_IE,那麼也視爲在 signal() 以後被中斷,設爲REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 只有在 signal() 前中斷的線程還會在等待隊列中留有節點,纔會知足這個條件
if (node.nextWaiter != null)
// 將狀態不是CONDITION的節點從隊列中刪除
unlinkCancelledWaiters();
if (interruptMode != 0)
// 拋出異常 或 重置中斷標識位
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 若是最後一個等待隊列被取消了,清除出去
if (t != null && t.waitStatus != Node.CONDITION) {
// 這個方法就是從頭至尾遍歷一遍鏈表將狀態不爲 CONDITION 的節點刪除等待隊列,代碼略
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/** * 若是沒有被中斷,返回 0 * 若是在被signal以前中斷了,返回 THROW_IE,表示須要拋出異常 * 若是在signal以後中斷了,返回 REINTERRUPT,表示不拋出,只恢復中斷位 */
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 若是CAS成功了,說明尚未被signal加入同步隊列
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 因爲沒有signal,這裏須要加入同步隊列,才能以後爭奪鎖
enq(node);
return true;
}
/* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */
// 說明已經被signal了,防止還沒被加入到同步隊列的狀況
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/** * 從等待隊列中找到第一個線程喚醒 */
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 出隊
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/** * 清空等待隊列, 將等待的節點按順序加入到同步隊列中 */
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
// 若是CAS失敗,說明被cancell了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 將節點加入到同步隊列中,注意enq()會返回node的前驅節點p
Node p = enq(node);
int ws = p.waitStatus;
// 由於此時節點仍是掛起的,按照同步隊列的結構,須要將前驅結點的狀態改成SIGNAL
// 若是前驅被取消了,或者CAS前驅狀態爲SIGNAL失敗了,那麼就喚醒線程,讓其本身走去獲取鎖的步驟,雖然線程可能會被再次掛起,但這是無害的操做
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
複製代碼
文章中的圖片來源於《Java併發編程的藝術》