AQS
是什麼,相信你們都不陌生這個題目,那麼AQS
究竟是什麼呢? AQS的全稱是 Abstract Queued Synchronizer, 從字面意思理解也就是 抽象隊列同步器 ,實際上AQS
確實就是排隊同步隊列 , 也是一個抽象類,須要 自定義 同步隊列中 可執行權 的 獲取和釋放中的邏輯(從新定義獲取和釋放語義),也就是重寫 tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
等方法,固然也能夠 自定義方法 來經過調用 AQS
提供的 判斷方法進行邏輯判斷。在 JDK9
以前 AQS
是依賴於 CAS
的,其底層是經過 Unsafe
的compareAndSwap*
方法實現同步更改,在以後則是使用 VarHandle
, 也替代了 Unsafe
。說白了 AQS 利用 VarHandle 保證操做的原子性。java
大白話就能夠理解爲: 表示某件事情同一時間點僅有一人能夠進行操做,若有多人則須要排隊等待, 等到當前操做人完成後通知下一我的。node
在源碼中 AbstractQueuedSynchronizer
繼承了 AbstractOwnableSynchronizer
, 同時也就繼承了 exclusiveOwnerThread
屬性,也就是 獨佔模式同步器的擁有者 ** , 也就意味着該線程是當前正在執行的線程**。git
在 AQS
中有幾個重點方法,分別是: acquire
acquireInterruptibly
tryAcquireNanos
release
acquireShared
acquireSharedInterruptibly
tryAcquireSharedNanos
releaseShared
下面逐一分析。github
在分析源代碼以前,先來看一張圖來了解一下 AQS排隊同步隊列
和 Node
節點中的 waitStatus
狀態 。併發
waitStatus
狀態都分爲是什麼app
等待超時
或者 被中斷
,須要從同步隊列中剔除,節點進入該狀態之後不會再發生變化了。官方解釋就是 Acquires in exclusive mode, ignoring interrupts 獲取獨佔模式並忽略interrupt(中斷), 翻譯成大白話就是就能夠理解爲獲取獨佔模式, 看一下源碼oop
public final void acquire(int arg) {
// 判斷線程是否有可繼續執行的權限, 若是沒有則建立node 加入到隊列中
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
在 acquire
方法中分別調用了 tryAcquire
acquireQueued
和 addWaiter
方法,其中 tryAcquire
方法是須要自定義(重寫) 獲取、 執行權限 的邏輯,這裏咱們以 AbstractQueuedSynchronizer
的實現 ReentrantLock
爲例,簡單分析一下,先看 tryAcquire
方法源碼分析
protected final boolean tryAcquire(int acquires) {
// 獲取當前線程
final Thread current = Thread.currentThread();
// 獲取當前線程的重入次數 若是是 0 則表明第一次
int c = getState();
if (c == 0) {
// 判斷是否存在隊列 && 能夠獲取到可執行權
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
// 設置獨佔模式同步器的擁有者 也就是是哪一個線程持有
setExclusiveOwnerThread(current);
return true;
}
}
// 若是進入線程是 持有可執行權的線程 則作重入 + 1 操做
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
tryAcquire
方法核心代碼就是 判斷執行權限 ,這裏就不具體分析了,會在下一篇文章中進行ReentrantLock的源碼分析,接下來重點看 acquireQueued
和 addWaiter
方法。ui
private Node addWaiter(Node mode) {
// 經過構造方法 新建 Node節點, 根據入參mode指定了 Node的模式,共享或獨佔
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
// 若是 tail 不是 null 則 設置 新建Node的前驅節點(prev) 指向 tail節點 反之 初始化同步隊列
if (oldTail != null) {
// 設置 新建Node的前驅節點(prev) 指向 tail節點
node.setPrevRelaxed(oldTail);
// 從新設置 tail 節點 指向 新建的Node節點
// 白話就是 隊列中的最後一個節點 == tail節點
if (compareAndSetTail(oldTail, node)) {
// 設置 未更改時的 tail節點 中 next 節點, 指向 新建Node節點
oldTail.next = node;
return node;
}
} else {
// 初始化同步隊列器
initializeSyncQueue();
}
}
}
複製代碼
// 若是未存在同步隊列 則初始化同步隊列
private final void initializeSyncQueue() {
Node h;
// 設置 AQS head節點 爲一個新建節點
if (HEAD.compareAndSet(this, null, (h = new Node())))
// 賦值操做
tail = h;
}
複製代碼
在 addWaiter
和 initializeSyncQueue
方法中,核心就是新建 Node 節點並經過 acquireQueued
方法將節點加入到 AQS
中,接下來分析一下 addWaiter
具體作了什麼this
經過構造方法建立新的Node節點,並經過入參 mode
指定Node節點的模式,共享或獨佔。固然這裏是設置的獨佔模式。
循環操做 新建Node節點並將 新建節點 和 tail
節點創建關係。首先判斷 tail
是不是null,若是是則 步驟3
,反之 步驟4
若是 tail
節點不爲null, 首先將新建的 Node節點
中的 前驅節(prev)
點設置爲當前的 tail
節點,而後經過 VarHandle
將 AQS
的 tail
節點改成 新建的Node
節點,若是修改爲功則將上一步 未更改時的 tail
節點 (也就是代碼中的oldTail) 中的 next
指向 新建的Node節點
,反之則可能由於併發操做致使 tial
節點已經被其餘線程變動,須要再次循環操做直至成功。
若是 tial
節點是null, 則須要實例化同步對列,也就是 AQS
, 經過調用 initializeSyncQueue
進行初始化操做,經過 VarHandle
設置 AQS
的 head
指向一個新建節點 (new Node) , 而後將 head
節點的引用賦值給 tail
節點。這裏注意一下,是將 head
節點的 引用
賦值給 tail
節點, 也就是這時候 head
節點 和 tail
節點是同時指向一塊內存地址 , 這裏的用意就是在新建隊列的時候, head
節點和新建節點的 prev
節點要保持是同一個引用 ,由於在後續的判斷中, 獲取可執行權的條件就是 AQS
的 head
節點是否等於當前節點的 prev
節點。
由於 addWaiter
方法中是一個循環,在 建立隊列後 須要將隊列新建的Node節點作關聯,因此還須要在執行一次 步驟3
addWaiter
方法分析完後,再來看一下 acquireQueued
方法
final boolean acquireQueued(final Node node, int arg) {
// 線程中斷狀態
boolean interrupted = false;
try {
for (;;) {
// 獲取經過 addWaiter 建立的Node方法
final Node p = node.predecessor();
// 判斷 新建的Node節點是否等於head節點 && 能夠獲取到 可執行權
if (p == head && tryAcquire(arg)) {
// 設置 head 節點爲 當前線程的新建的Node節點,也就是線程被喚醒後並獲取到了可執行權,則將head
// 節點設置爲當前線程建立的Node節點,能夠保證head節點永遠均可以和後續節點有關聯關係
setHead(node);
// 設置 next
p.next = null; // help GC
// 返回
return interrupted;
}
// 判斷Node節點的線程是否符合被 wait ,在這裏用的是 park
if (shouldParkAfterFailedAcquire(p, node))
// 將線程 wait 而且線程被喚醒後 判斷線程是否被中斷
// |= 操做等於 interrupted = interrupted | parkAndCheckInterrupt()
// |(按位或) 會將 | 兩邊的值進行二進制計算,運算規則一個爲真即爲真, 例如 1|1 = 1 或 1|0 = 1,
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
複製代碼
acquireQueued
也是核心方法,在其中會對線程進行 LockSupport.park
進行控制,其實現方式是 循環 ,下面就具體分析一下
當前線程
所建立的 Node
節點中的 前置節點(prev)
。前置節點(prev)
是否等於 AQS
的 head
節點 && 能夠獲取到 可執行權
,若是這兩個條件成立則看 步驟3
,反之看 步驟4
, 若是知足這兩個條件,也就表明着 head
節點 所對應的線程
已經執行完成而且作了釋放**(release方法)**操做。步驟2
條件成立,也就是 線程被喚醒後並獲取到了可執行權,則將 head
節點設置爲 當前線程建立的Node節點 。步驟2
條件不成立,則判斷 Node
節點所對應的線程的狀態是否符合改成 wait
狀態。這個邏輯在 shouldParkAfterFailedAcquire
方法中, 接下來看一下。private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 若是前置節點的 waitStatus == Signal 也就是 == -1
// 若是是 知足 線程 wait 條件
if (ws == Node.SIGNAL)
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
// 若是狀態 > 0 也就是1 也就是線程已經被中斷了
// 在這裏就會判斷 前置節點的前置節點 是否仍是被中斷,若是是 循環繼續判斷前置節點,
// 若是不是 則將前置節點的next節點改成 入參的 node 節點 而後 返回false 繼續循環判斷
// 這裏的做用就是 排除掉已經被中斷的線程
if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
// 不然設置 狀態爲 -1 等待喚醒狀態 再次進來之後就會被 wait
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
複製代碼
在 shouldParkAfterFailedAcquire
方法中主要就是判斷節點所屬的線程是否符規則,也就是更改成 wait
狀態
waitStatus
是不是 SIGNAL,若是是知足條件,返回 true
,線程將會 wait
。waitStatus
是否大於 0, 也就是1 ,若是條件成立,則表明 當前線程節點 的 前置節點 所對應的線程已經被中斷了,須要從新指定當前線程節點的前置節點(prev),經過循環的方式找到前置節點的節點,若是依然被中斷,則繼續循環,直到找到未中斷線程所對應的Node節點爲止。若是條件不成立則將 waitStatus
狀態改成 SIGNAL
返回false, 再經過 acquireQueued
方法中的循環在執行一次 。prev
節點中的 waitStatus
狀態,是由於只有 前置節點(prev)
的 waitStatus
等於 SIGNAL 也就是 -1
時,就表明當前線程新建的Node節點的線程處於等待狀態,在當前節點的前置節點(prev)
的線程釋放了同步狀態或被取消,將會通知當前節點,使當前節點的線程得以運行到這裏咱們整個的 acquire
方法就解析完了,接下來分享 release,有獲取纔有釋放,會在release講完後爲你們分享一下 acquire 到 release的整個流程。
release
字面一次就是釋放,釋放經過 acquire
獲取的獨佔模式,可讓 AQS
後續節點所對應的線程能夠獲得執行權,下面就看一下 release 方法
public final boolean release(int arg) {
// 步驟 1 2 3
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
在 release
方法中首先會調用 tryRelease
方法,這裏 tryRelease
方法將會有子類實現,先以 RenntrantLook
爲例,這裏就不展現代碼了,就簡單描述一下邏輯
state
減去 arg
,state
表明重入次數。步驟1
結果是0,則將 獨佔模式同步器的擁有者 改成null並返回true。步驟1
結果不是0, 則從新設置 state,返回false,表示還不能夠釋放。接下來判斷 AQS
的 head
節點不是null而且 waitStatus
狀態不等於0,表明 釋放成功,而後進入 unparkSuccessor 方法,進行對下一個Node節點所對應的線程進行喚醒操做。
private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
// 若是head的waitStatus<0 則將head的waitStatus改成0
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
// 若是 head 節點的 next 節點 == null 或者 節點 的狀態 大於0 也就是1 也就是 下一個節點所對應的線程被中斷了
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 將會循環整個同步隊列,從tail節點開始 往前循環,直到只找到 waitStatus <= 0 的Node節點
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 若是節點不是 null 則喚醒該節點的線程
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
接下來分析一下在 unparkSuccessor 方法中都作了什麼
waitStatus
<0, 則將 head
的 waitStatus
改成0。head
節點的 next
節點等於null 或者 waitStatus
狀態 大於0也就是1, 表示 head
節點所對應的 next
節點所對應的線程已經被中斷了,將會循環整個同步隊列,從 tail
節點開始往前循環,直到找到最前面的一個 waitStatus <= 0 的Node節點。步驟2
條件不知足 則表明 head 的 next 節點不是null 或 waitStatus狀態不等於1,調用 unpark
方法喚醒線程。至此 release
方法就解析完成了,很簡單,核心功能僅僅是若是符合規則,則調用 unpark
方法喚醒 AQS
隊列中下一個節點所對應的線程。下面就分析一下 acquire 和 releae 整個流程。
acquire
和 release
的總體流程接下來分析 acquireInterruptibly
方法,acquireInterruptibly
方法和 acquire
實際上是同樣的,只不過多判斷了一下是否被中斷。
acquireInterruptibly
方法就是 可中斷的獲取可執行權 ,具體流程和 acquire
類似。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
複製代碼
在 acquireInterruptibly
方法中,首先會經過 Thread.interrupted()
方法判斷線程是否被中斷,如已經被中斷,則拋出 InterruptedException
, 反之則調用 tryAcquire
方法,判斷是否 獲取到執行權,若是未獲取到則調用 doAcquireInterruptibly
方法進行建立 AQS
和 新的Node節點
,並將 新建的Node節點
和 AQS
的 head
節點進行關聯。 到這裏可能就會想到,這不是和 acquire
方法是同樣的嘛,沒錯,就是同樣。看一下源碼
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 新建 Node 節點 並將節點 和 AQS 隊列簡歷關聯
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
複製代碼
看到源碼是否是很熟悉,這不就是上邊咱們分析過的 acquire
方法嘛,惟一和 acquire
方法不一樣的就是,若是線程在被喚醒之後,也就是 head
節點的線程調用了 release
釋放了可執行權,而且經過 LockSupport.park
方法喚醒了 head 的 next節點所屬的線程時,head
的 next
節點所屬的線程已經被中斷了就會拋出 InterruptedException
異常。
這裏就不進行 addWaiter 方法 和 parkAndCheckInterrupt 方法的源碼展現了,若是還不明白就看一下上邊 acquire
方法的源碼分析。
tryAcquireNanos
方法的含義就是 可超時的獲取執行權 ,若是設置的 超時時間 到了,還未獲取到可執行權,則直接返回 false 。這裏的超時時間單位是 納秒 ns
, 1秒(s)=1000000000納秒(ns)
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
複製代碼
看到 tryAcquireNanos
方法會想到什麼? 看到方法上的 throws InterruptedException
就一下想到了上面剛剛剛說的 acquireInterruptibly
方法,支持可中斷的獲取執行權。首先這裏會先調用 tryAcquire
方法獲取執行權,若是能夠獲取到執行權則直接返回,反之則調用 doAcquireNanos(arg, nanosTimeout)
方法進行 新建 Node 節點
並和 AQS
的 head
節點進行關聯,而且會將節點加入到 AQS的隊列中,而後將節點所屬的線程放入等待隊列中。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 判斷超時時間 是否小於等於 0 若是這 則直接返回 false
if (nanosTimeout <= 0L)
return false;
// 使用當前時間的 納秒 + 超時時間的納秒 = 將來超時的超時時間,用來作parkNanos,
// 就至關於 Object.wait(long timeoutMillis) 方法
final long deadline = System.nanoTime() + nanosTimeout;
//經過構造方法 新建 Node節點, 根據入參mode指定了 Node的模式,共享或獨佔
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
// 判斷 新建的Node節點是否等於head節點 && 能夠獲取到'可執行權'
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 設置 head 節點爲 當前線程的新建的Node節點,也就是線程被喚醒後並獲取到了可執行權,則將head
// 節點設置爲當前線程建立的Node節點,能夠保證head節點永遠均可以和後續節點有關聯關係
setHead(node);
p.next = null; // help GC
return true;
}
// 判斷計算過的 deadline 時間 - 當前時間 是否小於0 是則 超時時間已過,返回false
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 判斷Node節點的線程是否符合被 wait ,在這裏用的是 park 而且 納秒必須大於 1000
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 判斷線程是否被中斷 若是中斷則拋出 InterruptedException 異常
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
複製代碼
若是在 acquire
和 release
分析中理解其中原理是否是以爲這裏很簡單,這裏也不列舉已經分析過的方法了,直接說出不一樣點
LockSupport.parkNanos(this, nanosTimeout)
方法,也就至關於 Object.wait(long timeoutMillis)
方法,等待的線程的狀態會在超時時間失效從 wait
變爲 run
deadline時間
- 當前時間
是否小於0, 若果是則表明超時時間已過,直接返回false,反之則繼續執行。InterruptedException
異常。是否是很簡單,讀者要把重點放到 acquire
和 release
上,其餘的就很容易了。上面的內容均是獲取的獨佔模式,下面來說解一下 共享模式。
public final void acquireShared(int arg) {
// 經過調用 tryAcquireShared 方法獲取可執行權,若是未獲取到則調用 doAcquireShared
// 方法進行 新建 Node節點 並和 AQS的 head 節點創建關係,
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
複製代碼
經過 acquireShared
方法能夠看到和 acquire
並無什麼區別,獲取可執行權的代碼須要 自定義同步器
實現,在共享模式分析中就不對 ReentrantReadWriteLock
源碼進行分析了 ,會在後面對 ReentrantLock
和 ReentrantReadWriteLock
進行源碼分析,接下來看一下 doAcquireShared
,看它是否是和 acquireQueued
方法也是同樣的邏輯呢?
private void doAcquireShared(int arg) {
// 獲取經過 addWaiter 建立的Node方法
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
// 獲取新建節點的前置節點
final Node p = node.predecessor();
// 判斷 新建的Node節點是否等於head節點
if (p == head) {
// 若是上邊的 p==head 須要在此判斷是否能夠獲取到'可執行權'
int r = tryAcquireShared(arg);
if (r >= 0) {
// 若是獲取到了可執行權
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 判斷Node節點的線程是否符合被 wait ,在這裏用的是 park
if (shouldParkAfterFailedAcquire(p, node))
// 將線程 wait 而且線程被喚醒後 判斷線程是否被中斷
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
複製代碼
在 doAcquireShared
方法中,咱們看到,首先依然是調用 addWaiter
方法進行新建Node,這裏就很少說,能夠看一下上邊的方法詳解,doAcquireShared
也是核心方法,在其中會對線程進行 LockSupport.park
進行控制,其實現方式是 循環 ,下面就具體分析一下
當前線程
所建立的 Node
節點中的 前置節點(prev)
。前置節點(prev)
是否等於 AQS
的 head
節點,若是條件成立則看 步驟3
,反之看 步驟4
, 若是知足條件,也就表明着 head
節點 所對應的線程
已經執行完成而且作了釋放**(release方法)**操做。步驟2
條件成立,則再次判斷 當前線程是否能夠獲取到可執行權 ,若是能夠則設置 AQS
的 head
節點爲當前線程的 新建的Node節點
, 反之則看 步驟3
。步驟2
或 步驟3
條件不成立,則判斷 Node
節點所對應的線程的狀態是否符合改成 wait
狀態,也就是是否能夠加入到等待隊列中。這個邏輯在 shouldParkAfterFailedAcquire
方法中,能夠看一下上邊的方法詳解。public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
複製代碼
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 獲取經過 addWaiter 建立的Node方法
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 獲取新建節點的前置節點
final Node p = node.predecessor();
// 判斷 新建的Node節點是否等於head節點
if (p == head) {
// 若是獲取到了可執行權
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 判斷Node節點的線程是否符合被 wait && 將線程 wait 而且線程被喚醒後判斷線程是否被中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
複製代碼
能夠看到 acquireSharedInterruptibly
和 acquireShared
方法並無什麼太大區別,惟一的區別就是在調用 parkAndCheckInterrupt
線程狀態被 wait ,等到當前節點 prev
節點的所屬線程調用了 release
方法後,喚醒當前節點所屬線程時,若是當前線程被中斷了會拋出 InterruptedException
異常。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
複製代碼
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
// 若是超時時間 小於等於0 則直接 返回失敗
if (nanosTimeout <= 0L)
return false;
// 使用當前時間的 納秒 + 超時時間的納秒 = 將來超時的超時時間,用來作parkNanos,
// 就至關於 Object.wait(long timeoutMillis) 方法
final long deadline = System.nanoTime() + nanosTimeout;
//經過構造方法 新建 Node節點, 根據入參mode指定了 Node的模式,共享或獨佔。這裏是共享
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 判斷 新建的Node節點是否等於head節點
final Node p = node.predecessor();
if (p == head) {
// 是否能夠得到可執行權
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
// 判斷計算過的 deadline 時間 - 當前時間 是否小於或等於0 是則超時時間已過,返回false
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 判斷Node節點的線程是否符合被 wait ,在這裏用的是 park 而且 納秒必須大於 1000
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// // 判斷線程是否被中斷 若是中斷則拋出 InterruptedException 異常
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
複製代碼
有沒有發現,doAcquireSharedNanos
方法和 doAcquireNanos
方法很類似呢,若是在 acquireShared
分析中理解其原理是否是以爲這裏很簡單,這裏也不列舉已經分析過的方法了,直接說出不一樣點
LockSupport.parkNanos(this, nanosTimeout)
方法,也就至關於 Object.wait(long timeoutMillis)
方法,等待的線程的狀態會在超時時間失效從 wait
變爲 run
deadline時間
- 當前時間
是否小於0, 若果是則表明超時時間已過,直接返回false,反之則繼續執行。InterruptedException
異常。是否是很簡單,讀者要把重點放到 acquire
和 acquireShared
上,其餘的就很容易了。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
複製代碼
private void doReleaseShared() {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 若是更新失敗則循環
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒 head 節點的 next 節點所屬的線程
unparkSuccessor(h);
}
// 若是更新失敗則循環
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 若是 head 改變了則再次循環
if (h == head) // loop if head changed
break;
}
}
複製代碼
tryReleaseShared 方法和 release 方法稍微有一點區別,下面咱們就具體分析一下
tryReleaseShared
方法,若是釋放成功了,就表明有資源空閒出來,那麼就看 步驟2doReleaseShared
去喚醒後續結點, 在 doReleaseShared
方法中採用了 loop
,每一次循環的過程都是首先得到 head
節點,若是 head
結點不爲空且不等於 tail
結點,那麼先得到該節點的狀態,若是是SIGNAL的狀態,則表明它須要有後繼結點去喚醒,首先將其的狀態變爲0(初始狀態),而後經過 unparkSuccessor
方法喚醒後續節點所屬的線程,若是結點狀態一開始就是0,那麼就給他轉換成 PROPAGATE 狀態,保證在後續獲取資源的時候,還可以向後面傳播。至此咱們已經分析完了 AbstractQueuedSynchronizer 的源碼,是否是很簡單呢?最主要的仍是要理解AQS的總體流程,說白了AQS是依賴兩大利器,也就是 VarHandle 和 LockSupport。
博客地址:lantaoblog.site