AbstractQueuedSynchronizer源碼解析

AQS是什麼?

AQS是什麼,相信你們都不陌生這個題目,那麼AQS究竟是什麼呢? AQS的全稱是 Abstract Queued Synchronizer, 從字面意思理解也就是 抽象隊列同步器 ,實際上AQS確實就是排隊同步隊列 , 也是一個抽象類,須要 自定義 同步隊列中 可執行權獲取和釋放中的邏輯(從新定義獲取和釋放語義),也就是重寫 tryAcquire tryRelease tryAcquireShared tryReleaseShared 等方法,固然也能夠 自定義方法 來經過調用 AQS 提供的 判斷方法進行邏輯判斷。在 JDK9 以前 AQS 是依賴於 CAS 的,其底層是經過 UnsafecompareAndSwap* 方法實現同步更改,在以後則是使用 VarHandle , 也替代了 Unsafe說白了 AQS 利用 VarHandle 保證操做的原子性java

大白話就能夠理解爲: 表示某件事情同一時間點僅有一人能夠進行操做,若有多人則須要排隊等待, 等到當前操做人完成後通知下一我的。node

AQS源碼解析

前言

在源碼中 AbstractQueuedSynchronizer 繼承了 AbstractOwnableSynchronizer , 同時也就繼承了 exclusiveOwnerThread 屬性,也就是 獨佔模式同步器的擁有者 ** , 也就意味着該線程是當前正在執行的線程**。git

AQS 中有幾個重點方法,分別是: acquire acquireInterruptibly tryAcquireNanos release acquireShared acquireSharedInterruptibly tryAcquireSharedNanos releaseShared 下面逐一分析。github

在分析源代碼以前,先來看一張圖來了解一下 AQS排隊同步隊列Node 節點中的 waitStatus 狀態 。併發

waitStatus 狀態都分爲是什麼app

  • CANCELLED,值爲1,表明同步隊列中等待的線程 等待超時 或者 被中斷,須要從同步隊列中剔除,節點進入該狀態之後不會再發生變化了。
  • SIGNAL,值爲-1,表明後繼節點的線程處於等待狀態,而若是當前節點的線程若是釋放了同步狀態或被取消,將會通知後繼結點,使後繼節點的線程得以運行。
  • CONDITION, 值爲-2,節點在等待隊列,節點線程等待在Condition上,當其餘線程調用Condition的signal方法後,該節點將會從等待隊列中轉移到同步隊列中。
  • PROPAGATE, 值爲-3,表示共享式同步狀態回去將會無條件的被傳播下去,
  • INITAL, 值爲0,初始狀態。

acquire(獲取)

官方解釋就是 Acquires in exclusive mode, ignoring interrupts 獲取獨佔模式並忽略interrupt(中斷), 翻譯成大白話就是就能夠理解爲獲取獨佔模式, 看一下源碼oop

public final void acquire(int arg) {
    // 判斷線程是否有可繼續執行的權限, 若是沒有則建立node 加入到隊列中
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼

acquire 方法中分別調用了 tryAcquire acquireQueuedaddWaiter 方法,其中 tryAcquire 方法是須要自定義(重寫) 獲取、 執行權限 的邏輯,這裏咱們以 AbstractQueuedSynchronizer 的實現 ReentrantLock 爲例,簡單分析一下,先看 tryAcquire 方法源碼分析

protected final boolean tryAcquire(int acquires) {
    // 獲取當前線程
    final Thread current = Thread.currentThread();
    // 獲取當前線程的重入次數 若是是 0 則表明第一次
    int c = getState();
    if (c == 0) {
        // 判斷是否存在隊列 && 能夠獲取到可執行權
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            // 設置獨佔模式同步器的擁有者 也就是是哪一個線程持有
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是進入線程是 持有可執行權的線程 則作重入 + 1 操做
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
複製代碼

tryAcquire 方法核心代碼就是 判斷執行權限 ,這裏就不具體分析了,會在下一篇文章中進行ReentrantLock的源碼分析,接下來重點看 acquireQueuedaddWaiter 方法。ui

private Node addWaiter(Node mode) {
    // 經過構造方法 新建 Node節點, 根據入參mode指定了 Node的模式,共享或獨佔
  	Node node = new Node(mode);
    for (;;) {
        Node oldTail = tail;
      	// 若是 tail 不是 null 則 設置 新建Node的前驅節點(prev) 指向 tail節點 反之 初始化同步隊列
        if (oldTail != null) {
            // 設置 新建Node的前驅節點(prev) 指向 tail節點
            node.setPrevRelaxed(oldTail);
          	// 從新設置 tail 節點 指向 新建的Node節點
            // 白話就是 隊列中的最後一個節點 == tail節點
            if (compareAndSetTail(oldTail, node)) {
                // 設置 未更改時的 tail節點 中 next 節點, 指向 新建Node節點
                oldTail.next = node;
                return node;
            }
        } else {
            // 初始化同步隊列器
            initializeSyncQueue();
        }
    }
}
複製代碼
// 若是未存在同步隊列 則初始化同步隊列
private final void initializeSyncQueue() {
    Node h;
  	// 設置 AQS head節點 爲一個新建節點
    if (HEAD.compareAndSet(this, null, (h = new Node())))
      	// 賦值操做
        tail = h;
}
複製代碼

addWaiterinitializeSyncQueue 方法中,核心就是新建 Node 節點並經過 acquireQueued 方法將節點加入到 AQS 中,接下來分析一下 addWaiter 具體作了什麼this

  1. 經過構造方法建立新的Node節點,並經過入參 mode 指定Node節點的模式,共享或獨佔。固然這裏是設置的獨佔模式。

  2. 循環操做 新建Node節點並將 新建節點tail 節點創建關係。首先判斷 tail 是不是null,若是是則 步驟3,反之 步驟4

  3. 若是 tail 節點不爲null, 首先將新建的 Node節點 中的 前驅節(prev) 點設置爲當前的 tail 節點,而後經過 VarHandleAQStail 節點改成 新建的Node 節點,若是修改爲功則將上一步 未更改時的 tail 節點 (也就是代碼中的oldTail) 中的 next 指向 新建的Node節點 ,反之則可能由於併發操做致使 tial 節點已經被其餘線程變動,須要再次循環操做直至成功。

  4. 若是 tial 節點是null, 則須要實例化同步對列,也就是 AQS , 經過調用 initializeSyncQueue 進行初始化操做,經過 VarHandle 設置 AQShead 指向一個新建節點 (new Node) , 而後將 head 節點的引用賦值給 tail 節點。這裏注意一下,是將 head 節點的 引用 賦值給 tail 節點, 也就是這時候 head 節點 和 tail 節點是同時指向一塊內存地址 , 這裏的用意就是在新建隊列的時候, head 節點和新建節點的 prev 節點要保持是同一個引用 ,由於在後續的判斷中, 獲取可執行權的條件就是 AQShead 節點是否等於當前節點的 prev 節點。

    由於 addWaiter 方法中是一個循環,在 建立隊列後 須要將隊列新建的Node節點作關聯,因此還須要在執行一次 步驟3

addWaiter 方法分析完後,再來看一下 acquireQueued 方法

final boolean acquireQueued(final Node node, int arg) {
    // 線程中斷狀態
    boolean interrupted = false;
    try {
        for (;;) {
          	// 獲取經過 addWaiter 建立的Node方法
            final Node p = node.predecessor();
          	// 判斷 新建的Node節點是否等於head節點 && 能夠獲取到 可執行權
            if (p == head && tryAcquire(arg)) {
              	// 設置 head 節點爲 當前線程的新建的Node節點,也就是線程被喚醒後並獲取到了可執行權,則將head
                // 節點設置爲當前線程建立的Node節點,能夠保證head節點永遠均可以和後續節點有關聯關係
                setHead(node);
                // 設置 next 
                p.next = null; // help GC
                // 返回
                return interrupted;
            }
            // 判斷Node節點的線程是否符合被 wait ,在這裏用的是 park 
            if (shouldParkAfterFailedAcquire(p, node))
                // 將線程 wait 而且線程被喚醒後 判斷線程是否被中斷
                // |= 操做等於 interrupted = interrupted | parkAndCheckInterrupt()
              	// |(按位或) 會將 | 兩邊的值進行二進制計算,運算規則一個爲真即爲真, 例如 1|1 = 1 或 1|0 = 1,
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
複製代碼

acquireQueued 也是核心方法,在其中會對線程進行 LockSupport.park 進行控制,其實現方式是 循環 ,下面就具體分析一下

  1. 首先會獲取 當前線程 所建立的 Node 節點中的 前置節點(prev)
  2. 判斷 前置節點(prev) 是否等於 AQShead 節點 && 能夠獲取到 可執行權 ,若是這兩個條件成立則看 步驟3 ,反之看 步驟4, 若是知足這兩個條件,也就表明着 head 節點 所對應的線程 已經執行完成而且作了釋放**(release方法)**操做。
  3. 若是 步驟2 條件成立,也就是 線程被喚醒後並獲取到了可執行權,則將 head 節點設置爲 當前線程建立的Node節點
  4. 若是 步驟2 條件不成立,則判斷 Node節點所對應的線程的狀態是否符合改成 wait 狀態。這個邏輯在 shouldParkAfterFailedAcquire 方法中, 接下來看一下。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 若是前置節點的 waitStatus == Signal 也就是 == -1 
    // 若是是 知足 線程 wait 條件
    if (ws == Node.SIGNAL)
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
        return true;
    // 若是狀態 > 0 也就是1 也就是線程已經被中斷了
    // 在這裏就會判斷 前置節點的前置節點 是否仍是被中斷,若是是 循環繼續判斷前置節點, 
    // 若是不是 則將前置節點的next節點改成 入參的 node 節點 而後 返回false 繼續循環判斷 
    // 這裏的做用就是 排除掉已經被中斷的線程
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        // 不然設置 狀態爲 -1 等待喚醒狀態 再次進來之後就會被 wait 
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}
複製代碼

shouldParkAfterFailedAcquire 方法中主要就是判斷節點所屬的線程是否符規則,也就是更改成 wait 狀態

  1. 判斷當前線程節點前置節點waitStatus 是不是 SIGNAL,若是是知足條件,返回 true,線程將會 wait
  2. 判斷當前線程節點前置節點waitStatus 是否大於 0, 也就是1 ,若是條件成立,則表明 當前線程節點前置節點 所對應的線程已經被中斷了,須要從新指定當前線程節點的前置節點(prev),經過循環的方式找到前置節點的節點,若是依然被中斷,則繼續循環,直到找到未中斷線程所對應的Node節點爲止。若是條件不成立則將 waitStatus 狀態改成 SIGNAL 返回false, 再經過 acquireQueued 方法中的循環在執行一次 。
  3. 這裏要說一下爲何要更改當前節點的 prev 節點中的 waitStatus 狀態,是由於只有 前置節點(prev)waitStatus 等於 SIGNAL 也就是 -1 時,就表明當前線程新建的Node節點的線程處於等待狀態在當前節點的前置節點(prev) 的線程釋放了同步狀態或被取消,將會通知當前節點,使當前節點的線程得以運行

到這裏咱們整個的 acquire 方法就解析完了,接下來分享 release,有獲取纔有釋放,會在release講完後爲你們分享一下 acquire 到 release的整個流程。

release(釋放)

release 字面一次就是釋放,釋放經過 acquire 獲取的獨佔模式,可讓 AQS 後續節點所對應的線程能夠獲得執行權,下面就看一下 release 方法

public final boolean release(int arg) {
    // 步驟 1 2 3 
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼

release 方法中首先會調用 tryRelease 方法,這裏 tryRelease 方法將會有子類實現,先以 RenntrantLook 爲例,這裏就不展現代碼了,就簡單描述一下邏輯

  1. 首先會先用 state 減去 argstate 表明重入次數。
  2. 若是 步驟1 結果是0,則將 獨佔模式同步器的擁有者 改成null並返回true。
  3. 若是 步驟1 結果不是0, 則從新設置 state,返回false,表示還不能夠釋放。

接下來判斷 AQShead 節點不是null而且 waitStatus 狀態不等於0,表明 釋放成功,而後進入 unparkSuccessor 方法,進行對下一個Node節點所對應的線程進行喚醒操做。

private void unparkSuccessor(Node node) {
    /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
    // 若是head的waitStatus<0 則將head的waitStatus改成0
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
    // 若是 head 節點的 next 節點 == null 或者 節點 的狀態 大於0 也就是1 也就是 下一個節點所對應的線程被中斷了
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 將會循環整個同步隊列,從tail節點開始 往前循環,直到只找到 waitStatus <= 0 的Node節點
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    // 若是節點不是 null 則喚醒該節點的線程
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼

接下來分析一下在 unparkSuccessor 方法中都作了什麼

  1. 首先若是 head 的 waitStatus <0, 則將 headwaitStatus 改成0。
  2. 若是 head 節點的 next 節點等於null 或者 waitStatus 狀態 大於0也就是1, 表示 head 節點所對應的 next 節點所對應的線程已經被中斷了,將會循環整個同步隊列,從 tail 節點開始往前循環,直到找到最前面的一個 waitStatus <= 0 的Node節點
  3. 若是 步驟2 條件不知足 則表明 head 的 next 節點不是null 或 waitStatus狀態不等於1,調用 unpark 方法喚醒線程。

至此 release 方法就解析完成了,很簡單,核心功能僅僅是若是符合規則,則調用 unpark 方法喚醒 AQS 隊列中下一個節點所對應的線程。下面就分析一下 acquire 和 releae 整個流程。

總結 acquire 和 release

分析一次 acquirerelease 的總體流程

接下來分析 acquireInterruptibly 方法,acquireInterruptibly 方法和 acquire 實際上是同樣的,只不過多判斷了一下是否被中斷。

acquireInterruptibly

acquireInterruptibly 方法就是 可中斷的獲取可執行權 ,具體流程和 acquire 類似。

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
複製代碼

acquireInterruptibly 方法中,首先會經過 Thread.interrupted() 方法判斷線程是否被中斷,如已經被中斷,則拋出 InterruptedException, 反之則調用 tryAcquire 方法,判斷是否 獲取到執行權,若是未獲取到則調用 doAcquireInterruptibly 方法進行建立 AQS新的Node節點,並將 新建的Node節點AQShead 節點進行關聯。 到這裏可能就會想到,這不是和 acquire 方法是同樣的嘛,沒錯,就是同樣。看一下源碼

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    // 新建 Node 節點 並將節點 和 AQS 隊列簡歷關聯
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
複製代碼

看到源碼是否是很熟悉,這不就是上邊咱們分析過的 acquire 方法嘛,惟一和 acquire 方法不一樣的就是,若是線程在被喚醒之後,也就是 head 節點的線程調用了 release 釋放了可執行權,而且經過 LockSupport.park 方法喚醒了 head 的 next節點所屬的線程時,headnext 節點所屬的線程已經被中斷了就會拋出 InterruptedException 異常。

這裏就不進行 addWaiter 方法 和 parkAndCheckInterrupt 方法的源碼展現了,若是還不明白就看一下上邊 acquire 方法的源碼分析。

tryAcquireNanos

tryAcquireNanos 方法的含義就是 可超時的獲取執行權 ,若是設置的 超時時間 到了,還未獲取到可執行權,則直接返回 false 。這裏的超時時間單位是 納秒 ns1秒(s)=1000000000納秒(ns)

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
複製代碼

看到 tryAcquireNanos 方法會想到什麼? 看到方法上的 throws InterruptedException 就一下想到了上面剛剛剛說的 acquireInterruptibly 方法,支持可中斷的獲取執行權。首先這裏會先調用 tryAcquire 方法獲取執行權,若是能夠獲取到執行權則直接返回,反之則調用 doAcquireNanos(arg, nanosTimeout) 方法進行 新建 Node 節點 並和 AQShead 節點進行關聯,而且會將節點加入到 AQS的隊列中而後將節點所屬的線程放入等待隊列中

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
  // 判斷超時時間 是否小於等於 0 若是這 則直接返回 false 
  if (nanosTimeout <= 0L)
        return false;
    // 使用當前時間的 納秒 + 超時時間的納秒 = 將來超時的超時時間,用來作parkNanos, 
    // 就至關於 Object.wait(long timeoutMillis) 方法
    final long deadline = System.nanoTime() + nanosTimeout;
  
    //經過構造方法 新建 Node節點, 根據入參mode指定了 Node的模式,共享或獨佔
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            // 判斷 新建的Node節點是否等於head節點 && 能夠獲取到'可執行權'
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                // 設置 head 節點爲 當前線程的新建的Node節點,也就是線程被喚醒後並獲取到了可執行權,則將head
                // 節點設置爲當前線程建立的Node節點,能夠保證head節點永遠均可以和後續節點有關聯關係
                setHead(node);
                p.next = null; // help GC
                return true;
            }
            // 判斷計算過的 deadline 時間 - 當前時間 是否小於0 是則 超時時間已過,返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 判斷Node節點的線程是否符合被 wait ,在這裏用的是 park 而且 納秒必須大於 1000 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            // 判斷線程是否被中斷 若是中斷則拋出 InterruptedException 異常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
複製代碼

若是在 acquirerelease 分析中理解其中原理是否是以爲這裏很簡單,這裏也不列舉已經分析過的方法了,直接說出不一樣點

  1. 增長超時時間,在這裏使用了 LockSupport.parkNanos(this, nanosTimeout) 方法,也就至關於 Object.wait(long timeoutMillis) 方法,等待的線程的狀態會在超時時間失效從 wait 變爲 run
  2. 線程被喚醒之後,也就是過了超時時間則會判斷計算過的 deadline時間 - 當前時間 是否小於0, 若果是則表明超時時間已過,直接返回false,反之則繼續執行。
  3. 支持中斷 ,若是已中斷則會拋出 InterruptedException 異常。

是否是很簡單,讀者要把重點放到 acquirerelease 上,其餘的就很容易了。上面的內容均是獲取的獨佔模式,下面來說解一下 共享模式。

acquireShared

public final void acquireShared(int arg) {
    // 經過調用 tryAcquireShared 方法獲取可執行權,若是未獲取到則調用 doAcquireShared
    // 方法進行 新建 Node節點 並和 AQS的 head 節點創建關係,
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
複製代碼

經過 acquireShared 方法能夠看到和 acquire 並無什麼區別,獲取可執行權的代碼須要 自定義同步器 實現,在共享模式分析中就不對 ReentrantReadWriteLock 源碼進行分析了 ,會在後面對 ReentrantLockReentrantReadWriteLock 進行源碼分析,接下來看一下 doAcquireShared ,看它是否是和 acquireQueued 方法也是同樣的邏輯呢?

private void doAcquireShared(int arg) {
  	// 獲取經過 addWaiter 建立的Node方法
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            // 獲取新建節點的前置節點
            final Node p = node.predecessor();
         	  // 判斷 新建的Node節點是否等於head節點
            if (p == head) {
                // 若是上邊的 p==head 須要在此判斷是否能夠獲取到'可執行權'
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                  	// 若是獲取到了可執行權
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 判斷Node節點的線程是否符合被 wait ,在這裏用的是 park 
            if (shouldParkAfterFailedAcquire(p, node))
                // 將線程 wait 而且線程被喚醒後 判斷線程是否被中斷
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}
複製代碼

doAcquireShared 方法中,咱們看到,首先依然是調用 addWaiter 方法進行新建Node,這裏就很少說,能夠看一下上邊的方法詳解,doAcquireShared 也是核心方法,在其中會對線程進行 LockSupport.park 進行控制,其實現方式是 循環 ,下面就具體分析一下

  1. 首先會獲取 當前線程 所建立的 Node 節點中的 前置節點(prev)
  2. 判斷 前置節點(prev) 是否等於 AQShead 節點,若是條件成立則看 步驟3 ,反之看 步驟4, 若是知足條件,也就表明着 head 節點 所對應的線程 已經執行完成而且作了釋放**(release方法)**操做。
  3. 若是 步驟2 條件成立,則再次判斷 當前線程是否能夠獲取到可執行權 ,若是能夠則設置 AQShead 節點爲當前線程的 新建的Node節點, 反之則看 步驟3
  4. 若是 步驟2步驟3 條件不成立,則判斷 Node節點所對應的線程的狀態是否符合改成 wait 狀態,也就是是否能夠加入到等待隊列中。這個邏輯在 shouldParkAfterFailedAcquire 方法中,能夠看一下上邊的方法詳解。

acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
複製代碼
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  	// 獲取經過 addWaiter 建立的Node方法
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            // 獲取新建節點的前置節點
            final Node p = node.predecessor();
            // 判斷 新建的Node節點是否等於head節點
            if (p == head) {
                // 若是獲取到了可執行權
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 判斷Node節點的線程是否符合被 wait && 將線程 wait 而且線程被喚醒後判斷線程是否被中斷
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
複製代碼

能夠看到 acquireSharedInterruptiblyacquireShared 方法並無什麼太大區別,惟一的區別就是在調用 parkAndCheckInterrupt 線程狀態被 wait ,等到當前節點 prev 節點的所屬線程調用了 release 方法後,喚醒當前節點所屬線程時,若是當前線程被中斷了會拋出 InterruptedException 異常。

tryAcquireSharedNanos

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}
複製代碼
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    // 若是超時時間 小於等於0 則直接 返回失敗
    if (nanosTimeout <= 0L)
        return false;
    // 使用當前時間的 納秒 + 超時時間的納秒 = 將來超時的超時時間,用來作parkNanos, 
    // 就至關於 Object.wait(long timeoutMillis) 方法
    final long deadline = System.nanoTime() + nanosTimeout;
  	
    //經過構造方法 新建 Node節點, 根據入參mode指定了 Node的模式,共享或獨佔。這裏是共享
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            // 判斷 新建的Node節點是否等於head節點
            final Node p = node.predecessor();
            if (p == head) {
                // 是否能夠得到可執行權
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
          
            // 判斷計算過的 deadline 時間 - 當前時間 是否小於或等於0 是則超時時間已過,返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 判斷Node節點的線程是否符合被 wait ,在這裏用的是 park 而且 納秒必須大於 1000 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
          	// // 判斷線程是否被中斷 若是中斷則拋出 InterruptedException 異常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
複製代碼

有沒有發現,doAcquireSharedNanos 方法和 doAcquireNanos 方法很類似呢,若是在 acquireShared 分析中理解其原理是否是以爲這裏很簡單,這裏也不列舉已經分析過的方法了,直接說出不一樣點

  1. 增長超時時間,在這裏使用了 LockSupport.parkNanos(this, nanosTimeout) 方法,也就至關於 Object.wait(long timeoutMillis) 方法,等待的線程的狀態會在超時時間失效從 wait 變爲 run
  2. 線程被喚醒之後,也就是過了超時時間則會判斷計算過的 deadline時間 - 當前時間 是否小於0, 若果是則表明超時時間已過,直接返回false,反之則繼續執行。
  3. 支持中斷 ,若是已中斷則會拋出 InterruptedException 異常。

是否是很簡單,讀者要把重點放到 acquireacquireShared 上,其餘的就很容易了。

releaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼
private void doReleaseShared() {
        /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 若是更新失敗則循環
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 喚醒 head 節點的 next 節點所屬的線程
                    unparkSuccessor(h);
                }
                // 若是更新失敗則循環
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 若是 head 改變了則再次循環
            if (h == head)                   // loop if head changed
                break;
        }
    }
複製代碼

tryReleaseShared 方法和 release 方法稍微有一點區別,下面咱們就具體分析一下

  1. 首先去嘗試釋放資源經過 tryReleaseShared 方法,若是釋放成功了,就表明有資源空閒出來,那麼就看 步驟2
  2. 調用doReleaseShared 去喚醒後續結點, 在 doReleaseShared 方法中採用了 loop,每一次循環的過程都是首先得到 head 節點,若是 head 結點不爲空且不等於 tail 結點,那麼先得到該節點的狀態,若是是SIGNAL的狀態,則表明它須要有後繼結點去喚醒,首先將其的狀態變爲0(初始狀態),而後經過 unparkSuccessor 方法喚醒後續節點所屬的線程,若是結點狀態一開始就是0,那麼就給他轉換成 PROPAGATE 狀態,保證在後續獲取資源的時候,還可以向後面傳播。

至此咱們已經分析完了 AbstractQueuedSynchronizer 的源碼,是否是很簡單呢?最主要的仍是要理解AQS的總體流程,說白了AQS是依賴兩大利器,也就是 VarHandle 和 LockSupport。

博客地址:lantaoblog.site

相關文章
相關標籤/搜索