詳解java併發包源碼之AQS獨佔方法源碼分析

AQS 的實現原理

學完用 AQS 自定義一個鎖之後,咱們能夠來看一下剛剛使用過的方法的實現。java

分析源碼的時候會省略一些不重要的代碼。node

AQS 的實現是基於一個 FIFO 隊列的,每個等待的線程被封裝成 Node 存放在等待隊列中,頭結點是空的,不存儲信息,等待隊列中的節點都是阻塞的,而且在每次被喚醒後都會檢測本身的前一個節點是否爲頭結點,若是是頭節點證實在這個線程以前沒有在等待的線程,就嘗試着去獲取共享資源。架構

AQS 的繼承關係

AQS 繼承了 AbstractOwnableSynchronizer ,咱們先分析一下這個父類。源碼分析

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    protected AbstractOwnableSynchronizer() { }
    /**
     * 獨佔模式下的線程
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 設置線程,只是對線程的 set 方法
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    /**
     * 設置線程,對線程的 get 方法
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

父類很是簡單,持有一個獨佔模式下的線程,而後就只剩下對這個線程的 get 和 set 方法。學習

AQS的內部類

AQS 是用鏈表隊列來實現線程等待的,那麼隊列確定要有節點,咱們先從節點講起。ui

Node 類,每個等待的線程都會被封裝成 Node 類this

Node 的域spa

public class Node {
    int waitStatus;
    Node prev;
    Node next;
    Thread thread;
    Node nextWaiter;
}

waitStatus:等待狀態線程

prev:前驅節點code

next:後繼節點

thread:持有的線程

nextWaiter:condiction 隊列中的後繼節點

Node 的 status:

Node 的狀態有四種:

  1. CANCELLED,值爲 1,表示當前的線程被取消,被打斷或者獲取超時了
  2. SIGNAL,值爲 -1,表示當前節點的後繼節點包含的線程須要運行,也就是 unpark;
  3. CONDITION,值爲 -2,表示當前節點在等待 condition,也就是在 condition 隊列中;
  4. PROPAGATE,值爲 -3,表示當前場景下後續的 acquireShared 可以得以執行;

取消狀態的值是惟一的正數,也是惟一當排隊排到它了也不要資源而是直接輪到下個線程來獲取資源的

AQS 中的方法源碼分析

acquire

這個方法執行了:

tryAcquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

看到上面的 tryAcquire 返回 false 後就會調用 addWaiter 新建節點加入等待隊列中。參數 EXCLUSIVE 是獨佔模式。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 拿到尾節點,若是尾節點是空則說明是第一個節點,就直接入隊就行了
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若是尾節點不是空的,則須要特殊方法入隊
        enq(node);
    return node;
}

在 addWaiter 方法建立完節點後,調用 enq 方法,在循環中用 CAS 操做將新的節點入隊。

由於可能會有多個線程同時設置尾節點,因此須要放在循環中不斷的設置尾節點。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 查看尾節點
        if (t == null) { // Must initialize
            // 尾節點爲空則設置爲剛剛建立的節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 尾節點
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在這裏,節點入隊就結束了。

那麼咱們回來前面分析的方法,

public final void acquire(long arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

剛剛分析完了 addWaiter 方法,這個方法返回了剛剛建立而且加入的隊列。如今開始分析 acquireQueued 方法。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 在循環中去獲取鎖
        for (;;) {
            // 拿前一個節點
            final Node p = node.predecessor();
            // 若是前一個節點是頭結點則嘗試着去獲取鎖
            if (p == head && tryAcquire(arg)) {
                // 拿到鎖後把這個節點設爲頭結點,這裏 setHead 會把除了 next 之外的數據清除
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 這個方法查看在獲取鎖失敗之後是否中斷,若是否的話就調用
            // parkAndCheckInterrupt 阻塞方法線程,等待被喚醒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireInterruptibly

由於很像因此順便來看一下 acquireInterruptibly 所調用的方法:在此我向你們推薦一個架構學習交流裙。交流學習裙號:821169538,裏面會分享一些資深架構師錄製的視頻錄像 

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 只有這一句有差異,獲取失敗了而且檢測到中斷位被設爲 true 直接拋出異常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireNanos

再來看一下有限時間的,當獲取超時之後會將節點 Node 的狀態設爲 cancel,設置爲取消的用處在後面的 release 方法中會有體現。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; 
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

總結一下過程

release

這個方法首先去調用了咱們實現的 tryRelease,當結果返回成功的時候,拿到頭結點,調用 unparkSuccessor 方法來喚醒頭結點的下一個節點。在此我向你們推薦一個架構學習交流裙。交流學習裙號:821169538,裏面會分享一些資深架構師錄製的視頻錄像 

public final boolean release(long arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    int ws = node.waitSatus;
    // 由於已經獲取過鎖,因此將狀態設設爲 0。失敗也沒所謂,說明有其餘的線程把它設爲0了
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 通常來講頭結點的下一個節點是在等待着被喚醒的,可是若是是取消的或者意外的是空的,
     * 則向後遍歷直到找到沒有被取消的節點
     * 
     */
    Node s = node.next;
    // 爲空或者大於 0,只有 cancel 狀態是大於 0 的
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
相關文章
相關標籤/搜索