在上一篇文章中咱們對lock和AbstractQueuedSynchronizer(AQS)有了初步的認識。在同步組件的實現中,AQS是核心部分,同步組件的實現者經過使用AQS提供的模板方法實現同步組件語義,AQS則實現了對同步狀態的管理,以及對阻塞線程進行排隊,等待通知等等一些底層的實現處理。AQS的核心也包括了這些方面:同步隊列,獨佔式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖,超時等待鎖獲取這些特性的實現,而這些實際上則是AQS提供出來的模板方法,概括整理以下:java
獨佔式鎖:node
void acquire(int arg):獨佔式獲取同步狀態,若是獲取失敗則插入同步隊列進行等待; void acquireInterruptibly(int arg):與acquire方法相同,但在同步隊列中進行等待的時候能夠檢測中斷; boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly基礎上增長了超時等待功能,在超時時間內沒有得到同步狀態返回false; boolean release(int arg):釋放同步狀態,該方法會喚醒在同步隊列中的下一個節點編程
共享式鎖:數組
void acquireShared(int arg):共享式獲取同步狀態,與獨佔式的區別在於同一時刻有多個線程獲取同步狀態; void acquireSharedInterruptibly(int arg):在acquireShared方法基礎上增長了能響應中斷的功能; boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基礎上增長了超時等待的功能; boolean releaseShared(int arg):共享式釋放同步狀態安全
要想掌握AQS的底層實現,其實也就是對這些模板方法的邏輯進行學習。在學習這些模板方法以前,咱們得首先了解下AQS中的同步隊列是一種什麼樣的數據結構,由於同步隊列是AQS對同步狀態的管理的基石。數據結構
當共享資源被某個線程佔有,其餘請求該資源的線程將會阻塞,從而進入同步隊列。就數據結構而言,隊列的實現方式無外乎二者一是經過數組的形式,另一種則是鏈表的形式。AQS中的同步隊列則是經過鏈式方式進行實現。接下來,很顯然咱們至少會抱有這樣的疑問:**1. 節點的數據結構是什麼樣的?2. 是單向仍是雙向?3. 是帶頭結點的仍是不帶頭節點的?**咱們依舊先是經過看源碼的方式。併發
在AQS有一個靜態內部類Node,其中有這樣一些屬性:app
volatile int waitStatus //節點狀態 volatile Node prev //當前節點/線程的前驅節點 volatile Node next; //當前節點/線程的後繼節點 volatile Thread thread;//加入同步隊列的線程引用 Node nextWaiter;//等待隊列中的下一個節點oop
節點的狀態有如下這些:post
int CANCELLED = 1//節點從同步隊列中取消 int SIGNAL = -1//後繼節點的線程處於等待狀態,若是當前節點釋放同步狀態會通知後繼節點,使得後繼節點的線程可以運行; int CONDITION = -2//當前節點進入等待隊列中 int PROPAGATE = -3//表示下一次共享式同步狀態獲取將會無條件傳播下去 int INITIAL = 0;//初始狀態
如今咱們知道了節點的數據結構類型,而且每一個節點擁有其前驅和後繼節點,很顯然這是一個雙向隊列。一樣的咱們能夠用一段demo看一下。
public class LockDemo {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(() -> {
lock.lock();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
thread.start();
}
}
}
複製代碼
實例代碼中開啓了5個線程,先獲取鎖以後再睡眠10S中,實際上這裏讓線程睡眠是想模擬出當線程沒法獲取鎖時進入同步隊列的狀況。經過debug,當Thread-4(在本例中最後一個線程)獲取鎖失敗後進入同步時,AQS時如今的同步隊列如圖所示:
Thread-0先得到鎖後進行睡眠,其餘線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進入同步隊列,同時也能夠很清楚的看出來每一個節點有兩個域:prev(前驅)和next(後繼),而且每一個節點用來保存獲取同步狀態失敗的線程引用以及等待狀態等信息。另外AQS中有兩個重要的成員變量:
private transient volatile Node head;
private transient volatile Node tail;
複製代碼
也就是說AQS實際上經過頭尾指針來管理同步隊列,同時實現包括獲取鎖失敗的線程進行入隊,釋放鎖時對同步隊列中的線程進行通知等核心方法。其示意圖以下:
經過對源碼的理解以及作實驗的方式,如今咱們能夠清楚的知道這樣幾點:
那麼,節點如何進行入隊和出隊是怎樣作的了?實際上這對應着鎖的獲取和釋放兩個操做:獲取鎖失敗進行入隊操做,獲取鎖成功進行出隊操做。
咱們繼續經過看源碼和debug的方式來看,仍是以上面的demo爲例,調用lock()方法是獲取獨佔式鎖,獲取失敗就將當前線程加入同步隊列,成功則線程執行。而lock()方法實際上會調用AQS的**acquire()**方法,源碼以下
public final void acquire(int arg) {
//先看同步狀態是否獲取成功,若是成功則方法結束返回
//若失敗則先調用addWaiter()方法再調用acquireQueued()方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
關鍵信息請看註釋,acquire根據當前得到同步狀態成功與否作了兩件事情:1. 成功,則方法結束返回,2. 失敗,則先調用addWaiter()而後在調用acquireQueued()方法。
獲取同步狀態失敗,入隊操做
當線程獲取獨佔式鎖失敗後就會將當前線程加入同步隊列,那麼加入隊列的方式是怎樣的了?咱們接下來就應該去研究一下addWaiter()和acquireQueued()。addWaiter()源碼以下:
private Node addWaiter(Node mode) {
// 1. 將當前線程構建成Node類型
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 2. 當前尾節點是否爲null?
Node pred = tail;
if (pred != null) {
// 2.2 將當前節點尾插入的方式插入同步隊列中
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 2.1. 當前同步隊列尾節點爲null,說明當前線程是第一個加入同步隊列進行等待的線程
enq(node);
return node;
}
複製代碼
分析能夠看上面的註釋。程序的邏輯主要分爲兩個部分:**1. 當前同步隊列的尾節點爲null,調用方法enq()插入;2. 當前隊列的尾節點不爲null,則採用尾插入(compareAndSetTail()方法)的方式入隊。**另外還會有另一個問題:若是 if (compareAndSetTail(pred, node))
爲false怎麼辦?會繼續執行到enq()方法,同時很明顯compareAndSetTail是一個CAS操做,一般來講若是CAS操做失敗會繼續自旋(死循環)進行重試。所以,通過咱們這樣的分析,enq()方法可能承擔兩個任務:**1. 處理當前同步隊列尾節點爲null時進行入隊操做;2. 若是CAS尾插入節點失敗後負責自旋進行嘗試。**那麼是否是真的就像咱們分析的同樣了?只有源碼會告訴咱們答案:),enq()源碼以下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//1. 構造頭結點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2. 尾插入,CAS操做失敗自旋嘗試
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
在上面的分析中咱們能夠看出在第1步中會先建立頭結點,說明同步隊列是帶頭結點的鏈式存儲結構。帶頭結點與不帶頭結點相比,會在入隊和出隊的操做中得到更大的便捷性,所以同步隊列選擇了帶頭結點的鏈式存儲結構。那麼帶頭節點的隊列初始化時機是什麼?天然而然是在tail爲null時,即當前線程是第一次插入同步隊列。compareAndSetTail(t, node)方法會利用CAS操做設置尾節點,若是CAS操做失敗會在for (;;)
for死循環中不斷嘗試,直至成功return返回爲止。所以,對enq()方法能夠作這樣的總結:
如今咱們已經很清楚獲取獨佔式鎖失敗的線程包裝成Node而後插入同步隊列的過程了?那麼緊接着會有下一個問題?在同步隊列中的節點(線程)會作什麼事情了來保證本身可以有機會得到獨佔式鎖了?帶着這樣的問題咱們就來看看acquireQueued()方法,從方法名就能夠很清楚,這個方法的做用就是排隊獲取鎖的過程,源碼以下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1. 得到當前節點的先驅節點
final Node p = node.predecessor();
// 2. 當前節點可否獲取獨佔式鎖
// 2.1 若是當前節點的先驅節點是頭結點而且成功獲取同步狀態,便可以得到獨佔式鎖
if (p == head && tryAcquire(arg)) {
//隊列頭指針用指向當前節點
setHead(node);
//釋放前驅節點
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2.2 獲取鎖失敗,線程進入等待狀態等待獲取獨佔式鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
程序邏輯經過註釋已經標出,總體來看這是一個這又是一個自旋的過程(for (;;)),代碼首先獲取當前節點的先驅節點,若是先驅節點是頭結點的而且成功得到同步狀態的時候(if (p == head && tryAcquire(arg))),當前節點所指向的線程可以獲取鎖。反之,獲取鎖失敗進入等待狀態。總體示意圖爲下圖:
獲取鎖成功,出隊操做
獲取鎖的節點出隊的邏輯是:
//隊列頭結點引用指向當前節點
setHead(node);
//釋放前驅節點
p.next = null; // help GC
failed = false;
return interrupted;
複製代碼
setHead()方法爲:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
複製代碼
將當前節點經過setHead()方法設置爲隊列的頭結點,而後將以前的頭結點的next域設置爲null而且pre域也爲null,即與隊列斷開,無任何引用方便GC時可以將內存進行回收。示意圖以下:
那麼當獲取鎖失敗的時候會調用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他們作了什麼事情。shouldParkAfterFailedAcquire()方法源碼爲:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
shouldParkAfterFailedAcquire()方法主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
使用CAS將節點狀態由INITIAL設置成SIGNAL,表示當前線程阻塞。當compareAndSetWaitStatus設置失敗則說明shouldParkAfterFailedAcquire方法返回false,而後會在acquireQueued()方法中for (;;)死循環中會繼續重試,直至compareAndSetWaitStatus設置節點狀態位爲SIGNAL時shouldParkAfterFailedAcquire返回true時纔會執行方法parkAndCheckInterrupt()方法,該方法的源碼爲:
private final boolean parkAndCheckInterrupt() {
//使得該線程阻塞
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
該方法的關鍵是會調用LookSupport.park()方法(關於LookSupport會在之後的文章進行討論),該方法是用來阻塞當前線程的。所以到這裏就應該清楚了,acquireQueued()在自旋過程當中主要完成了兩件事情:
通過上面的分析,獨佔式鎖的獲取過程也就是acquire()方法的執行流程以下圖所示:
獨佔鎖的釋放就相對來講比較容易理解了,廢話很少說先來看下源碼:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
這段代碼邏輯就比較容易理解了,若是同步狀態釋放成功(tryRelease返回true)則會執行if塊中的代碼,當head指向的頭結點不爲null,而且該節點的狀態值不爲0的話纔會執行unparkSuccessor()方法。unparkSuccessor方法源碼:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//頭節點的後繼節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//後繼節點不爲null時喚醒該線程
LockSupport.unpark(s.thread);
}
複製代碼
源碼的關鍵信息請看註釋,首先獲取頭節點的後繼節點,當後繼節點的時候會調用LookSupport.unpark()方法,該方法會喚醒該節點的後繼節點所包裝的線程。所以,每一次鎖釋放後就會喚醒隊列中該節點的後繼節點所引用的線程,從而進一步能夠佐證得到鎖的過程是一個FIFO(先進先出)的過程。
到如今咱們終於啃下了一塊硬骨頭了,經過學習源碼的方式很是深入的學習到了獨佔式鎖的獲取和釋放的過程以及同步隊列。能夠作一下總結:
整體來講:在獲取同步狀態時,AQS維護一個同步隊列,獲取同步狀態失敗的線程會加入到隊列中進行自旋;移除隊列(或中止自旋)的條件是前驅節點是頭結點而且成功得到了同步狀態。在釋放同步狀態時,同步器會調用unparkSuccessor()方法喚醒後繼節點。
獨佔鎖特性學習
咱們知道lock相較於synchronized有一些更方便的特性,好比能響應中斷以及超時等待等特性,如今咱們依舊採用經過學習源碼的方式來看看可以響應中斷是怎麼實現的。可響應中斷式鎖可調用方法lock.lockInterruptibly();而該方法其底層會調用AQS的acquireInterruptibly方法,源碼爲:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
//線程獲取鎖失敗
doAcquireInterruptibly(arg);
}
複製代碼
在獲取同步狀態失敗後就會調用doAcquireInterruptibly方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//將節點插入到同步隊列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//獲取鎖出隊
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//線程中斷拋異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
關鍵信息請看註釋,如今看這段代碼就很輕鬆了吧:),與acquire方法邏輯幾乎一致,惟一的區別是當parkAndCheckInterrupt返回true時即線程阻塞時該線程被中斷,代碼拋出被中斷異常。
經過調用lock.tryLock(timeout,TimeUnit)方式達到超時等待獲取鎖的效果,該方法會在三種狀況下才會返回:
咱們仍然經過採起閱讀源碼的方式來學習底層具體是怎麼實現的,該方法會調用AQS的方法tryAcquireNanos(),源碼爲:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
//實現超時等待的效果
doAcquireNanos(arg, nanosTimeout);
}
複製代碼
很顯然這段源碼最終是靠doAcquireNanos方法實現超時等待的效果,該方法源碼以下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//1. 根據超時時間和當前時間計算出截止時間
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//2. 當前線程得到鎖出隊列
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 3.1 從新計算超時時間
nanosTimeout = deadline - System.nanoTime();
// 3.2 已經超時返回false
if (nanosTimeout <= 0L)
return false;
// 3.3 線程阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 3.4 線程被中斷拋出被中斷異常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
程序邏輯如圖所示:
程序邏輯同獨佔鎖可響應中斷式獲取基本一致,惟一的不一樣在於獲取鎖失敗後,對超時時間的處理上,在第1步會先計算出按照如今時間和超時時間計算出理論上的截止時間,好比當前時間是8h10min,超時時間是10min,那麼根據deadline = System.nanoTime() + nanosTimeout
計算出恰好達到超時時間時的系統時間就是8h 10min+10min = 8h 20min。而後根據deadline - System.nanoTime()
就能夠判斷是否已經超時了,好比,當前系統時間是8h 30min很明顯已經超過了理論上的系統時間8h 20min,deadline - System.nanoTime()
計算出來就是一個負數,天然而然會在3.2步中的If判斷之間返回false。若是尚未超時即3.2步中的if判斷爲true時就會繼續執行3.3步經過LockSupport.parkNanos使得當前線程阻塞,同時在3.4步增長了對中斷的檢測,若檢測出被中斷直接拋出被中斷異常。
在聊完AQS對獨佔鎖的實現後,咱們繼續一氣呵成的來看看共享鎖是怎樣實現的?共享鎖的獲取方法爲acquireShared,源碼爲:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
複製代碼
這段源碼的邏輯很容易理解,在該方法中會首先調用tryAcquireShared方法,tryAcquireShared返回值是一個int類型,當返回值爲大於等於0的時候方法結束說明得到成功獲取鎖,不然,代表獲取同步狀態失敗即所引用的線程獲取鎖失敗,會執行doAcquireShared方法,該方法的源碼爲:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 當該節點的前驅節點是頭結點且成功獲取同步狀態
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
如今來看這段代碼會不會很容易了?邏輯幾乎和獨佔式鎖的獲取如出一轍,這裏的自旋過程當中可以退出的條件是當前節點的前驅節點是頭結點而且tryAcquireShared(arg)返回值大於等於0即能成功得到同步狀態。
共享鎖的釋放在AQS中會調用方法releaseShared:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
複製代碼
當成功釋放同步狀態以後即tryReleaseShared會繼續執行doReleaseShared方法:
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
複製代碼
這段方法跟獨佔式鎖釋放過程有點點不一樣,在共享式鎖的釋放過程當中,對於可以支持多個線程同時訪問的併發組件,必須保證多個線程可以安全的釋放同步狀態,這裏採用的CAS保證,當CAS操做失敗continue,在下一次循環中進行重試。
關於可中斷鎖以及超時等待的特性其實現和獨佔式鎖可中斷獲取鎖以及超時等待的實現幾乎一致,具體的就再也不說了,若是理解了上面的內容對這部分的理解也是水到渠成的。
經過這篇,加深了對AQS的底層實現更加清楚了,也對了解併發組件的實現原理打下了基礎,學無止境,繼續加油:);若是以爲不錯,請給贊,嘿嘿。
參考文獻
《java併發編程的藝術》