在一篇博客中,咱們看了下CopyOnWriteArrayList的源碼,不是很難,裏面用到了一個可重入的排他鎖: ReentrantLock,這東西看上去和Synchronized差很少,可是和Synchronized是徹底不一樣的東西。node
Synchronized鎖的特性是JVM保證的,ReentrantLock鎖的特性是上層的Java代碼控制的。而ReentrantLock的基礎就是AQS,事實上,不少併發容器都用了ReentrantLock,也就間接的用到了AQS,還有併發框架,如CountDownLatch,CyclicBarrier,Semaphore也都用到了AQS,可見AQS的重要性。安全
可是要想稍微深刻一點理解AQS實屬不易,牽扯到很多東西,因此本篇博客將會拆分紅兩篇,第一篇將會介紹AQS的前置知識:LockSupport,AQS的核心概念,以及獨佔、共享模式下,AQS的核心源碼解析等,第二篇將會介紹AQS對條件變量的支持,以及AQS的應用等。併發
要深刻一些學習AQS,首先要掌握一個前置知識:LockSupport。框架
LockSupport是一個工具類,它的主要做用是掛起和喚醒線程,它的底層是調用的native方法,這個咱們不去深刻探究,主要看下LockSupport的應用。工具
若是調用park方法的線程已經拿到了與LockSupport關聯的許可證,調用park後,會當即返回,不然該線程會被阻塞,直到拿到了許可證。
若是一個線程調用了unpark方法,就會得到與LockSupport關聯的許可證,若是該線程以前調用了park而被阻塞,那麼會被喚醒,若是該線程以前沒有調用park方法,那麼調用park方法後,會馬上返回。學習
public static void main(String[] args) { System.out.println("Hello,LockSupport"); LockSupport.park(); System.out.println("Bye,LockSupport"); }
運行結果:
線程打印出第一句話,就被阻塞了,由於線程沒有得到與LockSupport關聯的許可證。ui
public static void main(String[] args) { System.out.println("Hello,LockSupport"); LockSupport.unpark(Thread.currentThread()); LockSupport.park(); System.out.println("Bye,LockSupport"); }
運行結果:
先調用unpark方法,傳入了當前線程,當前線程得到了與LockSupport關聯的許可證,隨後調用park方法,由於該線程已經有了許可證,因此當即返回,打印出了第二句話。this
public static void main(String[] args) { Thread thread=new Thread(()->{ System.out.println("Hello,LockSupport"); LockSupport.park(); System.out.println("Bye,LockSupport"); }); thread.start(); LockSupport.unpark(thread); }
運行結果:
首先建立了一個Thread ,內部調用了park方法,隨之啓動線程,在主線程中,調用了unpark方法,傳入了子線程。
此方法有兩種狀況:線程
可是不論是哪一種狀況,最後的結果都是同樣的。只是過程有些區別,第一種狀況是
主線程調用了unpark方法後,讓子線程拿到了許可證,子線程內部調用park後當即返回,第二種狀況是子線程的park方法先調用到,由於目前尚未拿到許可證,因此被阻塞,隨後主線程調用了unpark,讓子線程拿到了許可證,子線程被返回。翻譯
和park方法相似,不一樣之處在於多了個超時時間,若是調用parkNanos,線程被阻塞了,超過了nanos後,無論有沒有得到許可,都會被返回。
public static void main(String[] args) { System.out.println("Hello,LockSupport"); LockSupport.parkNanos(Integer.MAX_VALUE); System.out.println("Bye,LockSupport"); }
運行結果:
爲了能夠看到比較明顯的效果,因此我把時間設置成了Integer.MAX_VALUE,能夠看到雖然沒有調用unpark方法拿到許可證,可是必定的時間後,該方法仍是被返回了。
此方法是比較推薦使用的,由於使用它,能夠經過jstack命令查看有關阻塞對象的信息。
public class Main { public void test() { LockSupport.park(this); } public static void main(String[] args) { Main main = new Main(); main.test(); } }
使用jstack pid命令:
還有幾個方法,就不一一介紹了。
有了上面的基礎,咱們終於能夠進入今天的正題了:AQS。
AQS的全稱是AbstractQueuedSynchronizer,翻譯是中文是抽象同步隊列。剛接觸AQS的時候,第一感受這個東西和抽象有關係,由於Abstract。。。後來發現,這個東西和抽象沒有半毛錢關係,慢慢的,又有新的理解,這個東西和抽象還真的有點關係,由於它把實現同步隊列的一些方法給抽象出來了,供其餘上層組件重寫或者複用。重點來了,其餘上層組件須要重寫其中的方法!再說的詳細點,就是其餘組件須要繼承AbstractQueuedSynchronizer,對其中的部分方法進行重寫。
咱們先來看下AQS的UML圖:
咱們先要對AQS進行一個大概的介紹,瞭解下AQS中比較核心的東西。
AQS維護了一個FIFO的雙向隊列,什麼是FIFO?就是先進先出的意思,雙向隊列就是上一個節點指向下一個節點的同時,下一個節點也指向上一個節點,咱們從AbstractQueuedSynchronizer關聯的Node類中就能夠看出來這一點:prev保存的是當前節點上一個node,next保存的是當前節點的下一個節點,有一個專業的名詞,分別是前驅節點,後繼節點,同時AbstractQueuedSynchronizer類有兩個字段,一個是head,一個是tail,顧名思義,head保存了頭節點,tail保存了尾節點。
Node類中的SHARED是用來標記該線程是獲取共享資源時被放入等待隊列的,EXCLUSIVE用來標記該線程是獲取獨佔資源時被放入等待隊列的,從這句話,咱們能夠看出Node類其實就是保存了放入等待隊列的線程,而有的線程是由於獲取共享資源失敗放入等待隊列的,而有的線程是由於獲取獨佔資源失敗而被放入等待隊列的,因此這裏須要有一個標記去區分。
再囉嗦一句,FIFO雙向隊列其實就是AQS中的等待隊列。
在Node類中,還有一個字段:waitStatus,它有五個取值,分別是:
在AbstractQueuedSynchronizer類中,有一個state字段,被標記爲volatile,是爲了保證可見性,這個字段的設計可厲害了。對於ReentrantLock來講,state保存的是重入次數,對於ReentrantReadWriteLock來講,state保存的是獲取讀鎖的重入次數和寫鎖的重入次數。
AbstractQueuedSynchronizer類中,還有一個內部類:ConditionObject,用來提供條件變量的支持。
AQS提供了兩種方式來獲取資源,一種是獨佔模式,一種是共享模式。
上面提到過,須要去定義一個類去繼承AbstractQueuedSynchronizer類,重寫其中的方法,通常來講
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
此方法是獨佔模式下獲取資源的頂級方法,若是線程調用tryAcquire(arg)方法成功了,說明已經獲取到了資源,直接返回,若是不成功,則將當前線程封裝成waitStaus爲Node.EXCLUSIVE的Node插入到AQS等待隊列的尾部。
咱們來看下tryAcquire方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
納尼,直接報錯了,這是什麼鬼?別忘了,咱們須要重寫這個方法。
咱們再來看下addWaiter(Node.EXCLUSIVE), arg)方法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//封裝成Node,新的Node // Try the fast path of enq; backup to full enq on failure Node pred = tail;//把尾節點賦值給pred ,pred也就是尾節點了 if (pred != null) {//若是pred不爲NULL node.prev = pred;//pred賦值給新節點的前驅節點,也就是新節點的前驅節點是尾節點 if (compareAndSetTail(pred, node)) {//CAS,若是pred仍是尾節點,則把新節點設置成尾節點,設置成功後,進入if pred.next = node;//把新節點賦值給pred的後繼節點 return node;//返回新節點 } } enq(node); return node; }
此方法先把線程封裝成一個(Node.EXCLUSIVE的Node,先嚐試把這個Node直接放入隊尾,若是成功的話,直接返回,若是失敗的話,調用enq(node)進行入隊操做:
private Node enq(final Node node) { for (;;) {//自旋 Node t = tail;//把尾節點賦值給t //若是尾節點爲空,則新建一個空的Node,用CAS把空的Node設置成頭節點 //成功後,再把尾部節點也指向空的Node if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t;//把尾節點賦值給傳進來的node的前驅節點 if (compareAndSetTail(t, node)) {//CAS,若是t仍是尾部節點,則用傳進來的node替換舊的尾部節點 t.next = node;//設置t的後繼節點爲傳進來的node return t; } } } }
這個方法歸納的來講,就是把獲取資源失敗的node放入AQS等待隊列。
咱們再回到頂級方法看下acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//拿到node的前驅節點,賦值給p if (p == head && tryAcquire(arg)) {//若是p已是頭節點了,表明這個時候 //node是第二個節點,再次調用tryAcquire獲取資源 setHead(node);//設置頭節點 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) &&//判斷此node是否能夠被park parkAndCheckInterrupt())//park interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
又是CAS自旋,首先拿到node的前驅節點,賦值給p,若是p已是頭節點了,表明這個時候node是第二個節點,再次嘗試調用tryAcquire獲取資源,若是成功,設置頭節點爲node,返回中斷標記位,若是失敗,先判斷本身是否能夠被park,若是能夠的話,就park,等待unpark。
再來看下parkAndCheckInterrupt方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//拿到前驅節點的waitStatus,賦值給ws if (ws == Node.SIGNAL)//若是是SIGNAL return true; if (ws > 0) {//若是是ws>0,則說明前驅節點被取消了,經過while循環, //找到最近的一個沒有取消的節點,排到後面 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//CAS設置前驅節點的waitStatus爲SIGNAL } return false; }
若是前驅節點的waitStatus爲SIGNAL,直接返回,若是前驅節點被取消了,則經過while循環,找到最近的一個沒有被取消的節點,排到後面去,若是前驅節點處於其餘狀態,則經過CAS把前驅節點的waitStatus設置爲SIGNAL。
再來看下parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
這方法就比較簡單了,就是park本身,返回當前線程是否被中斷。
咱們來爲acquireQueued方法作一個總結:
找到一個安全點park本身,若是被喚醒了,檢查本身是不是第二個節點,若是是的話,再次嘗試獲取資源,成功的話,就把本身設置爲頭節點。
好了,整個頂級的acquire核心內容已經分析完畢了,咱們來作一個總結:
最後,畫個流程圖幫助理解整個流程:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
此方法是獨佔模式下釋放資源的頂級方法。
首先調用tryRelease方法,若是成功了,把頭節點賦值給h,若是h不爲null而且waitStatus 不等於0,調用unparkSuccessor方法,喚醒下一個node。
tryRelease:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
此方法仍是直接報錯,由於咱們須要重寫。這裏咱們須要尤爲注意,此方法是判斷資源是否被徹底釋放了,若是鎖是能夠重入的,可能屢次得到了鎖,因此必須把最後一個鎖也釋放了,這裏才能返回ture,不然返回false。
unparkSuccessor:
private void unparkSuccessor(Node node) { int ws = node.waitStatus;//拿到當前節點的waitStatus,賦值給ws if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//當前節點的下一個節點賦值給s if (s == null || s.waitStatus > 0) {//若是s==null或者已經被取消了,就經過for循環找到下一個須要被喚醒的節點 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒 }
此方法的核心就是喚醒下一個須要被喚醒的節點。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
該方法是共享模式下獲取資源的頂級方法。
首先調用tryAcquireShared,來嘗試獲取資源,成功的話,則調用doAcquireShared,進入等待隊列,直到獲取了資源。
tryAcquireShared:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
咱們須要重寫tryAcquireShared方法。
doAcquireShared:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//入隊 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//拿到當前節點的前驅節點,賦值給p if (p == head) {//若是p是頭節點 int r = tryAcquireShared(arg);//調用tryAcquireShared嘗試獲取資源 if (r >= 0) { setHeadAndPropagate(node, r);//設置頭節點,若是還有剩餘資源,喚醒下一個節點 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
此方法和獨佔模式下的流程區別不大,最大的不一樣在於setHeadAndPropagate方法,咱們來看看這個方法作了什麼:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node);//設置頭節點 //若是還有剩餘資源 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next;//找到後繼節點 if (s == null || s.isShared()) doReleaseShared();//調用doReleaseShared方法 } }
首先設置當前節點爲頭節點,若是還有剩餘的資源,就找到後繼節點,調用doReleaseShared方法,這個方法咱們後面再看,可是從方法名稱來看,咱們能夠知道它與釋放共享資源有關。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
此方法是共享模式下釋放資源的頂級方法。
tryReleaseShared方法仍是須要咱們去重寫的,若是成功了,調用doReleaseShared方法:
private void doReleaseShared() { for (;;) { Node h = head;//把頭節點賦值給h if (h != null && h != tail) { int ws = h.waitStatus;//拿到h的waitStatus賦值給ws if (ws == Node.SIGNAL) {//若是爲SIGNAL if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//喚醒後繼節點 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
這個方法在共享模式下獲取共享資源的頂級方法acquireShared中的doAcquireShared中的setHeadAndPropagate也會調用。
好了,獨佔模式,共享模式下的獲取資源,釋放資源核心流程已經分析完畢了。
細心的你,必定發如今AQS中還有acquireInterruptibly()/acquireSharedInterruptibly()這兩個方法,這兩個方法從名稱上來看僅僅是多了一個Interruptibly,它們是會對中斷進行響應的,而咱們上面介紹的acquire,acquireShared是忽略中斷的。
本篇博客到這裏就結束了,可是還有一塊東西沒有講到:對條件變量的支持,這部份內容將放到下一篇博客再詳細介紹。