深刻淺出AQS源碼解析

最近一直在研究AQS的源碼,但願能夠更深入的理解AQS的實現原理。雖然網上有不少關於AQS的源碼分析,可是看完之後感受仍是隻知其一;不知其二。因而,我將本身的整個理解過程記錄下來了,但願對你們有所幫助。java

基本原理

AQS是Java中鎖的基礎,主要由兩個隊列組成。一個隊列是同步隊列,另外一個是條件隊列node

同步隊列的原理

  • 同步隊列的隊列頭部是head,隊列尾部是tail節點,head節點是一個空節點,同步隊列是一個雙向鏈表,經過nextprev鏈接全部節點
  • 全部的線程在競爭鎖的時候都會建立一個Node節點,線程與節點綁定在一塊兒,(若是是同步鎖和排他鎖不一樣之處是經過nextWaiter來區分的)而且添加到同步隊列的尾部
  • head的第一個節點獲取鎖,其他節點都須要等待被喚醒
  • 同步隊列中的節點會存在取消和null的狀況(如:線程超時中斷、線程更新節點的中間態),被取消和null的節點不能被喚醒,將會被視爲無效節點
  • 一個線程只能被有效的前驅節點(取消和null的節點除外)喚醒
  • 持有鎖的線程只能是有一個,其餘有效節點對應的線程都會被掛起

條件隊列的原理

  • 一個同步隊列能夠對應多個條件隊列
  • 條件隊列是一個單向鏈表,經過nextWaiter來鏈接起來,條件隊列的頭節點是firstWaiter,尾節點是lastWaiter
  • 某個條件隊列中知足條件的節點(被signalsignalAll方法喚醒的節點)纔會被轉移到同步隊列
  • 條件隊列中的被轉移到同步隊列的節點是從頭節點開始,條件隊列中被阻塞的線程會添加到隊列的尾部

同步隊列的實現

首先,瞭解如下同步隊列中隊列的節點Node的數據結構安全

static final class Node {
        /** 共享鎖的標識 */
        static final Node SHARED = new Node();
        /** 排他鎖的標識 */
        static final Node EXCLUSIVE = null;

        /** 線程取消 */
        static final int CANCELLED =  1;
        /** 持有鎖的線程的後繼線程被掛起 */
        static final int SIGNAL    = -1;
        /** 條件隊列標識 */
        static final int CONDITION = -2;
        /**
         * 共享鎖狀況下,通知全部其餘節點
         */
        static final int PROPAGATE = -3;

        /**
         * waitStatus的取值以下:
         *   SIGNAL(-1): 當前節點的後繼節點應該被掛起
         *   CANCELLED(1): 當前節點被取消
         *   CONDITION(-2): 當前節點在條件隊列
         *   PROPAGATE(-3): 釋放共享鎖時須要通知全部節點
         *   0: 初始值
         *
         */
        volatile int waitStatus;

        /**
         * 前驅節點
         */
        volatile Node prev;

        /**
         * 後繼節點
         */
        volatile Node next;

        /**
         * 節點對應的線程
         */
        volatile Thread thread;

        /**
         * 在共享鎖的狀況下,該節點的值爲SHARED
         * 在排他鎖的狀況下,該節點的值爲EXCLUSIVE
         * 在條件隊列的狀況下,連接的是下一個等待條件的線程
         */
        Node nextWaiter;
}

其次,咱們來看一下同步隊列的鏈表結構
同步隊列鏈表數據結構

接着,咱們根據同步隊列的原理來分析如下acquirerelease須要作哪些事情:源碼分析

實現acquire功能須要作的事情

  1. 建立一個Node節點node(該節點多是排他鎖,也能夠能是共享鎖)
  2. node添加到同步隊列尾部,若是同步隊列爲空(初始狀況下),須要先建立一個空的頭節點,而後再添加到隊列的尾部
  3. 若是node的前驅節點是head,說明node是第一個節點,可以獲取鎖,須要將head修改爲node,釋放前驅節點的資源
  4. 若是node的前驅節點不是head,說明獲取鎖失敗,須要檢測是否須要將node綁定的線程掛起,分如下幾種狀況:
    • 若是nodewaitStatus已經被設置爲SIGNAL 表示須要被掛起
    • 若是nodewaitStatus設置爲CANCEL表示該節點已經被取消,須要被去掉,並修改 nodeprev,直到連接上一個有效的節點爲止
    • 不然將nodewaitStatus設置爲SIGNAL,表示即將要被掛起
  5. 若是須要將node綁定的線程掛起,則讓出CPU,直到當前驅節點來喚起node纔會開始繼續從步驟3開始執行

與acquire功能相關的代碼

  • acquire方法:獲取排他鎖
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. tryAcquire(arg):對外提供的一個擴展方法,經常使用的鎖都要實現這個方法,具體實現與鎖相關ui

  2. addWaiter(Node.EXCLUSIVE): 建立一個排他鎖節點,並將該節點添加到同步隊列尾部,代碼以下:this

private Node addWaiter(Node mode) {
        // 建立一個node,EXCLUSIVE類型
        Node node = new Node(mode);

        for (;;) {
            // 獲取尾節點
            Node oldTail = tail;
            if (oldTail != null) {
                // 設置即將成爲尾節點的前驅
                node.setPrevRelaxed(oldTail);
                // CAS操做設置尾節點
                if (compareAndSetTail(oldTail, node)) {
                    // 將新尾節點的前驅節點與新的尾節點關聯起來
                    oldTail.next = node;
                    // 返回添加的節點
                    // 這個節點如今不必定是尾節點,由於若是有多個線程調用這個方法時,
                    // 可能還有節點添加在這個節點後面
                    return node;
                }
            } else {
                // 若是隊列爲空,初始化頭節點
                initializeSyncQueue();
            }
        }
    }
  1. acquireQueued同步隊列中的節點獲取排他鎖
final boolean acquireQueued(final Node node, int arg) {
        try {
            // 線程是否中斷
            boolean interrupted = false;
            for (;;) {
                // 獲取前驅節點
                final Node p = node.predecessor();
                // 若是前驅節點是頭節點,獲取鎖
                if (p == head && tryAcquire(arg)) {
                    // 修改頭節點
                    setHead(node);
                    // 釋放頭節點的資源
                    p.next = null; // help GC
                    // 返回線程中斷的狀態
                    // 這也是該方法惟一的返回值
                    // 沒有獲取鎖的線程會一直執行該方法直到獲取鎖之後再返回
                    return interrupted;
                }
                // 獲取鎖失敗後是否須要將線程掛起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 線程掛起並返回是否被中斷
                    interrupted = true;
            }
        } catch (Throwable t) {
            // 取消該節點
            cancelAcquire(node);
            throw t;
        }
    }
  1. shouldParkAfterFailedAcquire:檢測線程獲取鎖失敗之後是否須要被掛起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驅節點的狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 狀態已經設置成SIGNAL,能夠直接掛起該節點
             */
            return true;
        // 節點被取消
        if (ws > 0) {
            /*
             * 找到pred第一個有效的前驅節點
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // pred多是一個新的節點,須要將pred的next重寫設置爲node
            pred.next = node;
        } else {
            /*
             * CAS操做將pred節點的狀態設置爲SIGNAL
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        // 只有當pred節點的waitStatus已是SIGNAL狀態時,才能夠安全的掛起線程
        // 不然須要不能被掛起
        return false;
    }
  1. parkAndCheckInterrupt:將當前線程掛起,並檢測當前線程是否中斷
private final boolean parkAndCheckInterrupt() {
        // 線程掛起
        LockSupport.park(this);
        // 檢測線程是否中斷
        return Thread.interrupted();
    }
  1. cancelAcquire:取消節點
private void cancelAcquire(Node node) {
        // 若是節點爲空,什麼都不作
        if (node == null)
            return;
        // 釋放線程
        node.thread = null;

        // 從後往前過濾掉全部的被取消的節點
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 有效前驅節點的nex節點
        Node predNext = pred.next;

        // 將node設置爲CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 若是是尾節點,設置新的尾節點
        if (node == tail && compareAndSetTail(node, pred)) {
            // 將新的尾節點的後續設置爲null
            pred.compareAndSetNext(predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            // 若是前驅節點的線程不爲null而且waitStatus爲SIGNAL
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                // 將node設置成pred的後繼節點
                if (next != null && next.waitStatus <= 0)
                    pred.compareAndSetNext(predNext, next);
            } else {
                // 喚起node節點的後繼節點
                // 由於node節點已經釋放鎖了
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
  1. unparkSuccessor:喚醒後繼節點
private void unparkSuccessor(Node node) {
        /*
         * 獲取node節點的waitStatus
         */
        int ws = node.waitStatus;
       // 用CSA操做將waitStatus設置成初始狀態
       // 無論設置是否成功,都無所謂,由於該節點即將被銷燬
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
        /*
         * 獲取node的後繼節點
         */
        Node s = node.next;
        // 若是後繼節點爲null或者被取消,
        // 經過從同步隊列的尾節點開始一直往前找到一個有效的後繼節點
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        // 若是後繼節點不爲空
        if (s != null)
            LockSupport.unpark(s.thread);// 喚醒後繼節點的線程
    }

acquire方法相似的還有acquireInterruptiblytryAcquireNanosacquireSharedacquireSharedInterruptiblytryAcquireSharedNanos,咱們都一一分析如下線程

  • acquireInterruptibly方法:獲取可中斷的排他鎖
public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 若是線程中斷,直接返回
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg); // 中斷式的獲取鎖
    }
  1. doAcquireInterruptibly:可中斷式的獲取鎖
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
       // 建立一個排他節點加入同步隊列
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                // 獲取前驅節點
                final Node p = node.predecessor();
                // 若是前驅節點是頭節點,說明已經獲取的鎖
                if (p == head && tryAcquire(arg)) {
                    // 修改頭節點
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                // 若是沒有獲取鎖,檢測是否須要掛起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException(); // 若是發現線程已經被中斷,須要拋出異常
            }
        } catch (Throwable t) {
            // 發生異常取消節點
            cancelAcquire(node);
            throw t;
        }
    }
  • tryAcquireNanos方法:超時中斷獲取排他鎖
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 線程中斷直接返回
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout); // 超時獲取排他鎖
    }
  1. doAcquireNanos:超時獲取排他鎖
private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 若是超時直接返回
        if (nanosTimeout <= 0L)
            return false;
        // 獲取超時時長
        final long deadline = System.nanoTime() + nanosTimeout;
        // 添加一個排他節點到同步隊列尾部
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                 // 獲取前驅節點
                final Node p = node.predecessor();
                // 已經獲取鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                // 若是超時了就取消
                if (nanosTimeout <= 0L) {
                    cancelAcquire(node);
                    return false;
                }
                // 檢測節點是否須要被掛起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    // 若是須要掛起,且超時時長大於SPIN_FOR_TIMEOUT_THRESHOLD
                    // 線程掛起nanosTimeout時間
                    LockSupport.parkNanos(this, nanosTimeout); 
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            // 發生異常取消節點
            cancelAcquire(node);
            throw t;
        }
    }
  • acquireShared方法:獲取共享鎖
public final void acquireShared(int arg) {
        // 對外提供的一個擴展方法,經常使用的鎖都要實現這個方法,
        // 該方法的實現與鎖的用途有關
        if (tryAcquireShared(arg) < 0) 
            doAcquireShared(arg); // 獲取共享鎖
    }
  1. doAcquireShared:獲取共享鎖
private void doAcquireShared(int arg) {
        // 添加一個共享節點到同步隊列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲取前驅節點
                final Node p = node.predecessor();
                if (p == head) {
                    // 返回結果大於等於0表示獲取共享鎖
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 設置頭節點並廣播通知其餘獲取共享鎖的節點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        // 若是線程被中斷,將該線程中斷
                        // 共享鎖會被多個線程獲取,若是須要中斷
                        // 全部獲取共享鎖的線程都要被中斷
                        if (interrupted)
                            selfInterrupt();
                        return;
                    }
                }
                // 檢測是否須要掛起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 掛起並中斷
                    interrupted = true;
            }
        } catch (Throwable t) {
            // 發生異常取消節點
            cancelAcquire(node);
            throw t;
        }
    }
  1. setHeadAndPropagate:設置頭節點並廣播其餘節點來獲取鎖
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 記錄舊的頭節點
        setHead(node);// 設置新的頭節點
        /*
         * 若是頭節點爲null或者是否是取消狀態,嘗試喚醒後繼節點
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // node節點的next是SHARED,即共享鎖
            if (s == null || s.isShared())
                // 喚起獲取共享鎖的線程
                doReleaseShared();
        }
    }
  1. doReleaseShared:喚醒等待共享鎖的節點
private void doReleaseShared() {
        /*
         * 喚醒時是從頭節點開始先喚醒第一個共享節點,
         * 第一個共享節點被喚醒後會在doAcquireShared方法裏繼續執行(以前就是在這個方法裏被掛起的)
         * 第一個共享節點若是獲取鎖會調用setHeadAndPropagate方法修改頭節點,而後再調用doReleaseShared方法
         * 喚醒第二個共享節點,以此類推,最後把全部的共享節點都喚醒
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                // 獲取頭節點的狀態
                int ws = h.waitStatus;
                // 若是頭節點是SIGNAL,須要將狀態設置爲0,表示已經即將被喚醒
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // 若是失敗了說明有其餘線程在修改頭節點,須要繼續重試
                    unparkSuccessor(h); // 喚醒頭節點的後繼節點
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // 將頭節點狀態從0設置成PROPAGATE,若是失敗了繼續,由於也有其餘獲取共享鎖的線程在更改頭節點
            }
            // 若是頭節點未改變(由於沒有後繼節點須要等待共享鎖),跳出循環
            if (h == head)
                break;
        }
    }
  1. selfInterrupt:中斷當前線程
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
  • acquireSharedInterruptibly方法:可中斷的獲取共享鎖
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException(); // 若是線程被中斷拋出異常
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg); // 可中斷的方式獲取共享鎖
    }
  1. doAcquireSharedInterruptibly:可中斷的方式後去共享鎖
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加共享鎖節點到同步隊列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                // 獲取前驅節點
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 獲取共享鎖之後修改頭節點,通知其餘等待共享鎖的節點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                // 線程獲取共享鎖失敗後須要掛起,而且發現線程被中斷,因此拋出異常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            // 發生異常取消節點
            cancelAcquire(node);
            throw t;
        }
    }
  • tryAcquireSharedNanos方法:超時中斷獲取共享鎖
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted()) // 線程若是中斷了,直接拋出異常
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout); // 超時獲取共享鎖
    }
  1. doAcquireSharedNanos:超時的方式獲取中斷鎖
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 超時直接返回
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        // 添加共享節點到同步隊列尾部
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                // 獲取前驅節點
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 獲取鎖,修改頭節點,通知全部其餘等待共享鎖的節點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L) {
                    // 超時取消節點
                    cancelAcquire(node);
                    return false;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    // 若是須要掛起,且超時時長大於SPIN_FOR_TIMEOUT_THRESHOLD
                    // 線程掛起nanosTimeout時間
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException(); // 中斷了拋出異常
            }
        } catch (Throwable t) {
            // 發生異常取消節點
            cancelAcquire(node);
            throw t;
        }
    }

實現release功能須要作的事情

  1. 釋放當前獲取鎖的線程持有的資源
  2. 喚醒有效的一個後繼節點

與release功能相關的代碼

  • release方法:釋放排他鎖
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 頭節點不能是一箇中間態
            if (h != null && h.waitStatus != 0)
                // 喚醒後繼節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • release方法:釋放共享鎖
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 釋放共享鎖,從頭節點開始一個一個的釋放
            // 若是存在多個共享節點在同步隊列時,doReleaseShared方式實際上是遞歸調用
            doReleaseShared();
            return true;
        }
        return false;
    }

至此,將全部獲取鎖和釋放鎖的方法相關的源碼所有分析完3d

條件隊列的實現

咱們來看一下條件隊列的鏈表結構
條件隊列的鏈表結構code

實現await功能須要作的事情

  1. 建立一個CONDITION類型的節點,將該節點添加到條件隊列
  2. 釋放已經獲取的鎖(由於只有當前線程先獲取了鎖纔可能再調用Condition.await()方法)
  3. 若是沒法獲取鎖,線程掛起

與await功能相關的代碼

  • await方法:等待條件
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException(); // 若是線程中斷,直接拋出異常
            // 建立一個CONDITION類型的節點,將該節點添加到條件隊列尾部
            Node node = addConditionWaiter();
            // 釋放鎖
            // 在調用await方法以前都會調用lock方法,這個時候已經獲取鎖了
            // 有時候鎖仍是可重入的,因此須要將全部的資源都釋放掉
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 若是節點再也不同步隊列,所有都要掛起
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                // 若是在等待期間發生過中斷(不論是調用signal以前仍是以後),直接退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 讓線程嘗試去獲取鎖,若是沒法獲取鎖就掛起
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 清除全部在條件隊列中是取消狀態的節點
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 發生中斷,上報中斷狀況
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  1. addConditionWaiter:在條件隊列中添加一個節點
private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 清除條件隊列中無效的節點
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 建立一個節點
            Node node = new Node(Node.CONDITION);
            // 添加到條件隊列尾部
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
  1. unlinkCancelledWaiters:清除在條件隊列中被取消的節點
private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            // 遍歷條件隊列將全部不是CONDITION狀態的節點所有清除掉
            // 這些節點都是取消狀態的節點
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
  1. fullyRelease:釋放線程持有的全部鎖資源
final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            // 釋放全部的資源
            // 若是是可重入鎖,savedState就是重入的次數
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            // 發生異常就取消該節點
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }
  1. isOnSyncQueue:判斷節點是否在同步隊列
final boolean isOnSyncQueue(Node node) {
        // waitStatus是CONDITION或者node沒有前驅節點,說明node不在同步隊列
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // 有後繼節點必定在同步隊列
            return true;
        /*
         * 在同步隊列中查找node,看是否在同步隊列中
         */
        return findNodeFromTail(node);
    }
  1. findNodeFromTail:在同步隊列中查找節點
private boolean findNodeFromTail(Node node) {
        // 從尾節點開始查找
        for (Node p = tail;;) {
            if (p == node) // 找到了
                return true;
            if (p == null) // 找到頭了還沒找到
                return false;
            p = p.prev;
        }
    }
  1. checkInterruptWhileWaiting:檢測中斷的狀況
private int checkInterruptWhileWaiting(Node node) {
            // 沒有發生中斷返回0
            // 調用signal以前發生中斷返回THROW_IE
            // 調用signal以後發生中斷返回REINTERRUPT
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
  1. transferAfterCancelledWait:清除在條件隊列中被取消的節點
// 只有線程處於中斷狀態,纔會調用此方法
// 若是須要的話,將這個已經取消等待的節點轉移到阻塞隊列
// 返回 true,若是此線程在 signal 以前被取消,不然返回false
final boolean transferAfterCancelledWait(Node node) {
  
        // 用 CAS 將節點狀態設置爲 0 
        // 若是這步 CAS 成功,說明是 signal 方法以前發生的中斷,
       // 由於若是 signal 先發生的話,signal 中會將 waitStatus 設置爲 0
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
            enq(node); // 將節點放入阻塞隊列
            return true;
        }
        // 到這裏是由於 CAS 失敗,確定是由於 signal 方法已經將 waitStatus 設置爲了 0
        // signal 方法會將節點轉移到阻塞隊列,可是可能還沒完成,這邊自旋等待其完成
        // 固然,這種事情仍是比較少的吧:signal 調用以後,沒完成轉移以前,發生了中斷
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
  1. enq:把節點添加到同步隊列
private Node enq(Node node) {
        // 無限循環,將節點添加到同步隊列尾部
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return oldTail;
                }
            } else {
                // 若是同步隊列爲空,初始化
                initializeSyncQueue();
            }
        }
    }
  1. reportInterruptAfterWait:中斷處理
private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            // 若是是THROW_IE狀態,拋異常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT) // 再次中斷,由於中斷狀態被使用過一次
                selfInterrupt();
        }

awaitNanosawaitUntilawait(long time, TimeUnit unit)這幾個方法的總體邏輯是同樣的,就再也不分析了

實現signal功能須要作的事情

  1. 將條件隊列中的節點加入同步隊列
  2. 喚醒線程

與signal功能相關的代碼

  • signal方法:喚醒等待條件的節點
public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 獲取條件隊列中的第一個節點
            Node first = firstWaiter;
            if (first != null)
                // 喚醒等待條件的節點
                doSignal(first); 
        }
  1. doSignal:喚醒等待條件的節點
private void doSignal(Node first) {
            do {
                // 去掉無效的節點
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&  // 將節點轉移到同步隊列
                     (first = firstWaiter) != null);
        }
  1. transferForSignal:將節點轉移到同步隊列
final boolean transferForSignal(Node node) {
        /*
         * 取消的節點不須要轉移
         */
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        /*
         * 將節點加入同步隊列尾部
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程
        // 若是 ws <= 0, 那麼 compareAndSetWaitStatus 將會被調用
        // 節點入隊後,須要把前驅節點的狀態設爲SIGNAL
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            // 若是前驅節點取消或者 CAS 失敗,會進到這裏喚醒線程
            LockSupport.unpark(node.thread);
        return true;
    }
  • signalAlll方法:喚醒全部等待條件的節點
public final void signalAll() {
            // 若是是當前線程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                // 喚醒全部等待條件的節點
                doSignalAll(first);
        }
  1. doSignalAll:喚醒全部等待條件的節點
// 將全部的節點都轉移到同步隊列
private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

如今將與AQS相關的核心代碼都整理了一遍,裏面若是有描述不清晰或者不許確的地方但願你們能夠幫忙指出!

相關文章
相關標籤/搜索