AQS的java的同窗面試常常被問到的一個問題。不少同窗被面到這個問題的時候,一臉蒙圈。可是說實話,這個AQS對於java的同窗來講應該是一個比較重要的知識,由於咱們不少併發的對象都是基於這個實現的,因此考察java同窗的併發知識的功底,問這個AQS也是一個質量比較好的問題。java
AQS的全稱是AbstractQueuedSynchronizer(文中就叫:抽象隊列),說白了其實就是一個抽象類。node
是咱們併發包裏面的基石,重入鎖,讀寫鎖,不少併發的工具都是基於它實現的。因此理解好AQS是什麼東西對於掌握好併發知識是有幫助的。面試
不過,咱們若是分析AQS的時候,直接就讀AQS的代碼有點無聊,咱們就分析ReentantLock的源碼,由於咱們平時真正使用的時候咱們使用的是ReentantLock,而不是AQS。可是ReentantLock就是基於AQS實現的。併發
在分析AQS以前,我先給你們解析一下AQS的原理。以下圖所示:工具
圖展現的一個ReentantLocak(基於非公平鎖分析)加鎖的過程。ui
加鎖失敗了之後線程二就加入到隊列裏面去(AQS內部實現了一個雙向隊列)this
AQS重入加鎖大概就是這麼個原理。咱們接下來分析一下它的底層源碼。我就以ReentantLocak爲例寫個例子。spa
public class ReentrantLockDemo { ReentrantLock reentrantLock = new ReentrantLock(); int sum=0; public void count(){ //這個是加鎖的代碼 //咱們這次主要基於非公平鎖分析源碼,等到合適的時機再給你們 //解釋公平鎖和非公平鎖的區別。 //首先咱們開始分析lock的方法。 reentrantLock.lock(); for (int i = 0; i < 10; i++) { sum++; System.out.println(sum); } //這個是咱們釋放鎖的代碼 reentrantLock.unlock(); } } }
final void lock() { //線程一第一次進來之間使用cas操做修改state的值 //這句代碼的語義就是當前state的值是否爲0,若是是0,那麼就把修改成1。 if (compareAndSetState(0, 1)) //設置當前線程爲本身,其實線程一第一次進來加鎖的時候,到這兒就加鎖成功了!! setExclusiveOwnerThread(Thread.currentThread()); else //若是線程1第二次進來 //那麼由於state不是0了,因此會cas操做失敗 //因此會走這個方法 acquire(1); } 接下來咱們分析一下,線程一第二次進來是如何加鎖的: public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 咱們慢慢分析這段代碼 首先應該分析的是:tryAcquire(arg)方法 因此咱們要想分析acquire方法,那麼先分析裏面的tryAcouire方法 //執行的是這個方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
這個方法調用的是以下的方法: final boolean nonfairTryAcquire(int acquires) { //獲取當前線程,當前線程固然是線程一嘍 final Thread current = Thread.currentThread(); //獲取當前的state 那麼state 是1 int c = getState(); //若是c == 0 //其實咱們知道,代碼之因此走到這兒就是由於前面c != 0 //可是jdk的源碼爲了健壯性,因此這兒仍是再次判斷了一下 //意思就是若是當前的state是0,那麼直接加鎖就能夠了。 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //若是發現上一個加鎖也是本身 //那麼直接進行重入加鎖就能夠了 else if (current == getExclusiveOwnerThread()) { // 1+1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //修改狀態值爲2,可重入加鎖成功,返回true. setState(nextc); return true; } return false; } 若是上個結果返回true以後。咱們再回過頭來看這個方法: public final void acquire(int arg) { //(!true) 結果就是false //那麼代碼就不繼續執行了 //也就是說若是是重入加鎖,那麼這兒加鎖成功之後就退出去了。 //換句話說,若是是重入加鎖,代碼執行到這兒重入加鎖也就成功了!! if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 如今咱們的的 線程是1,state=2 接下來咱們在分析一個狀況,線程2進來了。咱們的代碼又是如何走的。 首先state這個時候不是0了,那麼直接走的是acquire方法。 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //直接走這個方法 acquire(1); } //咱們再次分析這段代碼 //咱們分析一下這個方法。tryAcquire //經過咱們前面的分析咱們知道,若是重入加鎖成功了,那麼這兒直接返回的是true //可是若是發現當前的線程 和 裏面加鎖的線程不是同一個線程 //那麼重入加鎖失敗。這兒就會返回來false //若是tryAcquire返回的是false。那麼 (!false) = true //代碼就會運行到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //接下來咱們分析一下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 這個方法 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 分析acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 這個方法 咱們首先須要分析addWariter這個方法。注意Node.EXCLUSIVE這個值是null //進入到這個方法的時候咱們進入到了 類 AbstractQueuedSynchonzied裏面。 private Node addWaiter(Node mode) { //根據當前線程建立了一個Node //這個node的nextWaiter = mode = null Node node = new Node(Thread.currentThread(), mode); //tail一開始就是等於mull Node pred = tail; //因此第一次進來等式不成立 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //第一次進來代碼走的是這兒 enq(node); return node; } //調的這個方法,建立來的參數的是當前線程的Node private Node enq(final Node node) { for (;;) {//自旋 //第一次指針有變化 //第二次進來指針仍是有變化 Node t = tail; //注意咱們第一次進來,知足這個條件,因此t==null,是知足條件的 //第二次進來,那麼這個時候t就不等於Null了,因此這兒的這個條件就不知足了 //去執行else語句 if (t == null) { // Must initialize //使用cas設置一個head頭 if (compareAndSetHead(new Node())) //指針要發生變化 tail = head; //接着代碼就會執行到這兒,你們必定要注意這兒。 //代碼執行到這兒之後,由於這是一個死循環,因此 //執行到這兒之後再次運行。 } else { //當前的線程的prev指向t node.prev = t; //使用cas操做設置隊列的尾部 //這個cas的意思是當前的tail是否就是t,若是是t //那麼就把值修改成當前node,那很明顯,當前的tail就是t //因此這個就把當前node設置爲tail if (compareAndSetTail(t, node)) { t.next = node; //返回頭結點結束這個死循環 return t; } } } } 到目前爲止指針變化以下:
接下來假設線程三要進來了。 若是線程三進來,確定就會走到這段代碼。 private Node addWaiter(Node mode) { //建立線程三Node Node node = new Node(Thread.currentThread(), mode); //pred指向tail Node pred = tail; //此次pred就不等於null if (pred != null) { //當前的node.pred指向 tail node.prev = pred; //判斷當前pred是否是tail,若是是 //就把當前node設置爲tail,當前tail確定就是pred if (compareAndSetTail(pred, node)) { //pred.netxt指向了線程三的node pred.next = node; //返回當前node return node; } } enq(node); return node; } 致使到此隊列的指針變化以下:
到目前爲止咱們分析清楚了addWaiter的方法,可是不要忘記了咱們的目標。咱們是分析 acquireQueued的這個方法。咱們只須要知道若是加鎖失敗了,那麼就會調用addWaiter方法,addWaiter方法返回來的是當前的node acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //須要注意一下咱們這兒是死循環 for (;;) { //獲取當前節點的上一個節點,那麼咱們線程三的 //上一個節點是線程二 final Node p = node.predecessor(); //線程二不知足這兒條件 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //若是上面一個條件不知足的話,接下來就會調用以下的方法 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //傳進來的參數是上一個節點和當前節點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //當一個節點剛牀架你的時候 waitStatus默認值應該是0 int ws = pred.waitStatus; //這個條件不知足 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; //這個條件不知足 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //那麼就會直接執行這個方法。 //這個方法就會把上一個節點的waitStatus 設置爲SINGAL //也就是-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //而後返回false return false; }
咱們再回過頭來分析上一段代碼: final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //剛剛咱們知道這兒的返回值是false //那麼若是是false的話,if條件就不知足了。 //不知足了之後再次執行for循環。 //繼續執行shouldParkAfterFailedAcquire方法 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 當前的隊列指針狀況以下:
//再次執行這個方法 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取狀態,如今ws=-1 int ws = pred.waitStatus; //符合這個提交 if (ws == Node.SIGNAL) //結果返回true return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //這個shouldParkAfterFailedAcquire方法結果爲true了之後 //那麼接下來執行parkAndCheckInterrupt方法 //因此接下來咱們分析一下parkAndCheckInterrupt方法。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { //這兒操做就比較簡單了,這兒就直接把線程掛起,線程就停在這兒不動了 //必需要等另一個線程去執行unpark操做代碼才能往下執行。 LockSupport.park(this); return Thread.interrupted(); } 由於代碼執行到這兒就已經卡住了。因此咱們回到源頭看到如下,最終會讓哪段代碼卡住。 //這個地方是調動acquireQueued方法致使代碼卡住,因此這兒的代碼也會卡住不動。 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
到目前爲止咱們分析了一個加鎖失敗的線程進去到隊列之後的狀況。.net
咱們如今能夠解釋一下公平鎖和非公平鎖的區別了。
咱們以前的全部的代碼分析的都是非公平鎖的,非公平鎖最大的特色就是在這兒。線程
final void lock() { //加鎖的時候不分青紅皁白,也無論隊列裏面是否有人在排着隊 //上來就是直接加鎖,因此咱們想一下,假設咱們雖然如今隊列裏面有線程在排隊加鎖 //可是恰好當前的獨佔鎖釋放鎖了,新進來的這個線程就加鎖成功了。也就是插隊成功了。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } 若是是公平鎖的話,加鎖的時候走的是這個邏輯: static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { //調用這個方法 acquire(1); } public final void acquire(int arg) { //首先執行的是這兒的tryAcquire方法 //其實到這兒的代碼跟咱們以前看到的代碼是同樣的。 //可是在往下實現就跟非公平鎖那兒不同了 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { //獲取當前線程 final Thread current = Thread.currentThread(); int c = getState(); //若是真的當前的state=0 if (c == 0) { //須要這兒進行判斷,咱們先單獨把hasQueuedPredecessors //代碼拿出來分析一下,分析後獲得,這個方法是判斷隊列裏面 //是否還有節點了。若是還有節點,那麼這個方法就返回true //!true 就是false,代碼就不走這兒了。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //這兒返回false return false; } } public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //若是隊列裏面還有節點 //若是還有節點那麼就返回來true return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
這樣的話,咱們再回過頭來看這段代碼: public final void acquire(int arg) { // tryAcquire方法返回來的是false,那麼!flase的結果就是等於true if (!tryAcquire(arg) && //而後接下來就是走這個方法,那麼這個方法,就是跟咱們分析的同樣了。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
因此綜上所述,咱們的公平隊列就是每次在加鎖的時候,先判斷隊列裏面是否有線程,若是有就加到隊列後面,若是沒有,那麼就直接加鎖成功。 接下來咱們再分析一個場景,就是重入鎖釋放鎖的邏輯。 public void unlock() { sync.release(1); } 接下來調用的是這段代碼: public final boolean release(int arg) { //重點調用的是這個方法 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { //加鎖如今是咱們的線程一進行重入鎖的釋放,一開始state的值2 //如今傳進來的參數releases 是1 //那麼c的值是1 int c = getState() - releases; //若是釋放鎖的線程不是當前獨佔鎖的線程,那麼就會報錯。 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //若是把整個重入鎖都釋放完了,那麼其實c==0 //可是第一次釋放重入鎖的時候,這兒c是1 if (c == 0) { free = true; //若是整個重入鎖都釋放了,那麼就放當前的獨佔鎖置爲null setExclusiveOwnerThread(null); } //更改線程的重入鎖的個數 setState(c); //若是整個鎖都釋放完了,那麼返回的是true //若是隻是釋放了一部分,那麼返回的是false。 return free; }
public final boolean release(int arg) { //若是線程一有兩個鎖重入,當前只是減小了一個 //鎖重入,那麼tryRelease返回值是false。那麼這個條件就不知足 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } //對兩個鎖重入,由於第一次進來,那麼直接返回的是false。 return false; } 其實若是是隻釋放了線程一的第一個重入鎖,那麼這個返回值也沒什麼意義,咱們看到的是,就是把state的值減一了。 接下來咱們繼續分析,若是線程一,再釋放一個重入鎖,也就是state由1變爲0了。 咱們回過頭來再分析以下代碼: public final boolean release(int arg) { //條件知足 if (tryRelease(arg)) { Node h = head; //我這兒的分析是h.waitStatus就是等於0 //可是若是這個等於0的話,咱們的等於就走不下去了,可見這兒的值應該不等於零。 //這樣咱們就執行裏面的uparkSuccessor方法 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { //獲取waitStatus狀態 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //獲取到第一個節點 Node s = node.next; //在目前咱們虛擬的環境中,s!=null if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //這兒直接unpakr把線程喚醒 LockSupport.unpark(s.thread); } 接下來咱們分析一下unpank之後代碼如何走: 其實咱們的代碼以前卡住了,而後unpark之後會致使代碼繼續往下執行。 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //以前咱們的代碼卡在這個地方。 //由於如今線程被喚醒了,因此這兒就能夠繼續往下執行。 //咱們要注意這兒是一個死循環。 //因此繼續重複執行。 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //重複執行到這兒,獲取當前節點的上一個節點 //當前節點的上一個節點固然是head了 final Node p = node.predecessor(); //p==head條件知足,因此接着就執行tryAcquire //這個方法就 開始加鎖了,修改當前加鎖線程的名字 //把state 改成了1 if (p == head && tryAcquire(arg)) { //而後把當前線程的node設置爲head setHead(node); //把當前線程的Node 置爲null進行垃圾回收 p.next = null; // help GC failed = false; //返回狀態 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }