上一篇文章講了併發編程的鎖機制:synchronized和lock,主要介紹了Java併發編程中經常使用的鎖機制。Lock是一個接口,而synchronized是Java中的關鍵字,synchronized是基於jvm實現。Lock鎖能夠被中斷,支持定時鎖等。Lock的實現類,可重入鎖ReentrantLock,咱們有講到其具體用法。而談到ReentrantLock,不得不談抽象類AbstractQueuedSynchronizer(AQS)。抽象的隊列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,如經常使用的ReentrantLock、ThreadPoolExecutor。html
AQS是一個抽象類,主是是以繼承的方式使用。AQS自己是沒有實現任何同步接口的,它僅僅只是定義了同步狀態的獲取和釋放的方法來供自定義的同步組件的使用。AQS抽象類包含以下幾個方法:java
AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。共享模式時只用 Sync Queue, 獨佔模式有時只用 Sync Queue, 但若涉及 Condition, 則還有 Condition Queue。在子類的 tryAcquire, tryAcquireShared 中實現公平與非公平的區分。node
不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。編程
整個 AQS 分爲如下幾部分:安全
下面咱們具體來分析一下AQS實現的源碼。微信
Node 節點是表明獲取lock的線程, 存在於 Condition Queue, Sync Queue 裏面, 而其主要就是 nextWaiter (標記共享仍是獨佔),waitStatus 標記node的狀態。多線程
static final class Node {
/** 標識節點是不是 共享的節點(這樣的節點只存在於 Sync Queue 裏面) */
static final Node SHARED = new Node();
//獨佔模式
static final Node EXCLUSIVE = null;
/** * CANCELLED 說明節點已經 取消獲取 lock 了(通常是因爲 interrupt 或 timeout 致使的) * 不少時候是在 cancelAcquire 裏面進行設置這個標識 */
static final int CANCELLED = 1;
/** * SIGNAL 標識當前節點的後繼節點須要喚醒(PS: 這個一般是在 獨佔模式下使用, 在共享模式下有時用 PROPAGATE) */
static final int SIGNAL = -1;
//當前節點在 Condition Queue 裏面
static final int CONDITION = -2;
/** * 當前節點獲取到 lock 或進行 release lock 時, 共享模式的最終狀態是 PROPAGATE(PS: 有可能共享模式的節點變成 PROPAGATE 以前就被其後繼節點搶佔 head 節點, 而從Sync Queue中被踢出掉) */
static final int PROPAGATE = -3;
volatile int waitStatus;
/** * 節點在 Sync Queue 裏面時的前繼節點(主要來進行 skip CANCELLED 的節點) * 注意: 根據 addWaiter方法: * 1. prev節點在隊列裏面, 則 prev != null 確定成立 * 2. prev != null 成立, 不必定 node 就在 Sync Queue 裏面 */
volatile Node prev;
/** * Node 在 Sync Queue 裏面的後繼節點, 主要是在release lock 時進行後繼節點的喚醒 * 然後繼節點在前繼節點上打上 SIGNAL 標識, 來提醒他 release lock 時須要喚醒 */
volatile Node next;
//獲取 lock 的引用
volatile Thread thread;
/** * 做用分紅兩種: * 1. 在 Sync Queue 裏面, nextWaiter用來判斷節點是 共享模式, 仍是獨佔模式 * 2. 在 Condition queue 裏面, 節點主要是連接且後繼節點 (Condition queue是一個單向的, 不支持併發的 list) */
Node nextWaiter;
// 當前節點是不是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取 node 的前繼節點
final Node predecessor() throws NullPointerException{
Node p = prev;
if(p == null){
throw new NullPointerException();
}else{
return p;
}
}
Node(){
// Used to establish initial head or SHARED marker
}
// 初始化 Node 用於 Sync Queue 裏面
Node(Thread thread, Node mode){ // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//初始化 Node 用於 Condition Queue 裏面
Node(Thread thread, int waitStatus){ // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
複製代碼
waitStatus的狀態變化:併發
waitStatus 變化過程:框架
其上的這些狀態變化主要在: doReleaseShared , shouldParkAfterFailedAcquire 裏面。jvm
Condition Queue 是一個併發不安全的, 只用於獨佔模式的隊列(PS: 爲何是併發不安全的呢? 主要是在操做 Condition 時, 線程必需獲取 獨佔的 lock, 因此不須要考慮併發的安全問題); 而當Node存在於 Condition Queue 裏面, 則其只有 waitStatus, thread, nextWaiter 有值, 其餘的都是null(其中的 waitStatus 只能是 CONDITION, 0(0 表明node進行轉移到 Sync Queue裏面, 或被中斷/timeout)); 這裏有個注意點, 就是當線程被中斷或獲取 lock 超時, 則一瞬間 node 會存在於 Condition Queue, Sync Queue 兩個隊列中.
節點 Node4, Node5, Node6, Node7 都是調用 Condition.awaitXX 方法加入 Condition Queue(PS: 加入後會將原來的 lock 釋放)。將當前線程封裝成一個 Node 節點放入到 Condition Queue 裏面你們能夠注意到, 下面對 Condition Queue 的操做都沒考慮到 併發(Sync Queue 的隊列是支持併發操做的), 這是爲何呢? 由於在進行操做 Condition 是當前的線程已經獲取了AQS的獨佔鎖, 因此不須要考慮併發的狀況。
private Node addConditionWaiter(){
Node t = lastWaiter;
// Condition queue 的尾節點
// 尾節點已經Cancel, 直接進行清除,
/** * 當Condition進行 awiat 超時或被中斷時, Condition裏面的節點是沒有被刪除掉的, 須要其 * 他await 在將線程加入 Condition Queue 時調用addConditionWaiter而進而刪除, 或 await 操做差很少結束時, 調用 "node.nextWaiter != null" 進行判斷而刪除 (PS: 經過 signal 進行喚 * 醒時 node.nextWaiter 會被置空, 而中斷和超時時不會) */
if(t != null && t.waitStatus != Node.CONDITION){
/** * 調用 unlinkCancelledWaiters 對 "waitStatus != Node.CONDITION" 的節點進行 * 刪除(在Condition裏面的Node的waitStatus 要麼是CONDITION(正常), 要麼就是 0 * (signal/timeout/interrupt)) */
unlinkCancelledWaiters();
t = lastWaiter;
}
//將線程封裝成 node 準備放入 Condition Queue 裏面
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if(t == null){
//Condition Queue 是空的
firstWaiter = node;
} else {
// 追加到 queue 尾部
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
複製代碼
當Node在Condition Queue 中, 若狀態不是 CONDITION, 則必定是被中斷或超時。在調用 addConditionWaiter 將線程放入 Condition Queue 裏面時或 awiat 方法獲取結束時 進行清理 Condition queue 裏面的因 timeout/interrupt 而還存在的節點。這個刪除操做比較巧妙, 其中引入了 trail 節點, 能夠理解爲traverse整個 Condition Queue 時遇到的最後一個有效的節點。
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
Node trail = null;
while(t != null){
Node next = t.nextWaiter; // 1. 先初始化 next 節點
if(t.waitStatus != Node.CONDITION){ // 2. 節點不有效, 在Condition Queue 裏面 Node.waitStatus 只有多是 CONDITION 或是 0(timeout/interrupt引發的)
t.nextWaiter = null; // 3. Node.nextWaiter 置空
if(trail == null){ // 4. 一次都沒有遇到有效的節點
firstWaiter = next; // 5. 將 next 賦值給 firstWaiter(此時 next 可能也是無效的, 這只是一個臨時處理)
} else {
trail.nextWaiter = next; // 6. next 賦值給 trail.nextWaiter, 這一步其實就是刪除節點 t
}
if(next == null){ // 7. next == null 說明 已經 traverse 完了 Condition Queue
lastWaiter = trail;
}
}else{
trail = t; // 8. 將有效節點賦值給 trail
}
t = next;
}
}
複製代碼
transferForSignal只有在節點被正常喚醒才調用的正常轉移的方法。
將Node 從Condition Queue 轉移到 Sync Queue 裏面在調用transferForSignal以前, 會 first.nextWaiter = null;而咱們發現若節點是由於 timeout / interrupt 進行轉移, 則不會進行這步操做; 兩種狀況的轉移都會把 wautStatus 置爲 0
final boolean transferForSignal(Node node){
/** * If cannot change waitStatus, the node has been cancelled */
if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 若 node 已經 cancelled 則失敗
return false;
}
Node p = enq(node); // 2. 加入 Sync Queue
int ws = p.waitStatus;
if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ // 3. 這裏的 ws > 0 指Sync Queue 中node 的前繼節點cancelled 了, 因此, 喚醒一下 node ; compareAndSetWaitStatus(p, ws, Node.SIGNAL)失敗, 則說明 前繼節點已經變成 SIGNAL 或 cancelled, 因此也要 喚醒
LockSupport.unpark(node.thread);
}
return true;
}
複製代碼
transferAfterCancelledWait 在節點獲取lock時被中斷或獲取超時才調用的轉移方法。將 Condition Queue 中因 timeout/interrupt 而喚醒的節點進行轉移
final boolean transferAfterCancelledWait(Node node){
if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 沒有 node 沒有 cancelled , 直接進行轉移 (轉移後, Sync Queue , Condition Queue 都會存在 node)
enq(node);
return true;
}
while(!isOnSyncQueue(node)){ // 2.這時是其餘的線程發送signal,將本線程轉移到 Sync Queue 裏面的工程中(轉移的過程當中 waitStatus = 0了, 因此上面的 CAS 操做失敗)
Thread.yield(); // 這裏調用 isOnSyncQueue判斷是否已經 入Sync Queue 了
}
return false;
}
複製代碼
AQS內部維護着一個FIFO的CLH隊列,因此AQS並不支持基於優先級的同步策略。至於爲什麼要選擇CLH隊列,主要在於CLH鎖相對於MSC鎖,他更加容易處理cancel和timeout,同時他具有進出隊列快、無所、暢通無阻、檢查是否有線程在等待也很是容易(head != tail,頭尾指針不一樣)。固然相對於原始的CLH隊列鎖,ASQ採用的是一種變種的CLH隊列鎖:
這個圖表明有個線程獲取lock, 而 Node1, Node2, Node3 則在Sync Queue 裏面進行等待獲取lock(PS: 注意到 dummy Node 的SINGNAL 這是叫獲取 lock 的線程在釋放lock時通知後繼節點的標示)
這裏有個地方須要注意, 就是初始化 head, tail 的節點, 不必定是 head.next, 由於期間可能被其餘的線程進行搶佔了。將當前的線程封裝成 Node 加入到 Sync Queue 裏面。
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(), mode); // 1. 封裝 Node
Node pred = tail;
if(pred != null){ // 2. pred != null -> 隊列中已經有節點, 直接 CAS 到尾節點
node.prev = pred; // 3. 先設置 Node.pre = pred (PS: 則當一個 node在Sync Queue裏面時 node.prev 必定 != null(除 dummy node), 可是 node.prev != null 不能說明其在 Sync Queue 裏面, 由於如今的CAS可能失敗 )
if(compareAndSetTail(pred, node)){ // 4. CAS node 到 tail
pred.next = node; // 5. CAS 成功, 將 pred.next = node (PS: 說明 node.next != null -> 則 node 必定在 Sync Queue, 但若 node 在Sync Queue 裏面不必定 node.next != null)
return node;
}
}
enq(node); // 6. 隊列爲空, 調用 enq 入隊列
return node;
}
/** * 這個插入會檢測head tail 的初始化, 必要的話會初始化一個 dummy 節點, 這個和 ConcurrentLinkedQueue 同樣的 * 將節點 node 加入隊列 * 這裏有個注意點 * 狀況: * 1. 首先 queue是空的 * 2. 初始化一個 dummy 節點 * 3. 這時再在tail後面添加節點(這一步可能失敗, 可能發生競爭被其餘的線程搶佔) * 這裏爲何要加入一個 dummy 節點呢? * 這裏的 Sync Queue 是CLH lock的一個變種, 線程節點 node 可否獲取lock的判斷經過其前繼節點 * 並且這裏在當前節點想獲取lock時一般給前繼節點 打上 signal 的標識(表示前繼節點釋放lock須要通知我來獲取lock) * 若這裏不清楚的同窗, 請先看看 CLH lock的資料 (這是理解 AQS 的基礎) */
private Node enq(final Node node){
for(;;){
Node t = tail;
if(t == null){ // Must initialize // 1. 隊列爲空 初始化一個 dummy 節點 其實和 ConcurrentLinkedQueue 同樣
if(compareAndSetHead(new Node())){ // 2. 初始化 head 與 tail (這個CAS成功後, head 就有值了, 詳情將 Unsafe 操做)
tail = head;
}
}else{
node.prev = t; // 3. 先設置 Node.pre = pred (PS: 則當一個 node在Sync Queue裏面時 node.prev 必定 != null, 可是 node.prev != null 不能說明其在 Sync Queue 裏面, 由於如今的CAS可能失敗 )
if(compareAndSetTail(t, node)){ // 4. CAS node 到 tail
t.next = node; // 5. CAS 成功, 將 pred.next = node (PS: 說明 node.next != null -> 則 node 必定在 Sync Queue, 但若 node 在Sync Queue 裏面不必定 node.next != null)
return t;
}
}
}
}
複製代碼
這裏的出Queue的方法其實有兩個: 新節點獲取lock, 調用setHead搶佔head, 而且剔除原head;節點因被中斷或獲取超時而進行 cancelled, 最後被剔除。
/** * 設置 head 節點(在獨佔模式沒有併發的可能, 當共享的模式有可能) */
private void setHead(Node node){
head = node;
node.thread = null; // 清除線程引用
node.prev = null; // 清除原來 head 的引用 <- 都是 help GC
}
// 清除因中斷/超時而放棄獲取lock的線程節點(此時節點在 Sync Queue 裏面)
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null; // 1. 線程引用清空
Node pred = node.prev;
while (pred.waitStatus > 0) // 2. 若前繼節點是 CANCELLED 的, 則也一併清除
node.prev = pred = pred.prev;
Node predNext = pred.next; // 3. 這裏的 predNext也是須要清除的(只不過在清除時的 CAS 操做須要 它)
node.waitStatus = Node.CANCELLED; // 4. 標識節點須要清除
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // 5. 若須要清除額節點是尾節點, 則直接 CAS pred爲尾節點
compareAndSetNext(pred, predNext, null); // 6. 刪除節點predNext
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL || // 7. 後繼節點須要喚醒(但這裏的後繼節點predNext已經 CANCELLED 了)
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 8. 將 pred 標識爲 SIGNAL
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 8. next.waitStatus <= 0 表示 next 是個一個想要獲取lock的節點
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node); // 若 pred 是頭節點, 則此刻可能有節點剛剛進入 queue ,因此進行一下喚醒
}
node.next = node; // help GC
}
}
複製代碼
acquire(int arg):以獨佔模式獲取對象,忽略中斷。
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
複製代碼
final boolean acquireQueued(final Node node, int arg){
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor(); // 1. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
if(p == head && tryAcquire(arg)){ // 2. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquire嘗試獲取一下
setHead(node); // 3. 獲取 lock 成功, 直接設置 新head(原來的head可能就直接被回收)
p.next = null; // help GC // help gc
failed = false;
return interrupted; // 4. 返回在整個獲取的過程當中是否被中斷過 ; 但這又有什麼用呢? 若整個過程當中被中斷過, 則最後我在 自我中斷一下 (selfInterrupt), 由於外面的函數可能須要知道整個過程是否被中斷過
}
if(shouldParkAfterFailedAcquire(p, node) && // 5. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
parkAndCheckInterrupt()){ // 6. 如今lock仍是被其餘線程佔用 那就睡一會, 返回值判斷是否此次線程的喚醒是被中斷喚醒
interrupted = true;
}
}
}finally {
if(failed){ // 7. 在整個獲取中出錯
cancelAcquire(node); // 8. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
}
}
}
複製代碼
主邏輯:
private void doAcquireInterruptibly(int arg) throws InterruptedException{
final Node node = addWaiter(Node.EXCLUSIVE); // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor(); // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
if(p == head && tryAcquire(arg)){ // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquire嘗試獲取一下
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if(shouldParkAfterFailedAcquire(p, node) && // 4. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
parkAndCheckInterrupt()){ // 5. 如今lock仍是被其餘線程佔用 那就睡一會, 返回值判斷是否此次線程的喚醒是被中斷喚醒
throw new InterruptedException(); // 6. 線程此時喚醒是經過線程中斷, 則直接拋異常
}
}
}finally {
if(failed){ // 7. 在整個獲取中出錯(好比線程中斷)
cancelAcquire(node); // 8. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
}
}
}
複製代碼
acquireInterruptibly(int arg): 以獨佔模式獲取對象,若是被中斷則停止。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
複製代碼
經過先檢查中斷的狀態,而後至少調用一次tryAcquire,返回成功。不然,線程在排隊,不停地阻塞與喚醒,調用tryAcquire直到成功或者被中斷。
tryAcquireNanos(int arg, long nanosTimeout):獨佔且支持超時模式獲取: 帶有超時時間,若是通過超時時間則會退出。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
if(nanosTimeout <= 0L){
return false;
}
final long deadline = System.nanoTime() + nanosTimeout; // 0. 計算截至時間
final Node node = addWaiter(Node.EXCLUSIVE); // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor(); // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
if(p == head && tryAcquire(arg)){ // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquire嘗試獲取一下
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime(); // 4. 計算還剩餘的時間
if(nanosTimeout <= 0L){ // 5. 時間超時, 直接返回
return false;
}
if(shouldParkAfterFailedAcquire(p, node) && // 6. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
nanosTimeout > spinForTimeoutThreshold){ // 7. 若沒超時, 而且大於spinForTimeoutThreshold, 則線程 sleep(小於spinForTimeoutThreshold, 則直接自旋, 由於效率更高 調用 LockSupport 是須要開銷的)
LockSupport.parkNanos(this, nanosTimeout);
}
if(Thread.interrupted()){ // 8. 線程此時喚醒是經過線程中斷, 則直接拋異常
throw new InterruptedException();
}
}
}finally {
if(failed){ // 9. 在整個獲取中出錯(好比線程中斷/超時)
cancelAcquire(node); // 10. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
}
}
}
複製代碼
嘗試以獨佔模式獲取,若是中斷和超時則放棄。實現時先檢查中斷的狀態,而後至少調用一次tryAcquire。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg)|| doAcquireNanos(arg, nanosTimeout);
}
複製代碼
釋放 lock 流程:
public final boolean release(int arg){
if(tryRelease(arg)){ // 1. 調用子類, 若徹底釋放好, 則返回true(這裏有lock重複獲取)
Node h = head;
if(h != null && h.waitStatus != 0){ // 2. h.waitStatus !=0 其實就是 h.waitStatus < 0 後繼節點須要喚醒
unparkSuccessor(h); // 3. 喚醒後繼節點
}
return true;
}
return false;
}
/** * 喚醒 node 的後繼節點 * 這裏有個注意點: 喚醒時會將當前node的標識歸位爲 0 * 等於當前節點標識位 的流轉過程: 0(剛加入queue) -> signal (被後繼節點要求在釋放時須要喚醒) -> 0 (進行喚醒後繼節點) */
private void unparkSuccessor(Node node) {
logger.info("unparkSuccessor node:" + node + Thread.currentThread().getName());
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 1. 清除前繼節點的標識
Node s = node.next;
logger.info("unparkSuccessor s:" + node + Thread.currentThread().getName());
if (s == null || s.waitStatus > 0) { // 2. 這裏若在 Sync Queue 裏面存在想要獲取 lock 的節點,則必定須要喚醒一下(跳過取消的節點) (PS: s == null發生在共享模式的競爭釋放資源)
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) // 3. 找到 queue 裏面最前面想要獲取 Lock 的節點
s = t;
}
logger.info("unparkSuccessor s:"+s);
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
public final void acquireShared(int arg){
if(tryAcquireShared(arg) < 0){ // 1. 調用子類, 獲取共享 lock 返回 < 0, 表示失敗
doAcquireShared(arg); // 2. 調用 doAcquireShared 當前 線程加入 Sync Queue 裏面, 等待獲取 lock
}
}
複製代碼
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED); // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor(); // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
if(p == head){
int r = tryAcquireShared(arg); // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquireShared 嘗試獲取一下
if(r >= 0){
setHeadAndPropagate(node, r); // 4. 獲取 lock 成功, 設置新的 head, 並喚醒後繼獲取 readLock 的節點
p.next = null; // help GC
if(interrupted){ // 5. 在獲取 lock 時, 被中斷過, 則本身再自我中斷一下(外面的函數可能須要這個參數)
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && // 6. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
parkAndCheckInterrupt()){ // 7. 如今lock仍是被其餘線程佔用 那就睡一會, 返回值判斷是否此次線程的喚醒是被中斷喚醒
interrupted = true;
}
}
}finally {
if(failed){ // 8. 在整個獲取中出錯(好比線程中斷/超時)
cancelAcquire(node); // 9. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
}
}
}
複製代碼
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException{
final Node node = addWaiter(Node.SHARED); // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor(); // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
if(p == head){
int r = tryAcquireShared(arg); // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquireShared 嘗試獲取一下
if(r >= 0){
setHeadAndPropagate(node, r); // 4. 獲取 lock 成功, 設置新的 head, 並喚醒後繼獲取 readLock 的節點
p.next = null; // help GC
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && // 5. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
parkAndCheckInterrupt()){ // 6. 如今lock仍是被其餘線程佔用 那就睡一會, 返回值判斷是否此次線程的喚醒是被中斷喚醒
throw new InterruptedException(); // 7. 若這次喚醒是 經過線程中斷, 則直接拋出異常
}
}
}finally {
if(failed){ // 8. 在整個獲取中出錯(好比線程中斷/超時)
cancelAcquire(node); // 9. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
}
}
}
複製代碼
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{
if (nanosTimeout <= 0L){
return false;
}
final long deadline = System.nanoTime() + nanosTimeout; // 0. 計算超時的時間
final Node node = addWaiter(Node.SHARED); // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor(); // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
if(p == head){
int r = tryAcquireShared(arg); // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquireShared 嘗試獲取一下
if(r >= 0){
setHeadAndPropagate(node, r); // 4. 獲取 lock 成功, 設置新的 head, 並喚醒後繼獲取 readLock 的節點
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime(); // 5. 計算還剩餘的 timeout , 若小於0 則直接return
if(nanosTimeout <= 0L){
return false;
}
if(shouldParkAfterFailedAcquire(p, node) && // 6. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
nanosTimeout > spinForTimeoutThreshold){// 7. 在timeout 小於 spinForTimeoutThreshold 時 spin 的效率, 比 LockSupport 更高
LockSupport.parkNanos(this, nanosTimeout);
}
if(Thread.interrupted()){ // 7. 若這次喚醒是 經過線程中斷, 則直接拋出異常
throw new InterruptedException();
}
}
}finally {
if (failed){ // 8. 在整個獲取中出錯(好比線程中斷/超時)
cancelAcquire(node); // 10. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
}
}
}
複製代碼
當 Sync Queue中存在連續多個獲取 共享lock的節點時, 會出現併發的喚醒後繼節點(由於共享模式下獲取lock後會喚醒近鄰的後繼節點來獲取lock)。首先調用子類的 tryReleaseShared來進行釋放 lock,而後判斷是否須要喚醒後繼節點來獲取 lock
private void doReleaseShared(){
for(;;){
Node h = head; // 1. 獲取 head 節點, 準備 release
if(h != null && h != tail){ // 2. Sync Queue 裏面不爲 空
int ws = h.waitStatus;
if(ws == Node.SIGNAL){ // 3. h節點後面多是 獨佔的節點, 也多是 共享的, 而且請求了喚醒(就是給前繼節點打標記 SIGNAL)
if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){ // 4. h 恢復 waitStatus 值置0 (爲啥這裏要用 CAS 呢, 由於這裏的調用多是在 節點剛剛獲取 lock, 而其餘線程又對其進行中斷, 所用cas就出現失敗)
continue; // loop to recheck cases
}
unparkSuccessor(h); // 5. 喚醒後繼節點
}
else if(ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){ //6. h後面沒有節點須要喚醒, 則標識爲 PROPAGATE 表示須要繼續傳遞喚醒(主要是區別 獨佔節點最終狀態0 (獨佔的節點在沒有後繼節點, 而且release lock 時最終 waitStatus 保存爲 0))
continue; // loop on failed CAS // 7. 一樣這裏可能存在競爭
}
}
if(h == head){ // 8. head 節點沒變化, 直接 return(從這裏也看出, 一個共享模式的 節點在其喚醒後繼節點時, 只喚醒一個, 可是它會在獲取 lock 時喚醒, 釋放 lock 時也進行, 因此或致使競爭的操做)
break; // head 變化了, 說明其餘節點獲取 lock 了, 本身的任務完成, 直接退出
}
}
}
複製代碼
本文主要講過了抽象的隊列式的同步器AQS的主要方法和實現原理。分別介紹了Node、Condition Queue、 Sync Queue、獨佔獲取釋放lock、共享獲取釋放lock的具體源碼實現。AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它。