淺談AQS(抽象隊列同步器)

概念

AQS,全稱AbstractQueuedSynchronizer,即抽象隊列同步器,和CAS共同撐起了整個java.util.concurrent包,同時也是Java併發編程上繞不開的一個概念java

抽象隊列同步器,如下統稱AQS,用於解決的就是多線程併發訪問控制問題。在傳統的多線程編程中,若是有多個線程須要訪問同一個變量,就須要使用synchronized來爲臨界區加鎖(臨界區:訪問共享資源的程序段),可是這種方式既不「優雅」,也不高效(即便Java爲其已經作了不少優化),更重要的是,不能實現更細粒度的控制(雖然能夠經過大量額外程序代碼實現)。這時候,AQS出現了,它提供了一種簡潔優雅的機制來實現線程安全node

本質上說,AQS是構建(包括鎖在內)大部分同步組件的基礎框架,它經過管理資源狀態量線程同步隊列來實現資源的分發(如共享或獨佔)。接下來,咱們就要對其實現方式來作進一步的討論編程

內部組件

AQS的實現是基於同步狀態量和一個FIFO的雙向隊列來實現的,下面就來分別講述其各自的特色安全

同步狀態

在類內部有一個被volatile修飾的整形變量state,其定義以下:多線程

private volatile int state;
複製代碼

這個變量官方稱爲同步狀態量,實際能夠理解爲一些共享資源,每有一個線程獲取到了一個共享資源,則這個同步狀態量就要減一,反之就須要加一,若是這個狀態量爲0,就表示共享資源已經被其餘全部線程分完了,當前的線程只能等待架構

可是同步狀態量並不能直接與資源劃等號,它只是提供一種相似門禁的操做(能夠類比鎖),任何線程想要獲取共享資源都須要先來詢問這個同步狀態量是否容許這樣的操做,這也就間接實現了對於共享資源的線程安全控制併發

在AQS中,對於該變量提供瞭如下三個操做接口:框架

protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

	/** * 經過CAS操做來更新state變量 */
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
複製代碼

這裏暫且提一句,CAS指的是CompareAndSwap,即比較並替換,是一種樂觀鎖的實現,其實現原理簡單的說就是在更新一個值以前,先比較變量內存地址的當前值,是否與這個地址上預期應存儲的值一致,若是一致再進行更新。爲了避免喧賓奪主,關於CAS的東西暫且就說這麼多,對於本文來講了解這些就已經夠用了優化

而後再回來看這三個接口方法,其均被protected修飾符所修飾,因此這些接口方法並非提供給用戶調用的,而是供同步框架的開發者使用。這三個方法爲同步狀態量的修改操做提供了極大的控制權限,所以也須要謹慎使用ui

雙向隊列

AQS爲了管理全部獲取或沒獲取到同步狀態的線程,使用了雙向隊列來管理這些線程,這個隊列的節點定義以下:

static final class Node {
    
        /**表示當前節點在共享模式下等待 */
        static final Node SHARED = new Node();
        /** 表示當前節點在獨佔模式下等待 */
        static final Node EXCLUSIVE = null;

        /** 表示當前節點線程須要取消等待 */
        static final int CANCELLED =  1;
        /** 表示後繼節點線程須要被喚醒 */
        static final int SIGNAL    = -1;
        /** 線程正在等待Condition */
        static final int CONDITION = -2;
        /** 傳播狀態,表示下一次獲取共享同步狀態的操做會無條件傳播下去 */
        static final int PROPAGATE = -3;

        /** 節點的等待狀態 */
        volatile int waitStatus;

        /** 前驅節點 */
        volatile Node prev;

        /** 後繼節點 */
        volatile Node next;

        /** 隊列節點所表明的線程 */
        volatile Thread thread;

        /** 等待隊列中的後繼節點,若是在共享模式下等待,則該變量爲SHARED */
        Node nextWaiter;

        /** 返回節點是否以共享模式在等待資源 */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /** * 返回當前節點的前驅節點,若是不存在則拋出異常 * @return 前驅節點 */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        
        Node() {}

        Node(Thread thread, Node mode) {  
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { 
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
複製代碼

在AQS中,使用head和tail來標識這個同步隊列:

private transient volatile Node head;

    private transient volatile Node tail;
複製代碼

在這個雙向隊列,更準確地來講是同步隊列中,比較重要的點有兩個,首先是能夠很明顯地看出同步節點有兩種模式,分別爲獨佔和共享。其次是每個節點都有一個Thread類型的變量,也就是說每個節點均表明着一個線程

同時,AQS也提供了一系列對同步隊列的操做接口,其中的一些重要的方法咱們放在下一節來詳細講解

同步原理

剛纔咱們簡要了解了AQS中同步狀態和同步隊列的結構,接下來咱們就要來分析爲何這兩個東西就能實現併發安全控制

接口

AQS自己是不能直接使用的,由於其本質仍是一個抽象類(儘管一個抽象方法都沒有),若是想要使用AQS的話,咱們僅須要繼承AQS,並重寫如下5個方法:

  • boolean tryAcquire(int arg):嘗試以獨佔模式獲取同步狀態
  • boolean tryRelease(int arg):嘗試以獨佔模式釋放同步狀態
  • int tryAcquireShared(int arg):嘗試以共享模式獲取同步狀態
  • boolean tryReleaseShared(int arg):嘗試以共享模式釋放同步變量
  • boolean isHeldExclusively():當同步器被當前線程以獨佔模式佔用時返回true

固然,也能夠選擇不重寫這些方法,可是不重寫的狀況下你是不能直接調用這些方法的,由於這些方法在AQS的實現中均會拋出一個UnsupportedOperationException異常

這些方法的含義很好理解,咱們也很容易想到如何利用這5個接口完成線程同步的操做。好比,咱們如今想要實現一個寫鎖,那麼咱們能夠把同步狀態的初始值設爲1,而後實現tryAcquire和tryRelease方法,每當有線程來獲取寫鎖時就嘗試調用tryAcquire,寫操做執行完以後就調用tryRelease方法

模板方法

剛纔僅僅是作個實例,實際上剛纔的這5個接口方法也僅僅是用於被模版方法調用,因此咱們實際操做的仍是模板方法而已,AQS中提供的模板方法有如下這些:

獲取同步狀態:

  • void acquire(int arg):以獨佔模式獲取同步狀態
  • void acquireShared(int arg):以共享模式獲取同步狀態
  • void acquireInterruptibly(int arg):可以響應中斷的acquire方法
  • void acquireSharedInterruptibly(int arg):可以響應中斷的acquireShared方法
  • boolean tryAcquireNanos(int arg, long nanos):有超時限制的acquireInterruptibly方法
  • boolean tryAcquireSharedNanos(int arg, long nanos):有超時限制的acquireSharedInterruptibly方法

釋放同步狀態:

  • boolean release(int arg):獨佔式釋放同步狀態
  • boolean releaseShared(int arg):共享式釋放同步狀態

獲取隊列上的全部線程:

  • Colleaction<Thread> getQueuedThreads():獲取同步隊列上的線程集合

以上這些方法纔是開發者直接調用的方法(並且由於這些方法被final修飾,因此也不可能被重寫),咱們這裏用ReentranLock中的實現來舉例,咱們來看其lock和unlock方法的實現:

static final class NonfairSync extends Sync {
        // ...
		
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        
        // ...
    }
    public void lock() {
        sync.lock();
    }
    
    public void unlock() {
        sync.release(1);
    }
複製代碼

這裏有一個setExclusiveOwnerThread方法,該方法會將當前線程標識爲獲取了獨佔資源的線程。瞭解了這一點,咱們再來看lock方法,首先會嘗試更新同步狀態量,若是更新失敗,則將該線程添加到同步隊列中。在acquire方法中,會首先調用tryAcquire方法,這也就印證了,以前咱們提到的5個接口方法並非直接調用,而是由模板方法來進行間接調用,關於這些模板方法的細節咱們在下一個章節再來進一步講解

具體實現

enq、addWaiter--入隊

先來看enq方法

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼

這個方法實際上就是經過無限次嘗試使用CAS操做把node節點添加到隊尾,因此在理論上是有無限期阻塞線程的可能存在

再來看addWaiter方法

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 嘗試快速地在隊尾添加; 若是失敗就使用enq添加
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
複製代碼

addWaiter的方法流程很簡單,就是構造一個節點,而後嘗試使用CAS直接將節點添加到隊尾,若是失敗再調用enq方法。重點不在這裏,咱們看addWaiter方法第一行構造的Node類型對象,這裏咱們要結合Node的構造方法來看:

Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }
複製代碼

以前咱們說過,nextWaiter標識着當前節點的模式,只有兩個值能夠選擇,Node.EXCLUSIVENode.SHARED,即獨佔模式或共享模式。也就是說,addWaiter方法是一個能夠設置節點模式的enq方法

acquireQueued--按序獲取同步狀態
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 當前驅節點是頭結點,且獲取同步狀態成功後,將當前節點設置爲頭結點
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    // 節點置空能儘快進行垃圾收集
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                // 不知足爭獲同步狀態的條件,或爭取失敗,
                // 就判斷並選擇是否要進行進一步的操做(阻塞並中斷線程)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

當調用該方法的時候,會先判斷當前節點是不是隊首節點的直接後繼,若是是的話再嘗試獲取同步狀態(也能夠理解爲獲取鎖)。由於保證了只有頭結點的直接後繼節點才能獲取同步狀態,因此也就保證了不會發生多個節點同時調用setHead來將本身設置爲頭結點這樣的狀況

更重要的一點是,這個方式是一個「死循環」,因此節點會不斷嘗試獲取直到成功

acquire--獨佔式獲取同步狀態
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼

這個方法原本想一開始就講的,可是最後仍是以爲先把其中的輔助方法講了以後你們會更好理解。acquire方法僅僅是一個if條件語句,咱們先看條件知足以後執行了什麼方法:

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
複製代碼

好傢伙,直接調用interrupt把當前線程中斷,難不成每次調用acquire方法都會中斷一次當前線程?其實不是的,這也是大部分框架經常使用的一個技巧,就是把真正執行的操做放在if條件判斷中,只有其當返回值不在預期以內的時候再執行if語句塊中的內容

在本例中,acquire方法會先嚐試獲取同步狀態,若是失敗後再將當前線程構造爲一個獨佔式同步節點並添加到隊列中(addWaiter方法),而後會不斷嘗試獲取同步狀態並將本身設置爲隊首節點(acquireQueued方法)

這時候可能會有人好奇,acquireQueued不是一個死循環嗎,那不是隻有一種返回值?實際上,acquireQueued方法返回的並非是否添加成功,而是interrupted這個局部變量,表示當前線程是否被中斷,忘記的朋友能夠翻上去再看一遍

在parkAndCheckInterrupt這個方法中,會先將線程阻塞,而後返回線程的中斷標識(若是一直沒有中斷的話,線程就會一直阻塞直到unpark方法被調用),因此當線程被中斷時(須要節點設置能夠被中斷),該方法會返回true,而後就會執行if語句塊的內容,將線程中斷

如今咱們再來梳理下acquire方法的整個流程

  1. 嘗試獲取一次同步狀態,若是失敗則進入下一步
  2. 判斷當前節點的前驅是否爲頭結點,若是是則嘗試獲取同步狀態,不然會重複執行該操做,直到成功。若是節點設置容許中斷,則會將線程阻塞,直到檢測到中斷信號
  3. 若是上一步因爲檢測到中斷信號致使直接返回,則調用線程的interrupt方法中斷當前線程,不然結束
acquireShared--共享式獲取同步狀態
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
複製代碼

方法簡潔明瞭:若是嘗試以共享式獲取同步狀態失敗,就調用doAcquireShared方法來獲取。咱們來看看這個doAcquireShared:

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

果真和acquireQueued方法差很少,finally和try最後幾行的if判斷都是徹底一致的,咱們把重點放在不一樣的地方,不過像第一行這種吧EXCLUSIVE換成SHARED這種咱們就不提了,咱們直接來看中間的核心代碼部分:

if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
複製代碼

實際上原理和acquireShared是基本一致的,都是判斷前驅節點是不是隊列的頭結點,若是是則調用tryAcquireShared來嘗試獲取。其中不同的地方就是setHeadAndPropagate方法,咱們來看這個方法:

private void setHeadAndPropagate(Node node, int propagate) {
    	// 保存以前的頭結點
        Node h = head;
        setHead(node);
       
       // propagate爲tryAcquireShared的返回值,表示剩餘的狀態量
       // 若是大於0,則能夠喚醒多個節點,因此這個變量名叫作「傳播」
       // 其他的一些條件都是一些容許喚醒多個後繼節點的判斷
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
複製代碼

從上面的代碼結合註釋能夠看出來,共享式獲取同步狀態與獨佔式相比,能夠喚醒多個等待的線程

release--獨佔式釋放同步狀態
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

該方法流程也很簡單,先嚐試釋放同步狀態(這一步若是失敗會直接返回),而後在unparkSuccessor中調用unpark方法將h所在節點的後繼節點喚醒

其餘

最後一些注入共享式釋放、容許中斷的獲取、帶有超時的獲取等方法我就不一一列出了,一通百通,其本質都是相似的

總結

想要理解AQS,首先必定要理解同步狀態同步隊列這兩個概念,同步狀態標識着共享資源的許可量,同步隊列標識着被阻塞的線程

理解了這兩個概念以後,就須要明白tryAcquire、tryRelease等AQS提供的5個接口方法,若是咱們想要基於AQS自定義同步組件,就須要重寫這5個方法

最後,就須要理解acquire、release等AQS提供的模板方法,理解這些模板方法雖然不能直接爲你的業務代碼提供幫助,可是能夠提升你對於整個併發架構的理解

最後,若是有對AQS的實現有興趣的,除了閱讀AQS源碼外,推薦閱讀Semaphore(AQS的共享式實現)和ReentrantLock(AQS的獨佔式實現)的源碼

相關文章
相關標籤/搜索