併發編程的基石——AQS類


本博客系列是學習併發編程過程當中的記錄總結。因爲文章比較多,寫的時間也比較散,因此我整理了個目錄貼(傳送門),方便查閱。html

併發編程系列博客傳送門java


本文參考了[Java多線程進階(六)—— J.U.C之locks框架:AQS綜述(1)]和Java技術之AQS詳解兩篇文章。node

AQS 簡介

AbstractQueuedSynchronizer (簡稱AQS)類是整個 JUC包的核心類。JUC 中的ReentrantLockReentrantReadWriteLock CountDownLatchSemaphoreLimitLatch等同步工具都是基於AQS實現的。算法

AQS 分離出了構建同步器時的通用關注點,這些關注點主要包括以下:編程

  • 資源是能夠被同時訪問?仍是在同一時間只能被一個線程訪問?(共享/獨佔功能)
  • 訪問資源的線程如何進行併發管理?(等待隊列)
  • 若是線程等不及資源了,如何從等待隊列退出?(超時/中斷)

這些關注點都是圍繞着資源——同步狀態(synchronization state)來展開的,AQS將這些通用的關注點封裝成了一個個模板方法,讓子類能夠直接使用。segmentfault

AQS 留給用戶的只有兩個問題安全

  • 什麼是資源
  • 什麼狀況下資源是能夠被訪問的

這樣一來,定義同步器的難度就大大下降了。用戶只要解決好上面兩個問題,就能構建出一個性能優秀的同步器。多線程

下面是幾個常見的同步器對資源的定義:併發

同步器 資源的定義
ReentrantLock 資源表示獨佔鎖。State爲0表示鎖可用;爲1表示被佔用;爲N表示重入的次數
ReentrantReadWriteLock 資源表示共享的讀鎖和獨佔的寫鎖。state邏輯上被分紅兩個16位的unsigned short,分別記錄讀鎖被多少線程使用和寫鎖被重入的次數。
CountDownLatch 資源表示倒數計數器。State爲0表示計數器歸零,全部線程均可以訪問資源;爲N表示計數器未歸零,全部線程都須要阻塞。
Semaphore 資源表示信號量或者令牌。State≤0表示沒有令牌可用,全部線程都須要阻塞;大於0表示由令牌可用,線程每獲取一個令牌,State減1,線程沒釋放一個令牌,State加1。

AQS 原理

上面一節中介紹到 AQS 抽象出了三個關注點,下面就具體看下 AQS 是若是解決這三個問題的。app

同步狀態的管理

同步狀態,其實就是資源。AQS使用單個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSetState操做來讀取和更新這個狀態。

private volatile int state;
  
protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

線程的阻塞和喚醒

在JDK1.5以前,除了內置的監視器機制外,沒有其它方法能夠安全且便捷得阻塞和喚醒當前線程。

JDK1.5之後,java.util.concurrent.locks包提供了LockSupport類來做爲線程阻塞和喚醒的工具。

等待隊列

等待隊列,是AQS框架的核心,整個框架的關鍵其實就是如何在併發狀態下管理被阻塞的線程。

等待隊列是嚴格的FIFO隊列,是Craig,Landin和Hagersten鎖(CLH鎖)的一種變種,採用雙向循環鏈表實現,所以也叫CLH隊列。

1. 節點定義

CLH隊列中的結點是對線程的包裝,結點一共有兩種類型:獨佔(EXCLUSIVE)和共享(SHARED)。

每種類型的結點都有一些狀態,其中獨佔結點使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享結點使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。

結點狀態 描述
CANCELLED 1 取消。表示後驅結點被中斷或超時,須要移出隊列
SIGNAL -1 發信號。表示後驅結點被阻塞了(當前結點在入隊後、阻塞前,應確保將其prev結點類型改成SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。)
CONDITION -2 Condition專用。表示當前結點在Condition隊列中,由於等待某個條件而被阻塞了
PROPAGATE -3 傳播。適用於共享模式(好比連續的讀操做結點能夠依次進入臨界區,設爲PROPAGATE有助於實現這種迭代操做。)
INITIAL 0 默認。新結點會處於這種狀態

AQS使用CLH隊列實現線程的結構管理,而CLH結構正是用前一結點某一屬性表示當前結點的狀態,之因此這種作是由於在雙向鏈表的結構下,這樣更容易實現取消和超時功能。

next指針:用於維護隊列順序,當臨界區的資源被釋放時,頭結點經過next指針找到隊首結點。

prev指針:用於在結點(線程)被取消時,讓當前結點的前驅直接指向當前結點的後驅完成出隊動做。

static final class Node {
    
    // 共享模式結點
    static final Node SHARED = new Node();
    
    // 獨佔模式結點
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;

    static final int SIGNAL    = -1;

    static final int CONDITION = -2;

    static final int PROPAGATE = -3;

    /**
    * INITAL:      0 - 默認,新結點會處於這種狀態。
    * CANCELLED:   1 - 取消,表示後續結點被中斷或超時,須要移出隊列;
    * SIGNAL:      -1- 發信號,表示後續結點被阻塞了;(當前結點在入隊後、阻塞前,應確保將其prev結點類型改成SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。)
    * CONDITION:   -2- Condition專用,表示當前結點在Condition隊列中,由於等待某個條件而被阻塞了;
    * PROPAGATE:   -3- 傳播,適用於共享模式。(好比連續的讀操做結點能夠依次進入臨界區,設爲PROPAGATE有助於實現這種迭代操做。)
    * 
    * waitStatus表示的是後續結點狀態,這是由於AQS中使用CLH隊列實現線程的結構管理,而CLH結構正是用前一結點某一屬性表示當前結點的狀態,這樣更容易實現取消和超時功能。
    */
    volatile int waitStatus;

    // 前驅指針
    volatile Node prev;

    // 後驅指針
    volatile Node next;

    // 結點所包裝的線程
    volatile Thread thread;

    // Condition隊列使用,存儲condition隊列中的後繼節點
    Node nextWaiter;

    Node() {
    }

    Node(Thread thread, Node mode) { 
        this.nextWaiter = mode;
        this.thread = thread;
    }
}

2. 隊列定義

對於CLH隊列,當線程請求資源時,若是請求不到,會將線程包裝成結點,將其掛載在隊列尾部。

下面結合代碼一塊兒看下節點進入隊列的過程。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;   // 1
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))  // 2 
                    tail = head;
            } else {
                node.prev = t; // 3
                if (compareAndSetTail(t, node)) { // 4
                    t.next = node;
                    return t;
                }
            }
        }
    }

如上代碼在第一次循環中,當要在AQS隊列尾部插入元素時,AQS隊列狀態以下圖中(default)所示。也就是隊列頭、尾節點都指向null;當執行代碼(1)後節點t指向了尾部節點,這時候隊列狀態如圖中(I)所示。

這時候t爲null,故執行代碼(2),使用CAS算法設置一個哨兵節點爲頭節點,若是CAS設置成功,則讓尾部節點也指向哨兵節點,這時候隊列狀態如圖中(II)所示。

到如今爲止只插入了一個哨兵節點,還須要插入node節點,因此在第二次循環後執行到代碼(1),這時候隊列狀態如圖中(III)所示;而後執行代碼(3)設置node的前驅節點爲尾部節點,這時候隊列狀態如圖中(IV)所示;

而後經過CAS算法設置node節點爲尾部節點,CAS成功後隊列狀態如圖中(V)所示;

CAS成功後再設置原來的尾部節點的後驅節點爲node,這時候就完成了雙向鏈表的插入,此時隊列狀態如圖中(VI)所示。

AQS 的方法介紹

用戶須要本身重寫的方法

上面介紹到 AQS 已經幫用戶解決了同步器定義過程當中的大部分問題,只將下面兩個問題丟給用戶解決:

  • 什麼是資源
  • 什麼狀況下資源是能夠被訪問的

具體的,AQS 是經過暴露如下 API 來讓用戶解決上面的問題的。

鉤子方法 描述
tryAcquire 獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease 獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared 共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared 共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。
isHeldExclusively 該線程是否正在獨佔資源。只有用到condition才須要去實現它。

若是你須要實現一個本身的同步器,通常狀況下只要繼承 AQS ,並重寫 AQS 中的這個幾個方法就好了。至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。要不怎麼說Doug Lea貼心呢。

須要注意的是:若是你沒在子類中重寫這幾個方法就直接調用了,會直接拋出異常。因此,在你調用這些方法以前必須重寫他們。不使用的話能夠不重寫。

AQS 提供的一系列模板方法

查看 AQS 的源碼咱們就能夠發現這個類提供了不少方法,看起來讓人「眼花繚亂」的。可是最主要的兩類方法就是獲取資源的方法和釋放資源的方法。所以咱們抓住主要矛盾就好了:

  • public final void acquire(int arg) // 獨佔模式的獲取資源
  • public final boolean release(int arg) // 獨佔模式的釋放資源
  • public final void acquireShared(int arg) // 共享模式的獲取資源
  • public final boolean releaseShared(int arg) // 共享模式的釋放資源

acquire(int)方法

該方法以獨佔方式獲取資源,若是獲取到資源,線程繼續往下執行,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。該方法是獨佔模式下線程獲取共享資源的頂層入口。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

下面分析下這個acquire方法的具體執行流程:

step1:首先這個方法調用了用戶本身實現的方法tryAcquire方法嘗試獲取資源,若是這個方法返回true,也就是表示獲取資源成功,那麼整個acquire方法就執行結束了,線程繼續往下執行;

step2:若是tryAcquir方法返回false,也就表示嘗試獲取資源失敗。這時acquire方法會先調用addWaiter方法將當前線程封裝成Node類並加入一個FIFO的雙向隊列的尾部。

step3:再看acquireQueued這個關鍵方法。首先要注意的是這個方法中哪一個無條件的for循環,這個for循環說明acquireQueued方法一直在自旋嘗試獲取資源。進入for循環後,首先判斷了當前節點的前繼節點是否是頭節點,若是是的話就再次嘗試獲取資源,獲取資源成功的話就直接返回false(表示未被中斷過)

假如仍是沒有獲取資源成功,判斷是否須要讓當前節點進入waiting狀態,通過 shouldParkAfterFailedAcquire這個方法判斷,若是須要讓線程進入waiting狀態的話,就調用LockSupport的park方法讓線程進入waiting狀態。進入waiting狀態後,這線程等待被interupt或者unpark(在release操做中會進行這樣的操做,能夠參見後面的代碼)。這個線程被喚醒後繼續執行for循環來嘗試獲取資源。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //首先判斷了當前節點的前繼節點是否是頭節點,若是是的話就再次嘗試獲取資源,
                //獲取資源成功的話就直接返回false(表示未被中斷過)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判斷是否須要讓當前節點進入waiting狀態
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 若是在整個等待過程當中被中斷過,則返回true,不然返回false。
                    // 若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

以上就是acquire方法的簡單分析。

單獨看這個方法的話可能會不太清晰,結合ReentrantLockReentrantReadWriteLock CountDownLatchSemaphoreLimitLatch等同步工具看這個代碼的話就會好理解不少。

release(int)方法

release(int)方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//上面已經講過了,須要用戶自定義實現
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

與acquire()方法中的tryAcquire()相似,tryRelease()方法也是須要獨佔模式的自定義同步器去實現的。正常來講,tryRelease()都會成功的,由於這是獨佔模式,該線程來釋放資源,那麼它確定已經拿到獨佔資源了,直接減掉相應量的資源便可(state-=arg),也不須要考慮線程安全的問題。

但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!因此自義定同步器在實現時,若是已經完全釋放資源(state=0),要返回true,不然返回false。

unparkSuccessor(Node)方法用於喚醒等待隊列中下一個線程。這裏要注意的是,下一個線程並不必定是當前節點的next節點,而是下一個能夠用來喚醒的線程,若是這個節點存在,調用unpark()方法喚醒

總之,release()是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。(須要注意的是隊列中被喚醒的線程不必定能立馬獲取資源,由於資源在釋放後可能立馬被其餘線程(不是在隊列中等待的線程)搶掉了

acquireShared(int)方法

acquireShared(int)方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。

public final void acquireShared(int arg) {
    //tryAcquireShared須要用戶自定義實現
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

能夠發現,這個方法的關鍵實現實際上是獲取資源失敗後,怎麼管理線程。也就是doAcquireShared的邏輯。

//不響應中斷
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

能夠看出,doAcquireShared的邏輯和acquireQueued的邏輯差很少。將當前線程加入等待隊列尾部休息,直到其餘線程釋放資源喚醒本身,本身成功拿到相應量的資源後才返回。

簡單總結下acquireShared的流程:

step1:tryAcquireShared()嘗試獲取資源,成功則直接返回;

step2:失敗則經過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()併成功獲取到資源才返回。整個等待過程也是忽略中斷的。

releaseShared(int)方法

releaseShared(int)方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是成功釋放且容許喚醒等待線程,它會喚醒等待隊列裏的其餘線程來獲取資源。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

釋放掉資源後,喚醒後繼。跟獨佔模式下的release()類似,但有一點稍微須要注意:獨佔模式下的tryRelease()在徹底釋放掉資源(state=0)後,纔會返回true去喚醒其餘線程,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制必定量的線程併發執行,那麼擁有資源的線程在釋放掉部分資源時就能夠喚醒後繼等待結點。

參考

相關文章
相關標籤/搜索