當咱們提到 juc 包下的鎖,就不得不聯繫到 AbstractQueuedSynchronizer 這個類,這個類就是大名鼎鼎的 AQS,AQS 按字面意思翻譯爲抽象隊列同步器,調用者能夠經過繼承該類快速的實現同步多線程下的同步容器。不論是咱們熟悉的 ReadWriteLock 亦或是 ReentrantLock,或者 CountDownLatch 與 Semaphore,甚至是線程池類 ThreadPoolExecutor 都繼承了 AQS。java
在本文,將深刻源碼,瞭解 AQS 的運行機制,瞭解經過 AQS 實現非公平鎖,公平鎖,可重入鎖等的原理。node
AQS 的底層數據結構實際上是一條雙向鏈表以及一個表明鎖狀態的變量 state
。當加鎖後,state
會改變,而競爭鎖的線程會被封裝到節點中造成鏈表,而且嘗試改變 state
以獲取鎖。數據結構
在 AQS 中有一個 Node 內部類,該類即爲鏈表的節點類。當經過 AQS 競爭鎖的時候,線程會被封裝到一個對應的節點中,多個競爭不到鎖的線程最終會連成一條鏈表,這條鏈表上節點表明的線程處於等待狀態,所以咱們稱之爲等待隊列,也就是 CLH。多線程
節點類中封裝了競爭鎖的線程的等待狀態:工具
和線程池中的狀態同樣,Node 只有小於 0 的時候才處於正常的等待狀態中,所以不少地方經過判斷是否小於 0 來肯定節點是否處於等待狀態。oop
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 等待狀態 volatile int waitStatus; volatile Node prev; volatile Node next; // 等待線程 volatile Thread thread; // 下一等待節點 Node nextWaiter; }
private volatile int state;
AQS 中提供了 state
變量作爲鎖狀態,通常來講,0 被視爲無鎖狀態,1 被視爲加鎖狀態,若是是可重入鎖,就會大於 1。ui
所以,AQS 中的加鎖解鎖實際上就是經過 CAS 改變 state
的過程,即下列三個方法:this
protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS 的同步過程其實就是同步隊列節點中依次獲取鎖的過程。AQS 一共提供了獨佔和非獨佔兩種獲取資源的方法:線程
acquire()
:以獨佔模式獲取鎖;release()
:以獨佔模式釋放鎖;acquireShared()
:以共享模式獲取鎖;releaseShared()
:以共享模式釋放鎖;獨佔鎖和非獨佔鎖二者從流程上來講都差很少,只在一些實現上有區別。翻譯
獨佔鎖,顧名思義,即只有佔有鎖的線程才能操做資源,在 synchronize 底層的鎖中,獨佔經過鎖對象對象頭中的指針來聲明獨佔的線程,而在 AQS 中則經過父類 AbstractOwnableSynchronizer 提供的 exclusiveOwnerThread
變量來聲明獨佔的線程:
private transient Thread exclusiveOwnerThread;
此外,AQS 並未提供其餘具體實現。AQS 獨佔鎖加鎖的方法是 acquire()
,其中涉及到 tryAcquire()
方法是一個空實現,須要由子類實現並在在裏面進行具體的獨佔判斷:
public final void acquire(int arg) { // 嘗試獲取鎖 if (!tryAcquire(arg) && // 添加到等待隊列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 進入等待隊列後阻塞 selfInterrupt(); }
裏面還涉及到 addWaiter()
,acquireQueued()
和 selfInterrupt()
四個方法。
在 AQS 中,tryAccquire()
是一個未實現的方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
他須要由具體的實現類去實現,並完成獲取資源的功能。這裏咱們以用可重入鎖 ReentrantLock 內爲例(後文無聲明亦同)。
在 ReentrantLock 中,鎖分爲公平鎖和非公平鎖兩種,兩者的區別在於公平鎖中等待隊列中的線程嚴格按順序獲取鎖,非公平鎖中的線程可能不會按順序獲取鎖。ReentrantLock 有一個內部類 Sync 繼承了 AQS,提供基本的加鎖解鎖方法。
而後分別有非公平鎖 NonfairSync 類與公平鎖類 FairSync 去繼承 Sync,進一步區別公平鎖與非公平的鎖的實現邏輯。咱們先看公平鎖 FairSync 的tryAccquire()
方法:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 若是當前等待隊列中沒有線程在等待 if (!hasQueuedPredecessors() && // 嘗試CAS修改state compareAndSetState(0, acquires)) { // 將當前鎖設爲本身獨佔 setExclusiveOwnerThread(current); return true; } } // 若是鎖已經被本身獲取過了,即重入 else if (current == getExclusiveOwnerThread()) { // state + 1,即多獲取一次鎖 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 沒有獲取鎖 return false; }
而非公平鎖與公平鎖的tryAccquire()
主要差異在於,公平鎖會先看看有沒有線程在等待,沒有才去競爭鎖,而非公平鎖不會看有沒有線程在等待,不管如何都會先去競爭一次鎖。
其餘鎖的 tryAccquire()
與 ReentrantLock 的大致相同。
addWaiter()
方法用於建立並添加等待節點。
private Node addWaiter(Node mode) { // 以共享或者獨佔模式建立節點 Node node = new Node(Thread.currentThread(), mode); // 尾插法插入節點 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是隊列爲空 enq(node); return node; }
這裏涉及到一個 enq()
方法,這個方法不復雜,主要是自旋初始化 AQS 中的頭結點和尾節點,值得注意的是,這裏的頭結點其實是一個哨兵節點,自己並沒有意義,當等待隊列排隊獲取資源的時候,會直接從 head.next 開始。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這個方法主要作兩件事:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取當前節點的上一節點 final Node p = node.predecessor(); // 再次嘗試獲取鎖,若是前驅節點已是頭節點了,或者獲取資源成功 if (p == head && tryAcquire(arg)) { // 將當前節點設置爲頭結點 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 前驅節點不是頭結點或者獲取鎖失敗 // 若是前驅節點須要被 park 掛起 if (shouldParkAfterFailedAcquire(p, node) && // 掛起當前線程 parkAndCheckInterrupt()) interrupted = true; } } finally { // 舊頭結點已經處理完了,直接刪除 if (failed) cancelAcquire(node); } }
這裏涉及到一個 shouldParkAfterFailedAcquire()
方法:
這個方法主要是根據前驅節點的狀態判斷當前節點是否須要被 park 的。若是這個方法返回 true,那麼說明前驅節點被設置爲 SIGNAL 狀態,而後進入 parkAndCheckInterrupt()
方法把當前線程掛起,等待前驅節點的喚醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 若是前驅節點狀態爲SIGNAL if (ws == Node.SIGNAL) return true; // 若是前驅節點已經失效 if (ws > 0) { // 移除所有失效節點,直到前驅節點爲正常等待狀態的節點爲止 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 將前驅節點設置爲SIGNAL,確保不影響後續節點的喚醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若是上述shouldParkAfterFailedAcquire()
返回 ture,那麼就會接着執行 parkAndCheckInterrupt()
方法掛起線程:
private final boolean parkAndCheckInterrupt() { // 讓當前線程等待,並中斷任務 LockSupport.park(this); return Thread.interrupted(); }
當線程使用 acquire()
方法獲取鎖的時候:
tryAcquire()
方法,這是個須要由子類實現的空方法,公平或者非公平在這個方法中讓線程去獲取鎖,得到鎖的線程要修改 state
;addWaiter()
方法,這個方法用於將線程封裝到節點中,並以尾插法插入等待隊列的鏈表,同時,若是等待隊列沒有初始化就會在此處先初始化;acquireQueued()
方法,此時會再次試圖獲取鎖,若是此時仍是失敗,就會判斷當前節點的前驅節點是否失效,若是不是就直接將前驅節點狀態改成 SIGNAL ,而後執行 parkAndCheckInterrupt()
方法掛起當前線程,若是是就一直找到一個正常等待的前驅節點爲止,改前驅節點狀態而後再掛起線程。和 AQS 使用 acquire()
方法加鎖的過程相似,AQS 也有一個 release()
的解鎖方法,他們一樣須要實現類本身去實現 tryRelease()
方法。
public final boolean release(int arg) { // 嘗試釋放鎖 if (tryRelease(arg)) { Node h = head; // 若是當前頭節點爲空且不爲初始狀態 if (h != null && h.waitStatus != 0) // 喚醒後繼節點 unparkSuccessor(h); return true; } return false; }
和 tryAcquire()
同樣,AQS 不提供 tryRelease()
的具體實現,而是交由子類去實現它。
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
咱們依然以可重入鎖 ReentrantLock 爲例,去了解 ReentrantLock 中 tryRelease()
的實現。
雖然 ReentrantLock 中有公平鎖和非公平鎖兩種實現,可是他們是釋放過程都是同樣的,都經過他們的父類,即繼承 AQS 的內部類 Sync 的 tryRelease()
方法來實現釋放的功能:
protected final boolean tryRelease(int releases) { // 可重入鎖,減去一次持鎖次數 int c = getState() - releases; // 若是當前線程不是持有鎖的線程則拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 若是可重入次數爲0,說明確實釋放鎖了 if (c == 0) { free = true; // 獨佔線程設置爲null setExclusiveOwnerThread(null); } setState(c); return free; }
這個地方也很好理解,就是讓 tryRelease()
去執行釋放鎖的過程,換句話說,就是改變 state
。
unparkSuccessor()
方法的主要用途是
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) // 若是頭節點狀態還處於等待狀態,則改回初始狀態 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 若是後繼節點存在並被標記爲CANCELLED狀態 if (s == null || s.waitStatus > 0) { s = null; // 從尾節點開始,找到離node最近的處於等待狀態的節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒節點 LockSupport.unpark(s.thread); }
當線程使用 release()
方法釋放鎖的時候:
tryRelease()
方法釋放資源,改變 state
以釋放鎖unparkSuccessor()
方法喚醒後繼節點,若是後繼節點掛了,就找到最近的下一個處於等待狀態的有效節點喚醒。相對 AQS 獨佔鎖,共享鎖在 AQS 中以及提供好的相關的實現。共享鎖經過 acquireShared()
方法加鎖,經過releaseShared()
方法解鎖。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared()
也是一個空實現方法,須要由子類去實現。根據註釋,咱們不難理解它的做用:
其中,針對共享鎖,比較具備表明性的是讀寫鎖 ReentrantReadWriteLock,它經過 state
的高 16 位記錄讀鎖,低 16 位記錄寫鎖,在獲取鎖資源的時候,若是檢測存在寫鎖則沒法得到鎖,若是是讀鎖則獲取資源並遞增讀鎖計數器,這部分的邏輯就是在其子類中獲得的實現。
基於上面的 tryAcquireShared()
方法,doAcquireShared()
要作的事情顯然很明瞭了:
private void doAcquireShared(int arg) { // 建立共享模式的節點 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); // 若是前驅節點已是頭結點,即當前節點須要獲取鎖 if (p == head) { // 嘗試獲取共享鎖 int r = tryAcquireShared(arg); // 喚醒後繼節點 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 判斷當前線程是否須要被掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
這裏有一個 setHeadAndPropagate()
方法,根據方法名能夠猜出是用來設置頭結點和喚醒後繼共享節點的:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 設置頭結點 setHead(node); // 若是後續有須要喚醒的節點,而且當前節點沒有被CANCELLED if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 若是下一節點處於共享狀態 if (s == null || s.isShared()) // 釋放共享鎖 doReleaseShared(); } }
這裏其實只作了一些條件判斷,確保有後繼節點而且後繼節點是正常節點,核心邏輯實際上是 doReleaseShared()
方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒節點 unparkSuccessor(h); } else if (ws == 0 && // 若是後續節點不須要喚醒,則設置爲PROPAGATE避免影響後繼節點的喚醒 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
解鎖使用的releaseShared()
方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
這裏的 tryReleaseShared()
其實跟獨佔鎖的tryRelease()
相似,即改變狀態以表示釋放資源,而 doReleaseShared()
即上文喚醒後繼節點的方法。
共享鎖和獨佔鎖的根本區別在於,當頭是共享模式時,它被喚醒後會直接嘗試喚醒後繼全部共享模式的節點,直到遇到第一個非共享模式的節點爲止,而不是跟獨佔鎖同樣只喚醒後繼節點。
AQS 在內部爲此了一個變量 state
,用於記錄鎖狀態,線程經過 CAS 修改 state
便是加鎖解鎖過程。
AQS 內存維護了一條雙向鏈表,即等待隊列 CLH,等待鎖的線程被封裝爲 Node 節點連成鏈表,經過 LockSuppor 工具類的 park()
和 unpark()
方法切換等待狀態。
AQS 提供了獨佔和非獨佔兩種鎖實現方式,分別提供了 acquire()/release()
和acquireShared()/releaseShared()
兩套加鎖解鎖方式,同時,基於 state
有衍生出可重入和非可重入鎖的實現——即重入鎖在state=1
的狀況下繼續遞增,解鎖在 state
上遞減直到爲 0 爲止。而且,根據是否先判斷等待隊列中是否已存在等待線程,而後再嘗試獲取鎖的狀況,又分出了公平鎖和非公平鎖兩種實現。