Java 併發編程 ----- AQS(抽象隊列同步器)

1、什麼是 AQS ?

AQS即AbstractQueuedSynchronizer的縮寫,是併發編程中實現同步器的一個框架。框架,框架,重要的事情說三遍,框架就是說它幫你處理了很大一部分的邏輯,其它功能須要你來擴展。想一想你使用Spring框架的場景,Spring幫助開發者實現IOC容器的bean依賴管理,標籤解析等,咱們只須要對bean進行配置便可,其餘不用管。java

AQS基於一個FIFO雙向隊列實現,被設計給那些依賴一個表明狀態的原子int值的同步器使用。咱們都知道,既然叫同步器,那個確定有個表明同步狀態(臨界資源)的東西,在AQS中即爲一個叫state的int值,該值經過CAS進行原子修改。node

在AQS中存在一個FIFO隊列,隊列中的節點表示被阻塞的線程,隊列節點元素有4種類型, 每種類型表示線程被阻塞的緣由,這四種類型分別是:編程

  • CANCELLED : 表示該線程是由於超時或者中斷緣由而被放到隊列中
  • CONDITION : 表示該線程是由於某個條件不知足而被放到隊列中,須要等待一個條件,直到條件成立後纔會出隊
  • SIGNAL : 表示該線程須要被喚醒
  • PROPAGATE : 表示在共享模式下,當前節點執行釋放release操做後,當前結點須要傳播通知給後面全部節點

因爲一個共享資源同一時間只能由一條線程持有,也能夠被多個線程持有,所以AQS中存在兩種模式,以下:設計模式

  • 一、獨佔模式安全

    獨佔模式表示共享狀態值state每次只能由一條線程持有,其餘線程若是須要獲取,則須要阻塞,如JUC中的ReentrantLockbash

  • 二、共享模式數據結構

    共享模式表示共享狀態值state每次能夠由多個線程持有,如JUC中的CountDownLatch多線程

2、AQS 中的核心數據結構和方法

一、既然AQS是基於一個FIFO隊列的框架,那麼咱們先來看下隊列的元素節點Node的數據結構,源碼以下:
static final class Node {
    /**共享模式*/
    static final Node SHARED = new Node();
    /**獨佔模式*/
    static final Node EXCLUSIVE = null;

    /**標記線程因爲中斷或超時,須要被取消,即踢出隊列*/
    static final int CANCELLED =  1;
    /**線程須要被喚醒*/
    static final int SIGNAL = -1;
    /**線程正在等待一個條件*/
    static final int CONDITION = -2;
    /**
     * 傳播
     */
    static final int PROPAGATE = -3;
    
    // waitStatus只取上面CANCELLED、SIGNAL、CONDITION、PROPAGATE四種取值之一
    volatile int waitStatus;

    // 表示前驅節點
    volatile Node prev;

    // 表示後繼節點
    volatile Node next;

    // 隊列元素須要關聯一個線程對象
    volatile Thread thread;

    // 表示下一個waitStatus值爲CONDITION的節點
    Node nextWaiter;

    /**
     * 是否當前結點是處於共享模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 返回前一個節點,若是沒有前一個節點,則拋出空指針異常
     */
    final Node predecessor() throws NullPointerException {
        // 獲取前一個節點的指針
        Node p = prev;
        // 若是前一個節點不存在
        if (p == null)
            throw new NullPointerException();
        else
        // 不然返回
            return p;
    }

    // 初始化頭節點使用
    Node() {}

    /**
     *  當有線程須要入隊時,那麼就建立一個新節點,而後關聯該線程對象,由addWaiter()方法調用
     */
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    /**
     * 一個線程須要等待一個條件阻塞了,那麼就建立一個新節點,關聯線程對象
     */
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
複製代碼

總結下Node節點數據結構設計,隊列中的元素,確定是爲了保存因爲某種緣由致使沒法獲取共享資源state而被入隊的線程,所以Node中使用了waitStatus表示節點入隊的緣由,使用Thread對象來表示節點所關聯的線程。至於prev,next,則是通常雙向隊列數據結構必須提供的指針,用於對隊列進行相關操做。併發

二、AQS中的共享狀態值

以前提到,AQS是基於一個共享的int類型的state值來實現同步器同步的,其聲明以下:框架

/**
 * 同步狀態值
 */
private volatile int state;

/**
 * 獲取同步狀態值
 */
protected final int getState() {
    return state;
}

/**
 * 修改同步狀態值
 */
protected final void setState(int newState) {
    state = newState;
}
複製代碼

由源碼咱們能夠看出,AQS聲明瞭一個int類型的state值,爲了達到多線程同步的功能,必然對該值的修改必須多線程可見,所以,state採用volatile修飾,並且getState()setState()方法採用final進行修飾,目的是限制AQS的子類只能調用這兩個方法對state的值進行設置和獲取,而不能對其進行重寫自定義設置/獲取邏輯。

AQS中提供對state值修改的方法不只僅只有setState()getState(),還有諸如採用CAS機制進行設置的compareAndSetState()方法,一樣,該方法也是採用final修飾的,不容許子類重寫,只能調用。

三、AQS中的tryXXX方法

通常基於AQS實現的同步器,如ReentrantLock,CountDownLatch等,對於state的獲取操做,子類只需重寫其tryAcquire()tryAcquireShared()方法便可,這兩個方法分別對應獨佔模式和共享模式下對state的獲取操做;而對於釋放操做,子類只需重寫tryRelease()tryReleaseShared()方法便可。

至於如何維護隊列的出隊、入隊操做,子類不用管,AQS已經幫你作好了。

3、AQS 設計妙處

優秀的項目總會有亮點可挖,AQS也是。小編在看了AQS的源碼以後,結合其餘做者相關博客,總結了如下兩點感受很優秀的設計點,這是咱們應該學習的,前輩老是那麼優秀。

一、自旋鎖

當咱們執行一個有肯定結果的操做,同時又須要併發正確執行,一般能夠採用自旋鎖實現。在AQS中,自旋鎖採用 死循環 + CAS 實現。針對AQS中的enq()進行講解:

private Node enq(final Node node) {
    // 死循環 + CAS ,解決入隊併發問題
    /**
     * 假設有三個線程同時都須要入隊操做,那麼使用死循環和CAS可保證併發安全,同一時間只有一個節點安全入隊,入隊失敗的線程則循環重試
     * 
     * 一、若是不要死循環能夠嗎?只用CAS.
     *   不能夠,由於若是其餘線程修改了tail的值,致使1處代碼返回false,那麼方法enq方法將推出,致使該入隊的節點卻沒能入隊
     * 
     * 二、若是隻用死循環,不須要CAS能夠嗎?
     *   不能夠,首先不須要使用CAS,那就不必再使用死循環了,再者,若是不使用CAS,那麼當執行1處代碼時,將會改變隊列的結構
     */
    for (;;) {
        // 獲取尾部節點
        Node t = tail;
        // 若是尚未初始化,那麼就初始化
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                // 剛開始確定是頭指針和尾指針相等
                tail = head;
        } else {
            // 當前結點的前驅節點等於尾部節點
            node.prev = t;
            // 若是當前尾結點仍然是t,那麼執行入隊並返回true,不然返回false,而後重試
            if (compareAndSetTail(t, node)) {   // 1
                t.next = node;
                return t;
            }
        }
    }
}
複製代碼

首先入隊操做要求的最終結果必須是一個節點插入到隊列中去,只能成功,不能失敗!然而這個入隊的操做是須要併發執行的,有可能同時有不少的線程須要執行入隊操做,所以咱們須要採起相關的線程同步機制。自旋鎖採起樂觀策略,即便用了CAS中的compareAndSet()操做,若是某次執行返回fasle,那麼當前操做必須重試,所以,採用for死循環直到成功爲止,成功,則break跳出for循環或者直接return操做退出方法。

二、模板方法

在AQS中,模板方法設計模式體如今其acquire()、release()方法上,咱們先來看下源碼:

public final void acquire(int arg) {
        // 首先嚐試獲取共享狀態,若是獲取成功,則tryAcquire()返回true
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼

其中調用tryAcquire()方法的默認實現是拋出一個異常,也就是說tryAcquire()方法留給子類去實現,acquire()方法定義了一個模板,一套處理邏輯,相關具體執行方法留給子類去實現。

關於更多模板方法設計模式,能夠查閱談一談我對‘模板方法’設計模式的理解(Template)

4、自定義本身的併發同步器

下邊以JDK文檔的一個實例進行介紹:

class Mutex implements Lock, java.io.Serializable {
    // 自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判斷是否鎖定狀態
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 嘗試獲取資源,當即返回。成功則返回true,不然false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 這裏限定只能爲1個量
            if (compareAndSetState(0, 1)) {//state爲0才設置爲1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//設置爲當前線程獨佔資源
                return true;
            }
            return false;
        }

        // 嘗試釋放資源,當即返回。成功則爲true,不然false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定爲1個量
            if (getState() == 0)//既然來釋放,那確定就是已佔有狀態了。只是爲了保險,多層判斷!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//釋放資源,放棄佔有狀態
            return true;
        }
    }

    // 真正同步類的實現都依賴繼承於AQS的自定義同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。二者語義同樣:獲取資源,即使等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。二者語義同樣:嘗試獲取資源,要求當即返回。成功則爲true,失敗則爲false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。二者語文同樣:釋放資源。
    public void unlock() {
        sync.release(1);
    }

    //鎖是否佔有狀態
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}
複製代碼

實現本身的同步類通常都會自定義同步器(sync),而且將該類定義爲內部類,供本身使用;而同步類本身(Mutex)則實現某個接口,對外服務。固然,接口的實現要直接依賴sync,它們在語義上也存在某種對應關係!!而sync只用實現資源state的獲取-釋放方式tryAcquire-tryRelelase,至於線程的排隊、等待、喚醒等,上層的AQS都已經實現好了,咱們不用關心。

除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實現方式都差很少,不一樣的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點,AQS的核心便被攻破了!

相關文章
相關標籤/搜索