Java的內置鎖一直都是備受爭議的,在JDK 1.6以前,synchronized這個重量級鎖其性能一直都是較爲低下,雖然在1.6後,進行大量的鎖優化策略,可是與Lock相比synchronized仍是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機制(基於JVM機制),可是它卻缺乏了獲取鎖與釋放鎖的可操做性,可中斷、超時獲取鎖,且它爲獨佔式在高併發場景下性能大打折扣。node
AQS,AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其餘同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC併發包的做者(Doug Lea)指望它可以成爲實現大部分同步需求的基礎。它是JUC併發包中的核心基礎組件。安全
AQS解決了子類實現同步器時涉及到的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基於AQS來構建同步器能夠帶來不少好處。它不只可以極大地減小實現工做,並且也沒必要處理在多個位置上發生的競爭問題。多線程
AQS的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態。併發
AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操做,固然AQS能夠確保對state的操做是安全的。框架
AQS經過內置的FIFO同步隊列來完成資源獲取線程的排隊工做,若是當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。高併發
AQS能夠實現獨佔鎖和共享鎖,RenntrantLock實現的是獨佔鎖,ReentrantReadWriteLock實現的是獨佔鎖和共享鎖,CountDownLatch實現的是共享鎖。oop
下面咱們經過源碼來分析下AQS的實現原理性能
經過AQS的類結構咱們能夠看到它內部有一個隊列和一個state的int變量。
隊列:經過一個雙向鏈表實現的隊列來存儲等待獲取鎖的線程。
state:鎖的狀態。
head、tail和state 都是volatile類型的變量,volatile能夠保證多線程的內存可見性。優化
同步隊列的基本結構以下:ui
同步隊列
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; //表示當前的線程被取消; static final int CANCELLED = 1; //表示當前節點的後繼節點包含的線程須要運行,也就是unpark; static final int SIGNAL = -1; //表示當前節點在等待condition,也就是在condition隊列中; static final int CONDITION = -2; //表示當前場景下後續的acquireShared可以得以執行; static final int PROPAGATE = -3; //表示節點的狀態。默認爲0,表示當前節點在sync隊列中,等待着獲取鎖。 //其它幾個狀態爲:CANCELLED、SIGNAL、CONDITION、PROPAGATE volatile int waitStatus; //前驅節點 volatile Node prev; //後繼節點 volatile Node next; //獲取鎖的線程 volatile Thread thread; //存儲condition隊列中的後繼節點。 Node nextWaiter; ...... }
從Node結構prev和next節點能夠看出它是一個雙向鏈表,waitStatus存儲了當前線程的狀態信息
waitStatus
下面咱們經過如下五個方面來介紹AQS是怎麼實現的鎖的獲取和釋放的
public final void acquire(int arg) { //嘗試得到鎖,獲取不到則加入到隊列中等待獲取 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; //將該節點添加到隊列尾部 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //若是前驅節點爲null,則進入enq方法經過自旋方式入隊列 enq(node); return node; }
將構造的同步節點加入到同步隊列中
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //若是隊列爲空,則經過CAS把當前Node設置成頭節點 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //若是隊列不爲空,則向隊列尾部添加Node if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
該方法使用CAS自旋的方式來保證向隊列中添加Node(同步節點簡寫Node)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //找到當前節點的前驅節點 final Node p = node.predecessor(); //檢測p是否爲頭節點,若是是,再次調用tryAcquire方法 if (p == head && tryAcquire(arg)) { //若是p節點是頭節點且tryAcquire方法返回true。那麼將當前節點設置爲頭節點。 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //若是p節點不是頭節點,或者tryAcquire返回false,說明請求失敗。 //那麼首先須要判斷請求失敗後node節點是否應該被阻塞,若是應該 //被阻塞,那麼阻塞node節點,並檢測中斷狀態。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //若是有中斷,設置中斷狀態。 interrupted = true; } } finally { if (failed) //最後檢測一下若是請求失敗(異常退出),取消請求。 cancelAcquire(node); } }
在acquireQueued方法中,當前線程經過自旋的方式來嘗試獲取同步狀態,
經過上面的代碼咱們能夠發現AQS內部的同步隊列是FIFO的方式存取的。節點自旋獲取同步狀態的行爲以下圖所示
節點自旋獲取同步狀態
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //得到前驅節點狀態 int ws = pred.waitStatus; if (ws == Node.SIGNAL) //若是前驅節點狀態爲SIGNAL,當前線程則能夠阻塞。 return true; if (ws > 0) { do { //判斷若是前驅節點狀態爲CANCELLED,那就一直往前找,直到找到最近一個正常等待的狀態 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //並將當前Node排在它的後邊。 pred.next = node; } else { //若是前驅節點正常,則修改前驅節點狀態爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
狀態 | 值 | 說明 |
---|---|---|
CANCELLED | 1 | 等待超時或者中斷,須要從同步隊列中取消 |
SIGNAL | -1 | 後繼節點出於等待狀態,當前節點釋放鎖後將會喚醒後繼節點 |
CONDITION | -2 | 節點在等待隊列中,節點線程等待在Condition上,其它線程對Condition調用signal()方法後,該節點將會從等待同步隊列中移到同步隊列中,而後等待獲取鎖。 |
PROPAGATE | -3 | 表示下一次共享式同步狀態獲取將會無條件地傳播下去 |
INITIAL | 0 | 初始狀態 |
private final boolean parkAndCheckInterrupt() { //阻塞當前線程 LockSupport.park(this); //判斷是否中斷來喚醒的 return Thread.interrupted(); }
public final boolean release(int arg) { //嘗試釋放鎖 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒後繼節點 unparkSuccessor(h); return true; } return false; }
tryRelease(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。
public final void acquireShared(int arg) { //嘗試獲取的鎖,若是獲取失敗執行doAcquireShared方法。 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared()嘗試獲取鎖,若是獲取失敗則經過doAcquireShared()進入等待隊列,直到獲取到資源爲止才返回。
這裏tryAcquireShared()須要自定義同步器去實現。
AQS中規定:負值表明獲取失敗,非負數標識獲取成功。
private void doAcquireShared(int arg) { //構建共享Node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取前驅節點 final Node p = node.predecessor(); //若是是頭節點進行嘗試得到鎖 if (p == head) { //若是返回值大於等於0,則說明得到鎖 int r = tryAcquireShared(arg); if (r >= 0) { //當前節點設置爲隊列頭,並 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在acquireQueued方法中,當前線程也經過自旋的方式來嘗試獲取同步狀態,同獨享式得到鎖同樣
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); //若是propagate >0,說明共享鎖還有能夠進行得到鎖,繼續喚醒下一個節點 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
設置當前節點爲頭結點,並調用了doReleaseShared()方法,acquireShared方法最終調用了release方法,得看下爲何。緣由其實也很簡單,shared模式下是容許多個線程持有一把鎖的,其中tryAcquire的返回值標誌了是否容許其餘線程繼續進入。若是容許的話,須要喚醒隊列中等待的線程。其中doReleaseShared方法的邏輯很簡單,就是喚醒後繼線程。
所以acquireShared的主要邏輯就是嘗試加鎖,若是容許其餘線程繼續加鎖,那麼喚醒後繼線程,若是失敗,那麼入隊阻塞等待。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。
private void doReleaseShared() { for (;;) { // 獲取隊列的頭節點 Node h = head; // 若是頭節點不爲null,而且頭節點不等於tail節點。 if (h != null && h != tail) { // 獲取頭節點對應的線程的狀態 int ws = h.waitStatus; // 若是頭節點對應的線程是SIGNAL狀態,則意味着「頭節點的下一個節點所對應的線程」須要被unpark喚醒。 if (ws == Node.SIGNAL) { // 設置「頭節點對應的線程狀態」爲空狀態。失敗的話,則繼續循環。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 喚醒「頭節點的下一個節點所對應的線程」。 unparkSuccessor(h); } // 若是頭節點對應的線程是空狀態,則設置「尾節點對應的線程所擁有的共享鎖」爲其它線程獲取鎖的空狀態。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是頭節點發生變化,則繼續循環。不然,退出循環。 if (h == head) // loop if head changed break; } }
該方法主要是喚醒後繼節點。對於可以支持多個線程同時訪問的併發組件(好比Semaphore),它和獨佔式主要區別在於tryReleaseShared(int arg)方法必須確保同步狀態(或者資源數)線程安全釋放,通常是經過循環和CAS來保證的,由於釋放同步狀態的操做會同時來自多個線程。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; //計算出超時時間點 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } //計算剩餘超時時間,超時時間點deadline減去當前時間點System.nanoTime()獲得還應該睡眠的時間 nanosTimeout = deadline - System.nanoTime(); //若是超時,返回false,獲取鎖失敗 if (nanosTimeout <= 0L) return false; //判斷是否須要阻塞當前線程 //若是須要,在判斷當前剩餘納秒數是否大於1000 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //阻塞 nanosTimeout納秒數 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
該方法在自旋過程當中,當節點的前驅節點爲頭節點時嘗試獲取同步狀態,若是獲取成功則從該方法返回,這個過程和獨佔式同步獲取的過程相似,可是在同步狀態獲取失敗的處理上有所不一樣。若是當前線程獲取同步狀態失敗,則首先從新計算超時間隔nanosTimeout,則判斷是否超時(nanosTimeout小於等於0表示已經超時),若是沒有超時,則使當前線程等待nanosTimeout納秒(當已到設置的超時時間,該線程會從LockSupport.parkNanos(Object blocker,long nanos)方法返回)。
若是nanosTimeout小於等於spinForTimeoutThreshold(1000納秒)時,將不會使該線程進行 超時等待,而是進入快速的自旋過程。緣由在於,很是短的超時等待沒法作到十分精確,若是 這時再進行超時等待,相反會讓nanosTimeout的超時從總體上表現得反而不精確。所以,在超 時很是短的場景下,同步器會進入無條件的快速自旋。