AQS的源碼分析 <一> java
目錄結構node
一、什麼是CAS ?數據庫
二、同步器類結構安全
三、CLH同步隊列多線程
四、AQS中靜態內部類Node併發
五、方法分析源碼分析
5.一、acquire(int arg )ui
5.二、release(int arg) 釋放鎖this
六、總結線程
在多線程環境下,咱們通常會對臨界區資源(共享資源)進行加鎖,釋放鎖,保證同一時刻最多隻有一個線程(獨佔模式),就如去公共廁所裏,在使用一個小房間時會加鎖避免本身在使用的時候,別人忽然闖進來同樣,引發沒必要要的麻煩,在使用完後,再打開鎖,其餘人才可以使用;還有生產者消費者模型中,線程之間要同步,須要等待和通知機制,來協調線程合做。那麼這些是這麼實現的?如可重入鎖ReentrantLock, 讀寫鎖ReadWriteLock, 信號量 Semaphore, 計數器CountDownLatch,這些都會涉及線程之間的協調同步,那麼會有一個抽象的結構,將這些須要共用的功能抽離出來,統一來知足要求嗎?咱們一塊兒來看看AbstractQueuedSynchronizer 這個抽象類,如何來實現這些功能和其設計的巧妙, 咱們能看到Doug lea 大佬在不少地方使用的循環CAS操做(自旋鎖)。
CAS 即 compare and swap 比較並交換, 涉及到三個參數,內存值V, 預期值A, 要修改的新值B, 拿着預期值A與內存值V比較,相等則符合預期,將內存值V更新爲B, 不然不相等,不能更新V;
好比我想去果籃拿1個蘋果,我預期籃子是5個,而果籃中實際有2個,被其餘人早已偷偷吃了幾個我還不知道!因而個人指望值5與果籃值實際2不符,那麼我就不能更新籃子的蘋果數量了。這和數據庫中樂觀鎖機制同樣。後面我想用一篇文章單獨總結一下CAS, 如它存在的ABA問題以及加上版本號來解決ABA問題, 還有用CAS實現自旋鎖等。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
AbstractQueuedSynchronizer (之後簡稱AQS)是一個抽象類,定義了一些通用功能以及子類須要根據自身特性重寫實現的方法,有兩個內部類 Node和ConditionObject, 以及2個鏈表結構的隊列,一個是雙向鏈表的同步隊列,用來存放嘗試獲取鎖,卻未獲取到鎖需等待的線程,另外一個是單向鏈表的條件隊列,用來存放某些線程已經獲取到鎖了,爲了等待某些事件(如IO事件,mq消息等)主動放棄鎖掛起等待條件的線程。
這裏主要分析2個方法,分別是獨佔式獲取鎖 acquire, 釋放鎖release。
// 獨佔模式 EXCLUSIVE acquire(int arg) release(int arg) // 共享模式 SHARED acquireShared(int arg) releaseShared(int arg)
AQS 內部持有一個雙向鏈表的隊列(CLH隊列,也稱線程同步隊列,用來存儲嘗試獲取鎖,未獲取到等待的線程),AQS 持有head, tail節點,以及資源同步狀態state 屬性, 來表示有限的鎖資源佔用狀況。
注:Head節點是一個空節點,是不關聯線程的,由於Head節點表示當前已經佔用資源在使用的線程,該線程已經不須要等待鎖,在同步隊列中做佔位做用,它只是一個標誌節點,其餘線程纔是對鎖資源有需求,在隊列中等待的,慢慢看來,後面代碼會更有理解。
雙向鏈表結構以下圖:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 等待隊列中的頭節點,懶初始化,若是頭節點存在,表示當前正在佔用鎖資源的線程。 * */ private transient volatile Node head; /** * 等待隊列中尾節點 */ private transient volatile Node tail; /** * 同步狀態,在獨佔模式下 state=0,表示鎖空閒,可獲取,state =1,則表示被其餘線程佔用,需等待 * 就如火車上的廁所,綠燈表示可用,紅燈表示裏面有人,須要等待 */ private volatile int state;
CLH雙向鏈表中每一個node 中有關聯的線程,節點狀態信息,以及節點的先後指針,每一個屬性有volatile修飾,可保證多線程之間的可見性。
static final class Node { // 獲取鎖的模式分爲獨佔式和共享式 static final Node SHARED = new Node(); //獨佔式 static final Node EXCLUSIVE = null; // 如下是節點在等待隊列中的5種狀態,初始爲0,即waitStatus的值 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 如下4個屬性分別用volatile修飾,保證多線程之間可見性 //節點的狀態 volatile int waitStatus; //前置節點 volatile Node prev; // 後置節點 volatile Node next; // 節點關聯的線程 volatile Thread thread;
Node中在隊列中的等待狀態 waitStatus,初始爲0,還有如下4種狀態和其含義:
鎖資源只有1個,同一時刻最多隻有一個線程可獲取到鎖,獨佔式獲取鎖,其餘未獲取到鎖的線程需在同步隊列中等待。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //若是在等待過程當中,線程被標記中斷了,響應中斷 selfInterrupt(); }
變形爲:
public final void acquire(int arg) { boolean ta = tryAcquire(arg) if(!ta) { Node aw = addWaiter(Node.EXCLUSIVE); boolean isInterrupt = acquireQueued(aw,arg); if(isInterrupt) { //若是在等待過程當中,線程被標記中斷了,響應中斷 selfInterrupt(); } } }
1) tryAcquire(arg): 嘗試去獲取鎖資源,具體怎樣獲取鎖由每一個子類實現,AQS不作處理,統一拋出 throw new UnsupportedOperationException() 異常;
2) addWaiter(Node.EXCLUSIVE):若是嘗試獲取鎖失敗,將該線程封裝到一個新node節點中,添加到等待隊列中
3) acquireQueued(node, arg) :若是在head節點後,再次嘗試去獲取鎖,可能佔用鎖的線程已經釋放資源了,若是再次獲取失敗,在隊列中尋找到安全位置(告知前置節點,使用完資源喚醒一下本身)後,則在隊列中掛起,等待被喚醒,返回布爾類型,表示線程是否被中斷。
下面來分別查看以上3個方法的源碼:
1)tryAcquire() 是嘗試去獲取鎖,須要每一個具體實現類去實現,默認拋出異常
protected boolean tryAcquire(int arg) { //拋出不支持操做異常 throw new UnsupportedOperationException(); }
來看看 ReentrantLock 是怎樣實現 tryAcquire() 去獲取鎖的,ReentrantLock 有公平方式和非公平兩種方式,這裏簡單看看非公平方式的實現細節。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
內部調用nonfairTryAcquire 方法,代碼以下:
final boolean nonfairTryAcquire(int acquires) { //1,獲取當前線程對象 final Thread current = Thread.currentThread(); //2,獲取同步狀態state int c = getState(); //3, c爲0表示鎖可用,cas更新爲1,compareAndSetState內部有unsafe類 調用本地方法(native修飾)執行原子更新操做,多個線程同時進來更新,只會有一個更新成功。更新成功就設置當前線程對象,用來標識當前線程擁有鎖了,是有主的,其餘線程在我沒釋放鎖以前不可佔用,也是用來支持可重入的。 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //4,若是當前線程等於當前已獲取鎖線程,可重入,同步狀態 +1,返回true,容許再次獲取鎖 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
可重入鎖 : 指線程在獲取鎖資源後,再次去獲取同步鎖,是容許再次得到的,否則存在產生死鎖的風險,由上可知ReentrantLock 是支持可重入的,可重入屢次,每重入一次 state +1,那麼重入n 次,也須要釋放n次(同理,每釋放一次,state -1 ),否則state 沒法恢復到到空閒狀態0,致使其餘線程沒法獲取到鎖。
2)addWaiter(Node mode),將當前線程封裝爲一個節點,添加到等待隊列中。該方法分爲2步操做,第一步利用CAS快速將節點入隊,若是能入隊成功,則返回,不然第二步執行 enq() 方法。
爲何?
爲了執行效率和安全性兩方面考慮,因在多線程環境下,同一時間可能併發執行多個線程,若是此時有2個線程都在執行入隊操做,就有可能其中一個線程執行CAS 失敗,他引用 的tail節點已經被更新,須要從新獲取新的,此時就須要執行enq() 循環輪詢去獲取最新的tail節點執行入隊操做,直到成功。
如圖所示:
線程A在剛獲取到pred = tail後,cpu調度切換到線程B,線程B此時也執行入隊操做,入隊成功,隊列長度+1, tail更新,此時cpu調度 線程A,線程A再執行 compareAndSetTail(pred, node) 操做就會失敗。
快速添加到隊尾失敗了,有兩種狀況,
1.是tail節點 爲空,隊列爲空,那麼須要初始化隊列,添加一個空的head節點,表示當前已經獲取鎖資源,正在執行的線程。
2.是在cas添加到隊尾後面時,有其餘線程也在操做,搶先入隊成功,致使本身獲取的tail節點不是最新的,那就輪詢獲取最新的tail執行更新。
流程圖以下:
addWaiter(Node mode) 代碼以下:
private Node addWaiter(Node mode) { /*1,建立 node節點,節點信息保存線程及模式 * Node.EXCLUSIVE for exclusive 獨佔模式 * Node.SHARED for shared 共享模式 */ Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; //2,若是隊尾節點不爲空,嘗試將節點添加到 tail後面,創建雙向連接 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //3,若是隊尾節點爲空,或者cas添加節點失敗 enq(node); return node; }
enq(node) 節點入隊操做,for無限循環嘗試去添加節點到隊尾,直至成功。
private Node enq(final Node node) { for (;;) { //每新的一次循環,獲取的是最新的tail,因tail節點是volatile修飾的,多線程之間內存可見,每次更新都會被刷新到內存 Node t = tail; if (t == null) { // Must initialize //1,head節點是一個 node對象,可是沒有綁定線程,表示當前已經獲取到鎖資源,正在執行的線程,此時首尾相同,會再次走一遍循環,添加當前節點到head節點後面 if (compareAndSetHead(new Node())) tail = head; } else { //2,輪詢去設置節點到隊尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
boolean acquireQueued(final Node node, int arg) 返回線程是否被中斷
在上一步方法將節點添加到同步隊列後,執行該方法的目的是:
若是node的前置節點是head, 再次嘗試一次獲取鎖資源,若是已經被釋放了,那(線程)就去獲取鎖執行。若是不是頭節點或者嘗試獲取鎖資源失敗,那就在隊列找個位置掛起等待了 。
此時須要找一個安全點,確保前面的有效節點(沒被取消的節點)的線程執行後,可以通知喚醒本身,當前節點線程才安心掛起等待。
流程圖以下:
源代碼以下:
final boolean acquireQueued(final Node node, int arg) { //默認是失敗 boolean failed = true; try { //默認是沒有被中斷 boolean interrupted = false; //無限循環 for (;;) { final Node p = node.predecessor(); //1,若是前置節點是head節點,再去嘗試獲取鎖,若是獲取成功了,則將本身更新爲head,此時當前線程能夠執行了。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // 此時返回 false,沒被中斷 return interrupted; } //2,1失敗或前置不是head,那麼要定位一個有效的位置去阻塞等待,前面有些節點多是被取消的,須要跳過這些節點,清除出隊列,shouldParkAfterFailedAcquire就是來作這個事 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //3,標記中斷標記,返回被中斷過 interrupted = true; } } finally { // 4,正常狀況下是一直會進行for循環,當跳出該循環,即出現異常,如被標記了中斷,去調用阻塞,拋出中斷異常,此時沒有在隊列中找到安全位置,所以將該節點移出隊列 if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire 在失敗獲取鎖資源後,尋找能夠安全掛起的位置
若是前置節點被取消,一直向前找到節點 ws<=0 爲止,注意藍色箭頭,指向的先後指針還在,可是node1對象已經沒有被引用,下次gc會被回收,則出隊。
流程圖以下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //1,前置節點已是signal,在它釋放資源後,會通知喚醒隊列中下一個線程,因此當前位置就是安全阻塞等待點 return true; if (ws > 0) { // ws> 0,前置節點被取消 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 2,ws>0,那麼就繼續往前找,直到找到一個節點沒被取消的,跳過了那些被取消的節點,這些節點後面會被GC pred.next = node; } else { //3,嘗試將前置節點 狀態更新爲signal,ws= -1, 進行下一次輪詢 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
在找到安全位置後,掛起當前線程,等待被喚醒,若是下次被喚醒,首先檢查一下本身是被前置節點喚醒 仍是被中斷喚醒的
private final boolean parkAndCheckInterrupt() { // 掛起當前線程 LockSupport.park(this); //當被喚醒,返回是否被中斷標記位 return Thread.interrupted(); }
最後finally中,線程被中斷,出現異常,取消節點;cancelAcquire(Node node) 將該節點移出等待隊列,線程都被中斷了,就再也不
須要去等待鎖資源了。
出隊有三種狀況:
注: 我理解這裏最終實現是要達到第3步的效果,可是在cancelAcquire方法裏並無更新第2步藍色箭頭的前置指針,而是留給了其餘線程在獲取鎖資源時,執行shouldParkAfterFailedAcquire方法查找安全位置掛起來實現的,這時 node 將沒有再被引用,所以會被GC。
3, 被取消節點在head的後面,第二個節點
這裏惟一作的就是去喚醒 node 的後繼節點(若是爲沒被取消的節點),若是後繼節點爲空或者ws =1,那麼就在隊列從後向前遍歷去喚醒一個有效節點,這個節點醒來作的事情是嘗試去獲取鎖,而且將排在它隊列前連續被取消的節點出隊。因此這裏更新節點的先後指針操做仍是交給其餘線程來作的,在 shouldParkAfterFailedAcquire方法實現的。
源代碼以下:
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //1,將節點與線程解綁 node.thread = null; //2,跳過前面全部被取消的節點 // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; //3,獲取前置節點後一個位置節點,不必定是 node,有多是跳過的被取消的節點,留做cas更新用 Node predNext = pred.next; //4,將當前節點狀態改成取消 ws =1 node.waitStatus = Node.CANCELLED; //5,第一種狀況:尾節點,將node前置節點更新爲tail,並斷開前置節點的next後置指針,node將無引用,被gc if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; // 第三種狀況:node在中間位置,要作的就是讓node前置節點 pred的next指針指向node後的沒被取消的節點,跳過node if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 第二種狀況,在head後面,那麼要需向node後找沒被取消節點,讓head跳過node指向他,並喚醒該節點的線程。 unparkSuccessor(node); } node.next = node; // help GC } }
喚醒後繼者,該方法在release 方法也被執行,在線程使用完資源後,去喚醒下一個等待該資源的線程去執行。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 狀態置爲0,初始狀態,也不讓她通知下一個節點,此時節點應該是取消狀態1,是大於0的,爲啥還需再判斷一次?,這個方法不僅這裏用,釋放資源release也用,正常ws =-1,因此更新狀態爲0 //首先判斷node後置節點,若是不行,再從尾向前遍歷去找一個沒被取消的節點 去喚醒其線程 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
public final boolean release(int arg) { //1,釋放資源前提是已經獲取到 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //2,head的waitStatus在獨佔模式應該是signal =-1 // 喚醒下一個節點,代碼如上 unparkSuccessor(h); return true; } return false; }
一樣,tryRelease() 與 tryAcquire() 同樣,嘗試釋放鎖,AQS都未具體實現,拋出異常,留個子類具體去實現。
AQS中代碼:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
這裏仍是看一下ReentrantLock 的具體實現
protected final boolean tryRelease(int releases) { //1,在沒有可重入狀況下,state =1, releases =1, 此時 c應該 =0 int c = getState() - releases; //2,判斷當前釋放鎖線程是否等於拿到鎖的線程,不等就拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //3,可見只有c=0,狀況纔可成功釋放鎖 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
注:因ReentrantLock可重入的鎖,由上代碼可見,只有在 state=0的狀況,纔可成功釋放鎖,常見錯誤場景:在使用ReentrantLock lock n次,卻沒有unlock相應的n次,致使沒有成功釋放鎖。
一、 AQS的CLH隊列的 pre指針可保證可靠性,next是保證不了的,看上面的代碼分析,在遍歷隊列時,會向前遍從來查找安全點。
二、Condition 隊列與 CLH同步隊列二者的區別,同步隊列中的全部線程是在等待獲取鎖的,Condition條件隊列是某些以前已經獲取到鎖,由於要等待某個事件(如IO事件)或者與某個線程同步(等待另外一線程執行結果),主動調用await方法,主動釋放鎖後,將該線程放入條件隊列中的,因此在條件隊列中線程不是等待獲取鎖,而是在等某個條件,下一篇我會來分析AQS的重要機制- 等待通知,靠其內部類ConditionObject實現,await和 signal來阻塞與 通知機制,可替代傳統Object類提供的wait和 notify功能,而AQS可有多個條件來分類,比之應用更加靈活。