AQS源碼分析

AQS源碼分析

AQS全稱AbstractQueuedSynchronizer(抽象隊列同步器)java

AQS中維護了一個被volatile修飾的int類型的同步狀態state,以及CLH等待隊列。node

state同步狀態用於維護同步資源被使用的狀況,AQS自己並不關心state的值及其含義,徹底由AQS的子類去定義以及維護。設計模式

CLH等待隊列是由一個雙向鏈表來實現的,存在head和tail指針分別指向鏈表中的頭節點以及尾節點,同時鏈表中的節點由AQS中的Node靜態內部類來表示。安全

ReentrantLock、ReentrantReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore底層都是基於AQS來實現的。多線程


AQS支持的模式

AQS支持兩種模式,一種是獨佔模式,一種是共享模式。併發

獨佔模式表示,同步資源在同一時刻只能被一個線程所持有,對應AQS的acquire()以及release()方法。ide

共享模式表示,同步資源在同一時刻能夠被多個線程所持有,對應AQS的acquireShared()以及releaseShared()方法。源碼分析

acquire()方法:獨佔模式下獲取同步資源。

release()方法:獨佔模式下釋放同步資源。

acquireShared()方法:共享模式下獲取同步資源。

releaseShared()方法:共享模式下釋放同步資源。

AQS使用了模板方法設計模式,在acquire()、release()、acquireShared()、releaseShared()方法中都會調用其對應的try方法,好比acquire()方法中會調用tryAcquire()方法,release()方法中會調用tryRelease()方法,AQS子類只須要重寫AQS提供的tryAcquire()、tryRelease()或tryAcquireShared()、tryReleaseShared()方法便可,同時須要保證方法的實現是線程安全的。ui

tryAcquire()、tryRelease()、tryAcquireShared()、tryReleaseShared()方法都沒有使用abstract進行修飾,同時方法中都會直接拋出UnsupportedOperationException異常,好處是不須要強制子類同時實現獨佔模式和共享模式中的方法,由於大多數AQS的子類都僅支持一種模式,用戶只須要根據實際狀況進行選擇便可。this

tryAcquire(int arg)方法:獨佔模式下嘗試獲取同步資源,同時AQS規定,若是獲取同步資源成功則返回true,不然返回false。

tryRelease(int arg)方法:獨佔模式下嘗試釋放同步資源,同時AQS規定,若是釋放同步資源成功則返回true,不然返回false。

tryAcquireShared(int arg)方法:共享模式下嘗試獲取同步資源,同時AQS規定,若是獲取同步資源失敗則返回負數,不然返回剩餘的資源個數。

tryReleaseShared(int arg)方法:共享模式下嘗試釋放同步資源,同時AQS規定,若是釋放同步資源成功則返回true,不然返回false。

剖析AQS中的Node類

Node類提供的核心屬性

// 節點封裝的線程
volatile Thread thread;

// 指向前驅節點的指針
volatile Node prev;

// 指向後繼節點的指針
volatile Node next;

// 節點的等待狀態(默認爲0)(默認爲0)(默認爲0) 
volatile int waitStatus;

// 下一個正在等待的節點
Node nextWaiter;

// 共享模式下的標識節點
static final Node SHARED = new Node();

// 獨佔模式下的標識節點
static final Node EXCLUSIVE = null;

同時Node類中維護了一系列節點的等待狀態值

// CANCELLED狀態,表示線程已超時等等,處於CANCELLED狀態的節點會從等待隊列中剔除,不會參與到同步資源的競爭當中
static final int CANCELLED =  1;

// SIGNAL狀態,若是節點的等待狀態爲SIGNAL,那麼當它釋放同步資源時,將會喚醒離它最近的同時等待狀態不爲CANCELLED的後繼節點(同時也能說明節點存在後繼節點)
static final int SIGNAL    = -1;

// 表示線程在指定的條件下進行等待
static final int CONDITION = -2;

// PROPAGATE狀態,表示實際存在可用資源,須要再往下傳播(喚醒)
static final int PROPAGATE = -3;

所以每一個Node節點中都會包含節點封裝的線程、分別指向前驅和後繼節點的指針、節點的等待狀態、指向下一個正在等待的節點的指針。


自定義AQS獨佔模式下的同步器來實現獨享鎖

/**
 * 自定義AQS獨佔模式下的同步器來實現獨享鎖
 */
public class Mutex implements Lock, java.io.Serializable {

    /**
     * 自定義AQS獨佔模式下的同步器
     * 使用state爲0表示當前鎖沒有被線程所持有
     * 使用state爲1表示當前鎖已經被線程所持有
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 判斷鎖是否被當前線程所持有
         */
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        /**
         * 嘗試獲取鎖
         * 判斷鎖是否存在,若是鎖不存在則獲取鎖(經過CAS控制)
         */
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 值必須是1(獨享鎖只有一把鎖嘛)
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread()); // 將當前線程設置爲獨佔模式下擁有同步資源的線程
                return true;
            }
            return false;
        }

        /**
         * 嘗試釋放鎖(要求被誰加的鎖只能被誰釋放)
         * 判斷當前擁有同步資源的線程是否爲當前線程,若是不是則拋出異常,不然釋放鎖
         * 這裏有三種調用狀況,鎖空閒的狀態下調用、鎖已經被線程所持有但被並不是擁有鎖的線程調用、鎖已經被線程所持有並被擁有鎖的線程調用,只有第三種狀況纔可以解鎖成功
         */
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 值必須是1(獨享鎖只有一把鎖嘛)
            if (Thread.currentThread() != getExclusiveOwnerThread()) // 要求被誰加的鎖只能被誰釋放
                throw new IllegalMonitorStateException();
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null); // 將獨佔模式中擁有同步資源的線程置爲NULL
            setState(0);
            return true;
        }

        /**
         * 提供一個Condition實例
         */
        Condition newCondition() {
            return new ConditionObject();
        }

        /**
         * 判斷鎖是否被線程所持有
         */
        final boolean isLocked() {
            return getState() == 1;
        }
    }

    /**
     * 同步器
     */
    private final Sync sync = new Sync();

    /**
     * 加鎖
     */
    public void lock() {
        sync.acquire(1);
    }

    /**
     * 嘗試獲取鎖
     */
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    /**
     * 解鎖
     * 解鎖只能調用同步器的release(),不能調用tryRelease()方法,由於tryRelease()方法只是簡單的修改一下同步狀態的值而已,並無去喚醒等待隊列中的線程,正常是須要喚醒等待隊列中離頭節點最近的同時等待狀態不爲CANCELLED的節點
     */
    public void unlock() {
        sync.release(1);
    }

    /**
     * 返回與此Mutex綁定的Condition實例
     */
    public Condition newCondition() {
        return sync.newCondition();
    }

    /**
     * 判斷鎖是否被線程所持有
     */
    public boolean isLocked() {
        return sync.isLocked();
    }

    /**
     * 判斷是否有線程在等待獲取鎖
     */
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * 可能拋出InterruptedException的加鎖(若是線程被設置了中斷標識那麼直接拋出異常)
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * 在指定的時間內嘗試獲取鎖
     */
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    
}

AQS子類(同步器)通常都是經過內部類實現,而後做爲內部組件來使用。

public class Main {

    static class MyRunnable implements Runnable {

        private Mutex mutex = new Mutex();

        @Override
        public void run() {
            System.out.println(String.format("%s Running", Thread.currentThread().getName()));
            mutex.lock();
            System.out.println(String.format("%s加鎖", Thread.currentThread().getName()));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            mutex.unlock();
            System.out.println(String.format("%s解鎖", Thread.currentThread().getName()));
        }

    }

    public static void main(String[] args) {
        Runnable runnable = new MyRunnable();
        Thread threadA = new Thread(runnable, "線程A");
        Thread threadB = new Thread(runnable, "線程B");
        Thread threadC = new Thread(runnable, "線程C");
        threadA.start();
        threadB.start();
        threadC.start();
    }

}

能夠看到該獨享鎖是公平鎖,多線程按照申請鎖的順序獲取鎖。


獨佔模式下獲取同步資源的源碼分析

acquire()方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

總結:當線程要獲取同步資源時,能夠調用acquire()或者tryAcquire()方法,acquire()方法中會調用AQS子類的tryAcquire()方法,嘗試獲取同步資源,若是獲取同步資源成功,則直接返回,作本身的事情,不然將會執行addWaiter()方法,將當前線程封裝成Node節點而後加入到等待隊列當中,而後執行acquireQueued()方法,用於自旋獲取同步資源,若是全部條件都知足那麼最後將會執行selfInterrupt()方法。

addWaiter()方法

private Node addWaiter(Node mode) {
    // 將當前線程封裝成Node節點,而且指定爲獨佔模式,獨佔模式Node.EXCLUSIVE爲NULL,也就是說節點的nextWaiter爲NULL
    Node node = new Node(Thread.currentThread(), mode);
    // 將節點加入到隊尾當中
    Node pred = tail;
    if (pred != null) {
        // 將當前節點的前驅指針指向尾節點
        node.prev = pred; 
        // 經過CAS設置尾節點(若是pred指針所指向的尾節點就是當前的尾節點,也就是在這個過程中沒有其餘節點插入到隊尾,則將tail指針指向當前節點)
        if (compareAndSetTail(pred, node)) { 
            // 將以前尾節點的後繼指針指向當前節點
            pred.next = node; 
            return node;
        }
    }
    // 若是不存在尾節點,也就是隊列爲空,或者經過CAS設置尾節點失敗(也就是在這個過程中有其餘節點插入到隊尾),那麼將會經過enq()方法死循環進行設置。
    enq(node); 
    // 不管怎麼樣該方法最終都會返回封裝了當前線程的節點。
    return node;
}

總結:addWaiter()方法用於將當前線程封裝成Node節點而後加入到等待隊列當中,若是在這個過程當中,等待隊列爲空或者經過CAS設置尾節點失敗,那麼將會經過enq()方法死循環進行設置。

enq()方法

private Node enq(final Node node) {
    // 死循環
    for (;;) {
        Node t = tail;
        // 若是尾節點爲空則初始化隊列,建立一個空的節點,而且將head和tail指針都指向這個節點
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else { 
            // 將當前節點的前驅指針指向尾節點
            node.prev = t;  
            // 經過CAS設置尾節點(若是t指針所指向的節點就是當前的尾節點,也就是在這個過程中沒有其餘節點插入到隊尾,則將tail指針指向當前節點)
            if (compareAndSetTail(t, node)) {
                // 將以前的尾節點的後繼指針指向當前節點
                t.next = node; 
                return t;
            }
        }
    }
}

總結:enq()方法中使用死循環初始化隊列以及經過CAS設置尾節點,直到尾節點被設置成功,同時須要注意的是當隊列初始化後會有一個空的頭節點,該節點不包含任何的線程,而後再將當前節點加入到隊列當中。

acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {
    // 失敗標識
    boolean failed = true; 
    try {
         // 中斷標識
        boolean interrupted = false;
        // 自旋
        for (;;) { 
            // 獲取節點的前驅節點
            final Node p = node.predecessor(); 
            // 若是節點的前驅節點是頭節點那麼嘗試獲取同步資源
            // 強制要求隊列中的節點獲取同步資源的順序必須是從隊頭到隊尾,不然將會形成節點丟失,丟失了的節點中的線程將會永遠處於阻塞狀態,同時只有當線程獲取了同步資源後,它才能成爲頭節點(隊列初始化後的頭節點除外),所以頭節點確定是已經獲取過同步資源的(隊列初始化後的頭節點除外),所以爲了遵循隊列中的節點獲取同步資源的順序必須是從隊頭到隊尾,因此永遠只有頭節點的後繼節點擁有嘗試獲取同步資源的權利,所以當在嘗試獲取同步資源以前,須要先判斷一下當前節點的前驅節點是不是頭節點,若是不是就不用獲取了
            if (p == head && tryAcquire(arg)) { 
                // 當獲取同步資源成功,則將當前節點設置爲頭節點
                setHead(node); 
                // 將以前頭節點的後繼指針設置爲null,幫助GC
                p.next = null; 
                failed = false; 
                // 返回中斷標識
                return interrupted; 
            }

            // 若是節點的前驅節點不是頭節點,或者嘗試獲取同步資源失敗,那麼將會調用shouldParkAfterFailedAcquire()方法,判斷線程可否進行阻塞,當線程可以被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 若是在執行該方法的過程當中,拋出了異常(線程超時等等),則failed標識爲true,那麼將會執行cancelAcquire()方法,將當前節點的等待狀態設置爲CANCELLED,同時從等待隊列中剔除。
        if (failed)
            cancelAcquire(node);
    }
}

總結:acquireQueued()方法用於自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那麼就會將當前節點設置爲頭節點,不然就會調用shouldParkAfterFailedAcquire()方法,判斷線程可否進行阻塞,當線程可以被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程,等待被喚醒,同時在執行acquireQueued()方法的過程當中,若是拋出了異常,則failed標識爲true,那麼將會執行cancelAcquire()方法,將當前節點的等待狀態設置爲CANCELLED,同時從等待隊列中剔除。

shouldParkAfterFailedAcquire()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取節點的前驅節點的等待狀態
    int ws = pred.waitStatus;
    // 若是前驅節點的等待狀態爲SIGNAL,那麼當它釋放同步資源時,將會自動喚醒離它最近的同時等待狀態不爲CANCELLED的後繼節點,所以當前節點就能夠直接阻塞了,等待被喚醒時再去嘗試獲取同步資源
    if (ws == Node.SIGNAL)
        return true;
    
    // 若是前驅節點的等待狀態爲CANCELLED,那麼經過循環找到前一個不爲CANCELLED狀態的節點,而且將當前節點的前驅指針指向該節點,將該節點的後繼指針指向當前節點
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev; 
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { 
        // 經過CAS將前驅節點的等待狀態設置爲SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

總結:shouldParkAfterFailedAcquire()方法用於判斷線程可否進行阻塞,以及剔除被設置爲CANCELLED狀態的節點。

正常狀況下,線程第一次進來shouldParkAfterFailedAcquire()方法時,會將前驅節點的等待狀態設置爲SIGNAL,而後再次自旋進來該方法,判斷到前驅節點的等待狀態爲SIGNAL,直接返回,而後就進入待阻塞狀態。

當該節點的前驅節點被CANCELLED時,若是前驅節點的前驅節點是頭節點,那麼將會喚醒當前節點,那麼它會再次自旋進來該方法,判斷到前驅節點的等待狀態爲CANCELLED,就會將當前節點的前驅指針指向前一個不爲CANCELLED狀態的節點,也就是頭節點,而後再將頭節點的後繼指針指向當前節點,而後再次自旋進來該方法,判斷到前驅節點的等待狀態爲SIGNAL,直接返回,再次進入待阻塞狀態。

不管怎麼樣經過shouldParkAfterFailedAcquire()方法的全部節點最終都會進入待阻塞狀態,也就是說等待隊列中除了頭節點之外的全部線程都會處於阻塞狀態。

parkAndCheckInterrupt()方法

private final boolean parkAndCheckInterrupt() {
    // 阻塞當前線程,blocker對象使用當前對象
    LockSupport.park(this);
    // 當被喚醒時返回線程的中斷標識
    return Thread.interrupted(); 
}

總結:parkAndCheckInterrupt()方法用於阻塞線程,同時當線程被喚醒時會返回線程的中斷標識,儘管若是線程被設置了中斷標識,但也不會影響線程繼續往下執行,只不過當它成功獲取到同步資源時,會調用一次selfInterrupt()方法,再次爲線程設置中斷標識。

selfInterrupt()方法

static void selfInterrupt() {
    // 爲線程設置中斷標識
    Thread.currentThread().interrupt();
}

總結:當獲取了同步資源的線程被設置了中斷標識,纔會調用selfInterrupt()方法,再次爲線程設置中斷標識,由於在parkAndCheckInterrupt()方法中已經調用過一次Thread.interrupted()方法,避免外部又再次調用Thread.interrupted()方法致使線程的中斷標識被清除。

cancelAcquire()方法

private void cancelAcquire(Node node) {
    
    if (node == null)
        return;
	// 將當前節點封裝的線程設置爲NULL
    node.thread = null;

    // 經過循環獲取當前節點不爲CANCELLED狀態的前驅節點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 獲取前驅節點的後繼節點(若是節點的前驅節點不是CANCELLED狀態,那麼前驅節點的後繼節點就是它本身)
    Node predNext = pred.next;

    // 將節點的等待狀態設置爲CANCELLED
    node.waitStatus = Node.CANCELLED;

    // 若是當前節點是尾節點,則直接經過CAS將tail指針指向當前節點不爲CANCELLED狀態的前驅節點,同時經過CAS將前驅節點的後繼指針設置爲NULL
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else { 
        int ws;
        // 若是當前節點的前驅節點不是頭節點 同時 前驅節點的等待狀態爲SIGNAL(若是不是SIGNAL那就設置爲SIGNAL) 且 前驅節點封裝的線程不爲NULL
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 獲取節點的後繼節點 
            Node next = node.next;
            // 若是後繼節點的等待狀態不爲CANCELLED,則經過CAS將前驅節點的後繼指針指向當前節點的後繼節點
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next); // 這裏並無將當前節點的後繼節點的前驅指針指向前驅節點(不用設置,unparkSuccessor()方法會自動跳過)
        } else {
            // 若是當前節點的前驅節點是頭節點,則直接喚醒當前節點的後繼節點,讓它來剔除當前節點
            unparkSuccessor(node);
        }

        node.next = node;
    }
}

總結:若是線程在阻塞的過程中拋出了異常,也就是直接中斷acquireQueued()方法,而後執行finally語句塊,因爲failed標識爲true,所以會執行cancelAcquire()方法,將當前節點的等待狀態設置爲CANCELLED,若是當前節點是尾節點,則直接經過CAS將tail指針指向當前節點不爲CANCELLED狀態的前驅節點,同時將該前驅節點的後繼指針設置爲NULL,若是當前節點的前驅節點不是頭節點,則經過CAS將前驅節點的後繼指針指向當前節點的後繼節點,若是當前節點的前驅節點是頭節點,那麼喚醒當前節點的後繼節點,讓它來剔除當前節點。


獨佔模式下釋放同步資源的源碼分析

release()方法

public final boolean release(int arg) {
    if (tryRelease(arg)) { 
        Node h = head; 
        // 若是隊列不等於空,同時頭節點的等待狀態不爲0,也就是頭節點存在後繼節點,那麼調用unparkSuccessor()方法,喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點。
        if (h != null && h.waitStatus != 0) 
            unparkSuccessor(h);
        return true;
    }
    return false;
}

總結:當獲取了同步資源的線程釋放同步資源時(外部線程或者頭節點中的線程),將會調用release()方法,release()方法中會調用AQS子類的tryRelease()方法,嘗試釋放同步資源,若是釋放同步資源成功,同時隊列不爲空以及頭節點的等待狀態不爲0,也就是頭節點存在後繼節點,那麼就會調用unparkSuccessor()方法,喚醒離頭節點最近的(也就是頭節點的後繼節點)同時等待狀態不爲CANCELLED的後繼節點,那麼該節點將會經過自旋嘗試獲取同步資源。

unparkSuccessor()方法

private void unparkSuccessor(Node node) {
    // 獲取節點的等待狀態
    int ws = node.waitStatus; 
    // 若是節點的等待狀態不爲CANCELLED,則經過CAS將節點的等待狀態設置爲0(恢復成隊列初始化後的狀態)
    if (ws < 0) 
        compareAndSetWaitStatus(node, ws, 0);

    // 獲取節點的後繼節點
    Node s = node.next;
    // 若是節點的後繼指針爲NULL(不能說明節點就沒有後繼節點)或者後繼節點爲CANCELLED狀態,那麼就從後往前尋找離當前節點最近的同時等待狀態不爲CANCELLED的後繼節點
    if (s == null || s.waitStatus > 0) { 
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒該後繼節點中的線程
        LockSupport.unpark(s.thread);
}

總結:unparkSuccessor()方法用於喚醒離節點最近的同時等待狀態不爲CANCELLED的後繼節點,若是節點的後繼指針爲NULL,不能說明節點就沒有後繼節點,或者後繼節點的等待狀態爲CANCELLED,則從後往前,尋找離節點最近的同時等待狀態不爲CANCELLED的節點,最終喚醒該節點中的線程。


獨佔模式下源碼分析後的總結

1.當線程要獲取同步資源時,能夠調用acquire()或者tryAcquire()方法,acquire()方法中會調用AQS子類的tryAcquire()方法,嘗試獲取同步資源,若是獲取同步資源成功,則直接返回,作本身的事情,不然將會執行addWaiter()方法,將當前線程封裝成Node節點而後加入到等待隊列當中,而後執行acquireQueued()方法,用於自旋獲取同步資源,若是全部條件都知足那麼最終將會執行selfInterrupt()方法。

2.addWaiter()方法用於將當前線程封裝成Node節點而後加入到等待隊列當中,若是在這個過程當中,等待隊列爲空或者經過CAS設置尾節點失敗(也就是當前指針所指向的尾節點並非真正的尾節點,也就是在這個過程中有其餘節點插入到隊尾),那麼將會經過enq()方法死循環進行設置。

3.enq()方法中使用死循環初始化隊列以及經過CAS設置尾節點,直到尾節點被設置成功,同時須要注意的是當隊列初始化後會有一個空的頭節點,該節點不包含任何的線程,而後再將當前節點加入到隊列當中。

4.acquireQueued()方法用於自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那麼就會將當前節點設置爲頭節點,不然就會調用shouldParkAfterFailedAcquire()方法,判斷線程可否進行阻塞,當線程可以被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程,等待被喚醒,同時在執行acquireQueued()方法的過程當中,若是拋出了異常,則failed標識爲true,那麼將會執行cancelAcquire()方法,將當前節點的等待狀態設置爲CANCELLED,同時從等待隊列中剔除。

5.shouldParkAfterFailedAcquire()方法用於判斷線程可否進行阻塞,以及剔除被設置爲CANCELLED狀態的節點,正常狀況下,線程第一次進來shouldParkAfterFailedAcquire()方法時,會將前驅節點的等待狀態設置爲SIGNAL,而後再次自旋進來該方法,判斷到前驅節點的等待狀態爲SIGNAL,直接返回,而後就進入待阻塞狀態,當該節點的前驅節點被CANCELLED時,若是前驅節點的前驅節點是頭節點,那麼將會喚醒當前節點,那麼它會再次自旋進來該方法,判斷到前驅節點的等待狀態爲CANCELLED,就會將當前節點的前驅指針指向前一個不爲CANCELLED狀態的節點,也就是頭節點,而後再將頭節點的後繼指針指向當前節點,而後再次自旋進來該方法,判斷到前驅節點的等待狀態爲SIGNAL,直接返回,再次進入待阻塞狀態,不管怎麼樣經過shouldParkAfterFailedAcquire()方法的全部節點最終都會進入待阻塞狀態,也就是說等待隊列中除了頭節點之外的全部線程都會處於阻塞狀態。

6.parkAndCheckInterrupt()方法用於阻塞線程,同時當線程被喚醒時會返回線程的中斷標識,儘管若是線程被設置了中斷標識,但也不會影響線程繼續往下執行,只不過當它成功獲取到同步資源時,會調用一次selfInterrupt()方法,再次爲線程設置中斷標識,由於在parkAndCheckInterrupt()方法中已經調用過一次Thread.interrupted()方法,避免外部又再次調用Thread.interrupted()方法致使線程的中斷標識被清除。


此時等待隊列中除了頭節點之外的全部線程都會處於阻塞狀態

1.若是線程在阻塞的過程中拋出了異常,也就是直接中斷acquireQueued()方法,而後執行finally語句塊,因爲failed標識爲true,所以會執行cancelAcquire()方法,將當前節點的等待狀態設置爲CANCELLED,若是當前節點是尾節點,則直接經過CAS將tail指針指向當前節點不爲CANCELLED狀態的前驅節點,同時將該前驅節點的後繼指針設置爲NULL,若是當前節點的前驅節點不是頭節點,則經過CAS將前驅節點的後繼指針指向當前節點的後繼節點,若是當前節點的前驅節點是頭節點,那麼喚醒當前節點的後繼節點,讓它來剔除當前節點。

2.當獲取了同步資源的線程釋放同步資源時(外部線程或者頭節點中的線程),將會調用release()方法,release()方法中會調用AQS子類的tryRelease()方法,嘗試釋放同步資源,若是釋放同步資源成功,同時隊列不爲空以及頭節點的等待狀態不爲0,也就是頭節點存在後繼節點,那麼就會調用unparkSuccessor()方法,喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點,那麼該節點將會經過自旋嘗試獲取同步資源。

3.在調用unparkSuccessor()方法喚醒離節點最近的同時等待狀態不爲CANCELLED的後繼節點時,若是節點的後繼指針爲NULL,不能說明節點就沒有後繼節點,或者後繼節點的等待狀態爲CANCELLED,則從後往前,尋找離節點最近的同時等待狀態不爲CANCELLED的節點,最終喚醒該節點中的線程。


獨佔模式FAQ

爲何要用CAS設置尾節點?

若是在設置尾節點的這個過程中,有其餘節點插入到隊尾,而後將tail指針指向當前節點,當前節點的前驅指針指向以前的尾節點,以前的尾節點的後繼指針指向當前節點,那麼中間插入的節點就會丟失。

在acquireQueued()方法中,爲何嘗試獲取同步資源以前,須要先判斷一下當前節點的前驅節點是不是頭節點?

強制要求等待隊列中的節點獲取同步資源的順序必須是從隊頭到隊尾,不然將會形成節點丟失,丟失了的節點中的線程將會永遠處於阻塞狀態(當同步資源被釋放時,還沒來得及喚醒離頭節點最近同時等待狀態不爲CANCELLED的後繼節點時,等待隊列中一個排在很後的節點被喚醒,而後它將會經過自旋嘗試獲取同步資源,一旦它獲取了同步資源,那麼它將成爲頭節點,最終它與以前頭節點之間的全部節點中的線程將會永遠處於阻塞狀態),同時只有當線程獲取了同步資源後,它才能成爲頭節點(隊列初始化後的頭節點除外),所以頭節點確定是已經獲取過同步資源的(隊列初始化後的頭節點除外),所以爲了遵循隊列中的節點獲取同步資源的順序必須是從隊頭到隊尾,因此永遠只有頭節點的後繼節點擁有嘗試獲取同步資源的權利,所以當在嘗試獲取同步資源以前,須要先判斷一下當前節點的前驅節點是不是頭節點,若是不是就不用獲取了,至於頭節點釋放同步資源後,可否被後繼節點獲取到同步資源另說,由於當同步資源被釋放時,被喚醒的後繼節點可能還沒來得獲取同步資源,此時就被外部線程直接獲取了,所以被喚醒的這個線程又只能再次進入阻塞狀態。

爲何在unparkSuccessor()方法中,若是節點的後繼指針爲NULL,須要從後往前尋找離節點最近的同時等待狀態不爲CANCELLED的後繼節點,而不從前日後進行尋找?

若是節點的後繼指針爲NULL,不能說明節點就沒有後繼節點,由於不管是在addWaiter()方法仍是enq()方法將節點加入到隊列,它老是先將當前節點的前驅指針指向尾節點,而後再經過CAS將tail指針指向當前節點,若是在將以前尾節點的後繼指針指向當前節點以前,須要喚醒尾節點的後繼節點,因爲此時尾節點的後繼指針仍然爲NULL,所以沒法經過next指針從前日後尋找,只能經過pred指針從後往前尋找。

線程在什麼狀況會被喚醒?

線程被喚醒只有兩種狀況

一種是外部線程或者頭節點釋放同步資源時,須要喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點,那麼該節點就會經過自旋嘗試獲取同步資源。

一種是當節點的前驅節點被CANCELLED時,若是前驅節點的前驅節點是頭節點,那麼將會喚醒當前節點,將當前節點的前驅指針指向前一個不爲CANCELLED狀態的節點,也就是頭節點,而後再將頭節點的後繼指針指向當前節點。

等待隊列中處於CANCELLED狀態的節點何時被剔除?

cancelAcquire()和shouldParkAfterFailedAcquire()方法均可以剔除等待隊列中處於CANCELLED狀態的節點。

*在unparkSuccessor()中須要剔除處於CANCELLED狀態的節點是爲了不同步問題,可能存在一個處於CANCELLED狀態的節點將來得及被剔除,而後它又做爲要喚醒的節點的後繼節點。


自定義AQS共享模式下的同步器來實現共享鎖

/**
 * 自定義AQS共享模式下的同步器來實現共享鎖
 */
public class Share {

    /**
     * 自定義AQS共享模式下的同步器
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 存儲線程獲取同步資源的狀況
         */
        private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

        /**
         * 初始化同步資源
         */
        public Sync(int state) {
            setState(state);
        }

        /**
         * 嘗試獲取同步資源(須要保證是線程安全的)
         */
        @Override
        protected int tryAcquireShared(int arg) {
            int state = getState();
            int available = state - arg;
            if (available >= 0 && compareAndSetState(state, available)) { // 經過CAS保證原子性
                threadLocal.set(arg);
                return available;
            }
            return -1;
        }

        /**
         * 釋放同步資源(線程釋放同步資源的個數必須等於它獲取同步資源的個數)
         */
        @Override
        protected boolean tryReleaseShared(int arg) {
            if (threadLocal.get() != arg)
                throw new UnsupportedOperationException();
            if (compareAndSetState(getState(), getState() + arg)) { // 經過CAS保證原子性
                threadLocal.set(null);
                return true;
            }
            return false;
        }

    }

    /**
     * 初始化同步器的同步資源
     */
    public Share(int permits) {
        sync = new Sync(permits);
    }

    public Sync sync;

    /**
     * 獲取許可
     */
    public void acquire(int permits) {
        sync.acquireShared(permits);
    }

    /**
     * 嘗試獲取許可
     */
    public boolean tryAcquire(int permits) {
        return sync.tryAcquireShared(permits) >= 0;
    }

    /**
     * 釋放許可
     */
    public boolean release(int permits) {
        return sync.releaseShared(permits);
    }

}
public class Main {

    static class MyRunnable implements Runnable {

        private Share share;

        private int permits;

        @Override
        public void run() {
            System.out.println(String.format("%s Running", Thread.currentThread().getName()));
            share.acquire(permits);
            System.out.println(String.format("%s獲取了%s個許可", Thread.currentThread().getName(), permits));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            share.release(permits);
            System.out.println(String.format("%s釋放了%s個許可", Thread.currentThread().getName(), permits));
        }

        public MyRunnable(Share share, int permits) {
            this.share = share;
            this.permits = permits;
        }
    }

    public static void main(String[] args) {
        Share share = new Share(10);
        Thread threadA = new Thread(new MyRunnable(share,5),"線程A");
        Thread threadB = new Thread(new MyRunnable(share,4),"線程B");
        Thread threadC = new Thread(new MyRunnable(share,3),"線程C");
        threadA.start();
        threadB.start();
        threadC.start();
    }
    
}


共享模式下獲取同步資源的源碼分析

acquireShared()方法

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

總結:當線程要獲取同步資源時,能夠調用acquireShared()或tryAcquireShared()方法,acquireShared()方法中會調用AQS子類的tryAcquireShared()方法,嘗試獲取同步資源,若是獲取同步資源成功,則直接返回,作本身的事情,不然將會調用doAcquireShared()方法。

doAcquireShared()方法

private void doAcquireShared(int arg) {
    // 將當前線程封裝成Node節點,而後加入到等待隊列當中
    // 當前節點會被指定爲共享模式,共享模式Node.SHARED爲一個空的節點,也就是說節點的nextWaiter不爲NULL(isShared()方法返回true)
    // 在調用addWaiter()方法的過程當中,若是等待隊列爲空或者經過CAS設置尾節點失敗,那麼將會經過enq()方法死循環進行設置
    final Node node = addWaiter(Node.SHARED);
    // 失敗標識
    boolean failed = true;
    try {
        // 中斷標識
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 獲取節點的前驅節點
            final Node p = node.predecessor();
            // 若是節點的前驅節點是頭節點,則嘗試獲取同步資源
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 若是獲取同步資源成功,則調用setHeadAndPropagate()方法
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    // 將以前的頭節點的後繼指針設置爲NULL,help gc
                    p.next = null;
                    // 若是獲取了同步資源的線程被設置了中斷標識,那麼調用selfInterrupt()方法,再次爲線程設置一箇中斷標識
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 若是節點的前驅節點不是頭節點,或者嘗試獲取同步資源失敗,那麼將會調用shouldParkAfterFailedAcquire()方法,判斷線程可否進行阻塞,當線程可以被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 若是在執行該方法的過程當中,拋出了異常(線程超時等等),則failed標識爲true,那麼將會執行cancelAcquire()方法,將當前節點的等待狀態設置爲CANCELLED,同時從等待隊列中剔除。
        if (failed)
            cancelAcquire(node);
    }
}

總結:doAcquireShared()方法用於將當前線程封裝成Node節點而後加入到等待隊列當中,而後經過自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那麼就會調用setHeadAndPropagate()方法,不然將會調用shouldParkAfterFailedAcquire()方法,判斷線程可否進行阻塞,當線程可以被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程,等待被喚醒,同時在執行doAcquireShared()方法的過程當中,若是拋出了異常,則failed標識爲true,那麼將會執行cancelAcquire()方法,將當前節點的等待狀態設置爲CANCELLED,同時從等待隊列中剔除。

setHeadAndPropagate()方法

private void setHeadAndPropagate(Node node, int propagate) {
    // 獲取頭節點
    Node h = head; 
    // 將當前節點設置爲頭節點
    setHead(node);
    
    //若是線程獲取了同步資源後,仍然有剩餘的可用資源(正常狀況),或沒有剩餘的可用資源但舊的和新的頭節點的等待狀態爲PROPAGATE時(說明實際存在可用資源),那麼將會調用doReleaseShared()方法
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared()) // 排除等待隊列中不爲共享模式的節點
            doReleaseShared();
    }
}

總結:setHeadAndPropagate()方法用於將當前節點設置爲頭節點,同時若是當線程獲取了同步資源後,仍然有剩餘的可用資源(正常狀況),或沒有剩餘的可用資源但舊的和新的頭節點的等待狀態爲PROPAGATE時(說明實際存在可用資源),那麼將會調用doReleaseShared()方法。

doReleaseShared()方法

private void doReleaseShared() {
    // 使用死循環來保證CAS操做最終確定成功
    for (;;) {
        // 獲取頭節點
        Node h = head;
        // 若是head指針和tail指針不是指向同一個節點,說明頭節點確定存在後繼節點(使用head != tail能夠避免頭節點存在後繼節點可是頭節點的後繼指針又爲NULL的狀況)
        if (h != null && h != tail) {
            // 獲取頭節點的等待狀態,若是等待狀態爲SIGNAL,則經過CAS將頭節點的等待狀態設置爲0(重置),而後喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
                    continue;            
                unparkSuccessor(h);
            }
            // 若是頭節點的等待狀態爲0,則經過CAS將頭節點的等待狀態設置爲PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        if (h == head) // 若是執行完以上步驟後,h指針指向的頭節點仍然爲當前的頭節點,則退出循環,完成釋放過程,而後作本身的事情
            break;
    }
}

總結:當等待隊列中的線程獲取了同步資源後,仍然有剩餘的可用資源,或沒有剩餘的可用資源但舊的和新的頭節點的等待狀態爲PROPAGATE,或者當線程釋放同步資源這兩種狀況,都會調用doReleaseShared()方法,該方法使用死循環來保證CAS操做最終確定成功,若是頭節點存在後繼節點,同時頭節點的等待狀態爲SIGNAL時,那麼將會經過CAS將頭節點的等待狀態設置爲0(重置),而後喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點,若是判斷到頭節點的等待狀態爲0,那麼將會經過CAS將節點的等待狀態設置爲PROPAGATE,表示須要傳播下去。


共享模式下釋放同步資源的源碼分析

releaseShared()方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

總結:當獲取了同步資源的線程釋放同步資源時,將會調用releaseShared()方法,releaseShared()方法中會調用AQS子類的tryReleaseShared()方法,嘗試釋放同步資源,若是釋放同步資源成功,則會調用doReleaseShared()方法,喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點。


共享模式下源碼分析後的總結

1.當線程要獲取同步資源時,能夠調用acquireShared()或tryAcquireShared()方法,acquireShared()方法中會調用AQS子類的tryAcquireShared()方法,嘗試獲取同步資源,若是獲取同步資源成功,則直接返回,作本身的事情,不然將會調用doAcquireShared()方法。

2.doAcquireShared()方法用於將當前線程封裝成Node節點而後加入到等待隊列當中,而後經過自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那麼就會調用setHeadAndPropagate()方法,不然將會調用shouldParkAfterFailedAcquire()方法,判斷線程可否進行阻塞,當線程可以被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程,等待被喚醒,同時在執行doAcquireShared()方法的過程當中,若是拋出了異常,則failed標識爲true,那麼將會執行cancelAcquire()方法,將當前節點的等待狀態設置爲CANCELLED,同時從等待隊列中剔除。

3.setHeadAndPropagate()方法用於將當前節點設置爲頭節點,同時若是當線程獲取了同步資源後,仍然有剩餘的可用資源(正常狀況),或沒有剩餘的可用資源但舊的和新的頭節點的等待狀態爲PROPAGATE時(說明實際存在可用資源),那麼將會調用doReleaseShared()方法。

4.當等待隊列中的線程獲取了同步資源後,仍然有剩餘的可用資源,或沒有剩餘的可用資源但舊的和新的頭節點的等待狀態爲PROPAGATE,或者當線程釋放同步資源這兩種狀況,都會調用doReleaseShared()方法,該方法使用死循環來保證CAS操做最終確定成功,若是頭節點存在後繼節點,同時頭節點的等待狀態爲SIGNAL時,那麼將會經過CAS將頭節點的等待狀態設置爲0(重置),而後喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點,若是判斷到頭節點的等待狀態爲0(表示併發釋放同步資源),那麼將會經過CAS將節點的等待狀態設置爲PROPAGATE,表示須要傳播下去。

5.當獲取了同步資源的線程釋放同步資源時,將會調用releaseShared()方法,releaseShared()方法中會調用AQS子類的tryReleaseShared()方法,嘗試釋放同步資源,若是釋放同步資源成功,則會調用doReleaseShared()方法,喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點。


共享模式FAQ

有哪些場景會將節點的等待狀態設置爲PROPAGATE,以及它的做用是什麼?

1.當線程A釋放同步資源時,將當前的頭節點的等待狀態設置爲0,而後喚醒離頭節點最近的同時等待狀態不爲CANCELLED的後繼節點,若是被喚醒的節點獲取了同步資源,而後在調用setHeadAndPropagate()方法以前,線程B釋放了同步資源,此時判斷到頭節點的等待狀態爲0,那麼就會將頭節點的等待狀態設置爲PROPAGATE,表示併發釋放了同步資源,目前還有可用的同步資源,而後被喚醒的節點在執行setHeadAndPropagate()方法時,若是沒有剩餘的可用資源,可是判斷到舊的頭節點的等待狀態爲PROPAGATE,說明實際存在可用資源,那麼會再次調用doReleaseShared()方法,去喚醒後繼節點,嘗試獲取同步資源。

2.若是被喚醒的節點獲取了同步資源,在將當前節點設置爲頭節點以後,線程A和B釋放了同步資源,那麼就跟場景1同樣,線程B會將頭節點的等待狀態設置爲PROPAGATE,而後被喚醒的節點在執行setHeadAndPropagate()方法時,若是沒有剩餘的可用資源,除了判斷舊的頭節點的等待狀態是否爲PROPAGATE之外,還須要判斷新的頭節點的等待狀態是否爲PROPAGATE。

場景一和場景二的區別是獲取同步資源的線程在設置頭節點以前仍是頭節點以後。

相關文章
相關標籤/搜索