【併發編程】Java併發編程-看懂AQS的前世此生

在咱們能夠深刻學習AbstractQueuedSynchronizer(AQS)以前,必須具有了volatile、CAS和模板方法設計模式的知識,本文主要想從AQS的產生背景、設計和結構、源代碼實現及AQS應用這4個方面來學習下AQSnode

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。算法

一、AQS產生背景編程

經過JCP的JSR166規範,Jdk1.5開始引入了j.u.c包,這個包提供了一系列支持併發的組件。這些組件是一系列的同步器,這些同步器主要維護着如下幾個功能:內部同步狀態的管理(例如表示一個鎖的狀態是獲取仍是釋放),同步狀態的更新和檢查操做,且至少有一個方法會致使調用線程在同步狀態被獲取時阻塞,以及在其餘線程改變這個同步狀態時解除線程的阻塞。上述的這些的實際例子包括:互斥排它鎖的不一樣形式、讀寫鎖、信號量、屏障、Future、事件指示器以及傳送隊列等。能夠看下這裏的4.2的圖便能理解j.u.c包的組件構成。設計模式

幾乎任一同步器均可以用來實現其餘形式的同步器。例如,能夠用可重入鎖實現信號量或者用信號量實現可重入鎖。可是,這樣作帶來的複雜性、開銷及不靈活使j.u.c最多隻能是一個二流工程,且缺少吸引力。若是任何這樣的構造方式不能在本質上比其餘形式更簡潔,那麼開發者就不該該隨意地選擇其中的某個來構建另外一個同步器。所以,JSR166基於AQS類創建了一個小框架,這個框架爲構造同步器提供一種通用的機制,而且被j.u.c包中大部分類使用,同時不少用戶也能夠用它來定義本身的同步器。這個就是j.u.c的做者Doug Lea大神的初衷,經過提供AQS這個基礎組件來構建j.u.c的各類工具類,至此就能夠理解AQS的產生背景了。安全

二、AQS的設計和結構數據結構

2.1 設計思想併發

同步器的核心方法是acquire和release操做,其背後的思想也比較簡潔明確。acquire操做是這樣的:app

while (當前同步器的狀態不容許獲取操做) {框架

若是當前線程不在隊列中,則將其插入隊列異步

阻塞當前線程

}

若是線程位於隊列中,則將其移出隊列

release操做是這樣的:

更新同步器的狀態

if (新的狀態容許某個被阻塞的線程獲取成功)

解除隊列中一個或多個線程的阻塞狀態

從這兩個操做中的思想中咱們能夠提取出三大關鍵操做:同步器的狀態變動、線程阻塞和釋放、插入和移出隊列。因此爲了實現這兩個操做,須要協調三大關鍵操做引伸出來的三個基本組件:

·同步器狀態的原子性管理;

·線程阻塞與解除阻塞;

·隊列的管理;

由這三個基本組件,咱們來看j.u.c是怎麼設計的。

2.1.1 同步狀態

AQS類使用單個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSet操做來讀取和更新這個同步狀態。其中屬性state被聲明爲volatile,而且經過使用CAS指令來實現compareAndSetState,使得當且僅當同步狀態擁有一個一致的指望值的時候,纔會被原子地設置成新值,這樣就達到了同步狀態的原子性管理,確保了同步狀態的原子性、可見性和有序性。

基於AQS的具體實現類(如鎖、信號量等)必須根據暴露出的狀態相關的方法定義tryAcquire和tryRelease方法,以控制acquire和release操做。當同步狀態知足時,tryAcquire方法必須返回true,而當新的同步狀態容許後續acquire時,tryRelease方法也必須返回true。這些方法都接受一個int類型的參數用於傳遞想要的狀態。

2.1.2 阻塞

直到JSR166,阻塞線程和解除線程阻塞都是基於Java的內置管程,沒有其它非基於Java內置管程的API能夠用來達到阻塞線程和解除線程阻塞。惟一能夠選擇的是Thread.suspend和Thread.resume,可是它們都有沒法解決的競態問題,因此也無法用,目前該方法基本已被拋棄。具體不能用的緣由能夠官方給出的答覆。

j.u.c.locks包提供了LockSupport類來解決這個問題。方法LockSupport.park阻塞當前線程直到有個LockSupport.unpark方法被調用。unpark的調用是沒有被計數的,所以在一個park調用前屢次調用unpark方法只會解除一個park操做。另外,它們做用於每一個線程而不是每一個同步器。一個線程在一個新的同步器上調用park操做可能會當即返回,由於在此以前能夠有多餘的unpark操做。可是,在缺乏一個unpark操做時,下一次調用park就會阻塞。雖然能夠顯式地取消多餘的unpark調用,但並不值得這樣作。在須要的時候屢次調用park會更高效。park方法一樣支持可選的相對或絕對的超時設置,以及與JVM的Thread.interrupt結合 ,可經過中斷來unpark一個線程。

2.1.3 隊列

整個框架的核心就是如何管理線程阻塞隊列,該隊列是嚴格的FIFO隊列,所以不支持線程優先級的同步。同步隊列的最佳選擇是自身沒有使用底層鎖來構造的非阻塞數據結構,業界主要有兩種選擇,一種是MCS鎖,另外一種是CLH鎖。其中CLH通常用於自旋,可是相比MCS,CLH更容易實現取消和超時,因此同步隊列選擇了CLH做爲實現的基礎。

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

CLH隊列實際並不那麼像隊列,它的出隊和入隊與實際的業務使用場景密切相關。它是一個鏈表隊列,經過AQS的兩個字段head(頭節點)和tail(尾節點)來存取,這兩個字段是volatile類型,初始化的時候都指向了一個空節點。以下圖:

Java併發編程-看懂AQS的前世此生

 

入隊操做:CLH隊列是FIFO隊列,故新的節點到來的時候,是要插入到當前隊列的尾節點以後。試想一下,當一個線程成功地獲取了同步狀態,其餘線程將沒法獲取到同步狀態,轉而被構形成爲節點並加入到同步隊列中,而這個加入隊列的過程必需要保證線程安全,所以同步器提供了一個CAS方法,它須要傳遞當前線程「認爲」的尾節點和當前節點,只有設置成功後,當前節點才正式與以前的尾節點創建關聯。入隊操做示意圖大體以下:

Java併發編程-看懂AQS的前世此生

 

出隊操做:由於遵循FIFO規則,因此能成功獲取到AQS同步狀態的一定是首節點,首節點的線程在釋放同步狀態時,會喚醒後續節點,然後續節點會在獲取AQS同步狀態成功的時候將本身設置爲首節點。設置首節點是由獲取同步成功的線程來完成的,因爲只能有一個線程能夠獲取到同步狀態,因此設置首節點的方法不須要像入隊這樣的CAS操做,只須要將首節點設置爲原首節點的後續節點同時斷開原節點、後續節點的引用便可。出隊操做示意圖大體以下:

Java併發編程-看懂AQS的前世此生

 

這一小節只是簡單的描述了隊列的大概,目的是爲了表達清楚隊列的設計框架,實際上CLH隊列已經和初始的CLH隊列已經發生了一些變化,具體的能夠看查看資料中Doug Lea的那篇論文中的3.3 Queues。

2.1.4 條件隊列

上一節的隊列實際上是AQS的同步隊列,這一節的隊列是條件隊列,隊列的管理除了有同步隊列,還有條件隊列。AQS只有一個同步隊列,可是能夠有多個條件隊列。AQS框架提供了一個ConditionObject類,給維護獨佔同步的類以及實現Lock接口的類使用。

ConditionObject類實現了Condition接口,Condition接口提供了相似Object管程式的方法,如await、signal和signalAll操做,還擴展了帶有超時、檢測和監控的方法。ConditionObject類有效地將條件與其它同步操做結合到了一塊兒。該類只支持Java風格的管程訪問規則,這些規則中,當且僅當當前線程持有鎖且要操做的條件(condition)屬於該鎖時,條件操做纔是合法的。這樣,一個ConditionObject關聯到一個ReentrantLock上就表現的跟內置的管程(經過Object.wait等)同樣了。二者的不一樣僅僅在於方法的名稱、額外的功能以及用戶能夠爲每一個鎖聲明多個條件。

ConditionObject類和AQS共用了內部節點,有本身單獨的條件隊列。signal操做是經過將節點從條件隊列轉移到同步隊列中來實現的,沒有必要在須要喚醒的線程從新獲取到鎖以前將其喚醒。signal操做大體示意圖以下:

Java併發編程-看懂AQS的前世此生

 

await操做就是當前線程節點從同步隊列進入條件隊列進行等待,大體示意圖以下:

Java併發編程-看懂AQS的前世此生

 

實現這些操做主要複雜在,因超時或Thread.interrupt致使取消了條件等待時,該如何處理。await和signal幾乎同時發生就會有競態問題,最終的結果遵守內置管程相關的規範。JSR133修訂之後,就要求若是中斷髮生在signal操做以前,await方法必須在從新獲取到鎖後,拋出InterruptedException。可是,若是中斷髮生在signal後,await必須返回且不拋異常,同時設置線程的中斷狀態。

2.2 方法結構

若是咱們理解了上一節的設計思路,咱們大體就能知道AQS的主要數據結構了。

Java併發編程-看懂AQS的前世此生

 

 進而再來看下AQS的主要方法及其做用。

Java併發編程-看懂AQS的前世此生

 

Java併發編程-看懂AQS的前世此生

 

Java併發編程-看懂AQS的前世此生

 

看到這,咱們對AQS的數據結構應該基本上有一個大體的認識,有了這個基本面的認識,咱們就能夠來看下AQS的源代碼。

三、AQS的源代碼實現

主要經過獨佔式同步狀態的獲取和釋放、共享式同步狀態的獲取和釋放來看下AQS是如何實現的。

3.1 獨佔式同步狀態的獲取和釋放

獨佔式同步狀態調用的方法是acquire,代碼以下:

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

上述代碼主要完成了同步狀態獲取、節點構造、加入同步隊列以及在同步隊列中自旋等待的相關工做,其主要邏輯是:首先調用子類實現的tryAcquire方法,該方法保證線程安全的獲取同步狀態,若是同步狀態獲取失敗,則構造獨佔式同步節點(同一時刻只能有一個線程成功獲取同步狀態)並經過addWaiter方法將該節點加入到同步隊列的尾部,最後調用acquireQueued方法,使得該節點以自旋的方式獲取同步狀態。若是獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要依靠前驅節點的出隊或阻塞線程被中斷來實現。

下面來首先來看下節點構造和加入同步隊列是如何實現的。代碼以下:

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

private Node addWaiter(Node mode) {
// 當前線程構形成Node節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 嘗試快速在尾節點後新增節點 提高算法效率 先將尾節點指向pred
Node pred = tail;
if (pred != null) {
//尾節點不爲空 當前線程節點的前驅節點指向尾節點
node.prev = pred;
//併發處理 尾節點有可能已經不是以前的節點 因此須要CAS更新
if (compareAndSetTail(pred, node)) {
//CAS更新成功 當前線程爲尾節點 原先尾節點的後續節點就是當前節點
pred.next = node;
return node;
}
}
//第一個入隊的節點或者是尾節點後續節點新增失敗時進入enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//尾節點爲空 第一次入隊 設置頭尾節點一致 同步隊列的初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
//全部的線程節點在構造完成第一個節點後 依次加入到同步隊列中
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

節點進入同步隊列以後,就進入了一個自旋的過程,每一個線程節點都在自省地觀察,當條件知足,獲取到了同步狀態,就能夠從這個自旋過程當中退出,不然依舊留在這個自旋過程當中並會阻塞節點的線程,代碼以下:

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當前線程節點的前驅節點
final Node p = node.predecessor();
//前驅節點爲頭節點且成功獲取同步狀態
if (p == head && tryAcquire(arg)) {
//設置當前節點爲頭節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//是否阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

再來看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt是怎麼來阻塞當前線程的,代碼以下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅節點的狀態決定後續節點的行爲
     int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*前驅節點爲-1 後續節點能夠被阻塞
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*前驅節點是初始或者共享狀態就設置爲-1 使後續節點阻塞
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//阻塞線程
LockSupport.park(this);
return Thread.interrupted();
}

節點自旋的過程大體示意圖以下,其實就是對圖2、圖三的補充。

Java併發編程-看懂AQS的前世此生

 

圖六 節點自旋獲取隊列同步狀態

整個獨佔式獲取同步狀態的流程圖大體以下:

Java併發編程-看懂AQS的前世此生

 

圖七 獨佔式獲取同步狀態

當同步狀態獲取成功以後,當前線程從acquire方法返回,對於鎖這種併發組件而言,就意味着當前線程獲取了鎖。有獲取同步狀態的方法,就存在其對應的釋放方法,該方法爲release,如今來看下這個方法的實現,代碼以下:

public final boolean release(int arg) {
if (tryRelease(arg)) {//同步狀態釋放成功
Node h = head;
if (h != null && h.waitStatus != 0)
//直接釋放頭節點
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*尋找符合條件的後續節點
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒後續節點
LockSupport.unpark(s.thread);
}

獨佔式釋放是很是簡單並且明確的。

總結下獨佔式同步狀態的獲取和釋放:在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列的條件是前驅節點爲頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用tryRelease方法釋放同步狀態,而後喚醒頭節點的後繼節點。

3.2 共享式同步狀態的獲取和釋放

共享式同步狀態調用的方法是acquireShared,代碼以下:

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

public final void acquireShared(int arg) {
//獲取同步狀態的返回值大於等於0時表示能夠獲取同步狀態
//小於0時表示能夠獲取不到同步狀態 須要進入隊列等待
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//和獨佔式同樣的入隊操做
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//前驅結點爲頭節點且成功獲取同步狀態 可退出自旋
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//退出自旋的節點變成首節點
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

與獨佔式同樣,共享式獲取也須要釋放同步狀態,經過調用releaseShared方法能夠釋放同步狀態,代碼以下:

public final boolean releaseShared(int arg) {
//釋放同步狀態
if (tryReleaseShared(arg)) {
//喚醒後續等待的節點
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
//自旋
    for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒後續節點
            unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

unparkSuccessor方法和獨佔式是同樣的。

四、AQS應用

AQS被大量的應用在了同步工具上。

ReentrantLock:ReentrantLock類使用AQS同步狀態來保存鎖重複持有的次數。當鎖被一個線程獲取時,ReentrantLock也會記錄下當前得到鎖的線程標識,以便檢查是不是重複獲取,以及當錯誤的線程試圖進行解鎖操做時檢測是否存在非法狀態異常。ReentrantLock也使用了AQS提供的ConditionObject,還向外暴露了其它監控和監測相關的方法。

ReentrantReadWriteLock:ReentrantReadWriteLock類使用AQS同步狀態中的16位來保存寫鎖持有的次數,剩下的16位用來保存讀鎖的持有次數。WriteLock的構建方式同ReentrantLock。ReadLock則經過使用acquireShared方法來支持同時容許多個讀線程。

Semaphore:Semaphore類(信號量)使用AQS同步狀態來保存信號量的當前計數。它裏面定義的acquireShared方法會減小計數,或當計數爲非正值時阻塞線程;tryRelease方法會增長計數,在計數爲正值時還要解除線程的阻塞。

CountDownLatch:CountDownLatch類使用AQS同步狀態來表示計數。當該計數爲0時,全部的acquire操做(對應到CountDownLatch中就是await方法)才能經過。

FutureTask:FutureTask類使用AQS同步狀態來表示某個異步計算任務的運行狀態(初始化、運行中、被取消和完成)。設置(FutureTask的set方法)或取消(FutureTask的cancel方法)一個FutureTask時會調用AQS的release操做,等待計算結果的線程的阻塞解除是經過AQS的acquire操做實現的。

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

SynchronousQueues:SynchronousQueues類使用了內部的等待節點,這些節點能夠用於協調生產者和消費者。同時,它使用AQS同步狀態來控制當某個消費者消費當前一項時,容許一個生產者繼續生產,反之亦然。

除了這些j.u.c提供的工具,還能夠基於AQS自定義符合本身需求的同步器。

相關文章
相關標籤/搜索