AQS是JUC包中許多類的實現根基,這篇文章基於我的理解的前提下完成,因此在結構上跟其餘AQS文章有些差別。html
tips:若是隻是想看AQS的實現的話能夠從第三節開始看,前面只是講結構和使用java
在開始瞭解AQS
以前,先看下AQS
的內部結構,這樣在看實現代碼的時候至少有一個總體的概念,重點要記住的是Node
類幾種狀態的做用,其餘結構有個概念就行。node
如上,在
AQS
中大體有:git
state
變量:內部維護了一個volatile
修飾的資源變量state
,能夠簡單的理解爲鎖,拿到資源就是拿到鎖。同步隊列(CLH)
:全部關於資源的搶奪都是在這個隊列中發生的;在AQS
中只存了頭尾節點,本質上就是一種雙向鏈表,隊列爲先進先出隊列(FIFO),也就是說對於資源state
的爭奪都是按照隊列中的順序來的,另外能參與資源爭奪的隊列只有有效的節點(節點狀態不爲取消
或者同步
)。等待隊列
:跟同步隊列相似,只有頭尾節點,不一樣是的其在一個內部類ConditionObjet
中,也就是說一個ConditionObject
對象就是一個等待隊列,因此容許有多個。處於等待隊列中的節點不會參與資源的競爭,其狀態爲CONDITION
,當節點被標記爲CONDITION
時(await
方法)其會從同步隊列中移除,加入對應的等待隊列,而若是等待隊列中的節點被喚醒(例如調用condition.signalAll()
)時會節點從新被放入同步隊列尾部參與資源的競爭(ReentrantLock
按組喚醒線程的實現原理就是這個)。
在AQS
中,內部類有兩個:Node
和ConditionObject
。Node
是隊列的實現根基,裏面存放了許多重要的信息,如操做的線程、線程競爭的狀態(特別重要)等;而ConditionObject
則是Condition
接口的實現類,用來實現喚醒指定線程組的(等待隊列)。設計模式
關係以下圖(下方的Waiter
節點也是Node
節點,這裏爲了便於區分取名不一樣):框架
Node內部類:
AQS
兩個隊列的實現節點。ide
waitStatus
:節點狀態,取值爲-3~1
(整數)。當狀態爲1
時表示沒用了,其餘狀態表示是有用的。
0
:初始狀態或者不表明任何意義時的取值。測試
SIGNAL(-1)
:這個狀態通常由下一個節點來設置,表明的意思是當前節點在釋放了資源後將後續節點的線程喚醒。(大白話就是後續節點拜託前方的大哥東西用完了叫他,他先去睡會兒)ui
CONDITION(-2)
:表示節點處於等待隊列中,等待隊列中的節點不會參與資源競爭,必須從等待隊列出來後從新加入同步隊列才能參與競爭。this
PROPAGATE(-3)
:在共享模式的時候用到。共享模式下,不只只是喚醒下個節點,還可能喚醒下下個節點(根據當前剩餘資源state
的值可否知足最近節點的需求決定)。
CANCELLED(1)
:表示該節點沒用了,多是等過久了,也多是其餘緣由,總之就是廢了,處於該狀態的節點不會再改變,因此AQS
中常常會判斷節點狀態是否大於0
來檢查節點是否還有用。
thread
:爭奪資源的線程,存放在節點當中。prev
:同步隊列中的上一個節點。next
:同步隊列的下一個節點。nextWaiter
:下一個等待節點,用來實現等待隊列。
如今對AQS
有了模模糊糊的瞭解,來看看要如何使用這個框架。其採用模板設計模式實現,定義了許多頂級方法如acquire
、release
等,這些方法子類不能重寫可是能夠調用,而要正確的使用這些方法則要按照其要求重寫一些方法如tryAcquire
(頂級方法內部調用了開放方法)。
能夠重寫的方法有tryAcquire
、tryRelease
、tryAcquireShared
、tryReleaseShared
、isHeldExclusively
共五種,每一個方法裏面沒有具體的實現,反而是直接拋出了異常,可是不必定要所有重寫,比方說只重寫tryAcquire
、tryRelease
則表示要實現的是獨佔模式的鎖。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
這些方法表示嘗試去獲取資源或者釋放資源,其實現必需要跟state
資源狀態相關,舉個例子,tryAcquire
方法表示以獨佔的方式嘗試獲取資源,若是獲取到了那麼其餘線程不得操做其資源,其中入參的arg
則表示想要獲取到的資源數量,例如我tryAcquire(5)
成功了,那麼狀態變量state
變量則增長5
,若是tryRelease(5)
成功則state
狀態變量減小5
,等到state==0
的時候則表示資源被釋放,便可以理解爲鎖被釋放。
若是隻是使用AQS
的話,再加上幾個變動狀態的方法就能夠了,咱們不須要了解更多的東西,如同AQS
的文檔給出的案例通常,簡單的重寫幾個方法即可以實現一種鎖,以下,一個不可重入鎖的簡單實現。
class Mutex implements Lock, java.io.Serializable { // 同步內部類,鎖的真正操做都是經過該類的操做 private static class Sync extends AbstractQueuedSynchronizer { // 檢查當前是否已經處於鎖定的狀態 protected boolean isHeldExclusively() { return getState() == 1; } // 若是資源變量爲0,則獲取鎖(資源) public boolean tryAcquire(int acquires) { // acquires的值只能是1,不然的話不進入下面代碼 assert acquires == 1; if (compareAndSetState(0, 1)) { // 設置持有當前鎖的線程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 經過將狀態變量state設定爲0來表示鎖的釋放 protected boolean tryRelease(int releases) { // 傳入的參數只能是1,不然是無效操做 assert releases == 1; // 若是狀態狀態等於0,說明不是鎖定狀態 if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // 提供Condition,返回其AQS內部類ConditionObject Condition newCondition() { return new ConditionObject(); } // 反序列化 private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // 內部類已經實現了全部須要的方法,咱們只要封裝一層就行 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
進行一個小測試
public static void main(String[] args) { Lock lock = new Mutex(); new Thread(() -> { lock.lock(); try { System.err.println("得到鎖線程名:" + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(3); System.err.println("3秒過去...."); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.err.println(Thread.currentThread().getName() + "釋放鎖"); } }).start(); new Thread(() -> { lock.lock(); try { System.err.println("得到鎖線程名:" + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(3); System.err.println("3秒過去...."); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.err.println(Thread.currentThread().getName() + "釋放鎖"); } }).start(); }
最終的結果圖以下
這樣就實現了一個不可重入鎖,是否是看起來很簡單?
首先要先明白的是AQS
分爲兩種模式——獨佔模式和共享模式。通常來講只會用到其中一種,兩種模式的資源競爭都是在同步隊列中發生的,不要跟等待隊列混淆。
獨佔模式:每次只能容許一個節點獲取到資源,每次釋放資源以後也只會喚醒後驅節點。
共享模式:每次能夠容許多個節點按照順序獲取資源,每次釋放頭節點資源後可能會喚醒後驅的後驅。(下方講實現的時候有解釋)
來看acquire
方法(若是講的不是容易讓人理解,能夠結合後方的流程圖一塊兒),ReentrantLock
中lock
就是這個方法,能夠類比理解。
在看代碼須要明確知道的是,tryAcquire
和tryRelease
這些操做纔是對資源的獲取和釋放,AQS
中的頂級方法如acquire
的做用只是對資源獲取操做以後的處理。
// 代碼邏輯不復雜,首先嚐試獲取資源,若是成功則直接返回,失敗則加入同步隊列爭奪資源 public final void acquire(int arg) { // 嘗試得到鎖,若是失敗了則增長節點放入等待隊列中 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
能夠看到總體的方法十分簡單,就在一個if條件中調用了3
個方法,tryAcquire
就不說了,先說下addWaiter
作了什麼,addWaiter
方法將當前線程封裝成一個節點放入同步隊列的尾部,若是失敗就不斷的嘗試直到成功爲止,其方法代碼以下。
private Node addWaiter(Node mode) { // 將當前線程封裝入一個節點之中,mode表明共享模式仍是獨佔模式 Node node = new Node(Thread.currentThread(), mode); // 首先嚐試一次快速的尾隨其後,若是失敗的話則採用正常方式入隊 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 入隊操做 enq(node); return node; }
再看下正常的入隊操做
private Node enq(final Node node) { // 自旋 for (;;) { Node t = tail; // 若是同步隊列是空的話則進行隊列的初始化 if (t == null) { // 這裏注意初始化的時候head是一個新增的Node,其waitStatus爲0 if (compareAndSetHead(new Node())) tail = head; } else { // 不然的話嘗試設置尾節點,失敗的話從新循環 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
能夠看出正常入隊比快速入隊也就多出來了自旋和初始化操做,其餘的大體邏輯都是類似的。再看看acquire中的另外一個方法acquireQueued
。
final boolean acquireQueued(final Node node, int arg) { // 默認獲取失敗 boolean failed = true; try { /* * 線程打斷標識,咱們知道使用interrupt()方法只是改變了線程中的打斷標識變量, * 並不能打斷正在運行的線程,而對於這個打斷變量的處理通常有兩種方式, * 一種是記錄下來,一種是拋出異常,這裏選擇前者,而可打斷的acquire則是選擇後者 */ boolean interrupted = false; // 自旋 for (;;) { // 拿到前驅節點 final Node p = node.predecessor(); // 若是前驅節點爲頭節點則嘗試一次獲取 // 再次強調下,獲取資源的操做是在tryAcquire中 if (p == head && tryAcquire(arg)) { // 設置當前節點爲頭節點,而後設置prev節點爲null setHead(node); p.next = null; // help GC failed = false; // 返回中斷標識 return interrupted; } // 獲取資源失敗了,判斷當前線程的節點是否應該休息 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 若是是由於中斷被喚醒的,要記錄下來,以後acquire方法要補上中斷 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 看看是否應該去休息這個方法中作了啥 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 若是前節點狀態爲SIGNAL,那麼表示能夠安興去休息了,到了時候前驅節點會叫醒你的,返回true if (ws == Node.SIGNAL) return true; if (ws > 0) { /* * 狀態大於0,則表示節點已經取消做廢,那麼須要一直往前找直到找到有效的節點 * 這時還不能去休息,要是前驅節點是頭結點又剛好頭結點釋放了資源,那麼你不就 * 不用掛起就能夠拿到資源了,因此返回false,再循環一次 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 其餘狀況則表示前驅節點有效,將前驅節點狀態設置尾SIGNAL,表示麻煩他到時候 * 叫醒你。這裏還不能夠去休息,由於有可能前驅節點恰好變成了頭結點又恰好執行完 * 釋放了資源,這時去休息豈不是虧了,因此返回false */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 若是上面的方法判斷須要休息,那麼將線程掛起 private final boolean parkAndCheckInterrupt() { // 使用park方法將線程掛起 LockSupport.park(this); // 在上面咱們提到線程的打斷標識,interrupted()方法返回後會重置這個標識 return Thread.interrupted(); }
光看代碼可能有點繞(整個流程能夠看下方的流程圖),從新理一下邏輯:
首先明確這個方法是不斷自旋不會退出的,除非成功拿到資源,若是拿不到資源就掛起等待。(不考慮特殊狀況)
整個流程的邏輯:
- 判斷前驅節點是否爲頭節點,若是是頭節點則嘗試獲取資源,成功了返回中斷標識,失敗了進行
2
。使用前驅節點判斷的緣由是由於頭結點 不會進到這個方法來;不是頭結點還要去獲取資源是由於要是在這個過程當中恰好頭結點釋放了資源,那麼你就不用再去掛起傻傻等待了,節省了系統資源消耗。- 進入
shouldParkAfterFailedAcquire
方法,這個方法的做用就是判斷你當前這個線程能不能去休息(掛起),而能夠去休息的標誌就是前驅節點的狀態爲SIGNAL
,這個狀態表明前驅節點釋放資源後會喚醒你。
- 1 判斷前驅節點狀態是否爲
SIGNAL
,若是是直接返回true
,能夠去休息了- 2 若是前驅節點狀態
>0
,表示做廢,那麼將一直往前找直到找到一個有效的節點,而後進行鏈接,這時還不能去休息,要是前驅節點是頭結點呢是吧,因此返回false
。也就是在這個階段中清理了同步隊列中那些沒用的節點,由於他們引用斷了,以後GC
會回收它們。- 3 將前驅節點的狀態設置爲
SIGNAL
,表示你準備去休息了要麻煩他叫醒你,而後先別休息,要是前驅節點這時候變成了頭結點又進行了資源釋放,那就能夠省去掛起的操做直接獲取資源了,因此要再循環一次看看,返回false
。- 根據是否應該休息方法
shouldParkAfterFailedAcquire
的結果判斷是否把線程掛起,若是返回true
那麼執行parkAndCheckInterrupt
方法把線程掛起,若是是false
那麼則再循環一次。parkAndCheckInterrupt
方法的做用是掛起線程,而後醒來的時候返回是否是由於被中斷而醒來的,若是是的話,那麼將interrupted
字段賦值爲true
,在整個acquire
方法結束的時候會根據這個標識來決定是否進行線程的自我中斷。
再回來看下acquire
方法
public final void acquire(int arg) { if (!tryAcquire(arg) && // 根據返回的中斷標識決定是否執行下方的自我中斷 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
整個acquire
的流程大體爲
獨佔式獲取資源的主要方法差很少就是這樣,還有可打斷的獨佔式獲取方法acquireInterruptibly
,代碼以下,其實現基本相同,只是對於咱們方纔說的打斷標識的處理從記錄改爲了拋出異常,因此纔是可打斷的,有興趣能夠本身再看下,基本邏輯相同,看起來也就耗費不了多少時間。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 拋出異常處理 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
瞭解完獲取資源天然知道釋放資源的過程,相對來講釋放資源要相對容易一些,大體邏輯爲嘗試釋放資源,若是成功了,則改變節點的狀態而且喚醒下一個可用節點(通常是下一個,可是可能出現下一個節點已經被取消的狀況)
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 修改線程的狀態,而且喚醒下一個節點進行資源競爭 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { // 改變節點狀態 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 喚醒下一個可用節點,通常來講是下一個節點,可是可能出現下個節點被取消 * 或者爲空的狀況,這個時候就要從尾結點向前遍歷直到找到有效的節點(從尾節點向前遍歷 * 是由於不管下個節點是空仍是取消的節點,正向遍歷都不可能走得通了,取消的節點的next * 就是其自己,因此只能從後面開始往前遍歷) */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 找到下個節點以後將其喚醒 if (s != null) LockSupport.unpark(s.thread); }
release
的流程圖以下:
在上面咱們講的都是獨佔模式的獲取和釋放處理,那接下來看看共享模式是怎麼實現的。首先理解AQS
中共享模式的概念,其表明資源能夠被隊列中的多個節點按照順序得到,什麼意思呢?
舉個例子,咱們設置資源變量爲3(state=3)
,首先頭結點使用tryAcquireShared(1)
獲取到了一個資源,那麼還剩下2
個,這兩個能夠給頭結點的後驅節點使用,若是後驅節點的需求是2
那麼獲取成功並將本身設置爲頭結點同時斷開跟原頭結點的鏈接,可是若是需求是3
的話則進入等待狀態直到可獲取的資源量達到其要求爲止,這時就算後續的需求量是1
也不會給後續節點,這就是按照順序得到的意思。例子圖以下:
okay,那來看下共享模式下的實現,先看acquireShared
方法:判斷資源是否獲取成功,是的話直接結束,不是的話進入隊列進行資源競爭。須要注意的是tryAcquireShared
返回值的語義:負值表明失敗,其餘表明成功而且當前還可獲取的資源量。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
看看doAcquireShared
作了什麼
// 仍是強調一次,這些方法只是善後處理,資源的獲取仍是在tryAcquireShared方法 private void doAcquireShared(int arg) { /* * 整個流程跟acquire方法有些相似,不一樣點是其獲取到資源後 * 會喚醒後驅線程 */ // 加入隊列尾,再也不贅述 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 一樣記錄一個打斷標識 boolean interrupted = false; for (;;) { // 前驅節點 final Node p = node.predecessor(); if (p == head) { // 若是前驅節點是頭結點,那麼嘗試一次獲取資源,根據其返回的值判斷執行不一樣操做 int r = tryAcquireShared(arg); if (r >= 0) { // 非負值表明資源獲取成功,將本身設爲頭結點後喚醒後驅節點爭取資源 setHeadAndPropagate(node, r); p.next = null; // help GC // 跟acquire不一樣的是,其補打斷的地方在方法內層,再也不放外面 if (interrupted) selfInterrupt(); failed = false; // 處理結束後就退出了 return; } } // 這裏跟acquire同樣,判斷是否能夠休息,休息後被喚醒後補充interrupted標識 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
看看獲取資源成功後對後續節點的操做
/** * @param node 當前節點 * @param propagate 當前剩餘的資源量 */ private void setHeadAndPropagate(Node node, int propagate) { // 記錄原頭結點 Node h = head; // 注意這裏設置頭結點的變化,這裏要結合3.3一開始的例子圖來理解 /** setHead方法體: * head = node; * node.thread = null; * node.prev = null; */ setHead(node); // 此時頭結點已經變爲當前節點了 /* * 存在如下三種狀況時喚醒當前節點後驅節點 * 1.剩餘資源量>0 * 2.node的原前驅節點(即原頭節點)釋放了資源, == null表示釋放完被回收了,<0則表示PROPAGATION * 狀態,釋放以後會將節點狀態設置爲PROPAGATION * 3.頭結點可能再次發生了改變而且也釋放了資源(競爭激烈的時候發生) */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 叫醒後續節點爭奪資源,這個方法是釋放方法的主要方法,放在下節講 doReleaseShared(); } }
okay,到這裏就是共享模式的acquireShared
方法,總結一下邏輯:
- 嘗試獲取鎖是否成功,是則結束,不然進入2
- 同acquire同樣先來個自旋,判斷前驅節點是否爲頭結點,不是的話掛起線程等待喚醒,是的話進入3
- 嘗試獲取資源,成功了喚醒後續線程,方法結束;失敗了掛起線程等待喚醒
線程被喚醒後重復2操做,如下是流程圖:
直接上代碼吧
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 這個方法理解爲喚醒,不要理解爲釋放資源 doReleaseShared(); return true; } return false; }
看看喚醒方法作了啥子
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 根據節點狀態判斷執行什麼操做 if (ws == Node.SIGNAL) { /* * 若是是SIGNAL那麼表示其後驅節點處於掛起的狀態 * 使用CAS改變狀態後喚醒後驅節點,失敗則再次循環(說明被其餘線程先執行了該方法) */ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 喚醒線程,前面已經說過,再也不贅述 unparkSuccessor(h); } // 將當前節點設置爲PROPAGATE,失敗則再次循環 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 若是頭節點改變了,說明喚醒操做是其餘線程作的,此時要再次循環 if (h == head) break; } }
共享模式的release
方法在咱們看過以前的方法後就簡單得多了,這裏就再也不畫流程圖了,到此AQS
的兩個模式和實現暫時告一段落。
整篇文章可能有些長,先講了AQS內部的一些結構,而後使用AQS實現了簡易的不可重入鎖,接着接下來將AQS的兩個模式和實現。
兩個模式的實現思路大體是相同的,可是方式不一樣,獨佔模式每次只容許一個節點獲取到資源,而共享模式則容許多個節點按照順序獲取;雙方釋放後的善後操做也不一樣,獨佔模式只喚醒後驅節點,而共享模式則可能喚醒後驅的後驅(資源充足的狀況)。
衝!衝!衝!
參考:http://www.javashuo.com/article/p-xcevmtwv-gz.html
https://snailclimb.gitee.io/javaguide/#/docs/java/Multithread/AQS