Java併發系列[4]----AbstractQueuedSynchronizer源碼分析之條件隊列

經過前面三篇的分析,咱們深刻了解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步隊列和條件隊列。咱們仍是拿公共廁所作比喻,同步隊列是主要的排隊區,若是公共廁所沒開放,全部想要進入廁所的人都得在這裏排隊。而條件隊列主要是爲條件等待設置的,咱們想象一下若是一我的經過排隊終於成功獲取鎖進入了廁所,但在方便以前發現本身沒帶手紙,碰到這種狀況雖然很無奈,可是它也必須接受這個事實,這時它只好乖乖的出去先準備好手紙(進入條件隊列等待),固然在出去以前還得把鎖給釋放了好讓其餘人可以進來,在準備好了手紙(條件知足)以後它又得從新回到同步隊列中去排隊。固然進入房間的人並不都是由於沒帶手紙,可能還有其餘一些緣由必須中斷操做先去條件隊列中去排隊,因此條件隊列能夠有多個,依不一樣的等待條件而設置不一樣的條件隊列。條件隊列是一條單向鏈表,Condition接口定義了條件隊列中的全部操做,AbstractQueuedSynchronizer內部的ConditionObject類實現了Condition接口,下面咱們看看Condition接口都定義了哪些操做。node

 1 public interface Condition {
 2     
 3     //響應線程中斷的條件等待
 4     void await() throws InterruptedException;
 5     
 6     //不響應線程中斷的條件等待
 7     void awaitUninterruptibly();
 8     
 9     //設置相對時間的條件等待(不進行自旋)
10     long awaitNanos(long nanosTimeout) throws InterruptedException;
11     
12     //設置相對時間的條件等待(進行自旋)
13     boolean await(long time, TimeUnit unit) throws InterruptedException;
14     
15     //設置絕對時間的條件等待
16     boolean awaitUntil(Date deadline) throws InterruptedException;
17     
18     //喚醒條件隊列中的頭結點
19     void signal();
20     
21     //喚醒條件隊列的全部結點
22     void signalAll();
23     
24 }

Condition接口雖然定義了這麼多方法,但總共就分爲兩類,以await開頭的是線程進入條件隊列等待的方法,以signal開頭的是將條件隊列中的線程「喚醒」的方法。這裏要注意的是,調用signal方法可能喚醒線程也可能不會喚醒線程,何時會喚醒線程這得看狀況,後面會講到,可是調用signal方法必定會將線程從條件隊列中移到同步隊列尾部。這裏爲了敘述方便,咱們先暫時不糾結這麼多,統一稱signal方法爲喚醒條件隊列線程的操做。你們注意看一下,await方法分爲5種,分別是響應線程中斷等待,不響應線程中斷等待,設置相對時間不自旋等待,設置相對時間自旋等待,設置絕對時間等待;signal方法只有2種,分別是隻喚醒條件隊列頭結點和喚醒條件隊列全部結點的操做。同一類的方法基本上是相通的,因爲篇幅所限,咱們不可能也不須要將這些方法所有仔細的講到,只須要將一個表明方法搞懂了再看其餘方法就可以舉一反三。因此在本文中我只會細講await方法和signal方法,其餘方法不細講但會貼出源碼來以供你們參考。源碼分析

1. 響應線程中斷的條件等待學習

 1 //響應線程中斷的條件等待
 2 public final void await() throws InterruptedException {
 3     //若是線程被中斷則拋出異常
 4     if (Thread.interrupted()) {
 5         throw new InterruptedException();
 6     }
 7     //將當前線程添加到條件隊列尾部
 8     Node node = addConditionWaiter();
 9     //在進入條件等待以前先徹底釋放鎖
10     int savedState = fullyRelease(node);
11     int interruptMode = 0;
12     //線程一直在while循環裏進行條件等待
13     while (!isOnSyncQueue(node)) {
14         //進行條件等待的線程都在這裏被掛起, 線程被喚醒的狀況有如下幾種:
15         //1.同步隊列的前繼結點已取消
16         //2.設置同步隊列的前繼結點的狀態爲SIGNAL失敗
17         //3.前繼結點釋放鎖後喚醒當前結點
18         LockSupport.park(this);
19         //當前線程醒來後立馬檢查是否被中斷, 若是是則表明結點取消條件等待, 此時須要將結點移出條件隊列
20         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
21             break;
22         }
23     }
24     //線程醒來後就會以獨佔模式獲取鎖
25     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
26         interruptMode = REINTERRUPT;
27     }
28     //這步操做主要爲防止線程在signal以前中斷而致使沒與條件隊列斷絕聯繫
29     if (node.nextWaiter != null) {
30         unlinkCancelledWaiters();
31     }
32     //根據中斷模式進行響應的中斷處理
33     if (interruptMode != 0) {
34         reportInterruptAfterWait(interruptMode);
35     }
36 }

當線程調用await方法的時候,首先會將當前線程包裝成node結點放入條件隊列尾部。在addConditionWaiter方法中,若是發現條件隊列尾結點已取消就會調用unlinkCancelledWaiters方法將條件隊列全部的已取消結點清空。這步操做是插入結點的準備工做,那麼確保了尾結點的狀態也是CONDITION以後,就會新建一個node結點將當前線程包裝起來而後放入條件隊列尾部。注意,這個過程只是將結點添加到同步隊列尾部而沒有掛起線程哦。ui

第二步:徹底將鎖釋放this

 1 //徹底釋放鎖
 2 final int fullyRelease(Node node) {
 3     boolean failed = true;
 4     try {
 5         //獲取當前的同步狀態
 6         int savedState = getState();
 7         //使用當前的同步狀態去釋放鎖
 8         if (release(savedState)) {
 9             failed = false;
10             //若是釋放鎖成功就返回當前同步狀態
11             return savedState;
12         } else {
13             //若是釋放鎖失敗就拋出運行時異常
14             throw new IllegalMonitorStateException();
15         }
16     } finally {
17         //保證沒有成功釋放鎖就將該結點設置爲取消狀態
18         if (failed) {
19             node.waitStatus = Node.CANCELLED;
20         }
21     }
22 }

將當前線程包裝成結點添加到條件隊列尾部後,緊接着就調用fullyRelease方法釋放鎖。注意,方法名爲fullyRelease也就這步操做會徹底的釋放鎖,由於鎖是可重入的,因此在進行條件等待前須要將鎖所有釋放了,否則的話別人就獲取不了鎖了。若是釋放鎖失敗的話就會拋出一個運行時異常,若是成功釋放了鎖的話就返回以前的同步狀態。spa

第三步:進行條件等待線程

 1 //線程一直在while循環裏進行條件等待
 2 while (!isOnSyncQueue(node)) {
 3     //進行條件等待的線程都在這裏被掛起, 線程被喚醒的狀況有如下幾種:
 4     //1.同步隊列的前繼結點已取消
 5     //2.設置同步隊列的前繼結點的狀態爲SIGNAL失敗
 6     //3.前繼結點釋放鎖後喚醒當前結點
 7     LockSupport.park(this);
 8     //當前線程醒來後立馬檢查是否被中斷, 若是是則表明結點取消條件等待, 此時須要將結點移出條件隊列
 9     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
10         break;
11     }
12 }
13 
14 //檢查條件等待時的線程中斷狀況
15 private int checkInterruptWhileWaiting(Node node) {
16     //中斷請求在signal操做以前:THROW_IE
17     //中斷請求在signal操做以後:REINTERRUPT
18     //期間沒有收到任何中斷請求:0
19     return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
20 }
21 
22 //將取消條件等待的結點從條件隊列轉移到同步隊列中
23 final boolean transferAfterCancelledWait(Node node) {
24     //若是這步CAS操做成功的話就代表中斷髮生在signal方法以前
25     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
26         //狀態修改爲功後就將該結點放入同步隊列尾部
27         enq(node);
28         return true;
29     }
30     //到這裏代表CAS操做失敗, 說明中斷髮生在signal方法以後
31     while (!isOnSyncQueue(node)) {
32         //若是sinal方法尚未將結點轉移到同步隊列, 就經過自旋等待一下
33         Thread.yield();
34     }
35     return false;
36 }

在以上兩個操做完成了以後就會進入while循環,能夠看到while循環裏面首先調用LockSupport.park(this)將線程掛起了,因此線程就會一直在這裏阻塞。在調用signal方法後僅僅只是將結點從條件隊列轉移到同步隊列中去,至於會不會喚醒線程須要看狀況。若是轉移結點時發現同步隊列中的前繼結點已取消,或者是更新前繼結點的狀態爲SIGNAL失敗,這兩種狀況都會當即喚醒線程,不然的話在signal方法結束時就不會去喚醒已在同步隊列中的線程,而是等到它的前繼結點來喚醒。固然,線程阻塞在這裏除了能夠調用signal方法喚醒以外,線程還能夠響應中斷,若是線程在這裏收到中斷請求就會繼續往下執行。能夠看到線程醒來後會立刻檢查是不是因爲中斷喚醒的仍是經過signal方法喚醒的,若是是由於中斷喚醒的一樣會將這個結點轉移到同步隊列中去,只不過是經過調用transferAfterCancelledWait方法來實現的。最後執行完這一步以後就會返回中斷狀況並跳出while循環。設計

第四步:結點移出條件隊列後的操做code

 1 //線程醒來後就會以獨佔模式獲取鎖
 2 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
 3     interruptMode = REINTERRUPT;
 4 }
 5 //這步操做主要爲防止線程在signal以前中斷而致使沒與條件隊列斷絕聯繫
 6 if (node.nextWaiter != null) {
 7     unlinkCancelledWaiters();
 8 }
 9 //根據中斷模式進行響應的中斷處理
10 if (interruptMode != 0) {
11     reportInterruptAfterWait(interruptMode);
12 }
13 
14 //結束條件等待後根據中斷狀況作出相應處理
15 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
16     //若是中斷模式是THROW_IE就拋出異常
17     if (interruptMode == THROW_IE) {
18         throw new InterruptedException();
19     //若是中斷模式是REINTERRUPT就本身掛起
20     } else if (interruptMode == REINTERRUPT) {
21         selfInterrupt();
22     }
23 }

當線程終止了while循環也就是條件等待後,就會回到同步隊列中。不論是由於調用signal方法回去的仍是由於線程中斷致使的,結點最終都會在同步隊列中。這時就會調用acquireQueued方法執行在同步隊列中獲取鎖的操做,這個方法咱們在獨佔模式這一篇已經詳細的講過。也就是說,結點從條件隊列出來後又是乖乖的走獨佔模式下獲取鎖的那一套,等這個結點再次得到鎖以後,就會調用reportInterruptAfterWait方法來根據這期間的中斷狀況作出相應的響應。若是中斷髮生在signal方法以前,interruptMode就爲THROW_IE,再次得到鎖後就拋出異常;若是中斷髮生在signal方法以後,interruptMode就爲REINTERRUPT,再次得到鎖後就從新中斷。blog

2.不響應線程中斷的條件等待

 1 //不響應線程中斷的條件等待
 2 public final void awaitUninterruptibly() {
 3     //將當前線程添加到條件隊列尾部
 4     Node node = addConditionWaiter();
 5     //徹底釋放鎖並返回當前同步狀態
 6     int savedState = fullyRelease(node);
 7     boolean interrupted = false;
 8     //結點一直在while循環裏進行條件等待
 9     while (!isOnSyncQueue(node)) {
10         //條件隊列中全部的線程都在這裏被掛起
11         LockSupport.park(this);
12         //線程醒來發現中斷並不會立刻去響應
13         if (Thread.interrupted()) {
14             interrupted = true;
15         }
16     }
17     if (acquireQueued(node, savedState) || interrupted) {
18         //在這裏響應全部中斷請求, 知足如下兩個條件之一就會將本身掛起
19         //1.線程在條件等待時收到中斷請求
20         //2.線程在acquireQueued方法裏收到中斷請求
21         selfInterrupt();
22     }
23 }

3.設置相對時間的條件等待(不進行自旋)

 1 //設置定時條件等待(相對時間), 不進行自旋等待
 2 public final long awaitNanos(long nanosTimeout) throws InterruptedException {
 3     //若是線程被中斷則拋出異常
 4     if (Thread.interrupted()) {
 5         throw new InterruptedException();
 6     }
 7     //將當前線程添加到條件隊列尾部
 8     Node node = addConditionWaiter();
 9     //在進入條件等待以前先徹底釋放鎖
10     int savedState = fullyRelease(node);
11     long lastTime = System.nanoTime();
12     int interruptMode = 0;
13     while (!isOnSyncQueue(node)) {
14         //判斷超時時間是否用完了
15         if (nanosTimeout <= 0L) {
16             //若是已超時就須要執行取消條件等待操做
17             transferAfterCancelledWait(node);
18             break;
19         }
20         //將當前線程掛起一段時間, 線程在這期間可能被喚醒, 也可能本身醒來
21         LockSupport.parkNanos(this, nanosTimeout);
22         //線程醒來後先檢查中斷信息
23         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
24             break;
25         }
26         long now = System.nanoTime();
27         //超時時間每次減去條件等待的時間
28         nanosTimeout -= now - lastTime;
29         lastTime = now;
30     }
31     //線程醒來後就會以獨佔模式獲取鎖
32     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
33         interruptMode = REINTERRUPT;
34     }
35     //因爲transferAfterCancelledWait方法沒有把nextWaiter置空, 全部這裏要再清理一遍
36     if (node.nextWaiter != null) {
37         unlinkCancelledWaiters();
38     }
39     //根據中斷模式進行響應的中斷處理
40     if (interruptMode != 0) {
41         reportInterruptAfterWait(interruptMode);
42     }
43     //返回剩餘時間
44     return nanosTimeout - (System.nanoTime() - lastTime);
45 }

4.設置相對時間的條件等待(進行自旋)

 1 //設置定時條件等待(相對時間), 進行自旋等待
 2 public final boolean await(long time, TimeUnit unit) throws InterruptedException {
 3     if (unit == null) { throw new NullPointerException(); }
 4     //獲取超時時間的毫秒數
 5     long nanosTimeout = unit.toNanos(time);
 6     //若是線程被中斷則拋出異常
 7     if (Thread.interrupted()) { throw new InterruptedException(); }
 8     //將當前線程添加條件隊列尾部
 9     Node node = addConditionWaiter();
10     //在進入條件等待以前先徹底釋放鎖
11     int savedState = fullyRelease(node);
12     //獲取當前時間的毫秒數
13     long lastTime = System.nanoTime();
14     boolean timedout = false;
15     int interruptMode = 0;
16     while (!isOnSyncQueue(node)) {
17         //若是超時就須要執行取消條件等待操做
18         if (nanosTimeout <= 0L) {
19             timedout = transferAfterCancelledWait(node);
20             break;
21         }
22         //若是超時時間大於自旋時間, 就將線程掛起一段時間
23         if (nanosTimeout >= spinForTimeoutThreshold) {
24             LockSupport.parkNanos(this, nanosTimeout);
25         }
26         //線程醒來後先檢查中斷信息
27         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
28             break;
29         }
30         long now = System.nanoTime();
31         //超時時間每次減去條件等待的時間
32         nanosTimeout -= now - lastTime;
33         lastTime = now;
34     }
35     //線程醒來後就會以獨佔模式獲取鎖
36     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
37         interruptMode = REINTERRUPT;
38     }
39     //因爲transferAfterCancelledWait方法沒有把nextWaiter置空, 全部這裏要再清理一遍
40     if (node.nextWaiter != null) {
41         unlinkCancelledWaiters();
42     }
43     //根據中斷模式進行響應的中斷處理
44     if (interruptMode != 0) {
45         reportInterruptAfterWait(interruptMode);
46     }
47     //返回是否超時標誌
48     return !timedout;
49 }

5.設置絕對時間的條件等待

 1 //設置定時條件等待(絕對時間)
 2 public final boolean awaitUntil(Date deadline) throws InterruptedException {
 3     if (deadline == null) { throw new NullPointerException(); } 
 4     //獲取絕對時間的毫秒數
 5     long abstime = deadline.getTime();
 6     //若是線程被中斷則拋出異常
 7     if (Thread.interrupted()) { throw new InterruptedException(); }
 8     //將當前線程添加到條件隊列尾部
 9     Node node = addConditionWaiter();
10     //在進入條件等待以前先徹底釋放鎖
11     int savedState = fullyRelease(node);
12     boolean timedout = false;
13     int interruptMode = 0;
14     while (!isOnSyncQueue(node)) {
15         //若是超時就須要執行取消條件等待操做
16         if (System.currentTimeMillis() > abstime) {
17             timedout = transferAfterCancelledWait(node);
18             break;
19         }
20         //將線程掛起一段時間, 期間線程可能被喚醒, 也可能到了點本身醒來
21         LockSupport.parkUntil(this, abstime);
22         //線程醒來後先檢查中斷信息
23         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
24             break;
25         }
26     }
27     //線程醒來後就會以獨佔模式獲取鎖
28     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
29         interruptMode = REINTERRUPT;
30     }
31     //因爲transferAfterCancelledWait方法沒有把nextWaiter置空, 全部這裏要再清理一遍
32     if (node.nextWaiter != null) {
33         unlinkCancelledWaiters();
34     }
35     //根據中斷模式進行響應的中斷處理
36     if (interruptMode != 0) {
37         reportInterruptAfterWait(interruptMode);
38     }
39     //返回是否超時標誌
40     return !timedout;
41 }

6.喚醒條件隊列中的頭結點

 1 //喚醒條件隊列中的下一個結點
 2 public final void signal() {
 3     //判斷當前線程是否持有鎖
 4     if (!isHeldExclusively()) {
 5         throw new IllegalMonitorStateException();
 6     }
 7     Node first = firstWaiter;
 8     //若是條件隊列中有排隊者
 9     if (first != null) {
10         //喚醒條件隊列中的頭結點
11         doSignal(first);
12     }
13 }
14 
15 //喚醒條件隊列中的頭結點
16 private void doSignal(Node first) {
17     do {
18         //1.將firstWaiter引用向後移動一位
19         if ( (firstWaiter = first.nextWaiter) == null) {
20             lastWaiter = null;
21         }
22         //2.將頭結點的後繼結點引用置空
23         first.nextWaiter = null;
24         //3.將頭結點轉移到同步隊列, 轉移完成後有可能喚醒線程
25         //4.若是transferForSignal操做失敗就去喚醒下一個結點
26     } while (!transferForSignal(first) && (first = firstWaiter) != null);
27 }
28 
29 //將指定結點從條件隊列轉移到同步隊列中
30 final boolean transferForSignal(Node node) {
31     //將等待狀態從CONDITION設置爲0
32     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
33         //若是更新狀態的操做失敗就直接返回false
34         //多是transferAfterCancelledWait方法先將狀態改變了, 致使這步CAS操做失敗
35         return false;
36     }
37     //將該結點添加到同步隊列尾部
38     Node p = enq(node);
39     int ws = p.waitStatus;
40     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
41         //出現如下狀況就會喚醒當前線程
42         //1.前繼結點是取消狀態
43         //2.更新前繼結點的狀態爲SIGNAL操做失敗
44         LockSupport.unpark(node.thread);
45     }
46     return true;
47 }

能夠看到signal方法最終的核心就是去調用transferForSignal方法,在transferForSignal方法中首先會用CAS操做將結點的狀態從CONDITION設置爲0,而後再調用enq方法將該結點添加到同步隊列尾部。咱們再看到接下來的if判斷語句,這個判斷語句主要是用來判斷何時會去喚醒線程,出現這兩種狀況就會當即喚醒線程,一種是當發現前繼結點的狀態是取消狀態時,還有一種是更新前繼結點的狀態失敗時。這兩種狀況都會立刻去喚醒線程,不然的話就僅僅只是將結點從條件隊列中轉移到同步隊列中就完了,而不會立馬去喚醒結點中的線程。signalAll方法也大體相似,只不過它是去循環遍歷條件隊列中的全部結點,並將它們轉移到同步隊列,轉移結點的方法也仍是調用transferForSignal方法。

7.喚醒條件隊列的全部結點

 1 //喚醒條件隊列後面的所有結點
 2 public final void signalAll() {
 3     //判斷當前線程是否持有鎖
 4     if (!isHeldExclusively()) {
 5         throw new IllegalMonitorStateException();
 6     }
 7     //獲取條件隊列頭結點
 8     Node first = firstWaiter;
 9     if (first != null) {
10         //喚醒條件隊列的全部結點
11         doSignalAll(first);
12     }
13 }
14 
15 //喚醒條件隊列的全部結點
16 private void doSignalAll(Node first) {
17     //先把頭結點和尾結點的引用置空
18     lastWaiter = firstWaiter = null;
19     do {
20         //先獲取後繼結點的引用
21         Node next = first.nextWaiter;
22         //把即將轉移的結點的後繼引用置空
23         first.nextWaiter = null;
24         //將結點從條件隊列轉移到同步隊列
25         transferForSignal(first);
26         //將引用指向下一個結點
27         first = next;
28     } while (first != null);
29 }

至此,咱們整個的AbstractQueuedSynchronizer源碼分析就結束了,相信經過這四篇的分析,你們能更好的掌握並理解AQS。這個類確實很重要,由於它是其餘不少同步類的基石,因爲筆者水平和表達能力有限,若是哪些地方沒有表述清楚的,或者理解不到位的,還請廣大讀者們可以及時指正,共同探討學習。可在下方留言閱讀中所遇到的問題,若是有須要AQS註釋源碼的也可聯繫筆者索取。

注:以上所有分析基於JDK1.7,不一樣版本間會有差別,讀者須要注意

相關文章
相關標籤/搜索