初步瞭解AQS是什麼(一)

瞭解AQS

簡介

AbstractQueueSynchronized的縮寫,也叫抽象的隊列式同步器。定義了一套多線程訪問共享資源的同步器框架。html

字如其名,他是一個抽象類,因此大部分同步類都是繼承於它,而後重寫部分方法便可。java

好比說ReentrantLock/Semanode

phore/CountDownLatch都是AQS的具體實現類。多線程

功能

AQS維護了一個共享資源State和一個FIFO的等待隊列,當有多個線程爭搶資源的時候就會阻塞進入此隊列。app

線程在爭搶State這個共享資源的時候,會被封裝成一個Node節點,也就是說在AQS的等待隊列裏面的元素都是Node類型的對象。框架

PS:阻塞隊列中,不包括Head節點。函數

在瞭解AQS以前,咱們先了解下Node內部是怎樣的,咱們先來看看源碼oop

static final class Node {
    // 標識節點當前在共享模式下
    static final Node SHARED = new Node();
    // 標識節點當前在獨佔模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的幾個int常量是給waitStatus用的 ===========
    // 代碼此線程取消了爭搶這個鎖
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示當前node的後繼節點對應的線程須要被喚醒
    //通俗的話來講就是,若是A節點被設置爲SIGNAL,假如B是A的後繼節點,那麼B須要依賴A節點來喚醒才能拿到鎖
    static final int SIGNAL    = -1;
   
    // 本文不分析condition,因此略過吧,下一篇文章會介紹這個
    static final int CONDITION = -2;
    
    // 一樣的不分析,略過吧
    static final int PROPAGATE = -3;
    // =====================================================


    // 取值爲上面的一、-一、-二、-3,或者0(之後會講到)
    // 這麼理解,暫時只須要知道若是這個值大於0表明此線程取消了等待,
    // ps: 半天搶不到鎖,不搶了,ReentrantLock是能夠指定timeouot的
    volatile int waitStatus;
    // 前驅節點的引用
    volatile Node prev;
    // 後繼節點的引用
    volatile Node next;
    // 這個就是本線程
    volatile Thread thread;

}
複製代碼

結構

//頭結點,當前持有鎖的線程
private transient volatile Node head;

//尾節點,每次有新的節點進來,都要放在尾節點後面
private transient volatile Node tail;

//當前鎖的狀態,值爲0的時候,表示共享資源沒有被佔用,1的時候表示有一個線程佔用,若是大於1則表示重入
private volatile int state;

// 表明當前持有獨佔鎖的線程,舉個最重要的使用例子,由於鎖能夠重入
// reentrantLock.lock()能夠嵌套調用屢次,因此每次用這個來判斷當前線程是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; 
複製代碼

由於AQS只是提供了一個模板,那麼具體資源的獲取方式和釋放方式就由具體的實現類來決定。源碼分析

下面咱們跟着看源碼一塊兒看看AQS究竟是什麼東西post

對於State的訪問,AQS定義瞭如下3種方式

  1. getState()
  2. setState()
  3. compareAndSetState()

AQS定義了兩種訪問資源的方式

  1. 獨佔模式 ,也就是說只有一個線程能夠訪問資源,如ReentranctLock
  2. 共享模式,表示能夠有多個線程訪問資源,如Semaphore/CountDownLatch

上面咱們說過,具體的同步實現器就是實現資源state的獲取和釋放的方式就行了,關於隊列的維護,Node節點的入隊出隊或者獲取資源失敗等操做,AQS已經實現好

自定義的同步器只要實現如下方法,就能夠實現出不一樣的同步器

  • 獨佔模式
    1. tryAcquire(int),嘗試獲取資源,獲取成功的話返回true,不然false
    2. tryRealease(int),嘗試釋放資源,釋放成功的話返回true,不然false
  • 共享模式
    1. tryAcquireShared(int),嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
    2. tryReleaseShared(int),嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。

PS:以上的方法在AQS上是沒有實現的,只有在具體的同步類實現器纔會實現。

獨佔模式的源碼分析

以獨佔模式的ReentractLock的公平鎖爲例子

加鎖

其實在每一個具體的同步類(獨佔模式)的操做資源的接口中,最終調用的是AQS的acquire方法(好比說ReentractLock的公平鎖)

因此咱們看acquire的方法具體是怎麼實現,至於其餘不一樣的同步器的方法調用,也差很少都理解了。

acquire的源碼

關於解釋都在代碼裏面了

public final void acquire(int arg) {//arg = 1,表示同步器想要1個state資源
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
// tryAcquire,顧名思義,就是先嚐試獲取一下,若是獲取成功,就返回true,那麼獲取資源也就結束了,不然,就把當前線程設置爲獨佔模式(EXCLUSIVE),壓到阻塞隊列中。
// addWaiter就是把當前線程封裝成Node對象,而且設置爲獨佔模式(EXCLUSIVE),加入阻塞隊列


複製代碼

下面繼續看tryAcquire的源碼

tryAcquire的源碼

注意:這裏用ReentranctLock只是爲了方便舉例子,不一樣的同步器實現不一樣的方法而已.

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
     		//獲取資源
            int c = getState();
     		//若是c爲0,說明資源沒有線程佔用,則能夠去搶
            if (c == 0) {
                //既然是公平鎖,那麼確定就講究先來後到
                //hasQueuedPredecessors先看看前面有沒有Node節點在等待,若是沒有,就經過CAS去獲取一下
                //在這裏存在着線程競爭,因此有可能成功有可能失敗,若是成功得到資源,那麼compareAndSetState返回true,不然false
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //到這裏說明前面沒有線程在等待而且成功搶佔到臨界資源
                    //因此就設置當前線程爲佔有資源的線程,方便後面判斷重入
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                //到這裏說明是重入的問題
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
     		//到這裏就是臨界資源被佔用,並且不是重入的狀況,也就是說head節點都還沒釋放資源!
            return false;
        }
複製代碼

下面繼續看addWaiter的源碼和acquireQueued的源碼

addWaiter源碼

private Node addWaiter(Node mode) {//傳入的是Node.EXCLUSIVE
     	//將當前線程封裝成Node對象,並設置爲獨佔模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
     	//找到尾節點
        Node pred = tail;
     	//若是找到隊尾,說明隊列不爲空(若是隻有head,其實隊列式爲空)
        if (pred != null) {
            //把隊尾設置爲插入節點的前綴節點
            node.prev = pred;
            //經過CAS操做,將傳入的線程放到隊尾,這裏用CAS操做,是由於此時可能會有多個線程插入隊尾,因此在此時隊尾元素是不太肯定的
            if (compareAndSetTail(pred, node)) {
                //進入這裏,說明當前隊尾元素就是當前線程,設置前綴節點就行了!
                pred.next = node;
                return node;
            }
        }
     	//若是代碼執行到這一步,說明有兩種狀況
     	//1. 如今隊列爲空
     	//2. 將當前線程表明的節點插入隊列的時候,有其餘線程也要插入該隊列而且成功成爲隊尾元素.
        enq(node);
        return node;
    }


/*enq函數------------------分界線------------------------------------*/

 private Node enq(final Node node) {//傳入的是當前線程所表明的節點
        for (;;) {//自旋
            Node t = tail; //找到隊尾元素
            if (t == null) { // Must initialize
                //到這裏表明隊列爲空,那麼經過CAS操做加入頭結點,此時仍是可能有多個線程會跑到這裏競爭
                if (compareAndSetHead(new Node()))
                    /* 到這裏說明當前線程設置head成功(競爭成功),注意,這個head是直接新建的,此時waitStatus == 0(到後面會說) 雖然設置了head,tail仍是null,所設置一下tail,讓它不爲null,方便下次for循環執行else語句從而進行將當前線程表明的節點設置在head後面,本身跟着思路走一下。 */
                    tail = head;
            } else {
                /* 到達這裏也要分下狀況 1. 是addWaiter想把當前線程加入隊尾失敗的時候 2. 是上個if語句設置head節點成功以後,下一次for循環了 不過上面的兩種狀況,都是想要把當前線程設置爲隊尾節點,也是經過CAS操做。由於此時也是有多個線程競爭的,若是成功就設置成功,若是失敗就自旋操做,不斷地嘗試設置爲隊尾節點。 */
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

複製代碼

經過上面的簡要分析,咱們知道addWaiter最終的結果就是返回一個插入隊尾或者head後面的節點。接下來acquireQueued就是插入隊列的線程進行等待,若是輪到這個線程去拿資源了就去拿。至關於在飯堂排隊打菜同樣。一個窗口只能爲一位學生打菜,沒輪到打菜的同窗能夠休息作其餘事情。

acquireQueued源碼

解釋一下:若是acquireQueued函數返回true,則表示會進入中斷處理,不會進行掛起,也就是說打菜的同窗不會休息,因此通常是返回false的

final boolean acquireQueued(final Node node, int arg) {//傳入的是當前已經加入隊尾的節點和想要獲取的State
        boolean failed = true;
        try {
            boolean interrupted = false;//默認設置沒有中斷
            for (;;) {//這裏也是自旋
                //獲取傳入這個節點的前驅節點
                final Node p = node.predecessor();
                
                /* tryAcquire 若是p是head,也就是說當前這個線程的節點是阻塞隊列的第一個節點,那麼就去嘗試獲取state,畢竟這個是隊列嘛,先來先到。有可能成功,也有可能失敗。 由於head節點表示的是當前正在擁有資源的線程,不知道可否成功是由於不知道head節點有沒有釋放資源,其實在ReentractLock的tryAcquire就是判斷state是否爲0,若是爲0,則表示沒有線程擁有該資源,也就是說head節點釋放了該資源,那麼便可獲取。 還有一個緣由就是在enq的時候,若是隊列沒有節點,也就是初始化head節點的時候,沒有設置任何線程,也就是說head沒有佔用資源,那麼當前線程做爲阻塞隊列的對頭,能夠去嘗試去獲取state,萬一得了呢?! */
                if (p == head && tryAcquire(arg)) {
                   //到這裏是當前線程獲取state成功,將當前節點設置爲head節點,
                    setHead(node);
                    p.next = null; // help GC,讓以前的head方便被JVM回收
                    failed = false;//表示獲取state成功
                    return interrupted;//表示期間有沒有被中斷過
                }
                //到這裏是說明 要麼表明當前線程的節點不是阻塞隊列的頭結點 要麼嘗試獲取state資源失敗
                //不論是哪一種狀況,說明當前加入節點的線程想要知道本身此時的狀態是什麼,如果休息,可是誰告訴我下一次到我了?若不是休息,那麼就找到能夠休息的地方或者說到我打菜了。因此這裏就用了waitStatus的變量表示
                //
                /* 若是有A Node對象,直接排在A後面,隊列是這樣的 A<=>B<=>C 1. 若是A的waitStatus = -1 ,表示說A 若是佔用了state資源,那麼排隊在A後面的第一個Node節點(B節點)能夠先休息(B線程掛起)了,若是A釋放了資源那麼就會喚醒B,也就是A對B說,你先去休息吧,我好了就叫你 2 .若是 A的waitStatu = 1,表示說A這個線程已經不想排隊獲取這個資源了,這裏設置這個值主要是方便當前節點找到可讓它能夠他安心休息的地方。 3. waitStatu = 0 表示A是初始化狀態 */
                //這裏是 主要是爲了找到node能夠休息的地方。若是找到就休息,若是找不到,那麼說明node前面就是head了,下一次循環檢查能不能獲取資源就行了!
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


//---------------setHead源碼-------------------
private void setHead(Node node) {
    	//將head指針指向傳入的節點
        head = node;
    	//這裏設置head節點的線程爲null,同步實現器在實現tryAcquire成功的時候會把當前線程保存下來的
        node.thread = null;
    	//這裏是當前node的前綴
        node.prev = null;
   }



//---------------
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//傳入的是前驅節點和當前節點
     	//獲取前驅節點的狀態,方便當前節點的接下來的狀態
        int ws = pred.waitStatus;
     	
        if (ws == Node.SIGNAL)
            //進到這裏就說明waitStatus = -1,也就說明node應該能夠休息了,也就是線程掛起
            /* * This node has already set status asking a release * to signal it, so it can safely park. */
            return true;
        if (ws > 0) {
            /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
            //這裏就是說前驅節點已經放棄排隊了,當前節點往前找看看能不能找到沒有取消排隊的節點,看看能不能排在他們後面
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
            //到這裏頗有多是waitStatus爲0的分支,上面咱們都沒有設置waitStatus
            // 咱們在enq的時候用了new Node()和在addWaiter剛開始的時候也是用了 new Node(mode,arg),這兩處都是添加tail的時候
            // 若是沒有設置waitStatus的時候,是默認爲0的,也就是說是初始化狀態
            // 若是到達這裏前驅節點都是tail,咱們就要將隊尾的狀態設置爲-1,讓傳進來的node節點能夠找到休息點。或者是已經釋放資源的head,那麼下次node能夠變爲head了!!
            // 設置可能會失敗,由於這裏也會有線程競爭,競爭不過,這裏也是經過自旋,直到能找到休息點爲止。
            //也有多是pre節點已是head節點了,尚未釋放state資源,此時pre(head)的waitStatus == -1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
     	//這裏爲何返回false呢,是由於若是這時候head已是node的pre,那麼若是到這裏head已經釋放完資源以後,node下一次就能夠直接獲取資源啦!若是head還沒釋放資源,那麼下一次node就直接去休息。
     	// 若是返回的是true的話,若是這時候head已是node的pre,head已經釋放完資源,那麼到後面線程節點就掛起,那麼誰來喚醒node節點?
        return false;
    }


//--------------------------
 private final boolean parkAndCheckInterrupt() {
     //若是shouldParkAfterFailedAcquire返回的是true,如果false則不會執行到這裏!
        LockSupport.park(this);//線程從這裏掛起,若是被喚醒和中斷那麼繼續從這裏往下執行。
        return Thread.interrupted();
    }
複製代碼

解鎖

從上面的解釋咱們知道,若是掛起等待的線程須要獲取資源,是須要前綴節點的waitStatus爲SIGNAL的線程喚醒的,也就是head節點。

在獨佔模式中,具體的同步器實現類最終用到的是AQS的release方法,開始的時候說過了,具體的同步實現器只要實現tryRelease方法便可。

好比說ReentranctLock的unlock

release的源碼

public final boolean release(int arg) {
        if (tryRelease(arg)) {//這裏是先嚐試釋放一下資源,通常均可以釋放成功,除了屢次重入但只釋放一次的狀況。
            Node h = head;
            //這裏判斷的是 阻塞隊列是否還存在和head節點是不是tail節點,由於以前說過,隊列的尾節點的waitStatus是爲0的
            if (h != null && h.waitStatus != 0)
                //到這裏就說明head節點已經釋放成功啦,就先去叫醒後面的直接節點去搶資源吧
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

tryRelease源碼

protected final boolean tryRelease(int releases) {
    		//對state的操做就是釋放資源
            int c = getState() - releases;
    		//若是執行釋放操做的不是所擁有資源的線程,拋出異常。
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
    		//判斷是否是有嵌套鎖
            if (c == 0) {
                //若是到達這裏,說明臨界資源已經得到自由啦,沒有線程佔用它啦!因此設置free = true
                free = true;
                //同時會把擁有資源的線程設置爲null
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
複製代碼

unparkSuccessor源碼

private void unparkSuccessor(Node node) {//傳入的是head節點
        /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
        int ws = node.waitStatus;
    	//這裏設置一下head的waitStatus,由於以前除了有節點加入隊列的時候會把head節點ws = -1,基本沒有其餘地方設置,因此這裏基本都是爲-1的,CAS設置爲0主要是head後面的直接節點不會掛起等待。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
    	//下面的代碼是若是阻塞隊列有取消等待的節點,那麼就把他們移除阻塞隊伍,找到真正想要獲取資源在等待的head後面的直接節點。
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //到這裏就說明找到了那個節點,也就是head後面的第一個沒有取消等待的節點,這個節點可能已經掛起或者還在掛起的過程當中,反正都會執行喚醒線程的函數。這樣若是是掛起的線程,就繼續執行下一次自旋,下一次自旋確定拿到鎖,進行操做。由於已經知足了是1. 喚醒的節點是阻塞隊列的第一個節點,2. head節點已經釋放資源了!
            LockSupport.unpark(s.thread);
    }

複製代碼

總結

  1. 本篇文章簡述了AQS的大概做用和原理。
  2. 以ReentrantLock(獨佔模式)的公平鎖爲例子,分析了AQS的關於阻塞隊列是怎麼的操做。
  3. 寫這篇文章主要是爲了增強本身的關於多線程的基礎。爲了學習,看了許多篇大佬的博客文章,記錄下來,加入了本身的理解,但願錯誤少一點。得以這些大佬爲榜樣,站在巨人的肩膀去學習,但願可以學得更快,更高效,但願之後也能夠寫出本身的高品質文章!

參考

  1. www.javadoop.com/post/Abstra…

  2. www.cnblogs.com/waterystone…

相關文章
相關標籤/搜索