曾經有一道比較比較經典的面試題「你可以說說java
的併發包下面有哪些常見的類?」大多數人應該均可以說出
CountDownLatch、CyclicBarrier、Sempahore多線程併發三大利器。這三大利器都是經過AbstractQueuedSynchronizer
抽象類(下面簡寫AQS)來實現的,因此學習三大利器以前咱們有必要先來學習下AQS
。html
AQS是一種提供了原子式管理同步狀態、阻塞和喚醒線程功能以及隊列模型的簡單框架java
AQS結構
說到同步咱們如何來保證同步?你們第一印象確定是加鎖了,說到鎖的話你們確定首先會想到的是Synchronized。
Synchronized你們應該基本上都會使用,加鎖和釋放鎖都是jvm 來幫咱們實現的,咱們只須要簡單的加個 Synchronized關鍵字就能夠了。
用起來超級方便。可是有沒有一種狀況咱們設置一個鎖的超時時間Synchronized就有點實現不了,這時候咱們就能夠用ReentrantLock來實現,ReentrantLock是經過aqs來實現的,今天咱們就經過ReentrantLock來學習一下aqs。nodeCAS && 公平鎖和非公平鎖
AQS裏面用到了大量的CAS學習AQS以前咱們仍是有必要簡單的先了解下
CAS
、公平鎖和非公平鎖。面試CAS
- CAS 全稱是 compare and swap,是一種用於在多線程環境下實現同步功能的機制。
CAS
操做包含三個操做數 -- 內存位置、預期數值和新值。CAS
的實現邏輯是將內存位置處的數值與預期數值想比較,若相等,則將內存位置處的值替換爲新值。若不相等,則不作任何操做,這個操做是個原子性操做,java裏面的AtomicInteger
等類都是經過cas來實現的。公平鎖和非公平鎖
- 公平鎖:多個線程按照申請鎖的順序去得到鎖,線程會直接進入隊列去排隊,隊列中第一個才能得到到鎖。
優勢:等待鎖的線程不會餓死,每一個線程均可以獲取到鎖。
缺點:總體吞吐效率相對非公平鎖要低,等待隊列中除第一個線程之外的全部線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖大。- 非公平鎖:多個線程去獲取鎖的時候,會直接去嘗試獲取,獲取不到,再去進入等待隊列,若是能獲取到,就直接獲取到鎖。
優勢:能夠減小CPU喚醒線程的開銷,總體的吞吐效率會高點,CPU也沒必要取喚醒全部線程,會減小喚起線程的數量。
缺點:處於等待隊列中的線程可能會餓死,或者等好久纔會得到鎖。
文字有點拗口,咱們來個實際的例子說明下。好比咱們去食堂就餐的時候都要排隊,你們都按照先來後到的順序排隊打飯,這就是公平鎖。若是等到你準備拿盤子打飯的時候
直接蹦出了一個五大三粗的胖子插隊到你前面,你看打不贏他只能忍氣吞聲讓他插隊,等胖子打完飯了又來個小個子也來插你隊,這時候你無法忍了,直接大吼一聲讓他滾,這個
小個子只能屁顛屁顛到隊尾去排隊了這就是非公平鎖。
咱們先來看看AQS有哪些屬性// 頭結點 private transient volatile Node head;
// 阻塞的尾節點,每一個新的節點進來,都插入到最後,也就造成了一個鏈表
private transient volatile Node tail;編程
// 這個是最重要的,表明當前鎖的狀態,0表明沒有被佔用,大於 0 表明有線程持有當前鎖
// 這個值能夠大於 1,是由於鎖能夠重入,每次重入都加上 1
private volatile int state;多線程
// 表明當前持有獨佔鎖的線程,舉個最重要的使用例子,由於鎖能夠重入
// reentrantLock.lock()能夠嵌套調用屢次,因此每次用這個來判斷當前線程是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer併發
下面咱們來寫一個demo分析下lock 加鎖和釋放鎖的過程 ```java final void lock() { // 上來先試試直接把狀態置位1,若是此時沒人獲取鎖就直接 if (compareAndSetState(0, 1)) // 爭搶成功則修改得到鎖狀態的線程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
cas
嘗試失敗,說明已經有人再持有鎖,因此進入acquire方法app
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire
方法,看名字大概能猜出什麼意思,就是試一試。
tryAcquire其實是調用了父類Sync的nonfairTryAcquire
方法框架
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 獲取下當前鎖的狀態 int c = getState(); // 這個if 邏輯跟前面一進來就獲取鎖的邏輯同樣都是經過cas嘗試獲取下鎖 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 進入這個判斷說明 鎖重入了 狀態須要進行+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 若是鎖的重入次數大於int的最大值,直接就拋出異常了,正常狀況應該不存在這種狀況,不過jdk仍是嚴謹的 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false 說明嘗試獲取鎖失敗了,失敗了就要進行acquireQueued方法了 return false; }
tryAcquire
方法若是獲取鎖失敗了,那麼確定就要排隊等待獲取鎖。排隊的線程須要待在哪裏等待獲取鎖?這個就跟咱們線程池執行任務同樣,線程池把任務都封裝成一個work,而後當線程處理任務不過來的時候,就把任務放到隊列裏面。AQS一樣也是相似的,把排隊等待獲取鎖的線程封裝成一個NODE。而後再把NODE放入到一個隊列裏面。隊列以下所示,不過須要注意一點head是不存NODE的。
jvm
接下來咱們繼續分析源碼,看下獲取鎖失敗是如何被加入隊列的。
就要執行acquireQueued方法,執行acquireQueued方法以前須要先執行addWaiter方法
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // cas 加入隊列隊尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾結點不爲空 || cas 加入尾結點失敗 enq(node); return node; }
接下來再看看enq方法
// 經過自旋和CAS必定要當前node加入隊尾 private Node enq(final Node node) { for (;;) { Node t = tail; // 尾結點爲空說明隊列仍是空的,尚未被初始化,因此初始化頭結點,能夠看到頭結點的node 是沒有綁定線程的也就是不存數據的 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
經過addWaiter方法已經把獲取鎖的線程經過封裝成一個NODE加入對列。上述方法的一個執行流程圖以下:
,接下來就是繼續執行acquireQueued方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 經過自旋去獲取鎖 前驅節點==head的時候去嘗試獲取鎖,這個方法在前面已經分析過了。 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 進入這個if說明node的前驅節點不等於head 或者嘗試獲取鎖失敗了 // 判斷是否須要掛起當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 異常狀況進入cancelAcquire,在jdk11的時候這個源碼直接是catch (Throwable e){ cancelAcquire(node);} 簡單明瞭 if (failed) cancelAcquire(node); } }
這個方法每當有一個node獲取到鎖了,就把當前node
節點設置爲頭節點,能夠簡單的看作當前節點獲取到鎖了就把當前節點」移除「(變爲頭結點)隊列。
說到這個方法咱們就要先看下NODE可能會有哪些狀態在源碼裏面咱們能夠看到總共會有四種狀態
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驅節點狀態 若是這個狀態爲-1 則返回true,把當前線程掛起 if (ws == Node.SIGNAL) return true; // 大於0,說明狀態爲CANCELLED if (ws > 0) { do { // 刪除被取消的node(讓被取消的node成爲一個沒有引用的node等着下次GC被回收) node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 進入這裏只能是 0,-2,-3。NODE節點初始化的時候waitStatus默認值是0,因此只有這裏纔有修改waitStatus的地方 // 經過cas 把前驅節點的狀態設置爲-1,而後返回false ,外面調用這個方法的是個循環,又會調用一次這個方法 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
掛起當前線程,而且阻塞
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 掛起當前線程,阻塞 return Thread.interrupted(); }
加鎖成功了,那鎖用完了就應該釋放鎖了,釋放鎖重點看下unparkSuccessor這個方法就行了
private void unparkSuccessor(Node node) { // 頭結點狀態 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // s==null head的successor節點獲取鎖成功後,執行了head.next=null的操做後,解鎖線程讀取了head.next,所以s==null // head的successor節點被取消(cancelAcquire)時,執行了以下操做:successor.waitStatus=1 ; successor.next = successor; if (s == null || s.waitStatus > 0) { s = null; // 從尾節點開始往前找,找到最前面的非取消的節點 這裏沒有break 哦 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒線程 ,喚醒的線程會從acquireQueued去獲取鎖 LockSupport.unpark(s.thread); }
釋放鎖代碼比較簡單,基本都寫在代碼註釋裏面了,流程以下:
這段代碼裏面有一個比較經典的面試題:
若是頭結點的下一個節點爲空或者頭結點的下一個節點的狀態爲取消的時候爲何要從後往前找,找到最前面非取消的節點?