原創做品,能夠轉載,可是請標註出處地址:http://www.javashuo.com/article/p-fyiwubsl-e.htmlhtml
AbstractQueuedSynchronizer簡稱爲AQS,是併發包中用於實現併發工具的基礎類,很是明顯,它是一個抽象類。java
它提供了一個依賴於FIFO隊列的框架用於實現各類阻塞鎖與同步器。node
它依賴於一個int值來表示狀態,並定義了獲取和修改該狀態值的原子方法,具體的同步器須要實現該抽象類,而且使用它定義的這些原子方法來操做狀態值。併發
它的實現類通常做爲待實現的同步器的靜態內部類而存在,用來提供一些方法來實現同步器的功能。框架
咱們能夠將其看做是基礎的同步器,並非具體的某一個同步器,而是同步器的一個抽象。工具
首先來看看其繼承體系:源碼分析
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {}
能夠看到它繼承了AbstractOwnableSynchronizer抽象類,這個類很簡單,咱們能夠總體來看看:ui
// 就是一個簡單的獨佔式同步器,持有被獨佔擁有的線程 public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; // 供子類調用的構造器 protected AbstractOwnableSynchronizer() { } // 表示獨佔擁有的線程,下面是其get和set方法 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
靜態內部類Node用於將要加入同步隊列的線程封裝成爲隊列節點。這個隊列採用雙向鏈表實現,支持先進先出。this
static final class Node {}
該靜態內部類被final修飾,代表做者但願其不被繼承修改。線程
static final class Node { // 兩個節點標記,用於標識節點對應的線程獲取鎖的模式,是共享式獲取,仍是獨享式獲取 static final Node SHARED = new Node();// 共享模式的節點標記 static final Node EXCLUSIVE = null;// 獨享模式的節點標記 // 四個節點狀態,其實還有一個狀態爲0-表示當前節點在同步隊列中,等待着獲取鎖 static final int CANCELLED = 1;// 表示當前節點封裝的線程被中斷或者超時 static final int SIGNAL = -1;// 表示當前節點的後繼節點須要被喚醒(unpark) static final int CONDITION = -2;// 表示當前節點位於等待隊列中,在等待條件知足 static final int PROPAGATE = -3;// 表示當前場景下後續的acquireShared可以得以執行?? // 節點狀態,其值就是上面定義的這四個狀態值再加上0 volatile int waitStatus; // 同步隊列的節點指針 volatile Node prev;// 雙向鏈表中節點指向前節點的指針 volatile Node next;// 雙向鏈表中節點指向後節點的指針 // 節點封裝的執行線程 volatile Thread thread; // 等待隊列的節點指針 Node nextWaiter;// 單向鏈表中節點指向後節點的指針 }
節點狀態:
- 0:默認狀態,表示節點是同步隊列中等待獲取鎖的線程的節點
- 1:CANCELLED,表示節點被取消,緣由多是超時或者被中斷,一旦置於該狀態,則再也不改變
- -1:SIGNAL,表示當前節點的後繼節點被阻塞(或即將被阻塞)(使用park),所以當前線程釋放鎖或者被取消執行時須要喚醒(unpark)後繼節點
- -2:CONDITION,表示當前節點位於等待隊列中,當節點被轉移到同步隊列的時候,狀態值會被更新爲0
- -3:PROPAGATE,表示持續的傳播releaseShared操做
static final class Node { Node() {} Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
三個構造器各有用處:
- Node():用戶初始化頭結點,或者建立共享標記SHARED
- Node(Thread thread, Node mode):給同步隊列添加新節點時使用,用於構造新節點
- Node(Thread thread, int waitStatus):給等待隊列添加新節點時使用,用於構造新節點
注意:上面的構造器中的mode(模式)屬於Node類型,它有兩種模式SHARED和EXCLUSIVE,分別表示共享模式和獨享模式。而waitStatus表示的是節點狀態。
static final class Node { // 校驗當前節點是不是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 獲取前置節點,必須爲非null final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } }
方法解析:
isShared方法主要用於校驗當前節點的鎖獲取模式,是共享仍是獨享,實現方式採用nextWaiter與SHARED比較,參照上面的第二個構造器的實現,咱們能夠知道在新增一個節點的時候,會對節點的nextWaiter進行賦值,而所賦的值正好是新增節點的模式標記,能夠說nextWaiter持有節點的模式標記,那麼拿其來與SHARED進行比較就是很顯然的事情了。
predecessor方法用於獲取前置節點,主要是在當前置節點不可爲null時使用,這樣當前置節點爲null,就會拋出空指針。
Condition並不是AQS中的內部類,而是其內部類ConditionObject的父接口,爲了後面的ConditionObject,咱們提早了解下Condition。
Condition是一個接口,旨在定義一系列針對獲取鎖的線程的操做,實現相似於Object類中wait/notify的功能。咱們經過其方法定義能夠明顯感受到這一點。
public interface Condition { // 使當前線程等待,知道被喚醒或者中斷,注意須要在臨界區使用,執行該方法以後該線程持有的鎖將被釋放,線程處於等待狀態 // 四種狀況下會退出等待狀態:被signal喚醒,被signalAll喚醒,被interrupt喚醒(須要當前線程能夠響應中斷),發生僞喚醒 void await() throws InterruptedException; // 使當前線程等待,直到被喚醒(不響應中斷),注意要在臨界區使用,執行該方法以後該線程持有的鎖將被釋放,線程處於等待狀態 // 三種狀況下會退出等待狀態:被signal喚醒,被signalAll喚醒,發生僞喚醒 void awaitUninterruptibly(); // 使當前線程等待,知道被喚醒或者中斷或者超時,注意須要在臨界區使用,執行該方法以後該線程持有的鎖將被釋放,線程處於等待狀態 // 五種狀況下會退出等待狀態:被signal喚醒,被signalAll喚醒,被interrupt喚醒(須要當前線程能夠響應中斷),超時,發生僞喚醒 // nanosTimeout表示當前線程要等待的時間長度 // 該方法返回一個正數表示線程被提早喚醒,返回一個負數或0表示等待超時 long awaitNanos(long nanosTimeout) throws InterruptedException; // 同上,不一樣在於上面的只能傳參爲納秒值,該方法能夠經過單位隨便傳值 boolean await(long time, TimeUnit unit) throws InterruptedException; // 使當前線程等待,知道被喚醒或者中斷或者過了截止日期,注意須要在臨界區使用,執行該方法以後該線程持有的鎖將被釋放,線程處於等待狀態 // 退出等待狀態的狀況同上,只是這裏傳參爲一個固定的時間點,線程等待到這個時間點將自動甦醒 boolean awaitUntil(Date deadline) throws InterruptedException; // 喚醒等待隊列中的一個線程,該線程從await返回時必須獲取到鎖 void signal(); // 喚醒等待隊列中的全部線程,每一個線程從await返回時必須獲取到鎖 void signalAll(); }
ConditionObject是Condition的實現類,在AQS中以普通內部類的方式存在。
ConditionObject內部維護了一個單向鏈表實現的等待隊列,隊列的節點與AQS中同步隊列的節點類型一致,均爲上面的內部類Node類型。
下面咱們來仔細看看這個類:
public class ConditionObject implements Condition, java.io.Serializable {}
該類實現了Condition接口和Serializable接口,擁有序列化功能
public class ConditionObject implements Condition, java.io.Serializable { // 序列化ID private static final long serialVersionUID = 1173984872572414699L; // 等待隊列頭結點指針 private transient Node firstWaiter; // 等待隊列尾節點指針 private transient Node lastWaiter; // 中斷模式 private static final int REINTERRUPT = 1;// 退出等待隊列時從新中斷 private static final int THROW_IE = -1;// 退出等待隊列時拋出InterruptedException異常 }
咱們能夠看到類的五個字段中除了三個靜態字段以外,剩下的兩個被transient修飾,也就是說雖然該類支持序列化,可是序列化無值。
ConditionObject中的公共方法其實就是對Condition接口中定義方法的實現,下面咱們逐個分析:
await():
public class ConditionObject implements Condition, java.io.Serializable { public final void await() throws InterruptedException { // 1-響應中斷,同時會清除中斷標記 if (Thread.interrupted()) throw new InterruptedException(); // 2-將當前線程封裝成Node節點並添加到等待隊列尾部 Node node = addConditionWaiter(); // 3-釋放當前線程所佔用的lock,在釋放的過程當中會喚醒同步隊列中的下一個節點 int savedState = fullyRelease(node); int interruptMode = 0; // 4-阻塞當前線程,直到被中斷或者被喚醒 while (!isOnSyncQueue(node)) {// 校驗當前線程是否被喚醒(是否被轉移到同步隊列),若是已喚醒則退出循環 LockSupport.park(this);// 阻塞當前線程 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)// 校驗當前線程是否被中斷 break;// 若是被中斷則退出循環 } // 5-自旋等待獲取到同步狀態(即獲取到lock) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 6-處理被中斷的狀況 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } }方法解析:
- 第一步:優先響應中斷,首先校驗當前線程是否被中斷,若是被中斷則拋出InterruptedException異常,不然下一步;
- 第二步:調用addConditionWaiter()方法,目的是將當前線程封裝成爲Node節點並添加到等待隊列的尾部,源碼以下:
public class ConditionObject implements Condition, java.io.Serializable { private Node addConditionWaiter() { Node t = lastWaiter;// 保存尾節點 // If lastWaiter is cancelled, clean out. // 若是尾節點線程被取消,則清除之 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters();// 清除等待隊列中全部的被取消的線程節點 t = lastWaiter; } // 將當前線程封裝成爲等待隊列的Node節點 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) // 若是等待隊列爲空,則將新節點做爲頭節點 firstWaiter = node; else // 不然將新節點做爲新的尾節點添加到等待隊列中 t.nextWaiter = node; // 更新尾節點指針 lastWaiter = node; return node; } }這個方法裏面除了封裝節點和添加節點以外,還有針對等待隊列進行清理的流程,主要是爲了清理被取消的線程節點
- 第三步:調用fullyRelease(node)方法,用於釋放當前線程所持有的鎖並喚醒同步隊列的下一節點,詳情可見AQS方法解析部分;
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();// 獲取同步狀態state值 // 執行release方法,嘗試釋放當前線程持有的共享狀態,並喚醒下一個線程 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } }
第四步:調用LockSupport.park(this)阻塞當前線程,一但消除被中斷後者線程被喚醒轉移到同步隊列,則退出循環,繼續下一步;
這裏涉及到一箇中斷模式的問題。中斷模式以前提到過,有兩種:REINTERRUPT和THROW_IE,分別表示針對被中斷的線程在退出等待隊列時的處理方式,前者從新中斷,後者則拋出異常。
此處interruptMode表示的就是中斷模式的值,初始賦值爲0,而後經過checkInterruptWhileWaiting(node)方法不斷的進行校驗,其源碼以下:public class ConditionObject implements Condition, java.io.Serializable { private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } }若是線程被中斷則經過方法transferAfterCancelledWait(node)判斷線程是不是在被喚醒以前被中斷,若是是則返回true,不然返回false;若是返回true則採用THROW_IN模式,不然採用REINTERRUPT模式。不管是上面的哪種模式都表明線程被中斷了,那麼此處interruptMode就再也不是0,那麼條件成立,break退出循環。除此以外transferAfterCancelledWait(node)方法不管返回true仍是false,都會將現場節點轉移到同步隊列中
- 第五步:當前線程已經被轉移到同步隊列中,而後開始自旋以獲取同步狀態,待其獲取到同步狀態(鎖)以後,返回該線程是否被中斷,若是被中斷,再根據其中斷模式進行整理,如何整理呢,主要就是若是當前中斷模式是THROW_IE模式,則保持不變,不然一概修改爲REINTERRUPT模式,以後會再次進行一次同步隊列節點清理。
第六步:最後針對不一樣的中斷模式進行中斷處理,若是是THROW_IN則拋出異常,若是是REINTERRUPT則再次進行中斷。
awaitNanos(long):
public class ConditionObject implements Condition, java.io.Serializable { public final long awaitNanos(long nanosTimeout) throws InterruptedException { // 1-優先響應中斷 if (Thread.interrupted()) throw new InterruptedException(); // 2-將當前線程封裝成Node節點並添加到等待隊列尾部 Node node = addConditionWaiter(); // 3-釋放當前線程所佔用的lock,在釋放的過程當中會喚醒同步隊列中的下一個節點 int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout;// 計算截止時間點 int interruptMode = 0; // 4-阻塞當前線程,直到被中斷或者被喚醒或者超時 // 4-1 校驗當前線程是否被喚醒,若是沒有進入循環體 while (!isOnSyncQueue(node)) { // 4-2 若是超時時間小於等於0,則表示線程當即超時,而後進行線程節點轉移處理,並結束循環 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node);// 轉移線程節點 break; } // 4-3 若是超時設置時間nanosTimeout大於等於spinForTimeoutThreshold,則進行定時阻塞當前線程 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 4-4 若是線程被中斷,則轉移線程到同步隊列,並結束循環 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 每次循環都會計算新的nanosTimeout值,而後在下次循環的時候設置阻塞的時限 nanosTimeout = deadline - System.nanoTime(); } // 5-自旋等待獲取到同步狀態(即獲取到lock) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); // 6-處理被中斷的狀況 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } }方法解析:
這個方法的流程與上面的await基本一致,只是在第4步中添加了關於超時判斷的邏輯,這裏就着重看一下這一部分,其他部分再也不贅述。
包括兩個部分的內容,第一是開始的校驗,若是設置的超時時間小於等於0,表示線程等待當即超時,而後當即轉移到同步隊列尾部,嘗試獲取鎖;第二是若是設置的超時時間大於等於spinForTimeoutThreshold的值,則將當前線程阻塞指定的時間,這個時間會隨着循環的次數不斷的減少。
另外的兩個等待方法awaitUntil(Date deadline)和await(long time, TimeUnit unit)就再也不贅述了,原理徹底一致,有一個不一樣的是awaitUninterruptibly()方法:
awaitUninterruptibly():
public class ConditionObject implements Condition, java.io.Serializable { public final void awaitUninterruptibly() { // 1-將當前線程封裝成Node節點並添加到等待隊列尾部 Node node = addConditionWaiter(); // 2-釋放當前線程所佔用的lock,在釋放的過程當中會喚醒同步隊列中的下一個節點 int savedState = fullyRelease(node); boolean interrupted = false; // 3-阻塞當前線程,直到被喚醒 while (!isOnSyncQueue(node)) { LockSupport.park(this);// 阻塞當前線程 if (Thread.interrupted()) interrupted = true; } // 4-自旋嘗試獲取同步鎖 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } }其實就是不響應中斷的等待方法,從源碼中能夠看出,雖然不響應中斷,可是仍然保存着中斷標誌。
下面就來看看喚醒的方法:
signal():
public class ConditionObject implements Condition, java.io.Serializable { public final void signal() { // 1-校驗當前線程時候獨享式持有共享鎖,若是不持有則拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;// 保存等待隊列首節點 // 2-若是隊列不爲空,則執行頭節點喚醒操做 if (first != null) doSignal(first); } private void doSignal(Node first) { do { // 3-若是等待隊列只有一個節點,則將lastWaiter更新爲null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 4-嘗試將線程節點從等待隊列轉移到同步隊列,若是成功則結束循環,若是失敗則再次判斷firstWaiter首節點是否爲null,若是不是null,則再次循環,不然結束循環 } while (!transferForSignal(first) && (first = firstWaiter) != null); } }方法解析:
- 第一步:校驗當前線程時候獨享式持有共享鎖,若是不持有則拋出異常
- 第二步:若是隊列不爲空,則執行頭節點喚醒操做
- 第三步:若是等待隊列只有一個節點(頭節點),則將lastWaiter更新爲null
- 第四步:嘗試將線程節點從等待隊列轉移到同步隊列,若是成功則結束循環,若是失敗則再次判斷firstWaiter首節點是否爲null,若是不是null,則再次循環,不然結束循環
signalAll():
public class ConditionObject implements Condition, java.io.Serializable { public final void signalAll() { // 1-校驗當前線程時候獨享式持有共享鎖,若是不持有則拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 2-若是隊列不爲空,則執行節點喚醒操做 if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null;// 要喚醒全部線程節點,那麼等待隊列就是被清空,那麼就將這兩個指針置爲null // 3-針對等待隊列中的節點一個一個進行喚醒操做 do { Node next = first.nextWaiter;// 保存二節點 first.nextWaiter = null; transferForSignal(first);// 將首節點轉移到同步隊列 first = next;// 重置首節點,將二節點做爲新的首節點 } while (first != null); } }
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe();// 注入Unsafe實例 private static final long stateOffset;// 同步狀態偏移量 private static final long headOffset;// 等待隊列的頭結點偏移量 private static final long tailOffset;// 等待隊列的尾節點偏移量 private static final long waitStatusOffset;// 節點等待狀態偏移量 private static final long nextOffset;// 節點的下級節點偏移量 static { try { // 獲取這五個字段的內存偏移量並保存到各自的字段中 stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } }
從這一部份內容能夠看出來AQS底層和ConcurrentHashMap同樣是使用CAS來實現原子操做的。
這一部分就是引入Unsafe來實現原子以上幾個字段的原子更新。知道便可。
AQS中字段很少,以下所示:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private transient volatile Node head;// 等待隊列的頭結點 private transient volatile Node tail;// 等待隊列的尾節點 private volatile int state;// 同步狀態,初始爲0,獲取鎖時會加1,釋放鎖時減1,當重入鎖時也會加1 static final long spinForTimeoutThreshold = 1000L;// 自旋時限1000納秒 }
這裏的head和tail分別指向的是同步器的同步隊列的頭結點與尾節點。這個同步隊列採用雙向鏈表實現,其節點就是以前介紹的內部類中的Node類型。
state表示同步狀態,初始爲0,表示未被持有,當其被某線程持有時,就會增長1,並且這個也是實現重入的基礎,當該線程再次獲取當前鎖時,只須要state加1便可,每釋放一個鎖,state-1,直到state等於0時,該同步鎖爲徹底釋放。
spinForTimeoutThreshold是一個內置的快速自旋時限,當設置的超時時間小於這個值的時候,無需再執行等待設置,直接進入快速自旋便可,緣由在於 spinForTimeoutThreshold 已經很是小了,很是短的時間等待沒法作到十分精確,若是這時再次進行超時等待,相反會讓nanosTimeout 的超時從總體上面表現得不是那麼精確,因此在超時很是短的場景中,AQS會進行無條件的快速自旋。
AQS中的方法能夠粗分爲四類:獲取同步狀態方法、釋放同步狀態方法、隊列檢驗方法、隊列監控方法,咱們羅列一個表格來簡單介紹下這些方法:
分類 | 序號 | 方法 | 說明 | 備註 |
---|---|---|---|---|
獲取同步狀態方法 | 1 | final void acquire(int arg) | 獨享獲取同步狀態,不響應中斷 | |
獲取同步狀態方法 | 2 | final void acquireInterruptibly(int arg) | 獨享獲取同步狀態,響應中斷 | |
獲取同步狀態方法 | 3 | final boolean tryAcquireNanos(int arg, long nanosTimeout) | 獨享獲取同步狀態,響應中斷,響應超時 | |
獲取同步狀態方法 | 4 | final void acquireShared(int arg) | 共享獲取同步狀態,不響應中斷 | |
獲取同步狀態方法 | 5 | final void acquireSharedInterruptibly(int arg) | 共享獲取同步狀態,響應中斷 | |
獲取同步狀態方法 | 6 | final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 共享獲取同步狀態,響應中斷,響應超時 | |
釋放同步狀態方法 | 7 | final boolean release(int arg) | 獨享釋放同步狀態 | |
釋放同步狀態方法 | 8 | final void acquireShared(int arg) | 共享釋放同步狀態 | |
隊列檢驗方法 | 9 | final boolean hasQueuedThreads() | 校驗同步隊列中是否有線程在等待獲取同步狀態 | |
隊列檢驗方法 | 10 | final boolean hasContended() | 校驗是否有線程爭用過此同步器(同步隊列是否爲空) | |
隊列檢驗方法 | 11 | final boolean isQueued(Thread thread) | 校驗給定線程是否在同步隊列之上 | |
隊列檢驗方法 | 12 | final boolean hasQueuedPredecessors() | 校驗是否有線程等待獲取同步狀態比當前線程時間長(同步隊列中是都有前節點) | |
隊列檢驗方法 | 13 | final boolean owns(ConditionObject condition) | 校驗給定的condition是不是使用當前同步器做爲鎖 | |
隊列檢驗方法 | 14 | final boolean hasWaiters(ConditionObject condition) | 校驗等待隊列是否有等待線程 | |
隊列監控方法 | 15 | final int getWaitQueueLength(ConditionObject condition) | 獲取等待隊列中線程數量 | |
隊列監控方法 | 16 | final Collection
|
獲取等待隊列中等待線程的集合 | |
隊列監控方法 | 17 | final Thread getFirstQueuedThread() | 獲取同步隊列中的頭節點線程 | |
隊列監控方法 | 18 | final int getQueueLength() | 獲取同步隊列中線程數量 | |
隊列監控方法 | 19 | final Collection
|
獲取同步隊列中線程的集合 | |
隊列監控方法 | 20 | final Collection
|
獲取同步隊列中欲獨享獲取同步狀態的線程集合 | |
隊列監控方法 | 21 | final Collection
|
獲取同步隊列中欲共享獲取同步狀態的線程集合 |
這些方法中重點就是獲取同步狀態方法和釋放同步狀態方法,下面咱們也重點就看下這些個方法的實現:
該方法表示獨享式獲取同步狀態,但不響應中斷,源碼以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } }
該方法中調用了四個方法來完成功能,依次爲:
解析:首先嚐試獨享式獲取同步狀態,若是獲取到了就結束,
若是未獲取到則將線程封裝成爲Node節點並添加到同步隊列尾部,而後自旋以獲取同步狀態,
一旦獲取到同步狀態,退出自旋,並返回當前線程在自旋期間是否被中斷過,若是被中斷過則再次自我中斷,
爲何須要再次自我中斷呢,這只是爲了保留中斷現場,由於在自旋結束進行中斷校驗時使用的是Thread.interrupted(),
該方法會致使中斷狀態被清除。
tryAcquire方法是一個模板方法,須要在AQS的子類中實現,默認的實現只是拋出了一個異常
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } }
addWaiter方法源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private Node addWaiter(Node mode) { // 將當前線程與同步狀態獲取模式封裝成爲Node節點 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 嘗試快速進行一次enq操做,將新節點設置爲同步地列尾節點, // 若是成功會結束方法但若是不成功,能夠由下面的enq方法來執行, // 這個enq方法能夠經過無限循環的方法直到執行成功 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 將新節點添加到同步隊列中 enq(node); return node; } // 將新節點添加到同步隊列中 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 這一步主要是針對同步隊列未初始化時進行的初始化操做,初始化完成後下次循環就會執行新節點的添加操做 if (compareAndSetHead(new Node())) tail = head; } else { // 將以前的尾節點設置爲新節點的前節點,而後原子更新尾節點爲新節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } }
解析:很明顯上面的addWaiter方法中出現了添加新節點到同步隊列的邏輯,而在以後的enq方法中再次出現,
主要目的就是爲了能在執行enq方法以前能夠先進行一次嘗試,看可否一次執行成功,若成功,則皆大歡喜,
沒必要走下面的邏輯,若不成功,再走enq方法,來經過無限循環的方式強制執行成功。因此前面的邏輯能夠當作是一次簡單的enq操做。
acquireQueued方法源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { final boolean acquireQueued(final Node node, int arg) { boolean failed = true;// 默認失敗 try { boolean interrupted = false;// 中斷標記 for (;;) {// 無限循環以自旋 final Node p = node.predecessor();// 獲取前置節點 // 若是前節點是頭節點,而且當前線程獲取同步狀態成功,則將當前節點置爲頭節點 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC,這裏去除之前的節點對當前節點的引用,當前節點對象再也不被使用後能夠被GC清理 failed = false;// 表示成功 return interrupted; } // 若是前置節點不是頭節點,或者當前節點線程未獲取到同步狀態,則將嘗試將前置節點狀態更新爲SIGNAL,並阻塞當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } }
解析:以無限循環的方法自旋,每次循環都嘗試獨享式獲取同步狀態,若是獲取到了同步狀態,
那麼將當前節點置爲頭節點;若是前置節點不是頭節點或者未獲取到同步狀態則嘗試將前置節點的狀態更新爲SIGNAL,並阻塞當前線程(park),
這種狀況下,當前線程須要被喚醒才能繼續執行,當被喚醒以後能夠再次循環,嘗試獲取同步狀態,若是不成功,將會再次阻塞,等待再次被喚醒。
AbstractQueuedSynchronizer方法源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;// 獲取前置節點的狀態 if (ws == Node.SIGNAL) // 表示後置線程節點(當前節點須要被喚醒) return true; if (ws > 0) { // 表示前置節點線程被取消,那麼清理被取消的線程節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 嘗試將前置節點的狀態置爲SIGNAL,只有置爲SIGNAL以後才能返回true. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } }
解析:這個方法主要目的就是爲了將前置節點狀態置爲SIGNAL,這個狀態意思是它後面的那個節點被阻塞了,
須要被喚醒,可見這個狀態就是一個標記,標記着後面節點須要被喚醒。
parkAndCheckInterrupt方法源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private final boolean parkAndCheckInterrupt() { LockSupport.park(this);// 阻塞當前線程 return Thread.interrupted(); } }
解析:一旦線程執行到這一步,那麼當前線程就會阻塞,後面的return暫時就不會執行。只有在被喚醒以後才能接着返回中斷校驗的結果。
總結:acquire方法首先嚐試獨享式獲取同步狀態(tryAcquire),獲取失敗的狀況下須要將當前線程封裝成爲一個Node節點,
而後首先嚐試將其設置爲同步隊列的爲節點,若是失敗,則自旋直到成功爲止,而後進行自旋判斷當前節點是否第二節點,若是是,
則嘗試獲取同步狀態,若是成功,將當前節點置爲頭節點;不然若是當前節點不是第二節點,或者獲取同步狀態失敗,
則將前置節點狀態置爲SIGNAL,而後阻塞(park)當前線程,等待被喚醒,喚醒以後會重複自旋,判斷節點是否第二節點和嘗試獲取同步狀態,
若是還不成功,那麼就再次阻塞...
該方法表示獨享式獲取同步狀態,響應中斷,源碼以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireInterruptibly(int arg) throws InterruptedException { // 中斷校驗,會清除中斷狀態 if (Thread.interrupted()) throw new InterruptedException(); // 嘗試獨享式獲取同步狀態,若是失敗則嘗試中斷的獲取。 if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // 中斷的獲取同步狀態 private void doAcquireInterruptibly(int arg) throws InterruptedException { // 首先將當前線程封裝成爲Node節點,並保存到同步隊列尾部 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 自旋,邏輯桶acquire for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
解析:一開始就進行中斷校驗,若是未被中斷則嘗試獨享式獲取同步狀態,獲取失敗後則封裝線程爲Node節點並保存到同步隊列,而後自旋,邏輯與acquire種的acquireQueued方法邏輯一致,再也不贅述。
該方法表示獨享式獲取同步狀態,響應中斷,響應超時,源碼以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 首先響應中斷,進行中斷校驗,若被中斷,拋出異常 if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);// 超時獲取 } // 超時獲取 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 若是超時時間小於等於0,則直接超時,返回false if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout;// 計算截止時間點 final Node node = addWaiter(Node.EXCLUSIVE);// 封裝線程節點,並添加到同步隊列 boolean failed = true; try { for (;;) {// 自旋 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime();// 計算剩餘超時時間 // 若是剩餘超時時間小於等於0,這說明超時,返回false if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) &&// 將前置節點狀態置爲SIGNAL nanosTimeout > spinForTimeoutThreshold)// 剩餘超時時間大於快速自旋時限(1000納秒) LockSupport.parkNanos(this, nanosTimeout);// 限時阻塞當前線程,超時時間爲剩餘超時時間 // 再次響應中斷,進行中斷校驗,若被中斷直接拋出異常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
spinForTimeoutThreshold:這是系統內置的一個常量,設置爲1000納秒,這是一個很短的時間,若是要阻塞的剩餘時間小於這個值,就沒有必要再執行阻塞,直接進入快速自旋過程。
解析:總體邏輯基本與前面的兩種相似,不一樣之處在於增長了針對超時時間處理的邏輯。
與acquireInterruptibly相似,一開始就進行中斷校驗,若被中斷則拋出異常,不然嘗試獨享式獲取同步狀態,
獲取成功,則返回true,若是獲取失敗,則將線程封裝成Node節點保存到同步隊列,而後計算截止時間點(當前時間+超時時間),
而後開始自旋,自旋的邏輯中前半部分與以前相同,只有在前置節點不是頭節點或者獲取同步狀態失敗的狀況下邏輯發生了改變,
先計算剩餘超時時間nanosTimeout(截止時間點-當前時間),而後將前置節點的狀態置爲SIGNAL,判斷剩餘超時時間是否大於
spinForTimeoutThreshold,若是大於則限時阻塞當前線程,不然快速自旋便可。
該方法表示共享式獲取同步狀態,不響應中斷,源碼以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } }
解析:首先嚐試共享式獲取同步狀態,若是獲取失敗(返回負值),則執行doAcquireShared方法。
tryAcquireShared方法源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } }
該方法是一個模板方法,須要子類來完善邏輯。但大體意義以下,若是獲取失敗返回負數(-1),若是是該同步狀態被首次共享獲取成功,返回0,非首次獲取成功,則返回正數(1)
doAcquireShared方法源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private void doAcquireShared(int arg) { // 將線程封裝成功節點,保存到同步隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) {// 自旋 final Node p = node.predecessor();// 獲取前置節點 if (p == head) { // 若是前置節點爲頭節點 int r = tryAcquireShared(arg); if (r >= 0) { // 若是成功獲取到同步狀態,則將當前節點置爲頭節點,並進行傳播喚醒 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 若是前置節點非頭節點或者獲取同步狀態失敗,則將前置節點設置爲SIGNAL,而後阻塞當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 預存原始頭節點 setHead(node);// 將當前節點置爲頭節點 // propagate可爲0或1,0表示同步狀態被首次獲取,1表示被屢次獲取 // h爲原始頭節點 // head爲新頭節點 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next;// 獲取下級節點s // 若是後繼節點不存在或者後繼節點是共享式的,則喚醒後繼節點 if (s == null || s.isShared()) doReleaseShared();// 喚醒後繼節點 } } }
解析:該方法的邏輯相對於acquireQueued只是稍有變更,大體意思是相同的。不一樣之處在於此處涉及到一個傳播(Propagate)。
所謂的傳播,實際上是在當前節點共享式獲取到同步狀態以後,檢查其後置節點是否也是在等待共享式獲取同步狀態,如果,則將喚醒其後置節點。
doReleaseShared源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private void doReleaseShared() { for (;;) {// 自旋 Node h = head;// 獲取頭節點 if (h != null && h != tail) {// 若是隊列中存在多個節點的話 int ws = h.waitStatus;// 頭節點狀態ws // 若是頭節點狀態爲SIGNAL,則將其 if (ws == Node.SIGNAL) {// 說明其後繼節點線程被阻塞,須要喚醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 首先將頭節點狀態重置爲0 continue;// 若是重置頭節點狀態操做失敗則重試 unparkSuccessor(h);// 而後進行後繼節點喚醒 } // 若是頭節點狀態爲0,則將其狀態更新爲PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;// 頭節點更新操做失敗則重試 } if (h == head) break;// 頭節點發生變化則退出自旋 } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next;// 獲取後繼節點s if (s == null || s.waitStatus > 0) { // 若是s爲null或者其狀態爲取消,則從後遍歷隊列節點,找到node節點以後的首個未被取消的節點t,賦給s s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);// 執行s節點線程的喚醒操做 } }
解析:doReleaseShared方法被兩處調用,一爲此處,另外一爲releaseShared方法,這個是用來共享式釋放同步狀態的方法。
doReleaseShared方法的做用就是爲了喚醒後繼節點,主要邏輯以下:首先獲取頭節點的狀態ws,若是ws是SIGNAL,
表示後繼節點須要被喚醒,而後自旋將頭節點狀態更新爲0,並執行後繼節點喚醒操做,這裏要確保喚醒的是頭節點以後首個
未被取消的線程節點,喚醒以後,後繼節點的線程開始繼續執行,當前線程也繼續執行;若是ws是0,則將頭節點的狀態更新爲PROPAGATE,
來確保同步狀態能夠順利傳播(由於若是ws爲SIGNAL,會自動喚醒下一個節點,而0則不會,全部將其更新爲PROPAGATE,表示共享式獲取的傳播)
被喚醒的線程會重置頭節點,一旦重置,當前線程在最後校驗頭節點那一步就會成功,而後執行break退出自旋。通常來講這裏喚醒的主要目的是爲了喚醒一個共享式獲取同步狀態的線程節點,它會直接獲取到同步狀態;但也存在特殊狀況,好比
這個節點線程被取消了,致使喚醒了一個獨享式獲取的線程節點,那麼在這個線程被喚醒後嘗試獨享式獲取同步狀態的時候會獲取不到
(由於同步狀態被共享式獲取的線程持久,並且多是多個)從而再次進入阻塞。其實喚醒的主要來源仍是靠同步狀態釋放操做來發起的。
該方法表示共享式獲取同步狀態,響應中斷,源碼以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 首先響應中斷 if (Thread.interrupted()) throw new InterruptedException(); // 嘗試共享式獲取同步狀態,失敗則執行doAcquireSharedInterruptibly方法 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 可中斷的共享式獲取同步狀態 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 首先封裝線程節點,保存到同步隊列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {// 自旋 final Node p = node.predecessor();// 獲取前置節點 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 若是發生了中斷則拋出異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
解析:這個方法與acquireShared幾乎一致,僅僅是在處理中斷的問題上有點區別,因此再也不贅述。
該方法表示共享式獲取同步狀態,響應中斷,響應超時,源碼以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { // 若是超時時間小於等於0,則直接超時,返回false if (nanosTimeout <= 0L) return false; // 計算超時截止時間點(當前時間+超時時間) final long deadline = System.nanoTime() + nanosTimeout; // 封裝節點並保存隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {// 自旋 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } // 計算剩餘的超時時間 nanosTimeout = deadline - System.nanoTime(); // 若是剩餘超時時間小於等於0,直接超時,返回false if (nanosTimeout <= 0L) return false; // 將前置節點置爲SIGNAL,而後校驗剩餘超時時間,若是不足spinForTimeoutThreshold,則進入快速自旋,不然執行阻塞 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 再次響應中斷 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
解析:基本雷同,能夠參考共享式獲取同步狀態的方法和獨享式響應中斷超時的獲取方法。
該方法表示獨享式釋放同步狀態,源碼以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean release(int arg) { // 首先嚐試獨享式釋放同步狀態 if (tryRelease(arg)) { Node h = head;// 頭節點 // 頭節點存在且狀態不爲0,則喚醒其後繼節點 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } // 釋放失敗返回false return false; } }
解析:首先調用tryRelease來嘗試獨享式釋放同步狀態,若是成功,則根據頭節點的狀態來決定是否喚醒後繼節點,
頭節點爲0則不喚醒。喚醒操做經過調用unparkSuccessor方法來實現,具體邏輯以前已有描述,這裏總結一下:
其實就是喚醒頭節點以後的首個未被取消的節點線程,這個線程多是獨享式的也多是共享式的。
tryRelease源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } }
解析:tryRelease方法是一個模板方法,一樣須要子類來實現。
該方法表示共享式釋放同步狀態,源碼以下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final boolean releaseShared(int arg) { // 嘗試共享式釋放同步狀態,成功後喚醒後繼節點 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } }
解析:很簡單,其中的doReleaseShared方法咱們也瞭解了。
tryReleaseShared源碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } }
解析:和前面的那幾個模板方法同樣,須要子類來實現。
剩下的方法都是一些校驗和監控的方法,並不涉及重點邏輯,再也不贅述,下面作一個總結
總結:
- AQS同步器內部維護了一個底層爲雙向鏈表的同步隊列,用於保存那些獲取同步狀態失敗的線程。每一個AQS同步器還能夠關聯多個Condition,其中每一個Condition內部維護了一個底層爲單向鏈表的等待隊列,用於保存那些基於特定條件而陷入等待的線程。
- 內部類Node描述的是同步隊列和等待隊列中節點的類型。節點有兩點須要注意,那就是節點的模式與狀態
- 節點模式:
- EXCLUSIVE:獨享式
- SHARED:共享式
- 節點狀態:
- 0:初始狀態,該狀態下不會喚醒後繼節點
- CANCELLED(1):取消狀態,節點線程被中斷或超時
- SIGNAL(-1):喚醒狀態,表示該節點的後繼節點被阻塞,須要喚醒
- CONDITION(-2):表示當前節點位於等待隊列中,在等待條件知足
- PROPAGATE(-3):表示共享式獲取同步狀態的傳播
- 內部類ConditionObject是Condition的實現類,做爲附着在同步器上的一個功能,可用可不用;它提供了一些方法來執行等待和喚醒操做:
- 等待操做:
- await():響應中斷
- awaitNanos(long):響應中斷,響應超時
- awaitUninterruptibly():不響應中斷,不響應超時
- 喚醒操做:
- signal()
- signalAll()
- AQS同步器提供了多個方法歷來輔助實現同步狀態的獲取與釋放:
- 獨享式獲取:
- acquire(int):不響應中斷,不響應超時
- acquireInterruptibly(int):響應中斷
- tryAcquireNanos(int, long):響應中斷,響應超時
- 獨享式釋放:
- release(int)
- 共享式獲取:
- acquireShared(int):不響應中斷,不響應超時
- acquireSharedInterruptibly(int):響應中斷
- tryAcquireSharedNanos(int, long):響應中斷,響應超時
- 共享式釋放:
- releaseShared(int)
參考: