對於 Java 開發者來講,都會碰到多線程訪問公共資源的狀況,這時候,每每都是經過加鎖來保證訪問資源結果的正確性。在 java 中一般採用下面兩種方式來解決加鎖得問題:html
synchronized 關鍵字;java
Java.util.concurrent.locks 包中的 locks 包下面的鎖(Lock 接口和 ReentrantLock 等實現類);node
synchronized 是 java 底層支持的,而 concurrent 包則是 jdk 實現。關於 synchronized 的原理能夠閱讀 再有人問你synchronized是什麼,就把這篇文章發給他。c#
Lock 是一個接口,方法定義以下segmentfault
// 若是鎖可用就得到鎖,若是鎖不可用就阻塞直到鎖釋放 void lock() // 和 lock()方法類似, 但阻塞的線程可中斷,拋出 java.lang.InterruptedException異常 void lockInterruptibly() // 非阻塞獲取鎖;嘗試獲取鎖,若是成功返回true boolean tryLock() // 帶有超時時間的獲取鎖方法 boolean tryLock(long timeout, TimeUnit timeUnit) // 釋放鎖 void unlock()
實現 Lock 接口的類有不少,如下爲幾個常見的鎖實現設計模式
ReentrantLock:表示重入鎖,它是惟一一個實現了 Lock 接口的類。重入鎖指的是線程在得到鎖以後,再次獲取該鎖不須要阻塞,而是直接關聯一次計數器增長重入次數安全
ReentrantReadWriteLock:重入讀寫鎖,它實現了 ReadWriteLock 接口,在這個類中維護了兩個鎖,一個是 ReadLock,一個是 WriteLock,他們都分別實現了 Lock 接口。讀寫鎖是一種適合讀多寫少的場景下解決線程安全問題的工具,基本原則是:讀和讀不互斥、讀和寫互斥、寫和寫互斥
。也就是說涉及到影響數據變化的操做都會存在互斥。數據結構
StampedLock: stampedLock 是 JDK8 引入的新的鎖機制,能夠簡單認爲是讀寫鎖的一個改進版本,讀寫鎖雖然經過分離讀和寫的功能使得讀和讀之間能夠徹底併發,可是讀和寫是有衝突的,若是大量的讀線程存在,可能會引發寫線程的飢餓。stampedLock 是一種樂觀的讀策略,使得樂觀鎖徹底不會阻塞寫線程多線程
AQS 的全稱爲(AbstractQueuedSynchronizer),這個類也是在 java.util.concurrent.locks 下面。這是一個抽象類,採用設計模式中的模板模式來設計的,內部提供了一系列公共的方法,主要是經過繼承的方式來使用,它自己沒有實現任何的同步接口,僅僅是定義了同步狀態的獲取以及釋放的方法來提供自定義的同步組件。架構
能夠這麼說,只要搞懂了AQS,那麼 J.U.C 中絕大部分的 API 都能輕鬆掌握。
下圖是 AQS 的子類:
能夠看到,AQS 仍是有不少子類的。下面將詳細講解下 AQS。
AQS 解決了多線程訪問共享資源安全性的問題。其原理圖能夠表示以下:
AQS 利用了一個 volatile 類型的 int 變量 state 來表示同步狀態,當其餘線程訪問帶有鎖的共享資源的時候,會被阻塞,而後會被放入 FIFO 的 CLH (Craig, Landin, and Hagersten)
隊列中,等待在此被喚醒。當獲取鎖的線程釋放鎖之後,會從隊列中喚醒一個阻塞的節點(線程)。由此確保了每一個線程有序訪問共享資源,避免出現數據不一致的狀況。
下面經過一張架構圖來總體瞭解一下 AQS 框架:
上圖中有顏色的爲 Method,無顏色的爲 Attribution。
總的來講,AQS 框架共分爲五層,自上而下由淺入深,從 AQS 對外暴露的 API 到底層基礎數據。
當有自定義同步器接入時,只需重寫第一層所須要的部分方法便可,不須要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操做時,先通過第一層的API進入 AQS 內部方法,而後通過第二層進行鎖的獲取,接着對於獲取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴於第五層的基礎數據提供層。
前面提到了 AQS 使用內置的 FIFO 隊列來完成獲取資源線程的排隊工做。既然是隊列,就是由不少節點(Node)組成的,下面來看下 Node 的數據構成。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
static final class Node {
/** 共享模式 */
static final Node SHARED = new Node();
/** 獨佔模式 */
static final Node EXCLUSIVE = null;
/** 取消等待,好比線程等待超時或者被中斷 */
static final int CANCELLED = 1;
/** 線程須要 unpark 操做來喚醒 */
static final int SIGNAL = -1;
/** 線程處於 condition 等待 */
static final int CONDITION = -2;
/** 共享模式下使用,表示下一次共享模式獲取同步狀態時會被無條件傳播下去 */
static final int PROPAGATE = -3;
// 當前線程在隊列中的等待狀態
volatile int waitStatus;
// 前驅節點
volatile Node prev;
// 後繼節點
volatile Node next;
/** 獲取同步狀態的線程 */
volatile Thread thread;
// 指向下一個處於 CONDITION 的節點
Node nextWaiter;
// 若是是共享模式返回true
final boolean isShared() {
return nextWaiter == SHARED;
}
/** 返回前驅節點,沒有就拋出NPE */
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/** Establishes initial head or SHARED marker. */
Node() {}
/** 將線程構成一個 node 添加到隊列中,經過調用 addWaiter 使用. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
U.putObject(this, THREAD, Thread.currentThread());
}
/** 在 condition 隊列使用,經過調用 addConditionWaiter 使用. */
Node(int waitStatus) {
// 經過 unsafe 類以及對應的 Node 屬性在內存中的偏移量來修改對應實例的屬性值。
U.putInt(this, WAITSTATUS, waitStatus);
U.putObject(this, THREAD, Thread.currentThread());
}
/** CASes waitStatus field. */
final boolean compareAndSetWaitStatus(int expect, int update) {
return U.compareAndSwapInt(this, WAITSTATUS, expect, update);
}
/** CASes next field. */
final boolean compareAndSetNext(Node expect, Node update) {
return U.compareAndSwapObject(this, NEXT, expect, update);
}
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long NEXT;
static final long PREV;
private static final long THREAD;
private static final long WAITSTATUS;
static {
try {
//獲取 Node 的屬性 next 在內存中的偏移量,下面同理
NEXT = U.objectFieldOffset
(Node.class.getDeclaredField("next"));
PREV = U.objectFieldOffset
(Node.class.getDeclaredField("prev"));
THREAD = U.objectFieldOffset
(Node.class.getDeclaredField("thread"));
WAITSTATUS = U.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
對於 Node 類,能夠發現其內部操做都是經過 Unsafe 類來保證是原子性操做。同時內部部分變量都是採用 volatile 來修飾,確保該變量對其餘線程也是可見的。此外,還能夠得出存在兩種不一樣模式,一種是獨佔模式,一種是共享模式。
再看看 AQS 中兩個跟 Node 類相關的屬性:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 頭結點
private transient volatile Node head;
// 尾節點
private transient volatile Node tail;
整個結構以下圖所示:
如上圖瞭解了同步隊列的結構, 咱們在分析其入列操做在簡單不過。無非就是將 tail(使用 CAS 保證原子操做)指向新節點,新節點的 prev 指向隊列中最後一節點(舊的 tail 節點),原隊列中最後一節點的 next 節點指向新節點以此來創建聯繫,來張圖幫助你們理解。
同步隊列(CLH)遵循 FIFO,首節點是獲取同步狀態的節點,首節點的線程釋放同步狀態後,將會喚醒它的後繼節點(next),然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點,這個過程很是簡單。以下圖
設置首節點是經過獲取同步狀態成功的線程來完成的(獲取同步狀態是經過 CAS 來完成),只能有一個線程可以獲取到同步狀態,所以設置頭節點的操做並不須要 CAS 來保證,只須要將首節點設置爲其原首節點的後繼節點並斷開原首節點的 next(等待 GC 回收)應用便可。
在瞭解數據結構後,接下來了解一下 AQS 的同步狀態 —— State。AQS 中維護了一個名爲 state 的字段,意爲同步狀態,是由 Volatile 修飾的,用於展現當前臨界資源的獲鎖狀況。
// java.util.concurrent.locks.AbstractQueuedSynchronizer private volatile int state;
當state=0時,表示無鎖狀態
當state>0時,表示已經有線程得到了鎖,也就是 state=1,可是由於 ReentrantLock 容許重入,因此同一個線程屢次得到同步鎖的時候,state 會遞增,好比重入5次,那麼state=5。 而在釋放鎖的時候,一樣須要釋放 5 次直到 state=0 其餘線程纔有資格得到鎖
下面提供了幾個訪問這個字段的方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 獲取State的值
protected final int getState()
// 設置State的值
protected final void setState(int newState)
// 使用CAS方式更新State
protected final boolean compareAndSetState(int expect, int update)
這幾個方法都是 Final 修飾的,說明子類中沒法重寫它們。咱們能夠經過修改 State 字段表示的同步狀態來實現多線程的獨佔模式和共享模式(加鎖過程)。
對於咱們自定義的同步工具,須要自定義獲取同步狀態和釋放狀態的方式,也就是 AQS 架構圖中的第一層:API 層。
須要注意的是:不一樣的 AQS 實現,state 所表達的含義是不同的。
清楚了 AQS 的基本架構之後,咱們來分析一下 AQS 的實現原理,仍然以 ReentrantLock 爲模型。
ReentrantLock 意思爲可重入鎖,指的是一個線程可以對一個臨界資源重複加鎖。爲了幫助你們更好地理解 ReentrantLock 的特性,咱們先將 ReentrantLock 跟經常使用的 Synchronized 進行比較,其特性以下(藍色部分爲本篇文章主要剖析的點):
下面經過僞代碼,進行更加直觀的比較:
// **************************Synchronized的使用方式************************** // 1.用於代碼塊 synchronized (this) {} // 2.用於對象 synchronized (object) {} // 3.用於方法 public synchronized void test () {} // 4.可重入 for (int i = 0; i < 100; i++) { synchronized (this) {} } // **************************ReentrantLock的使用方式************************** public void test () throw Exception { // 1.初始化選擇公平鎖、非公平鎖 ReentrantLock lock = new ReentrantLock(true); // 2.可用於代碼塊 lock.lock(); try { try { // 3.支持多種加鎖方式,比較靈活; 具備可重入特性 if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ } } finally { // 4.手動釋放鎖 lock.unlock() } } finally { lock.unlock(); } }
調用 ReentrantLock 中的 lock() 方法,源碼的調用過程採用時序圖來展示:
從圖上能夠看出來,當鎖獲取失敗時,會調用 addWaiter() 方法將當前線程封裝成 Node 節點加入到 AQS 隊列,基於這個思路,咱們來分析 AQS 的源碼實現。ReentrantLock 與 AQS 之間的關係
首先來看看 ReentrantLock 的構造方法,它的構造方法有兩個,以下所示:
// 默認是非公平鎖 public ReentrantLock() { sync = new NonfairSync(); } // true 是公平鎖 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
能夠發現,構造函數中引用了兩個內部類,分別是 FairSync (公平鎖) 和 NonfairSync (非公平鎖)。而且都是 Sync 的子類。
// 非公平鎖 static final class NonfairSync extends Sync { } // 公平鎖 static final class FairSync extends Sync { }
從這裏也能夠發現 Sync 類的重要性,而前面的截圖也說明了 Sync 又是 AbstractQueuedSynchronizer 的子類,到這裏,他們之間的關係就浮出水面了:
對於 FairSync 與 NonfairSync :
公平鎖 表示全部線程嚴格按照 FIFO 來獲取鎖
非公平鎖 表示能夠存在搶佔鎖的功能,也就是說無論當前隊列上是否存在其餘線程等待,新線程都有機會搶佔鎖
公平鎖和非公平鎖的實現上的差別,會在文章後面作一個解釋,接下來的分析仍然以非公平鎖
做爲主要分析邏輯。
對於 ReentrantLock 默認是 NonfairSync,咱們以這個爲例瞭解其背後的原理。
// java.util.concurrent.locks.ReentrantLock#NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
// Android-removed: @ReservedStackAccess from OpenJDK 9, not available on Android.
// @ReservedStackAccess
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
看 lock 方法代碼的含義:
若經過 CAS 設置變量 State(同步狀態)成功,也就是獲取鎖成功,則將當前線程設置爲獨佔線程。
若經過 CAS 設置變量 State(同步狀態)失敗,也就是獲取鎖失敗,則進入 Acquire 方法進行後續處理。
compareAndSetState 的代碼實現邏輯以下
protected final boolean compareAndSetState(int expect, int update) { return U.compareAndSwapInt(this, STATE, expect, update); }
這段代碼其實邏輯很簡單,就是經過 CAS 樂觀鎖的方式來作比較並替換。上面這段代碼的意思是,若是當前內存中的 state 的值和預期值 expect 相等,則替換爲 update。更新成功返回 true,不然返回 false。
這個操做是原子的,不會出現線程安全問題。
lock 方法的第一步很好理解,但第二步獲取鎖失敗後,後續的處理策略是怎麼樣的呢?這塊可能會有如下思考:
將當前線程獲鎖結果設置爲失敗,獲取鎖流程結束。這種設計會極大下降系統的併發度,並不知足咱們實際的需求。因此就是 2 這種流程,也就是 AQS 框架的處理流程。
存在某種排隊等候機制,線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續。
對於問題 1 的第二種狀況,既然說到了排隊等候機制,那麼就必定會有某種隊列造成,這樣的隊列是什麼數據結構呢?
處於排隊等候機制中的線程,何時能夠有機會獲取鎖呢?
若是處於排隊等候機制中的線程一直沒法獲取鎖,仍是須要一直等待嗎,仍是有別的策略來解決這一問題?
能夠看一下 else 分支的邏輯,acquire 方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Acquire 方法是 AQS 中的核心方法。這裏它幹了三件事情:
tryAcquire:會嘗試再次經過 CAS 獲取一次鎖。
addWaiter:將當前線程加入上面鎖的雙向鏈表(等待隊列)中
acquireQueued:經過自旋,判斷當前隊列節點是否能夠獲取鎖
下面詳細看下 NonfairSync 的 tryAcquire 方法,該方法會直接調用 nonfairTryAcquire 方法,代碼以下:
// java.util.concurrent.locks.ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c=0 說明此時沒有獲取沒有線程佔有鎖 if (c == 0) {
// CAS 操做去獲取鎖 if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 已經獲取鎖了,能夠繼續重入 else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
簡單來講上面的方法主要就是看能不能獲取到鎖,不能獲取到就返回 false,而後就會調用 addWaiter 添加到等待隊列中,具體代碼以下:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 死循環
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
// 經過unsafe 類來對 Node.prev 節點賦值
U.putObject(node, Node.PREV, oldTail);
// 更新 tail 節點爲 node,該操做對其餘線程是可見的,確保每次只有一個線程能夠更新成功
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
// cas 設置 tail 節點
private final boolean compareAndSetTail(Node expect, Node update) {
return U.compareAndSwapObject(this, TAIL, expect, update);
}
// 初始化 head 和 tail 節點
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}
addWaiter(Node node) 方法經過採用死循環方案,確保將該節點設置尾成尾節點。
若是爲尾節點不爲空,須要將新節點添加到 oldTail 的 next 節點,同時將新節點的 prev 節點指向 oldTail;
若是當前隊列爲空,須要進行初始化,此時 head 結點和 tail 節點都是 h = new Node () 實例;此時 oldTail = h 不爲空,node 的 prev 爲 oldTail, oldTail 的 next 是 node。
這裏代碼很簡單,可是卻經過 CAS 操做保證了多個線程一塊兒添加節點的時候,只有一個線程能夠成功。
此外,入隊操做還有個 enq 方法,這個方法和 addWaiter 同樣的,就是返回值不同,具體以下:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node enq(Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
但請注意,初始化的頭結點並非當前線程節點,而是調用了無參構造函數的節點。若是經歷了初始化或者併發致使隊列中有元素,則與以前的方法相同。
將添加到隊列中的 Node 做爲參數傳入 acquireQueued 方法,這裏面會作搶佔鎖的操做:
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) {
// 獲取前一個節點,爲空,拋出 NPE final Node p = node.predecessor();
// p==head 說明 node 是隊列中的第一位,這時候還會再去獲取一次鎖 if (p == head && tryAcquire(arg)) {
// 獲取鎖成功後,node 變成 head 節點,凡是 head 節點,其 thread 和 pre 都爲空,next 保持不變。 setHead(node); p.next = null; // help GC
// 注意這個中斷記錄是在獲取鎖以後纔會被返回的,也就是說獲取鎖以後,纔有資格處理中斷 return interrupted; }
// 獲取鎖失敗,說明p爲頭節點且當前沒有獲取到鎖(多是非公平鎖被搶佔了)
// 或者是p不爲頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus爲-1),防止無限循環浪費資源。具體兩個方法下面細細分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 說明在這個過程當中發生過中斷,須要補上 interrupted = true; } } catch (Throwable t) { cancelAcquire(node); throw t; } }
總的來講,一個線程獲取鎖失敗了,被放入等待隊列,acquireQueued 會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者再也不須要獲取(中斷)。
下面來看獲取失敗後的處理,具體在看下面的代碼:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取頭結點的節點狀態 int ws = pred.waitStatus;
// 當前 prev node 的線程須要被 unpark 喚醒,也就是當前 node 能夠接受 park 操做 if (ws == Node.SIGNAL)
// This node has already set status asking a release to signal it, so it can safely park.
return true;
// 前節點處於取消狀態,跳過,獲取再前一個的節點狀態 if (ws > 0) {
do {
// 這裏將取消狀態的節點刪除 node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 同時設置下一個節點爲 node pred.next = node;
} else {
// 設置前任節點等待狀態爲 SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 若是以前中斷了,爲 true,並清除中斷標誌
return Thread.interrupted();
}
若是 shouldParkAfterFailedAcquire 返回了true,則會執行:parkAndCheckInterrupt()
方法,它是經過 LockSupport.park(this) 將當前線程掛起到 WATING 狀態,它須要等待一箇中斷、unpark 方法來喚醒它,經過這樣一種 FIFO 的機制的等待,來實現了 Lock 的操做。
LockSupport 類是 Java6 引入的一個類,提供了基本的線程同步原語。LockSupport 其實是調用了 Unsafe 類裏的函數,歸結到 Unsafe 裏,只有兩個函數:
public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time);
unpark 函數爲線程提供「許可( permit )」,線程調用 park 函數則等待「許可」。這個有點像信號量,可是這個「許可」是不能疊加的,「許可」是一次性的。
permit至關於0/1的開關,默認是 0,調用一次 unpark 就加 1 變成了 1。調用一次 park 會消費 permit,又會變成 0,變成 0 不會影響原有線程的運行。 若是再調用一次 park 會阻塞,由於 permit 已是 0 了。直到 permit 變成 1。這時調用 unpark 會把 permit 設置爲 1 。每一個線程都有一個相關的 permit,permit 最多隻有一個,重複調用 unpark 不會累積。
這裏須要說明的一點就是:acquireQueued 方法內部是一個死循環, shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 也都在這裏面。這裏對這個邏輯再整理下:
acquireQueued 本意是經過無限循環讓隊列中的第一個節點嘗試去獲取鎖;當一個 node 被加入到隊列中的時候,就會促發這個無限循環;
若是等待隊列中的第一個節點獲取到鎖了,就會退出循環;
若是 node 是第一個加入等待隊列的,此時 node 的 prev 節點是 head ( new Node() ),node 會先去獲取鎖,失敗後,由於 prev 的 waitStatus = 0,這時候將其 waitStatus 設置爲 -1,而後再次循環,再獲取鎖失敗就會調用 parkAndCheckInterrupt 阻塞當前線程;
shouldParkAfterFailedAcquire 過程當中會將隊列中處於 CANCELLED = 1 的節點刪除。也就是說每添加一個節點,獲取鎖失敗後,均可能會對隊列作一遍整理;
被加入隊列後的線程是不會響應中斷的。當node 獲取鎖以後,若是線程在等待中被中斷過,須要將這個中斷補上,這樣線程就能夠響應中斷操做,好比此時被取消了。
若是在獲取鎖的過程當中,發生了錯誤,就會響應 cancelAcquire(node) 方法。下面具體看下方法的源碼,看看它作了啥:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 過濾掉那些被取消的節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取過濾後的前驅節點的後繼節點
Node predNext = pred.next;
// 把當前node的狀態設置爲CANCELLED
node.waitStatus = Node.CANCELLED;
// 若是當前節點是尾節點,將從後往前的第一個非取消狀態的節點設置爲尾節點
// 若是更新成功,將tail的後繼節點設置爲null,更新失敗,說明 node 後面還有其餘節點,node 不是尾接點
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
// 若是當前節點不是head的後繼節點,1:判斷當前節點前驅節點的是否爲SIGNAL,2:若是不是,則把前驅節點設置爲SINGAL看是否成功
// 若是1和2中有一個爲true,再判斷當前節點的線程是否爲null
// 若是上述條件都知足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
// 走到這裏,已經把 node 從 next 隊列裏面刪除了,可是保留了 prev 指針 } else {
// 若是當前節點是head的後繼節點,或者上述條件不知足,那就喚醒當前節點的後繼節點 unparkSuccessor(node);
}
// 這裏修改了 node 的next 指針,可是保證了 prev 指針的不變 node.next = node; // help GC
}
}
當前的流程:獲取當前節點的前驅節點,若是前驅節點的狀態是 CANCELLED,那就一直往前遍歷,找到第一個 waitStatus <= 0 的節點,將找到的 Pred 節點和當前 Node 關聯,將當前Node 設置爲 CANCELLED。
根據當前節點的位置,考慮如下三種狀況:
當前節點是尾節點。
當前節點是Head的後繼節點。
當前節點不是Head的後繼節點,也不是尾節點。
根據上述第二條,咱們來分析每一種狀況的流程。
當前節點是尾節點。
當前節點是 Head 的後繼節點。
當前節點不是 Head 的後繼節點,也不是尾節點。
經過上面的流程,咱們對於 CANCELLED 節點狀態的產生和變化已經有了大體的瞭解,可是爲何全部的變化都是對 Next 指針進行了操做,而沒有對 Prev 指針進行操做呢?什麼狀況下會對 Prev 指針進行操做?
執行 cancelAcquire 的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過 Try 代碼塊中的 shouldParkAfterFailedAcquire 方法了),若是此時修改 Prev指針,有可能會致使 Prev 指向另外一個已經移除隊列的 Node,所以這塊變化 Prev 指針不安全。 shouldParkAfterFailedAcquire 方法中,會執行下面的代碼,其實就是在處理 Prev 指針。shouldParkAfterFailedAcquire 是獲取鎖失敗的狀況下才會執行,進入該方法後,說明共享資源已被獲取,當前節點以前的節點都不會出現變化,所以這個時候變動 Prev 指針比較安全。
do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0);
下面看下 unparkSuccessor 的邏輯:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) // 將傳入的參數node的等待狀態變爲 0 node.compareAndSetWaitStatus(ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 從後往前尋找那些沒有被取消的線程 for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) LockSupport.unpark(s.thread); }
unparkSuccessor 的做用以下:
若是其下一個節點爲空,或者其等待狀態是取消狀態,那麼就從後往前找,找到一個等待狀態 <=0 的,而後將其喚醒;
若是下一個節點不爲空,且等待狀態 <=0,將其喚醒。
這個方法的找到一個須要喚醒的節點,看下後面怎麼處理:
// LockSupport public static void unpark(Thread thread) { if (thread != null) U.unpark(thread); }
發現也是經過 unSafe 類來處理的。這裏調用了 unpark 方法,那確定有地方調用了 park 方法,這個是在 parkAndCheckInterrupt 裏調用的。
到這裏,NonfairSync lock 的邏輯就講完了 。那 FairSync lock 是如何保證公平的呢?且看代碼:
// java.util.concurrent.locks.ReentrantLock
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 加鎖 final void lock() {
acquire(1);
}
//
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 當前沒有線程獲取鎖 if (c == 0) {
// 當前線程處於 head 以後,或者隊列爲空,就會去調用 CAS 獲取鎖,不然是沒有機會獲取鎖的 if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 當前線程就是獨佔線程,可重入 else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
可見對於公平鎖,新加入的節點有如下幾種操做:
node 能獲取鎖的狀況有兩種:1 是當前沒有線程持有鎖,而且隊列爲空,或者 node 是 head 的下一個節點;2 是 node 自己持有鎖,可重入。
在狀況 1 後的 node,都將會被加入到隊列中去;
這裏就能夠看出來,公平鎖徹底是按照先來後到的順序進行排列等候的,不會給你機會去經過 CAS 操做獲取鎖的。對於非公平鎖,每一個線程去獲取鎖的時候都有機會去嘗試獲取鎖的,成功鎖就是你的,不成功就加入到隊列中去。
講完了 lock 方法之後,接下去講 unLock 方法了。來看下 unlock 的邏輯:
public void unlock() { sync.release(1); } // 釋放鎖 public final boolean release(int arg) {
// true 表示成功釋放,就會喚醒下一個線程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases;
// 確保是當前線程,非當前線程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); }
// 更新同步狀態 setState(c); return free; }
unlock 的邏輯比較好理解,就是釋放鎖,更新同步狀態,而後喚醒下一個等待線程。
其中 tryRelease 動做能夠認爲就是一個設置鎖狀態的操做,並且是將狀態減掉傳入的參數值(參數是 1 ),若是結果狀態爲 0,就將排它鎖的 Owner 設置爲 null,以使得其它的線程有機會進行執行。
在排它鎖中,加鎖的時候狀態會增長 1(固然能夠本身修改這個值),在解鎖的時候減掉 1,同一個鎖,在能夠重入後,可能會被疊加爲 二、三、4 這些值,只有 unlock() 的次數與 lock() 的次數對應纔會將 Owner 線程設置爲空,並且也只有這種狀況下才會返回 true。
hasQueuedPredecessors 是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。若是返回 False,說明當前線程能夠爭取共享資源;若是返回 True,說明隊列中存在有效節點,當前線程必須加入到等待隊列中。
// java.util.concurrent.locks.ReentrantLock public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
看到這裏,咱們理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());爲何要判斷的頭結點的下一個節點?第一個節點儲存的數據是什麼?
雙向鏈表中,第一個節點爲虛節點,其實並不存儲任何信息,只是佔位,這個能夠從列表的第一次初始化也能夠看出來。
真正的第一個有數據的節點,是在第二個節點開始的。當h != t時: 若是(s = h.next) == null,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,須要返回 True。
若是(s = h.next) != null,說明此時隊列中至少有一個有效節點。
若是此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節點中的線程與當前線程相同,那麼當前線程是能夠獲取資源的;
若是s.thread != Thread.currentThread(),說明等待隊列的第一個有效節點線程與當前線程不一樣,當前線程必須加入進等待隊列。
對於 unparkSuccessor 邏輯前面講過了,就是喚醒下一個節點去獲取鎖。固然在喚醒過程當中,對於非公平鎖,其餘線程是有機會去搶佔的
到這裏,就把加鎖和解鎖的邏輯都講完了。
以非公平鎖爲例,這裏主要闡述一下非公平鎖與 AQS 之間方法的關聯之處,具體每一處核心方法的做用都已經在上文闡述清楚了。
爲了幫助你們理解 ReentrantLock 和 AQS 之間方法的交互過程,以非公平鎖爲例,將加鎖和解鎖的交互流程單獨拎出來強調一下,以便於對後續內容的理解。
經過 ReentrantLock 的加鎖方法 Lock 進行加鎖操做。
會調用到內部類 Sync 的 Lock 方法,因爲 Sync#lock 是抽象方法,根據 ReentrantLock 初始化選擇的公平鎖和非公平鎖,執行相關內部類的 Lock 方法,本質上都會執行 AQS 的 Acquire 方法。
AQS 的 Acquire 方法會執行 tryAcquire 方法,可是因爲 tryAcquire 須要自定義同步器實現,所以執行了 ReentrantLock 中的 tryAcquire 方法,因爲 ReentrantLock 是經過公平鎖和非公平鎖內部類實現的 tryAcquire 方法,所以會根據鎖類型不一樣,執行不一樣的 tryAcquire。
tryAcquire 是獲取鎖邏輯,獲取失敗後,會執行框架 AQS 的後續邏輯,跟 ReentrantLock 自定義同步器無關。
經過 ReentrantLock 的解鎖方法 Unlock 進行解鎖。
Unlock 會調用內部類 Sync 的 Release 方法,該方法繼承於 AQS。
Release 中會調用 tryRelease 方法,tryRelease 須要自定義同步器實現,tryRelease 只在 ReentrantLock 中的 Sync 實現,所以能夠看出,釋放鎖的過程,並不區分是否爲公平鎖。
釋放成功後,全部處理由 AQS 框架完成,與自定義同步器無關。
經過上面的描述,大概能夠總結出ReentrantLock加鎖解鎖時API層核心方法的映射關係。
到這裏,基本就講完了。
一、 Lock 的操做不只僅侷限於 lock()/unlock(),由於這樣線程可能進入 WAITING 狀態,這個時候若是沒有 unpark() 就無法喚醒它,可能會一直「睡」下去,能夠嘗試用 tryLock()、tryLock(long , TimeUnit) 來作一些嘗試加鎖或超時來知足某些特定場景的須要。例若有些時候發現嘗試加鎖沒法加上,先釋放已經成功對其它對象添加的鎖,過一小會再來嘗試,這樣在某些場合下能夠避免「死鎖」哦。
看下相關代碼:
// ReentrantLock public boolean tryLock() {
// 調用的是非公平鎖來搶佔鎖 return sync.nonfairTryAcquire(1); } // ReentrantLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
// 超過必定時間後再去獲取鎖 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }// AQS
// 拿不到鎖時,等一段時間再拿不到就退出 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 時間 <=0 直接返回 if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout;
// 將當前線程加入到隊列中 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor();
// 這裏若是當前線程是第一個有效節點,直接嘗試去獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) {
// 時間到了以後,就退出等待隊列 cancelAcquire(node); return false; }
// 須要等待,而且時長大於 1000L if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
// 阻塞必定時間,再去獲取鎖 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
二、 lockInterruptibly() 它容許拋出 InterruptException 異常,也就是當外部發起了中斷操做,程序內部有可能會拋出這種異常,可是並非絕對會拋出異常的。
// ReentrantLock public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } // AQS public final void acquireInterruptibly(int arg) throws InterruptedException {
// 若是發生了中斷,就拋出中斷異常 if (Thread.interrupted()) throw new InterruptedException();
// if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // 可中斷的 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor();
// 再次看能不能獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; }
// park 前發現中斷了,拋出中斷錯誤 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
能夠發現,基本上能夠中斷的點,都會去判斷線程是否有中斷標誌,有的話,直接拋出中斷異常,可是在加入隊列過程,和獲取鎖的過程是不響應中斷的,只有以前以後會作中斷判斷。
三、 newCondition() 操做,是返回一個 Condition 的對象,Condition 只是一個接口,它要求實現 await()、awaitUninterruptibly()、awaitNanos(long)、await(long , TimeUnit)、awaitUntil(Date)、signal()、signalAll() 方法,AbstractQueuedSynchronizer 中有一個內部類叫作 ConditionObject 實現了這個接口,它也是一個相似於隊列的實現,具體能夠參考源碼。大多數狀況下能夠直接使用,固然以爲本身比較牛逼的話也能夠參考源碼本身來實現。
四、 在 AQS 的 Node 中有每一個 Node 本身的狀態(waitStatus),咱們這裏概括一下,分別包含:
SIGNAL 從前面的代碼狀態轉換能夠看得出是前面有線程在運行,須要前面線程結束後,調用 unpark() 方法才能激活本身,值爲:-1
CANCELLED 當 AQS 發起取消或 fullyRelease() 時,會是這個狀態。值爲 1,也是幾個狀態中惟一一個大於 0 的狀態,因此前面斷定狀態大於 0 就基本等價因而 CANCELLED 的意思。
CONDITION 線程基於 Condition 對象發生了等待,進入了相應的隊列,天然也須要 Condition 對象來激活,值爲 -2。
PROPAGATE 讀寫鎖中,當讀鎖最開始沒有獲取到操做權限,獲得後會發起一個 doReleaseShared() 動做,內部也是一個循環,當斷定後續的節點狀態爲 0 時,嘗試經過CAS自旋方式將狀態修改成這個狀態,表示節點能夠運行。
狀態 0 初始化狀態,也表明正在嘗試去獲取臨界資源的線程所對應的 Node 的狀態。
本文基於 ReentrantLock 非公平鎖的獨佔鎖源碼來分析了 AQS 的內部實現原理。在得到同步鎖時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用 tryRelease(int arg) 方法釋放同步狀態,而後喚醒頭節點的後繼節點。