逐行分析AQS源碼(1)——獨佔鎖的獲取

前言

AQS(AbstractQueuedSynchronizer)是JAVA中衆多鎖以及併發工具的基礎,其底層採用樂觀鎖,大量使用了CAS操做, 而且在衝突時,採用自旋方式重試,以實現輕量級和高效地獲取鎖。java

AQS雖然被定義爲抽象類,但事實上它並不包含任何抽象方法。這是由於AQS是被設計來支持多種用途的,若是定義抽象方法,則子類在繼承時必需要覆寫全部的抽象方法,這顯然是不合理的。因此AQS將一些須要子類覆寫的方法都設計成protect方法,將其默認實現爲拋出UnsupportedOperationException異常。若是子類使用到這些方法,可是沒有覆寫,則會拋出異常;若是子類沒有使用到這些方法,則不須要作任何操做。node

AQS中實現了鎖的獲取框架,鎖的實際獲取邏輯交由子類去實現,就鎖的獲取操做而言,子類必須重寫 tryAcquire方法。編程

本篇咱們將以ReentrantLock的公平鎖爲例來詳細看看使用AQS獲取獨佔鎖的流程。segmentfault

本文中的源碼基於JDK1.8 。安全

系列文章目錄數據結構

Java併發工具類的三板斧

在開始看AQS源碼以前,咱們先來了解如下java併發工具的設計套路,我把它總結成三板斧:多線程

狀態,隊列,CAS

每當咱們學習一個java併發編程工具的時候,咱們首先要抓住這三點。併發

  • 狀態:通常是一個state屬性,它基本是整個工具的核心,一般整個工具都是在設置和修改狀態,不少方法的操做都依賴於當前狀態是什麼。因爲狀態是全局共享的,通常會被設置成volatile類型,以保證其修改的可見性;
  • 隊列:隊列一般是一個等待的集合,大多數以鏈表的形式實現。隊列採用的是悲觀鎖的思想,表示當前所等待的資源,狀態或者條件短期內可能沒法知足。所以,它會將當前線程包裝成某種類型的數據結構,扔到一個等待隊列中,當必定條件知足後,再從等待隊列中取出。
  • CAS: CAS操做是最輕量的併發處理,一般咱們對於狀態的修改都會用到CAS操做,由於狀態可能被多個線程同時修改,CAS操做保證了同一個時刻,只有一個線程能修改爲功,從而保證了線程安全,CAS操做基本是由Unsafe工具類的compareAndSwapXXX來實現的;CAS採用的是樂觀鎖的思想,所以經常伴隨着自旋,若是發現當前沒法成功地執行CAS,則不斷重試,直到成功爲止,自旋的的表現形式一般是一個死循環for(;;)

AQS核心實現

上面咱們已經總結了java併發編程的套路,下面咱們就以這個套路爲切入點來分析AQS的實現。框架

狀態

首先是找狀態。函數

在AQS中,狀態是由state屬性來表示的,不出所料,它是volatile類型的:

private volatile int state;

該屬性的值即表示了鎖的狀態,state爲0表示鎖沒有被佔用,state大於0表示當前已經有線程持有該鎖,這裏之因此說大於0而不說等於1是由於可能存在可重入的狀況。你能夠把state變量當作是當前持有該鎖的線程數量。

因爲本篇咱們分析的是獨佔鎖,同一時刻,鎖只能被一個線程所持有。經過state變量是否爲0,咱們能夠分辨當前鎖是否被佔用,但光知道鎖是否是被佔用是不夠的,咱們並不知道佔用鎖的線程是哪個。在監視器鎖中,咱們用ObjectMonitor對象的_owner屬性記錄了當前擁有監視器鎖的線程,而在AQS中,咱們將經過exclusiveOwnerThread屬性:

private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer

exclusiveOwnerThread屬性的值即爲當前持有鎖的線程,它就是咱們在分析監視器鎖的原理的時候所說的「鐵王座」。

隊列

接着咱們來看隊列,AQS中,隊列的實現是一個雙向鏈表,被稱爲sync queue,它表示全部等待鎖的線程的集合,有點相似於咱們前面介紹synchronized原理的時候說的wait set

咱們前面說過,在併發編程中使用隊列一般是將當前線程包裝成某種類型的數據結構扔到等待隊列中,咱們先來看看隊列中的每個節點是怎麼個結構:

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

這個結構看起來很複雜,其實屬性只有4類:

// 節點所表明的線程
volatile Thread thread;

// 雙向鏈表,每一個節點須要保存本身的前驅節點和後繼節點的引用
volatile Node prev;
volatile Node next;

// 線程所處的等待鎖的狀態,初始化時,該值爲0
volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 該屬性用於條件隊列或者共享鎖
Node nextWaiter;

注意,在這個Node類中也有一個狀態變量waitStatus,它表示了當前Node所表明的線程的等待鎖的狀態,在獨佔鎖模式下,咱們只須要關注CANCELLED SIGNAL兩種狀態便可。這裏還有一個nextWaiter屬性,它在獨佔鎖模式下永遠爲null,僅僅起到一個標記做用,沒有實際意義。這麼一分析,這個Node類是否是就簡單了好多?<( ̄︶ ̄)>

說完隊列中的節點,咱們接着說回這個sync queue,AQS是怎麼使用這個隊列的呢,既然是雙向鏈表,操縱它天然只須要一個頭結點和一個尾節點:

// 頭結點,不表明任何線程,是一個啞結點
private transient volatile Node head;

// 尾節點,每個請求鎖的線程會加到隊尾
private transient volatile Node tail;

到這裏,咱們就瞭解到了這個sync queue的全貌:
wait queue

不過這裏有一點咱們提早說一下,在AQS中的隊列是一個CLH隊列,它的head節點永遠是一個啞結點(dummy node), 它不表明任何線程(某些狀況下能夠看作是表明了當前持有鎖的線程),所以head所指向的Node的thread屬性永遠是null。只有從次頭節點日後的全部節點才表明了全部等待鎖的線程。也就是說,在當前線程沒有搶到鎖被包裝成Node扔到隊列中時,即便隊列是空的,它也會排在第二個,咱們會在它的前面新建一個dummy節點(具體的代碼咱們在後面分析源碼時再詳細講)。爲了便於描述,下文中咱們把除去head節點的隊列稱做是等待隊列,在這個隊列中的節點才表明了全部等待鎖的線程:
dummy head

在繼續往下以前咱們再對着上圖總結一下Node節點各個參數的含義:

  • thread:表示當前Node所表明的線程
  • waitStatus:表示節點所處的等待狀態,共享鎖模式下只需關注三種狀態:SIGNAL CANCELLED 初始態(0)
  • prev next:節點的前驅和後繼
  • nextWaiter:進做爲標記,值永遠爲null,表示當前處於獨佔鎖模式

CAS操做

前面咱們提到過,CAS操做大對數是用來改變狀態的,在AQS中也不例外。咱們通常在靜態代碼塊中初始化須要CAS操做的屬性的偏移量:

private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

從這個靜態代碼塊中咱們也能夠看出,CAS操做主要針對5個屬性,包括AQS的3個屬性state,headtail, 以及Node對象的兩個屬性waitStatus,next。說明這5個屬性基本是會被多個線程同時訪問的。

定義完屬性的偏移量以後,接下來就是CAS操做自己了:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

如前面所說,最終CAS操做調用的仍是Unsafe類的compareAndSwapXXX方法。

最後就是自旋了,這一點就沒有什麼好說的了,咱們在後面源碼分析的時候再詳細講。

AQS核心屬性

前面咱們以java併發編程工具類的「三板斧」爲切入點分析了AQS的狀態,隊列和CAS操做,對這個工具類有了初步的認識。接下來,咱們就要開始進入源碼分析了。在進入正式的分析以前,咱們先來總結下AQS核心屬性:

(1)鎖相關的屬性有兩個:

private volatile int state; //鎖的狀態
private transient Thread exclusiveOwnerThread; // 當前持有鎖的線程,注意這個屬性是從AbstractOwnableSynchronizer繼承而來

(2)sync queue相關的屬性有兩個:

private transient volatile Node head; // 隊頭,爲dummy node
private transient volatile Node tail; // 隊尾,新入隊的節點

(3)隊列中的Node中須要關注的屬性有三組:

// 節點所表明的線程
volatile Thread thread;

// 雙向鏈表,每一個節點須要保存本身的前驅節點和後繼節點的引用
volatile Node prev;
volatile Node next;

// 線程所處的等待鎖的狀態,初始化時,該值爲0
volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;

拎了這些屬性後,咱們下面分析源碼就容易不少了。

Example: FairSync in ReentrantLock

前面已經提到, AQS大多數狀況下都是經過繼承來使用的, 子類經過覆寫 tryAcquire 來實現本身的獲取鎖的邏輯,咱們這裏以ReentrantLock爲例來講明鎖的獲取流程。

值得注意的是, ReentrantLock有 公平鎖非公平鎖 兩種實現, 默認實現爲非公平鎖, 這體如今它的構造函數中:

public class ReentrantLock implements Lock, java.io.Serializable {
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
    
    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
    
    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync{
        ...
    }
    
    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        ...
    }
    
    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    // 獲取鎖
    public void lock() {
        sync.lock();
    }
    
    ...
}

能夠看出, FairSync 繼承自 Sync, 而Sync繼承自 AQS, ReentrantLock獲取鎖的邏輯是直接調用了 FairSync 或者 NonfairSync的邏輯.

好了, ReentrantLock 就簡單說到這裏, 之後咱們有機會再詳細講, 這裏直接以 FairLock 爲例, 來逐行分析鎖的獲取:

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    //獲取鎖
    final void lock() {
        acquire(1);
    }
    ...
}

lock 方法調用的 acquire方法來自父類AQS。

這裏首先給出完整的獲取鎖的流程圖, 再逐行分析代碼, 由於看源碼的時候, 代碼會在函數或者循環中來回跳轉,讀者能夠對照如下流程圖, 就不容易被繞暈了.

ReentrantLock 公平鎖獲取流程圖

acquire

acquire 定義在AQS類中,描述了獲取鎖的流程

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

能夠看出, 該方法中涉及了四個方法的調用:

(1)tryAcquire(arg)

該方法由繼承AQS的子類實現, 爲獲取鎖的具體邏輯。

(2)addWaiter(Node mode)

該方法由AQS實現, 負責在獲取鎖失敗後調用, 將當前請求鎖的線程包裝成Node扔到sync queue中去,並返回這個Node。

(3)acquireQueued(final Node node, int arg)

該方法由AQS實現,這個方法比較複雜, 主要對上面剛加入隊列的Node不斷嘗試如下兩種操做之一:

  • 在前驅節點就是head節點的時候,繼續嘗試獲取鎖
  • 將當前線程掛起,使CPU再也不調度它

(4)selfInterrupt

該方法由AQS實現, 用於中斷當前線程。因爲在整個搶鎖過程當中,咱們都是不響應中斷的。那若是在搶鎖的過程當中發生了中斷怎麼辦呢,總不能僞裝沒看見呀。AQS的作法簡單的記錄有沒有有發生過中斷,若是返回的時候發現曾經發生過中斷,則在退出acquire方法以前,就調用selfInterrupt自我中斷一下,就好像將這個發生在搶鎖過程當中的中斷「推遲」到搶鎖結束之後再發生同樣。

從上面的簡單介紹中能夠看出,除了獲取鎖的邏輯 tryAcquire(arg)由子類實現外, 其他方法均由AQS實現。

接下來咱們重點來看 FairSync 所實現的獲取鎖的邏輯:

tryAcquire

tryAcquire 獲取鎖的邏輯其實很簡單——判斷當前鎖有沒有被佔用:

  1. 若是鎖沒有被佔用, 嘗試以公平的方式獲取鎖
  2. 若是鎖已經被佔用, 檢查是否是鎖重入

獲取鎖成功返回true, 失敗則返回false

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 首先獲取當前鎖的狀態
    int c = getState(); 
    
    // c=0 說明當前鎖是avaiable的, 沒有被任何線程佔用, 能夠嘗試獲取
    // 由於是實現公平鎖, 因此在搶佔以前首先看看隊列中有沒有排在本身前面的Node
    // 若是沒有人在排隊, 則經過CAS方式獲取鎖, 就能夠直接退出了
    if (c == 0) {
        if (!hasQueuedPredecessors() 
        /* 爲了閱讀方便, hasQueuedPredecessors源碼就直接貼在這裏了, 這個方法的本質其實是檢測本身是否是head節點的後繼節點,即處在阻塞隊列第一位的節點
            public final boolean hasQueuedPredecessors() {
                Node t = tail; 
                Node h = head;
                Node s;
                return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
            }
        */
        && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current); // 將當前線程設置爲佔用鎖的線程
            return true;
        }
    }
    
    // 若是 c>0 說明鎖已經被佔用了
    // 對於可重入鎖, 這個時候檢查佔用鎖的線程是否是就是當前線程,是的話,說明已經拿到了鎖, 直接重入就行
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        /* setState方法以下:
        protected final void setState(int newState) {
            state = newState;
        }
        */
        return true;
    }
    
    // 到這裏說明有人佔用了鎖, 而且佔用鎖的不是當前線程, 則獲取鎖失敗
    return false;
}

從這裏能夠看出,獲取鎖其實主要就是幹一件事:

將state的狀態經過CAS操做由0改寫成1

因爲是CAS操做,必然是隻有一個線程能執行成功。則執行成功的線程即獲取了鎖,在這以後,纔有權利將exclusiveOwnerThread的值設成本身,從而「坐上鐵王座」。
另外對於可重入鎖,若是當前線程已是獲取了鎖的線程了,它還要注意增長鎖的重入次數。

值得一提的是,這裏修改state狀態的操做,一個用了CAS方法compareAndSetState,一個用了普通的setState方法。這是由於用CAS操做時,當前線程尚未得到鎖,因此可能存在多線程同時在競爭鎖的狀況;而調用setState方法時,是在當前線程已是持有鎖的狀況下,所以對state的修改是安全的,只須要普通的方法就能夠了。

所以,在多線程條件下看源碼時,咱們必定要時刻在心中問本身:

這段代碼是不是線程安全的?同一時刻是否可能有多個線程在執行這行代碼?

addWaiter

若是執行到此方法, 說明前面嘗試獲取鎖的tryAcquire已經失敗了, 既然獲取鎖已經失敗了, 就要將當前線程包裝成Node,加到等待鎖的隊列中去, 由於是FIFO隊列, 因此天然是直接加在隊尾。
方法調用爲:

addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); //將當前線程包裝成Node
    // 這裏咱們用註釋的形式把Node的構造函數貼出來
    // 由於傳入的mode值爲Node.EXCLUSIVE,因此節點的nextWaiter屬性被設爲null
    /*
        static final Node EXCLUSIVE = null;
        
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    */
    Node pred = tail;
    // 若是隊列不爲空, 則用CAS方式將當前節點設爲尾節點
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    // 代碼會執行到這裏, 只有兩種狀況:
    //    1. 隊列爲空
    //    2. CAS失敗
    // 注意, 這裏是併發條件下, 因此什麼都有可能發生, 尤爲注意CAS失敗後也會來到這裏
    enq(node); //將節點插入隊列
    return node;
}

可見,每個處於獨佔鎖模式下的節點,它的nextWaiter必定是null。
在這個方法中,咱們首先會嘗試直接入隊,可是由於目前是在併發條件下,因此有可能同一時刻,有多個線程都在嘗試入隊,致使compareAndSetTail(pred, node)操做失敗——由於有可能其餘線程已經成爲了新的尾節點,致使尾節點再也不是咱們以前看到的那個pred了。

若是入隊失敗了,接下來咱們就須要調用enq(node)方法,在該方法中咱們將經過自旋+CAS的方式,確保當前節點入隊。

enq

能執行到這個方法,說明當前線程獲取鎖已經失敗了,咱們已經把它包裝成一個Node,準備把它扔到等待隊列中去,可是在這一步又失敗了。這個失敗的緣由多是如下兩種之一:

  1. 等待隊列如今是空的,沒有線程在等待。
  2. 其餘線程在當前線程入隊的過程當中率先完成了入隊,致使尾節點的值已經改變了,CAS操做失敗。

在該方法中, 咱們使用了死循環, 即以自旋方式將節點插入隊列,若是失敗則不停的嘗試, 直到成功爲止, 另外, 該方法也負責在隊列爲空時, 初始化隊列,這也說明,隊列是延時初始化的(lazily initialized):

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 若是是空隊列, 首先進行初始化
        // 這裏也能夠看出, 隊列不是在構造的時候初始化的, 而是延遲到須要用的時候再初始化, 以提高性能
        if (t == null) { 
            // 注意,初始化時使用new Node()方法新建了一個dummy節點
            if (compareAndSetHead(new Node()))
                tail = head; // 這裏僅僅是將尾節點指向dummy節點,並無返回
        } else {
        // 到這裏說明隊列已經不是空的了, 這個時候再繼續嘗試將節點加到隊尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

這裏尤爲要注意的是,當隊列爲空時,咱們初始化隊列並無使用當前傳進來的節點,而是:
新建了一個空節點!!!
新建了一個空節點!!!
新建了一個空節點!!!

在新建完空的頭節點以後,咱們並無當即返回,而是將尾節點指向當前的頭節點,而後進入下一輪循環。
在下一輪循環中,尾節點已經不爲null了,此時再將咱們包裝了當前線程的Node加到這個空節點後面。

這就意味着,在這個等待隊列中,頭結點是一個「啞節點」,它不表明任何等待的線程。
head節點不表明任何線程,它就是一個空節點!!!
head節點不表明任何線程,它就是一個空節點!!!
head節點不表明任何線程,它就是一個空節點!!!

尾分叉

在繼續往下以前,咱們先分析enq方法中一個比較有趣的現象,我把它叫作尾分叉。咱們着重看將當前節點設置成尾節點的操做:

} else {
// 到這裏說明隊列已經不是空的了, 這個時候再繼續嘗試將節點加到隊尾
    node.prev = t;
    if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
    }
}

與將大象放到冰箱裏須要三步同樣,將一個節點node添加到sync queue的末尾也須要三步:

  1. 設置node的前驅節點爲當前的尾節點:node.prev = t
  2. 修改tail屬性,使它指向當前節點
  3. 修改原來的尾節點,使它的next指向當前節點

set tail

可是須要注意的,這裏的三步並非一個原子操做,第一步很容易成功;而第二步因爲是一個CAS操做,在併發條件下有可能失敗,第三步只有在第二步成功的條件下才執行。這裏的CAS保證了同一時刻只有一個節點能成爲尾節點,其餘節點將失敗,失敗後將回到for循環中繼續重試。

因此,當有大量的線程在同時入隊的時候,同一時刻,只有一個線程能完整地完成這三步,而其餘線程只能完成第一步,因而就出現了尾分叉:
尾分叉

注意,這裏第三步是在第二步執行成功後才執行的,這就意味着,有可能即便咱們已經完成了第二步,將新的節點設置成了尾節點,此時原來舊的尾節點的next值可能仍是null(由於尚未來的及執行第三步),因此若是此時有線程恰巧從頭節點開始向後遍歷整個鏈表,則它是遍歷不到新加進來的尾節點的,可是這顯然是不合理的,由於如今的tail已經指向了新的尾節點。
另外一方面,當咱們完成了第二步以後,第一步必定是完成了的,因此若是咱們從尾節點開始向前遍歷,已經能夠遍歷到全部的節點。這也就是爲何咱們在AQS相關的源碼中,有時候經常會出現從尾節點開始逆向遍歷鏈表——由於一個節點要能入隊,則它的prev屬性必定是有值的,可是它的next屬性可能暫時尚未值。

至於那些「分叉」的入隊失敗的其餘節點,在下一輪的循環中,它們的prev屬性會從新指向新的尾節點,繼續嘗試新的CAS操做,最終,全部節點都會經過自旋不斷的嘗試入隊,直到成功爲止。

addWaiter總結

至此,咱們就完成了addWaiter(Node.EXCLUSIVE)方法的完整的分析,該方法並不設計到任何關於鎖的操做,它就是解決了併發條件下的節點入隊問題。具體來講就是該方法保證了將當前線程包裝成Node節點加入到等待隊列的隊尾,若是隊列爲空,則會新建一個啞節點做爲頭節點,再將當前節點接在頭節點的後面。

addWaiter(Node.EXCLUSIVE)方法最終返回了表明了當前線程的Node節點,在返回的那一刻,這個節點必然是當時的sync queue的尾節點。

不過值得注意的是,enq方法也是有返回值(雖然這裏咱們並無使用它的返回值),可是它返回的是node節點的前驅節點,這個返回值雖然在addWaiter方法中並無使用,可是在其餘地方會被用到。

咱們再回到獲取鎖的邏輯中:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

當addWaiter(Node.EXCLUSIVE)執行完畢後,節點如今已經被成功添加到sync queue中了,接下來將執行acquireQueued方法。

acquireQueued

該方法是最複雜的一個方法, 也是最難啃的骨頭, 看代碼以前首先簡單的說明幾點:

(1) 能執行到該方法, 說明addWaiter 方法已經成功將包裝了當前Thread的節點添加到了等待隊列的隊尾
(2) 該方法中將再次嘗試去獲取鎖
(3) 在再次嘗試獲取鎖失敗後, 判斷是否須要把當前線程掛起

爲何前面獲取鎖失敗了, 這裏還要再次嘗試獲取鎖呢?
首先, 這裏再次嘗試獲取鎖是基於必定的條件的,即:

當前節點的前驅節點就是HEAD節點

由於咱們知道,head節點就是個啞節點,它不表明任何線程,或者表明了持有鎖的線程,若是當前節點的前驅節點就是head節點,那就說明當前節點已是排在整個等待隊列最前面的了。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 在當前節點的前驅就是HEAD節點時, 再次嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //在獲取鎖失敗後, 判斷是否須要把當前線程掛起
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注意,這裏又來了個自旋操做,咱們一段段來看:

final Node p = node.predecessor();
// 在當前節點的前驅就是HEAD節點時, 再次嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

首先咱們獲取尾節點的前驅節點(由於上一步中返回的就是尾節點,而且這個節點就是表明了當前線程的Node)。
若是前驅節點就是head節點,那說明當前線程已經排在了隊列的最前面,因此這裏咱們再試着去獲取鎖。若是這一次獲取成功了,即tryAcquire方法返回了true, 則咱們將進入if代碼塊,調用setHead方法:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

這個方法將head指向傳進來的node,而且將node的thread和prev屬性置爲null, 以下圖所示:
setHead

能夠看出,這個方法的本質是丟棄原來的head,將head指向已經得到了鎖的node。可是接着又將該node的thread屬性置爲null了,這某種意義上致使了這個新的head節點又成爲了一個啞節點,它不表明任何線程。爲何要這樣作呢,由於在tryAcquire調用成功後,exclusiveOwnerThread屬性就已經記錄了當前獲取鎖的線程了,此處沒有必要再記錄。這某種程度上就是將當前線程從等待隊列裏面拿出來了,是一個變相的出隊操做。

還有另一個特色是,這個setHead方法只是個普通方法,並無像以前enq方法中那樣採用compareAndSetHead方法,這是爲何呢? 同咱們以前分析setState方法同樣:

由於這裏不會產生競爭!

在enq方法中,當咱們設置頭節點的時候,是新建一個啞節點並將它做爲頭節點,這個時候,可能多個線程都在執行這一步,所以咱們須要經過CAS操做保證只有一個線程能成功。
在acquireQueued方法裏,因爲咱們在調用到setHead的時,已經經過tryAcquire方法得到了鎖,這意味着:

  1. 此時沒有其餘線程在建立新的頭節點——由於很明顯此時隊列並非空的,不會執行到建立頭節點的代碼
  2. 此時能執行setHead的只有一個線程——由於要執行到setHead, 必然是tryAcquire已經返回了true, 而同一時刻,只有一個線程能獲取到鎖

綜上,在整個if語句內的代碼即便不加鎖,也是線程安全的,不須要採用CAS操做。

接下來咱們再來看看另外一種狀況,即p == head && tryAcquire(arg)返回了false,此時咱們須要判斷是否須要將當前線程掛起:

shouldParkAfterFailedAcquire

從函數名也能夠看出, 該方法用於決定在獲取鎖失敗後, 是否將線程掛起.

決定的依據就是前驅節點的waitStatus值。

(有沒發現一直到如今,前面的分析中咱們都沒有用到waitStatus的值,終於在這裏要用到了)

咱們先來回顧一下waitStatus有哪些狀態值:

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

一共有四種狀態,可是咱們在開篇的時候就說過,在獨佔鎖鎖的獲取操做中,咱們只用到了其中的兩個——CANCELLEDSIGNAL
固然,前面咱們在建立節點的時候並無給waitStatus賦值,所以每個節點最開始的時候waitStatus的值都被初始化爲0,即不屬於上面任何一種狀態。

那麼CANCELLEDSIGNAL表明什麼意思呢?

CANCELLED狀態很好理解,它表示Node所表明的當前線程已經取消了排隊,即放棄獲取鎖了。

SIGNAL這個狀態就有點意思了,它不是表徵當前節點的狀態,而是當前節點的下一個節點的狀態。
當一個節點的waitStatus被置爲SIGNAL,就說明它的下一個節點(即它的後繼節點)已經被掛起了(或者立刻就要被掛起了),所以在當前節點釋放了鎖或者放棄獲取鎖時,若是它的waitStatus屬性爲SIGNAL,它還要完成一個額外的操做——喚醒它的後繼節點。

有意思的是,SIGNAL這個狀態的設置經常不是節點本身給本身設的,而是後繼節點設置的,這裏給你們打個比方:

好比說出去吃飯,在人多的時候常常要排隊取號,你取到了8號,前面還有7我的在等着進去,你就和排在你前面的7號講「哥們,我如今排在你後面,隊伍這麼長,估計一時半會兒也輪不到我,我去那邊打個盹,一會輪到你進去了(release)或者你不想等了(cancel), 麻煩你都叫醒我」,說完,你就把他的waitStatus值設成了SIGNAL

換個角度講,當咱們決定要將一個線程掛起以前,首先要確保本身的前驅節點的waitStatus爲SIGNAL,這就至關於給本身設一個鬧鐘再去睡,這個鬧鐘會在恰當的時候叫醒本身,不然,若是一直沒有人來叫醒本身,本身可能就一直睡到天荒地老了。

理解了CANCELLEDSIGNAL這兩個狀態的含義後,咱們再來看看shouldParkAfterFailedAcquire是怎麼用的:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 得到前驅節點的ws
    if (ws == Node.SIGNAL)
        // 前驅節點的狀態已是SIGNAL了,說明鬧鐘已經設了,能夠直接睡了
        return true;
    if (ws > 0) {
        // 當前節點的 ws > 0, 則爲 Node.CANCELLED 說明前驅節點已經取消了等待鎖(因爲超時或者中斷等緣由)
        // 既然前驅節點不等了, 那就繼續往前找, 直到找到一個還在等待鎖的節點
        // 而後咱們跨過這些不等待鎖的節點, 直接排在等待鎖的節點的後面 (是否是很開心!!!)
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 前驅節點的狀態既不是SIGNAL,也不是CANCELLED
        // 用CAS設置前驅節點的ws爲 Node.SIGNAL,給本身定一個鬧鐘
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

能夠看出,shouldParkAfterFailedAcquire所作的事情無外乎:

  • 若是爲前驅節點的waitStatus值爲 Node.SIGNAL 則直接返回 true
  • 若是爲前驅節點的waitStatus值爲 Node.CANCELLED (ws > 0), 則跳過那些節點, 從新尋找正常等待中的前驅節點,而後排在它後面,返回false
  • 其餘狀況, 將前驅節點的狀態改成 Node.SIGNAL, 返回false

注意了,這個函數只有在當前節點的前驅節點的waitStatus狀態自己就是SIGNAL的時候纔會返回true, 其餘時候都會返回false, 咱們再回到這個方法的調用處:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 咱們在這裏!在這裏!!在這裏!!!
            // 咱們在這裏!在這裏!!在這裏!!!
            // 咱們在這裏!在這裏!!在這裏!!!
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

能夠看出,當shouldParkAfterFailedAcquire返回false後,會繼續回到循環中再次嘗試獲取鎖——這是由於此時咱們的前驅節點可能已經變了(搞很差前驅節點就變成head節點了呢)。

當shouldParkAfterFailedAcquire返回true,即當前節點的前驅節點的waitStatus狀態已經設爲SIGNAL後,咱們就能夠安心的將當前線程掛起了,此時咱們將調用parkAndCheckInterrupt:

parkAndCheckInterrupt

到這個函數已是最後一步了, 就是將線程掛起, 等待被喚醒

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 線程被掛起,停在這裏再也不往下執行了
    return Thread.interrupted();
}

注意!LockSupport.park(this)執行完成後線程就被掛起了,除非其餘線程unpark了當前線程,或者當前線程被中斷了,不然代碼是不會再往下執行的,後面的Thread.interrupted()也不會被執行,那後面這個Thread.interrupted()是幹什麼用的呢? 咱們下一篇再講。

總結

  1. AQS中用state屬性表示鎖,若是能成功將state屬性經過CAS操做從0設置成1即獲取了鎖
  2. 獲取了鎖的線程才能將exclusiveOwnerThread設置成本身
  3. addWaiter負責將當前等待鎖的線程包裝成Node,併成功地添加到隊列的末尾,這一點是由它調用的enq方法保證的,enq方法同時還負責在隊列爲空時初始化隊列。
  4. acquireQueued方法用於在Node成功入隊後,繼續嘗試獲取鎖(取決於Node的前驅節點是否是head),或者將線程掛起
  5. shouldParkAfterFailedAcquire方法用於保證當前線程的前驅節點的waitStatus屬性值爲SIGNAL,從而保證了本身掛起後,前驅節點會負責在合適的時候喚醒本身。
  6. parkAndCheckInterrupt方法用於掛起當前線程,並檢查中斷狀態。
  7. 若是最終成功獲取了鎖,線程會從lock()方法返回,繼續往下執行;不然,線程會阻塞等待。

(完)

下一篇: 逐行分析AQS源碼(2)——獨佔鎖的釋放

查看更多系列文章:系列文章目錄

相關文章
相關標籤/搜索