在前面提到過,AQS是構建Java同步組件的基礎,咱們期待它可以成爲實現大部分同步需求的基礎。AQS的設計模式採用的模板方法模式,子類經過繼承的方式,實現它的抽象方法來管理同步狀態,對於子類而言它並無太多的活要作,AQS提供了大量的模板方法來實現同步,主要是分爲三類:獨佔式獲取和釋放同步狀態、共享式獲取和釋放同步狀態、查詢同步隊列中的等待線程狀況。自定義子類使用AQS提供的模板方法就能夠實現本身的同步語義。node
獨佔式,同一時刻僅有一個線程持有同步狀態。編程
acquire(int arg)方法爲AQS提供的模板方法,該方法爲獨佔式獲取同步狀態,可是該方法對中斷不敏感,也就是說因爲線程獲取同步狀態失敗加入到CLH同步隊列中,後續對線程進行中斷操做時,線程不會從同步隊列中移除。代碼以下:設計模式
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
各個方法定義以下:安全
acquireQueued方法爲一個自旋的過程,也就是說當前線程(Node)進入同步隊列後,就會進入一個自旋的過程,每一個節點都會自省地觀察,當條件知足,獲取到同步狀態後,就能夠從這個自旋過程當中退出,不然會一直執行下去。以下:併發
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //中斷標誌 boolean interrupted = false; /* * 自旋過程,其實就是一個死循環而已 */ for (;;) { //當前線程的前驅節點 final Node p = node.predecessor(); //當前線程的前驅節點是頭結點,且同步狀態成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //獲取失敗,線程等待--具體後面介紹 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
從上面代碼中能夠看到,當前線程會一直嘗試獲取同步狀態,固然前提是隻有其前驅節點爲頭結點纔可以嘗試獲取同步狀態,理由:工具
acquire(int arg)方法流程圖以下:ui
AQS提供了acquire(int arg)方法以供獨佔式獲取同步狀態,可是該方法對中斷不響應,對線程進行中斷操做後,該線程會依然位於CLH同步隊列中等待着獲取同步狀態。爲了響應中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態時,若是當前線程被中斷了,會馬上響應中斷拋出異常InterruptedException。this
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
首先校驗該線程是否已經中斷了,若是是則拋出InterruptedException,不然執行tryAcquire(int arg)方法獲取同步狀態,若是獲取成功,則直接返回,不然執行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義以下:spa
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差異。1.方法聲明拋出InterruptedException異常,2.在中斷方法處再也不是使用interrupted標誌,而是直接拋出InterruptedException異常。線程
AQS除了提供上面兩個方法外,還提供了一個加強版的方法:tryAcquireNanos(int arg,long nanos)。該方法爲acquireInterruptibly方法的進一步加強,它除了響應中斷外,還有超時控制。即若是當前線程沒有在指定時間內獲取同步狀態,則會返回false,不然返回true。以下:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
tryAcquireNanos(int arg, long nanosTimeout)方法超時獲取最終是在doAcquireNanos(int arg, long nanosTimeout)中實現的,以下:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //nanosTimeout <= 0 if (nanosTimeout <= 0L) return false; //超時時間 final long deadline = System.nanoTime() + nanosTimeout; //新增Node節點 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { //自旋 for (;;) { final Node p = node.predecessor(); //獲取同步狀態成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } /* * 獲取失敗,作超時、中斷判斷 */ //從新計算須要休眠的時間 nanosTimeout = deadline - System.nanoTime(); //已經超時,返回false if (nanosTimeout <= 0L) return false; //若是沒有超時,則等待nanosTimeout納秒 //注:該線程會直接從LockSupport.parkNanos中返回, //LockSupport爲JUC提供的一個阻塞和喚醒的工具類,後面作詳細介紹 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); //線程是否已經中斷了 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
針對超時控制,程序首先記錄喚醒時間deadline ,deadline = System.nanoTime() + nanosTimeout(時間間隔)。若是獲取同步狀態失敗,則須要計算出須要休眠的時間間隔nanosTimeout(= deadline - System.nanoTime()),若是nanosTimeout <= 0 表示已經超時了,返回false,若是大於spinForTimeoutThreshold(1000L)則須要休眠nanosTimeout ,若是nanosTimeout <= spinForTimeoutThreshold ,就不須要休眠了,直接進入快速自旋的過程。緣由在於 spinForTimeoutThreshold 已經很是小了,很是短的時間等待沒法作到十分精確,若是這時再次進行超時等待,相反會讓nanosTimeout 的超時從總體上面表現得不是那麼精確,因此在超時很是短的場景中,AQS會進行無條件的快速自旋。
整個流程以下:
當線程獲取同步狀態後,執行完相應邏輯後就須要釋放同步狀態。AQS提供了release(int arg)方法釋放同步狀態:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
該方法一樣是先調用自定義同步器自定義的tryRelease(int arg)方法來釋放同步狀態,釋放成功後,會調用unparkSuccessor(Node node)方法喚醒後繼節點。
總結一下:
在AQS中維護着一個FIFO的同步隊列,當線程獲取同步狀態失敗後,則會加入到這個CLH同步隊列的對尾並一直保持着自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅節點是否爲首節點,若是爲首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步隊列。當線程執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。
共享式與獨佔式的最主要區別在於同一時刻獨佔式只能有一個線程獲取同步狀態,而共享式在同一時刻能夠有多個線程獲取同步狀態。例如讀操做能夠有多個線程同時進行,而寫操做同一時刻只能有一個線程進行寫操做,其餘操做都會被阻塞。
AQS提供acquireShared(int arg)方法共享式獲取同步狀態:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //獲取失敗,自旋獲取同步狀態 doAcquireShared(arg); }
從上面程序能夠看出,方法首先是調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,若是獲取失敗則調用doAcquireShared(int arg)自旋方式獲取同步狀態,共享式獲取同步狀態的標誌是返回 >= 0 的值表示獲取成功。自選式獲取同步狀態以下:
private void doAcquireShared(int arg) { /共享式節點 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //前驅節點 final Node p = node.predecessor(); //若是其前驅節點,獲取同步狀態 if (p == head) { //嘗試獲取同步 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
tryAcquireShared(int arg)方法嘗試獲取同步狀態,返回值爲int,當其 >= 0 時,表示可以獲取到同步狀態,這個時候就能夠從自旋過程當中退出。
acquireShared(int arg)方法不響應中斷,與獨佔式類似,AQS也提供了響應中斷、超時的方法,分別是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),這裏就不作解釋了。
獲取同步狀態後,須要調用release(int arg)方法釋放同步狀態,方法以下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
由於可能會存在多個線程同時進行釋放同步狀態資源,因此須要確保同步狀態安全地成功釋放,通常都是經過CAS和循環來完成的。
Doug Lea:《Java併發編程實戰》方騰飛:《Java併發編程的藝術》