Java併發包源碼學習系列:AbstractQueuedSynchronizer

本文基於JDK1.8java

本篇學習目標

  • 瞭解AQS的設計思想以及重要字段含義,如經過state字段表示同步狀態等。
  • 瞭解AQS內部維護鏈式雙向同步隊列的結構以及幾個重要指針。
  • 瞭解五種重要的同步狀態。
  • 明確兩種模式:共享模式和獨佔模式。
  • 學習兩種模式下AQS提供的模板方法:獲取與釋放同步狀態相關方法。
  • 瞭解Condition、ConditionObject等AQS對條件變量的支持。
  • 經過Condition的案例深刻了解等待通知的機制。

AQS概述

AQS即AbstractQueuedSynchronizer,隊列同步器,他是構建衆多同步組件的基礎框架,如ReentrantLockReentrantReadWriteLock等,是J.U.C併發包的核心基礎組件。node

AQS框架基於模板方法設計模式構建,子類經過繼承它,實現它提供的抽象方法來管理同步狀態。算法

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	// 序列化版本號
    private static final long serialVersionUID = 7373984972572414691L;

    /**
     * 建立AQS實例,初始化state爲0,爲子類提供
     */
    protected AbstractQueuedSynchronizer() { }
    
    /*----------------  同步隊列構成 ---------------*/

    // 等待隊列節點類型
    static final class Node {
        //...省略
    }

    /**
     * 除了初始化以外,它只能經過setHead方法進行修改。注意:若是head存在,它的waitStatus保證不會被取消
     */
    private transient volatile Node head;

    /**
     * 等待隊列的尾部,懶初始化,以後只在enq方法加入新節點時修改
     */
    private transient volatile Node tail;
    
    /*----------------  同步狀態相關 ---------------*/

    /**
     * volatile修飾, 標識同步狀態,state爲0表示鎖空閒,state>0表示鎖被持有,能夠大於1,表示被重入
     */
    private volatile int state;

    /**
     * 返回當前同步狀態
     */
    protected final int getState() {
        return state;
    }

    /**
     * 設置同步狀態
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * 利用CAS操做更新state值
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    static final long spinForTimeoutThreshold = 1000L;
    
    // 這部分和CAS有關 
    // 獲取Unsafe實例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // 記錄state在AQS類中的偏移值
    private static final long stateOffset;
    static {
        try {
            // 初始化state變量的偏移值
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        } catch (Exception ex) { throw new Error(ex); }
    }
}

以上包括AQS的基本字段,比較核心的就是兩個部分:編程

  • 內部FIFO隊列的節點類型Node,和首尾指針字段。
  • 同步狀態相關的方法,設置,獲取,CAS更新等。

接下來咱們將一一學習這些內容。c#

AbstractOwnableSynchronizer

AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer,它提供了設置或獲取獨佔鎖的擁有者線程的功能。設計模式

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
 
    private static final long serialVersionUID = 3737899427754241961L;
 	// 自己是抽象類,該構造方法實爲爲子類提供
    protected AbstractOwnableSynchronizer() { }
 
	/* 互斥模式同步下的當前線程 */
    private transient Thread exclusiveOwnerThread;
 
	/* 設置當前擁有獨佔訪問的線程。鎖的擁有線程,null 參數表示沒有線程擁有訪問。
     * 此方法不另外施加任何同步或 volatile 字段訪問。
     */
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }
 
	/* 返回由 setExclusiveOwnerThread 最後設置的線程;
     * 若是從未設置,則返回 null。
     * 此方法不另外施加任何同步或 volatile 字段訪問。 
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

這裏exclusiveOwnerThread字段用來判斷當前線程是否是持有鎖,由於鎖能夠重入嘛,所以就會產生下面這樣的僞代碼:安全

if (currThread == getExclusiveOwnerThread()) {
	state++;
}

同步隊列與Node節點

tips:同步隊列被稱爲CLH隊列,是Craig,Landin,Hagersten的合稱。多線程

AQS經過內置的FIFO同步雙向隊列來完成資源獲取線程的排隊工做,內部經過節點head【其實是虛擬節點,真正的第一個線程在head.next的位置】和tail記錄隊首和隊尾元素,隊列元素類型爲Node。併發

CLU同步隊列的結構以下,具體的操做以後再作總結:框架

圖源:【AQS】核心實現

  • 若是當前線程獲取同步狀態失敗(鎖)時,AQS 則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程
  • 當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。
/**
     * 等待隊列中的節點類
     */
    static final class Node {
        /** 標識共享式節點 */
        static final Node SHARED = new Node();
        /** 標識獨佔式節點 */
        static final Node EXCLUSIVE = null;

        /** ------------ 等待狀態 ---------------*/
        
        /** 表示該線程放棄對鎖的爭奪 */
        static final int CANCELLED =  1;
        /** 當前node的後繼節點對應的線程須要被喚醒 */
        static final int SIGNAL    = -1;
        /** 線程在條件隊列中等待 */
        static final int CONDITION = -2;
        /** 釋放共享資源時須要通知其餘節點 */
        static final int PROPAGATE = -3;
        
        /** waitStatus == 0 表示不是以上任何一種 */
        
        // 記錄當前線程的等待狀態,以上五種
        volatile int waitStatus;

        // 前驅節點
        volatile Node prev;

        // 後繼節點
        volatile Node next;

        // node存儲的線程
        volatile Thread thread;
		
        // 當前節點在Condition中等待隊列上的下一個節點
        Node nextWaiter;

        // 判斷是否爲共享式獲取同步狀態
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 爲何不直接判斷prev,而是用p變量判斷呢?
         * 避免併發的狀況下,prev判斷完爲null,剛好被修改
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
		// 用於SHARED的建立
        Node() {    
        }
		// 用於addWaiter(Node mode)方法,指定模式的
        Node(Thread thread, Node mode) {     
            this.nextWaiter = mode;
            this.thread = thread;
        }
		// 用於addConditionWaiter()方法
        Node(Thread thread, int waitStatus) { 
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

註釋中已經標註各個等待狀態的取值和表示,這邊再總結一下:

  • 等待狀態使用waitStatus字段表示,用來控制線程的阻塞和喚醒,除了上面寫的四種,實際還有一種狀態是0,這在源碼的註釋中已經明確。

  • 【CANCELLED = 1】取消狀態,因爲超時或中斷,節點會被設置爲取消狀態,且一直保持,不會參與到競爭中。若是waitStatus>0則能夠認爲是該線程取消了等待。

  • 【SIGNAL = -1】後繼節點的線程處於等待狀態,當前節點的線程若是釋放了同步狀態或被取消,將通知後繼節點,使後繼節點的線程得以運行。

  • 【CONDITION = -2】節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中。

  • 【PROPAGATE = -3】表示下一次共享式同步狀態獲取,將會無條件地傳播下去。

  • 0:初始狀態,上面幾種啥也不是。

關於AQS內部維護的同步隊列,這裏只是瞭解一些基本概念,後續對隊列操做注意點進行深刻學習。

同步狀態state

AQS使用一個int類型的成員變量state來表示同步狀態,它用volatile修飾,而且提供了關於state的三個線程安全的方法:

  • getState(),讀取同步狀態。
  • setState(int newState),設置同步狀態爲newState。
  • compareAndSetState(int expect, int update),CAS操做更新state值。

state爲0表示鎖空閒,state>0表示鎖被持有,能夠大於1,表示被重入。不一樣的子類實現,對state的含義表示略有差別,舉幾個例子吧:

  • ReentrantLock:state表示當前線程獲取鎖的可重入次數。
  • ReetrantReadWriteLock:state的高16位表示讀狀態,也就是獲取該讀鎖的次數,低16位表示獲取到寫鎖的線程的可重入次數。
  • semaphore:state表示當前可用信號的個數。
  • CountDownlatch:state表示計數器當前的值。

重要方法分析

對於AQS來講,線程同步的關鍵是對state進行操做,根據state是否屬於一個線程,操做state的方式分爲獨佔方式和共享方式。

獨佔式獲取與釋放同步狀態

  • 使用獨佔的方式獲取的資源是與具體線程綁定的,若是一個線程獲取到了資源,便標記這個線程已經獲取到,其餘線程再次嘗試操做state獲取資源時就會發現當前該資源不是本身持有的,就會在獲取失敗後阻塞。

好比獨佔鎖ReentrantLock的實現,當一個線程獲取了ReentrantLock的鎖後,在AQS內部會首先使用CAS操做把state狀態值從0變爲1,而後設置當前鎖的持有者爲當前線程,當該線程再次獲取鎖時發現它就是鎖的持有者,則會把狀態值從1變爲2,也就是設置可重入次數,而當另一個線程獲取鎖時發現本身並非該鎖的持有者就會被放入AQS阻塞隊列後掛起。

// 獨佔式獲取同步狀態,成功後,其餘線程須要等待該線程釋放同步狀態才能獲取同步狀態
    public final void acquire(int arg) {
        // 首先調用 tryAcquire【須要子類實現】嘗試獲取資源,本質就是設置state的值,獲取成功就直接返回
        if (!tryAcquire(arg) &&
            // 獲取失敗,就將當前線程封裝成類型爲Node.EXCLUSIVE的Node節點,並插入AQS阻塞隊列尾部
            // 而後經過自旋獲取同步狀態
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

	// 與 acquire(int arg) 相同,可是該方法響應中斷。
	// 若是其餘線程調用了當前線程的interrupt()方法,響應中斷,拋出異常。
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        // interrupted()方法將會獲取當前線程的中斷標誌並重置
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
	//嘗試獲取鎖,若是獲取失敗會將當前線程掛起指定時間,時間到了以後當前線程被激活,若是仍是沒有獲取到鎖,就返回false。
	//另外,該方法會對中斷進行的響應,若是其餘線程調用了當前線程的interrupt()方法,響應中斷,拋出異常。
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
	// 獨佔式釋放同步狀態
    public final boolean release(int arg) {
        // 嘗試使用tryRelease釋放資源,本質也是設置state的值
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // LockSupport.unpark(thread) 激活AQS裏面被阻塞的一個線程
                // 被激活的線程則使用tryAcquire 嘗試,看當前狀態變量state的值是否能知足本身的須要,
                //知足則該線程被激活,而後繼續向下運行,不然仍是會被放入AQS隊列並被掛起。
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

須要注意:tryRelease和tryAcquire方法並無在AQS中給出實現,實現的任務交給了具體的子類,子類根據具體的場景需求實現,經過CAS算法,設置修改state的值。

共享式獲取與釋放同步狀態

  • 對應共享方式的資源與具體線程是不相關的,當多個線程去請求資源時經過CAS 方式競爭獲取資源,當一個線程獲取到了資源後,另一個線程再次去獲取時若是當前資源還能知足它的須要,則當前線程只須要使用CAS 方式進行獲取便可。

好比Semaphore信號量,當一個線程經過acquire()方法獲取信號量時,會首先看當前信號量個數是否知足須要,不知足則把當前線程放入阻塞隊列,若是知足則經過自旋CAS獲取信號量。

//共享式獲取同步狀態,若是當前線程未獲取到同步狀態,將會進入同步隊列等待,
	// 與獨佔式的主要區別是在同一時刻能夠有多個線程獲取到同步狀態;
	public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            // 嘗試獲取資源,若是成功則直接返回
            // 若是失敗,則將當前線程封裝爲類型爲Node.SHARED的Node節點並插入AQS阻塞隊列尾部
            // 並使用LockSupport.park(this)掛起本身
            doAcquireShared(arg);
    }
	// 共享式獲取同步狀態,響應中斷
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
	//共享式獲取同步狀態,增長超時限制
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
	//共享式釋放同步狀態
    public final boolean releaseShared(int arg) {
        // 嘗試釋放資源
        if (tryReleaseShared(arg)) {
            // 調用LockSupport.unpark(thread)激活AQS隊列裏被阻塞的一個線程。
            // 被激活的線程使用tryReleaseShared查看當前狀態變量state是否能知足本身的須要。
            // 若是知足須要,則線程被激活繼續向下運行,不然仍是放入AQS隊列並被掛起
            doReleaseShared();
            return true;
        }
        return false;
    }

Interruptibly的方法表示對中斷須要進行響應,線程在調用帶Interruptibly關鍵字的方法獲取資源時或者獲取資源失敗被掛起,其餘線程中斷了該線程,那麼該線程會拋出InterruptedException異常而返回。

AQS條件變量的支持

Condition接口

Contition是一種廣義上的條件隊列,它利用await()和signal()爲線程提供了一種更爲靈活的等待/通知模式

Condition必需要配合Lock一塊兒使用,由於對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,所以await和signal的調用必須在lock和unlock之間,有鎖以後,才能使用condition嘛。以ReentrantLock爲例,簡單使用以下:

public class ConditionTest {

    public static void main(String[] args) {
        final ReentrantLock lock = new ReentrantLock();
        final Condition condition = lock.newCondition();

        Thread thread1 = new Thread(() -> {
            String name = Thread.currentThread().getName();

            lock.lock();
            System.out.println(name + " <==成功獲取到鎖" + lock);
            try {
                System.out.println(name + " <==進入條件隊列等待");
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " <==醒了");
            lock.unlock();
            System.out.println(name + " <==釋放鎖");
        }, "等待線程");

        thread1.start();

        Thread thread2 = new Thread(() -> {
            String name = Thread.currentThread().getName();

            lock.lock();
            System.out.println(name + " ==>成功獲取到鎖" + lock);
            try {
                System.out.println("========== 這裏演示await中的線程沒有被signal的時候會一直等着 ===========");
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " ==>通知等待隊列的線程");
            condition.signal();
            lock.unlock();
            System.out.println(name + " ==>釋放鎖");
        }, "通知線程");

        thread2.start();
    }
}
等待線程 <==成功獲取到鎖java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 等待線程]
等待線程 <==進入條件隊列等待
通知線程 ==>成功獲取到鎖java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 通知線程]
========== 這裏演示await中的線程沒有被signal的時候會一直等着 ===========
通知線程 ==>通知等待隊列的線程
通知線程 ==>釋放鎖
等待線程 <==醒了
等待線程  <==釋放鎖

ConditionObject內部類

AQS,Lock,Condition,ConditionObject之間的關係:

ConditionObject是AQS的內部類,實現了Condition接口,Lock中提供newCondition()方法,委託給內部AQS的實現Sync來建立ConditionObject對象,享受AQS對Condition的支持。

// ReentrantLock#newCondition
	public Condition newCondition() {
        return sync.newCondition();
    }
	// Sync#newCondition
    final ConditionObject newCondition() {
        // 返回Contition的實現,定義在AQS中
        return new ConditionObject();
    }

ConditionObject用來結合鎖實現線程同步,ConditionObject能夠直接訪問AQS對象內部的變量,好比state狀態值和AQS隊列。ConditionObject是條件變量,每一個條件變量對應一個條件隊列(單向鏈表隊列),其用來存放調用條件變量的await方法後被阻塞的線程,ConditionObject維護了首尾節點:

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

看到這裏咱們須要明確這裏的條件隊列和上面分析的同步隊列是不同的:

  • AQS維護的是當前在等待資源的隊列,Condition維護的是在等待signal信號的隊列。

  • 每一個線程會存在上述兩個隊列中的一個,lock與unlock對應在AQS隊列,signal與await對應條件隊列,線程節點在他們之間反覆橫跳。

這裏咱們針對上面的demo來分析一下會更好理解一些:

爲了簡化,接下來我將用D表示等待線程,用T表示通知線程

  1. 【D】先調用lock.lock()方法,此時無競爭,【D】被加入到AQS等待隊列中。
  2. 【D】調用condition.await()方法,此時【D】從AQS等待隊列中移除,並加入到condition對應的條件等待隊列中。
  3. 【D】陷入等待以後,【T】啓動,因爲AQS隊列中的【D】已經被移除,此時【T】也很快獲取到鎖,相應的,【T】也被加入到AQS等待隊列中。
  4. 【T】接着調用condition.signal()方法,這時condition對應的條件隊列中只有一個節點【D】,因而【D】被取出,並被再次加入AQS的等待隊列中。此時【D】並無被喚醒,只是單純換了個位置。
  5. 接着【T】執行lock.unlock(),釋放鎖鎖以後,會喚醒AQS隊列中的【D】,此時【D】真正被喚醒且執行。

你看,【D】線程確實在兩個隊列中反覆橫跳吧,有關Condition的內容本文也只是拋磚引玉,以後會做詳細學習總結,若是你對這一系列感興趣,能夠關注一下,後續會陸續對併發進行深刻學習。

參考閱讀

相關文章
相關標籤/搜索