AQS ,AbstractQueuedSynchronizer ,即隊列同步器。它是構建鎖或者其餘同步組件的基礎框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等),J.U.C 併發包的做者(Doug Lea)指望它可以成爲實現大部分同步需求的基礎。java
它是 J.U.C 併發包中的核心基礎組件。node
AQS 解決了在實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO 同步隊列。設計模式
基於 AQS 來構建同步器能夠帶來不少好處。它不只可以極大地減小實現工做,並且也沒必要處理在多個位置上發生的競爭問題。安全
在基於 AQS 構建的同步器中,只能在一個時刻發生阻塞,從而下降上下文切換的開銷,提升了吞吐量。同時在設計 AQS 時充分考慮了可伸縮性,所以 J.U.C 中,全部基於 AQS 構建的同步器都可以得到這個優點。多線程
AQS 的主要使用方式是繼承,子類經過繼承同步器,並實現它的抽象方法來管理同步狀態。併發
AQS 使用一個 int 類型的成員變量 state 來表示同步狀態:app
當 state > 0 時,表示已經獲取了鎖。
當 state = 0 時,表示釋放了鎖。
複製代碼
它提供了三個方法,來對同步狀態 state 進行操做,而且 AQS 能夠確保對 state 的操做是安全的:框架
#getState()
#setState(int newState)
#compareAndSetState(int expect, int update)
複製代碼
AQS 經過內置的 FIFO 同步隊列來完成資源獲取線程的排隊工做:工具
若是當前線程獲取同步狀態失敗(鎖)時,AQS 則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程 當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。oop
AQS 主要提供了以下方法:
從上面的方法看下來,基本上能夠分紅 3 類:
獨佔式獲取與釋放同步狀態
共享式獲取與釋放同步狀態
查詢同步隊列中的等待線程狀況
複製代碼
CLH 同步隊列是一個 FIFO 雙向隊列,AQS 依賴它來完成同步狀態的管理:
當前線程若是獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構形成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程
當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。
Node 是 AbstractQueuedSynchronizer 的內部靜態類。
static final class Node {
// 共享
static final Node SHARED = new Node();
// 獨佔
static final Node EXCLUSIVE = null;
/** * 由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態 */
static final int CANCELLED = 1;
/** * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行 */
static final int SIGNAL = -1;
/** * 節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 */
static final int CONDITION = -2;
/** * 表示下一次共享式同步狀態獲取,將會無條件地傳播下去 */
static final int PROPAGATE = -3;
/** 等待狀態 */
volatile int waitStatus;
/** 前驅節點,當節點添加到同步隊列時被設置(尾部添加) */
volatile Node prev;
/** 後繼節點 */
volatile Node next;
/** 等待隊列中的後續節點。若是當前節點是共享的,那麼字段將是一個 SHARED 常量,也就是說節點類型(獨佔和共享)和等待隊列中的後續節點共用同一個字段 */
Node nextWaiter;
/** 獲取同步狀態的線程 */
volatile Thread thread;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
複製代碼
等待狀態,用來控制線程的阻塞和喚醒,而且能夠避免沒必要要的調用LockSupport的 #park(...) 和 #unpark(...) 方法。。目前有 4 種:CANCELLED SIGNAL CONDITION PROPAGATE 。實際上,有第 5 種,INITAL ,值爲 0 ,初始狀態。每一個等待狀態表明的含義,它不只僅指的是 Node 本身的線程的等待狀態,也能夠是下一個節點的線程的等待狀態。
head 和 tail 字段,是 AbstractQueuedSynchronizer 的字段,分別指向同步隊列的頭和尾。再配合上 prev 和 next 字段,快速定位到同步隊列的頭尾。
prev 和 next 字段,分別指向 Node 節點的前一個和後一個 Node 節點,從而實現鏈式雙向隊列。
thread 字段,Node 節點對應的線程 Thread 。
nextWaiter 字段,Node 節點獲取同步狀態的模型( Mode )。#tryAcquire(int args) 和 #tryAcquireShared(int args) 方法,分別是獨佔式和共享式獲取同步狀態。在獲取失敗時,它們都會調用 #addWaiter(Node mode) 方法入隊。而 nextWaiter 就是用來表示是哪一種模式:
SHARED 靜態 + 不可變字段,枚舉共享模式。
EXCLUSIVE 靜態 + 不可變字段,枚舉獨佔模式。
#isShared() 方法,判斷是否爲共享式獲取同步狀態。
複製代碼
#predecessor() 方法,得到 Node 節點的前一個 Node 節點。在方法的內部,Node p = prev 的本地拷貝,是爲了不併髮狀況下,prev 判斷完 == null 時,剛好被修改,從而保證線程安全。
構造方法有 3 個,分別是:
#Node() 方法:用於 SHARED 的建立。
#Node(Thread thread, Node mode) 方法:用於 #addWaiter(Node mode) 方法。
從 mode 方法參數中,咱們也能夠看出它表明獲取同步狀態的模式。
#Node(Thread thread, int waitStatus) 方法,用於 #addConditionWaiter() 方法。
複製代碼
CLH 隊列入列很簡單: tail 指向新節點。 新節點的 prev 指向當前最後的節點。 當前最後一個節點的 next 指向當前節點。
可是,實際上,入隊邏輯實現的 #addWaiter(Node) 方法,須要考慮併發的狀況。它經過 CAS 的方式,來保證正確的添加 Node 。代碼以下:
private Node addWaiter(Node mode) {
// 新建節點
Node node = new Node(Thread.currentThread(), mode);
// 記錄原尾節點
Node pred = tail;
// 快速嘗試,添加新節點爲尾節點
//當原尾節點非空,才執行快速嘗試的邏輯. 在下面的 #enq(Node node) 方法中,咱們會看到,首節點未初始化的時,head 和 tail 都爲空。
if (pred != null) {
// 設置新 Node 節點的尾節點爲原尾節點
node.prev = pred;
// CAS 設置新的尾節點
if (compareAndSetTail(pred, node)) {
// 成功,原尾節點的下一個節點爲新節點
pred.next = node;
return node;
}
}
// 失敗,屢次嘗試,直到成功
enq(node);
return node;
}
複製代碼
調用 #enq(Node node) 方法,屢次嘗試,直到成功添加
private Node enq(final Node node) {
// 屢次嘗試,直到成功爲止
for (;;) {
// 記錄原尾節點
Node t = tail;
// 原尾節點不存在,建立首尾節點都爲 new Node()
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
// 原尾節點存在,添加新節點爲尾節點
} else {
//設置爲尾節點
node.prev = t;
// CAS 設置新的尾節點
if (compareAndSetTail(t, node)) {
// 成功,原尾節點的下一個節點爲新節點
t.next = node;
return t;
}
}
}
}
複製代碼
CLH 同步隊列遵循 FIFO,首節點的線程釋放同步狀態後,將會喚醒它的下一個節點(Node.next)。然後繼節點將會在獲取同步狀態成功時,將本身設置爲首節點( head )。
這個過程很是簡單,head 執行該節點並斷開原首節點的 next 和當前節點的 prev 便可。注意,在這個過程是不須要使用 CAS 來保證的,由於只有一個線程,可以成功獲取到同步狀態。
setHead(Node node) 方法,實現上述的出列邏輯。代碼以下:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
複製代碼
AQS 的設計模式採用的模板方法模式,子類經過繼承的方式,實現它的抽象方法來管理同步狀態。
對於子類而言,它並無太多的活要作,AQS 已經提供了大量的模板方法來實現同步,主要是分爲三類:
獨佔式獲取和釋放同步狀態
共享式獲取和釋放同步狀態
查詢同步隊列中的等待線程狀況。
複製代碼
自定義子類使用 AQS 提供的模板方法,就能夠實現本身的同步語義。
獨佔式,同一時刻,僅有一個線程持有同步狀態。
acquire(int arg) 方法,爲 AQS 提供的模板方法。該方法爲獨佔式獲取同步狀態,可是該方法對中斷不敏感。也就是說,因爲線程獲取同步狀態失敗而加入到 CLH 同步隊列中,後續對該線程進行中斷操做時,線程不會從 CLH 同步隊列中移除。代碼以下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
調用 #tryAcquire(int arg) 方法,去嘗試獲取同步狀態,獲取成功則設置鎖狀態並返回 true ,不然獲取失敗,返回 false 。
若tryAcquire獲取成功,則acquire(int arg) 方法直接返回,不用線程阻塞
若 tryAcquire 獲取失敗調用 addWaiter(Node mode) 方法,將當前線程加入到 CLH 同步隊列尾部,而且, mode 方法參數爲 Node.EXCLUSIVE ,表示獨佔模式。而後調用 boolean #acquireQueued(Node node, int arg) 方法,自旋直到得到同步狀態成功。
另外,該 acquireQueued 方法的返回值類型爲 boolean ,當返回 true 時,表示在這個過程當中,發生過線程中斷。可是呢,這個方法又會清理線程中斷的標識,因此在種狀況下,須要調用 #selfInterrupt() 方法,恢復線程中斷的標識,代碼以下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
複製代碼
tryAcquire(int arg)方法,須要自定義同步組件本身實現,該方法必需要保證線程安全的獲取同步狀態。AQS裏代碼以下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
直接拋出 UnsupportedOperationException 異常。
boolean #acquireQueued(Node node, int arg) 方法,爲一個自旋的過程,也就是說,當前線程(Node)進入同步隊列後,就會進入一個自旋的過程,每一個節點都會自省地觀察,當條件知足,獲取到同步狀態後,就能夠從這個自旋過程當中退出,不然會一直執行下去。
流程圖以下:
代碼以下:
final boolean acquireQueued(final Node node, int arg) {
// 記錄是否獲取同步狀態成功
boolean failed = true;
try {
// 記錄過程當中,是否發生線程中斷
boolean interrupted = false;
/* * 自旋過程,其實就是一個死循環而已 */
for (;;) {
// 當前線程的前驅節點
final Node p = node.predecessor();
// 當前線程的前驅節點是頭結點,且同步狀態成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 獲取失敗,線程等待--具體後面介紹
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 獲取同步狀態發生異常,取消獲取。
if (failed)
cancelAcquire(node);
}
}
複製代碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 得到前一個節點的等待狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // Node.SIGNAL
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
if (ws > 0) { // Node.CANCEL
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 0 或者 Node.PROPAGATE
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
整個過程以下圖:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
複製代碼
若傳入參數 node 爲空。
將節點的等待線程置空。
得到 node 節點的前一個節點 pred 。
得到 pred 的下一個節點 predNext 。predNext 從表面上看,和 node 是等價的。 可是實際上,存在多線程併發的狀況,因此咱們調用 #compareAndSetNext(...) 方法,使用 CAS 的方式,設置 pred 的下一個節點。 若是設置失敗,說明當前線程和其它線程競爭失敗,不須要作其它邏輯,由於 pred 的下一個節點已經被其它線程設置成功。
設置 node 節點的爲取消的等待狀態 Node.CANCELLED 。 這裏可使用直接寫,而不是 CAS 。 在這個操做以後,其它 Node 節點能夠忽略 node 。 Before, we are free of interference from other threads. 如何理解。
下面開始開始修改 pred 的新的下一個節點,一共分紅三種狀況。
AQS 提供了acquire(int arg) 方法,以供獨佔式獲取同步狀態,可是該方法對中斷不響應,對線程進行中斷操做後,該線程會依然位於CLH同步隊列中,等待着獲取同步狀態。
爲了響應中斷,AQS 提供了 #acquireInterruptibly(int arg) 方法。該方法在等待獲取同步狀態時,若是當前線程被中斷了,會馬上響應中斷,並拋出 InterruptedException 異常。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
複製代碼
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // <1>
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
它與 #acquire(int arg) 方法僅有兩個差異:
方法聲明拋出 InterruptedException 異常。
在中斷方法處再也不是使用 interrupted 標誌,而是直接拋出 InterruptedException 異常。
AQS 除了提供上面兩個方法外,還提供了一個加強版的方法 #tryAcquireNanos(int arg, long nanos) 。該方法爲 #acquireInterruptibly(int arg) 方法的進一步加強,它除了響應中斷外,還有超時控制。即若是當前線程沒有在指定時間內獲取同步狀態,則會返回 false ,不然返回 true 。
流程圖以下:
代碼以下:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
複製代碼
static final long spinForTimeoutThreshold = 1000L;
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// nanosTimeout <= 0
if (nanosTimeout <= 0L)
return false;
// 超時時間
final long deadline = System.nanoTime() + nanosTimeout;
// 新增 Node 節點
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 自旋
for (;;) {
final Node p = node.predecessor();
// 獲取同步狀態成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
/* * 獲取失敗,作超時、中斷判斷 */
// 從新計算須要休眠的時間
nanosTimeout = deadline - System.nanoTime();
// 已經超時,返回false
if (nanosTimeout <= 0L)
return false;
// 若是沒有超時,則等待nanosTimeout納秒
// 注:該線程會直接從LockSupport.parkNanos中返回,
// LockSupport 爲 J.U.C 提供的一個阻塞和喚醒的工具類,後面作詳細介紹
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 線程是否已經中斷了
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
由於是在 #doAcquireInterruptibly(int arg) 方法的基礎上,作了超時控制的加強,因此相同部分,咱們直接跳過。
當線程獲取同步狀態後,執行完相應邏輯後,就須要釋放同步狀態。AQS 提供了#release(int arg)方法,釋放同步狀態。代碼以下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
調用 #tryRelease(int arg) 方法,去嘗試釋放同步狀態,釋放成功則設置鎖狀態並返回 true ,不然獲取失敗,返回 false 。
tryRelease(int arg) 方法,須要自定義同步組件本身實現,該方法必需要保證線程安全的釋放同步狀態。代碼以下:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
直接拋出 UnsupportedOperationException 異常。 3. 得到當前的 head ,避免併發問題。
頭結點不爲空,而且頭結點狀態不爲 0 ( INITAL 未初始化)。爲何會出現 0 的狀況呢?
調用 #unparkSuccessor(Node node) 方法,喚醒下一個節點的線程等待。
在 AQS 中維護着一個 FIFO 的同步隊列。
當線程獲取同步狀態失敗後,則會加入到這個 CLH 同步隊列的對尾,並一直保持着自旋。
在 CLH 同步隊列中的線程在自旋時,會判斷其前驅節點是否爲首節點,若是爲首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步隊列。
當線程執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。
共享式與獨佔式的最主要區別在於,同一時刻:
獨佔式只能有一個線程獲取同步狀態。
共享式能夠有多個線程獲取同步狀態。
複製代碼
例如,讀操做能夠有多個線程同時進行,而寫操做同一時刻只能有一個線程進行寫操做,其餘操做都會被阻塞。例子爲 ReentrantReadWriteLock 。
acquireShared(int arg) 方法,對標 #acquire(int arg) 方法。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
複製代碼
調用 #tryAcquireShared(int arg) 方法,嘗試獲取同步狀態,獲取成功則設置鎖狀態並返回大於等於 0 ,不然獲取失敗,返回小於 0 。
若獲取成功,直接返回,不用線程阻塞,獲取失敗則自旋直到得到同步狀態成功。
須要自定義同步組件本身實現,該方法必需要保證線程安全的獲取同步狀態。代碼以下:
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
直接拋出 UnsupportedOperationException 異常。
private void doAcquireShared(int arg) {
// 共享式節點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 前驅節點
final Node p = node.predecessor();
// 若是其前驅節點,獲取同步狀態
if (p == head) {
// 嘗試獲取同步
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
由於和 #acquireQueued(int arg) 方法的基礎上,因此相同部分,直接跳過。
調用 #addWaiter(Node mode) 方法,將當前線程加入到 CLH 同步隊列尾部。而且, mode 方法參數爲 Node.SHARED ,表示共享模式。
調用 #tryAcquireShared(int arg) 方法,嘗試得到同步狀態。
調用 #setHeadAndPropagate(Node node, int propagate) 方法,設置新的首節點,並根據條件,喚醒下一個節點。這裏和獨佔式同步狀態獲取很大的不一樣:經過這樣的方式,不斷喚醒下一個共享式同步狀態, 從而實現同步狀態被多個線程的共享獲取。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
複製代碼
代碼以下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
代碼以下:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
當線程獲取同步狀態後,執行完相應邏輯後,就須要釋放同步狀態。AQS 提供了#releaseShared(int arg)方法,釋放同步狀態。代碼以下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
複製代碼
調用 #tryReleaseShared(int arg) 方法,去嘗試釋放同步狀態,釋放成功則設置鎖狀態並返回 true ,不然獲取失敗,返回 false 。調用 #doReleaseShared() 方法,喚醒後續的共享式獲取同步狀態的節點。
須要自定義同步組件本身實現,該方法必需要保證線程安全的釋放同步狀態。代碼以下:
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
直接拋出 UnsupportedOperationException 異常。
private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
複製代碼
在線程獲取同步狀態時,若是獲取失敗,則加入 CLH 同步隊列,經過經過自旋的方式不斷獲取同步狀態,可是在自旋的過程當中,則須要判斷當前線程是否須要阻塞,其主要方法在acquireQueued(int arg) ,代碼以下:
// ... 省略前面無關代碼
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
// ... 省略前面無關代碼
複製代碼
經過這段代碼咱們能夠看到,在獲取同步狀態失敗後,線程並非立馬進行阻塞,須要檢查該線程的狀態,檢查狀態的方法爲 #shouldParkAfterFailedAcquire(Node pred, Node node)方法,該方法主要靠前驅節點判斷當前線程是否應該被阻塞。
若是 #shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回 true ,則調用parkAndCheckInterrupt() 方法,阻塞當前線程。代碼以下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
開始,調用 LockSupport#park(Object blocker) 方法,將當前線程掛起,此時就進入阻塞等待喚醒的狀態。
而後,在線程被喚醒時,調用 Thread#interrupted()方法,返回當前線程是否被打斷,並清理打斷狀態。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
private native boolean isInterrupted(boolean ClearInterrupted);
複製代碼
因此,實際上,線程被喚醒有兩種狀況:
第一種,當前節點(線程)的前序節點釋放同步狀態時,喚醒了該線程 。
第二種,當前線程被打斷致使喚醒。
複製代碼
當線程釋放同步狀態後,則須要喚醒該線程的後繼節點。代碼以下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 喚醒後繼節點
return true;
}
return false;
}
複製代碼
調用 unparkSuccessor(Node node) 方法,喚醒後繼節點:
private void unparkSuccessor(Node node) {
//當前節點狀態
int ws = node.waitStatus;
//當前狀態 < 0 則設置爲 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//當前節點的後繼節點
Node s = node.next;
//後繼節點爲null或者其狀態 > 0 (超時或者被中斷了)
if (s == null || s.waitStatus > 0) {
s = null;
//從tail節點來找可用節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//喚醒後繼節點
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
可能會存在當前線程的後繼節點爲 null,例如:超時、被中斷的狀況。若是遇到這種狀況了,則須要跳過該節點。
可是,爲什麼是從 tail 尾節點開始,而不是從 node.next 開始呢?緣由在於,取消的 node.next.next 指向的是 node.next 本身。若是順序遍歷下去,會致使死循環。因此此時,只能採用 tail 回溯的辦法,找到第一個( 不是最新找到的,而是最前序的 )可用的線程。
可是,爲何取消的 node.next.next 指向的是 node.next 本身呢?在 #cancelAcquire(Node node) 的末尾,node.next = node; 代碼塊,取消的 node 節點,將其 next 指向了本身。 最後,調用 LockSupport的unpark(Thread thread) 方法,喚醒該線程。
LockSupport 是用來建立鎖和其餘同步類的基本線程阻塞原語。
每一個使用 LockSupport 的線程都會與一個許可與之關聯:
若是該許可可用,而且可在進程中使用,則調用 #park(...) 將會當即返回,不然可能阻塞。
若是許可尚不可用,則能夠調用 #unpark(...) 使其可用。
可是,注意許可不可重入,也就是說只能調用一次 park(...) 方法,不然會一直阻塞。
LockSupport 定義了一系列以 park 開頭的方法來阻塞當前線程,unpark(Thread thread) 方法來喚醒一個被阻塞的線程。
複製代碼
以下圖所示:
方法的blocker參數,主要是用來標識當前線程在等待的對象,該對象主要用於問題排查和系統監控。
park 方法和 unpark(Thread thread) 方法,都是成對出現的。同時 unpark(Thread thread) 方法,必需要在 park 方法執行以後執行。固然,並非說沒有調用 unpark(Thread thread) 方法的線程就會一直阻塞
park 有一個方法,它是帶了時間戳的 #parkNanos(long nanos) 方法:爲了線程調度禁用當前線程,最多等待指定的等待時間,除非許可可用。
public static void park() {
UNSAFE.park(false, 0L);
}
複製代碼
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
複製代碼
從上面能夠看出,其內部的實現都是經過 sun.misc.Unsafe 來實現的,其定義以下:
// UNSAFE.java
public native void park(boolean var1, long var2);
public native void unpark(Object var1);
複製代碼