ReentrantLock 源碼分析以及 AQS (一)

前言

JDK1.5 以後發佈了JUC(java.util.concurrent),用於解決多線程併發問題。AQS 是一個特別重要的同步框架,不少同步類都藉助於 AQS 實現了對線程同步狀態的管理。java

AQS 中最主要的就是獨佔鎖和共享鎖的獲取和釋放,以及提供了一些可中斷的獲取鎖,超時等待鎖等方法。node

ReentranLock 是基於 AQS 獨佔鎖的一個實現。ReentrantReadWriteLock 是基於 AQS 共享鎖的一個讀寫鎖實現。原本打算一篇文章裏面寫完獨佔鎖和共享鎖,可是發現篇幅太長了,也不易於消化。安全

所以,本篇就先結合 ReentrantLock 源碼,分析 AQS 的獨佔鎖獲取和釋放。以及 ReentrantLock 的公平鎖和非公平鎖實現。數據結構

下一篇再寫 ReentrantReadWriteLock 讀寫鎖源碼,以及 AQS 共享鎖的獲取和釋放。多線程

在正式講解源碼以前,牆裂建議讀者作一些準備工做,最好對如下知識有必定的瞭解,這樣閱讀起來源碼會比較輕鬆(由於,我當初剛開始接觸多線程時,直接看 AQS 簡直是一臉懵逼,就像讀天書同樣。。)。併發

  1. 瞭解雙向鏈表的數據結構,以及隊列的入隊出隊等操做。
  2. LockSupport 的 park,unpark 方法,以及對線程的 interrupt 幾個方法瞭解(可參考:LockSupport的 park 方法是怎麼響應中斷的?)。
  3. 對 CAS 和自旋機制有必定的瞭解。

AQS 同步隊列

AQS 內部維護了一個 FIFO(先進先出)的雙向隊列。它的內部是用雙向鏈表來實現的,每一個數據節點(Node)中都包含了當前節點的線程信息,還有它的先後兩個指針,分別指向前驅節點和後繼節點。下邊看一下 Node 的屬性和方法:框架

static final class Node {
    //能夠認爲是一種標記,代表了這個 node 是以共享模式在同步隊列中等待
    static final Node SHARED = new Node();
    //也是一種標記,代表這個 node 是以獨佔模式在同步隊列中等待
    static final Node EXCLUSIVE = null;

    /** waitStatus 常量值 */
    //說明當前節點被取消,緣由有多是超時,或者被中斷。
    //節點被取消的狀態是不可逆的,也就是說此節點會一直停留在取消狀態,不會轉變。
    static final int CANCELLED =  1;
    //說明後繼節點的線程被 park 阻塞,所以當前線程須要在釋放鎖或者被取消時,喚醒後繼節點
    static final int SIGNAL    = -1;
    //說明線程在 condition 條件隊列等待
    static final int CONDITION = -2;
    //在共享模式中用,代表下一個共享線程應該無條件傳播
    static final int PROPAGATE = -3;

    
    //當前線程的等待狀態,除了以上四種值,還有一個值 0 爲初始化狀態(條件隊列的節點除外)。
    //注意這個值修改時是經過 CAS ,以保證線程安全。
    volatile int waitStatus;

    //前驅節點
    volatile Node prev;

    //後繼節點
    volatile Node next;

    //當前節點中的線程,經過構造函數初始化,出隊時會置空(這個後續說,重點強調)
    volatile Thread thread;

    //有兩種狀況。1.在 condition 條件隊列中的後一個節點 
    //2. 一個特殊值 SHARED 用於代表當前是共享模式(由於條件隊列只存在於獨佔模式)
    Node nextWaiter;

    //是不是共享模式,理由同上
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //返回前驅節點,若是爲空拋出空指針
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

另外,在 AQS 類中,還會記錄同步隊列的頭結點和尾結點:函數

//同步隊列的頭結點,是懶加載的,即不會當即建立一個同步隊列,
    //只有當某個線程獲取不到鎖,須要排隊的時候,纔會初始化頭結點
    private transient volatile Node head;

    //同步隊列的尾結點,一樣是懶加載。
    private transient volatile Node tail;

獨佔鎖

這部分就結合 ReentrantLock 源碼分析 AQS 的獨佔鎖是怎樣得到和釋放鎖的。源碼分析

非公平鎖

首先,咱們從 ReentrantLock 開始分析,它有兩個構造方法,一個構造,能夠傳入一個 boolean 類型的參數,代表是用公平鎖仍是非公平鎖模式。另外一個構造方法,不傳入任何參數,則默認用非公平鎖。學習

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

NonfairSync 和 FairSync 都繼承自 Sync ,它們都是 ReentranLock 的內部類。 而Sync 類又繼承自 AQS (AbstractQueuedSynchronizer)。

static final class NonfairSync extends Sync {
}

static final class FairSync extends Sync {
}

abstract static class Sync extends AbstractQueuedSynchronizer {
}

知道了它們之間的繼承關係,咱們就從非公平鎖的加鎖方法做爲入口,跟蹤源碼。由於非公平鎖的流程講明白以後,公平鎖大體流程都同樣,只是多了一個條件判斷(這個,一下子後邊細講,會作對比)。

NonfairSync.lock

咱們看下公平鎖的獲取鎖的方法:

final void lock() {
    //經過 CAS 操做把 state 設置爲 1
    if (compareAndSetState(0, 1))
        //若是設值成功,說明加鎖成功,保存當前得到鎖的線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //若是加鎖失敗,則執行 AQS 的acquire 方法
        acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire

這個方法的邏輯是:

  1. 經過 tryAcquire 方法,嘗試獲取鎖,若是成功,則返回 true,失敗返回 false 。
  2. tryAcquire 失敗以後,會先調用 addWaiter 方法,把當前線程封裝成 node 節點,加入同步隊列(獨佔模式)。
  3. acquireQueued 方法會把剛加入隊列的 node 做爲參數,經過自旋去得到鎖。

tryAcquire

這是一個模板方法,具體的實現須要看它的子類,這裏對應的就是 ReentrantLock.NonfairSync.tryAcquire 方法。咱們看一下:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    //當前線程
    final Thread current = Thread.currentThread();
    //獲取當前的同步狀態,若爲 0 ,表示無鎖狀態。若大於 0,表示已經有線程搶到了鎖。
    int c = getState();
    if (c == 0) {
        //而後經過 CAS 操做把 state 的值改成 1。
        if (compareAndSetState(0, acquires)) {
            // CAS 成功以後,保存當前得到鎖的線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若是 state 大於0,則判斷當前線程是不是得到鎖的線程,是的話,可重入。
    else if (current == getExclusiveOwnerThread()) {
        //因爲 ReentrantLock 是可重入的,因此每重入一次 state 就加 1 。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

addWaiter

若是獲取鎖失敗以後,就會調用 addWaiter 方法,把當前線程加入同步隊列。

private Node addWaiter(Node mode) {
    //把當前線程封裝成 Node ,而且是獨佔模式
    Node node = new Node(Thread.currentThread(), mode);
    //嘗試快速入隊,若是失敗,則會調用 enq 入隊方法。enq 會初始化隊列。
    Node pred = tail;
    //若是 tail 不爲空,說明當前隊列中已經有節點
    if (pred != null) { 
        //把當前 node 的 prev 指針指向 tail
        node.prev = pred;
        //經過 CAS 把 node 設置爲 tail,即添加到隊尾
        if (compareAndSetTail(pred, node)) {
            //把舊的 tail 節點的 next 指針指向當前 node
            pred.next = node;
            return node;
        }
    }
    //當 tail 爲空時,把 node 添加到隊列,若是須要的話,先進行隊列初始化
    enq(node);
    //入隊成功以後,返回當前 node
    return node;
}

enq

經過自旋,把當前節點加入到隊列中

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //若是 tail爲空,說明隊列未初始化
        if (t == null) { 
            //建立一個空節點,經過 CAS把它設置爲頭結點
            if (compareAndSetHead(new Node()))
                //此時只有一個 head頭節點,所以把 tail也指向它
                tail = head;
        } else {
            //第二次自旋時,tail不爲空,因而把當前節點的 prev指向 tail節點
            node.prev = t;
            //經過 CAS把 tail節點設置爲當前 node節點
            if (compareAndSetTail(t, node)) {
                //把舊的 tail節點的 next指向當前 node
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued

入隊成功以後,就會調用 acquireQueued 方法自旋搶鎖。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //獲取當前節點的前驅節點
            final Node p = node.predecessor();
            //若是前驅節點就是 head 節點,就調用 tryAcquire 方法搶鎖
            if (p == head && tryAcquire(arg)) {
                //若是搶鎖成功,就把當前 node 設置爲頭結點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                //搶鎖成功後,會把線程中斷標誌返回出去,終止for循環
                return interrupted;
            }
            //若是搶鎖失敗,就根據前驅節點的 waitStatus 狀態判斷是否須要把當前線程掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                //線程被掛起時,判斷是否被中斷過
                parkAndCheckInterrupt())
                //注意此處,若是被線程被中斷過,須要把中斷標誌從新設置一下
                interrupted = true;
        }
    } finally {
        if (failed)
            //若是拋出異常,則取消鎖的獲取,進行出隊操做
            cancelAcquire(node);
    }
}

setHead

經過代碼,咱們能夠看到,當前的同步隊列中,只有第二個節點纔有資格搶鎖。若是搶鎖成功,則會把它設置爲頭結點。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

須要注意的是,這個方法,會把頭結點的線程設置爲 null 。想一下,爲何?

由於,此時頭結點的線程已經搶鎖成功,須要出隊了。天然的,隊列中也就不該該存在這個線程了。

PS:由 enq 方法,還有 setHead 方法,咱們能夠發現,頭結點的線程老是爲 null。這是由於,頭結點要麼是剛初始化的空節點,要麼是搶到鎖的線程出隊了。所以,咱們也經常把頭結點叫作虛擬節點(不存儲任何線程)。

shouldParkAfterFailedAcquire

以上是搶鎖成功的狀況,那麼搶鎖失敗了呢?這時,咱們須要判斷是否應該把當前線程掛起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //獲取當前節點的前驅節點的 waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //若是 ws = -1 ,說明當前線程能夠被前驅節點正常喚醒,因而就能夠安全的 park了
        return true;
    if (ws > 0) {
        //若是 ws > 0,說明前驅節點被取消,則會從當前節點依次向前查找,
        //直到找到第一個沒有被取消的節點,把那個節點的 next 指向當前 node
        //這一步,是爲了找到一個能夠把當前線程喚起的前驅節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //若是 ws 爲 0,或者 -3(共享鎖狀態),則把它設置爲 -1 
        //返回 false,下次自旋時,就會判斷等於 -1,返回 true了
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

若是 shouldParkAfterFailedAcquire 返回 true,說明當前線程須要被掛起。所以,就執行此方法,同時檢查線程是否被中斷。

private final boolean parkAndCheckInterrupt() {
    //把當前線程掛起,則 acquireQueued 方法的自旋就會暫停,等待前驅節點 unpark
    LockSupport.park(this);
    //返回當前節點是否被中斷的標誌,注意此方法會把線程的中斷標誌清除。
    //所以,返回上一層方法時,須要設置 interrupted = true 把中斷標誌從新設置,以便上層代碼能夠處理中斷
    return Thread.interrupted();
}

想一下,爲何搶鎖失敗後,須要判斷是否把線程掛起?

由於,若是搶不到鎖,而且還不把線程掛起,acquireQueued 方法就會一直自旋下去,這樣你的CPU能受得了嗎。

cancelAcquire

當不停的自旋搶鎖時,若發生了異常,就會調用此方法,取消正在嘗試獲取鎖的線程。node 的位置分爲三種狀況,見下面註釋,

private void cancelAcquire(Node node) {

    if (node == null)
        return;

    // node 再也不指向任何線程
    node.thread = null;

    Node pred = node.prev;
    //從當前節點不斷的向前查找,直到找到一個有效的前驅節點
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    //把 node 的 ws 設置爲 -1 
    node.waitStatus = Node.CANCELLED;

    // 1.若是 node 是 tail,則把 tail 更新爲 node,並把 pred.next 指向 null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        //2.若是 node 既不是 tail,也不是 head 的後繼節點,就把 node的前驅節點的 ws 設置爲 -1
        //最後把 node 的前驅節點的 next 指向 node 的後繼節點
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            //3.若是 node是 head 的後繼節點,則直接喚醒 node 的後繼節點。
            //這個也很好理解,由於 node 是隊列中惟一有資格嘗試獲取鎖的節點,
            //它放棄了資格,固然有義務把後繼節點喚醒,以讓後繼節點嘗試搶鎖。
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

unparkSuccessor

這個喚醒方法就比較簡單了,

private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        //從尾結點向前依次遍歷,直到找到距離當前 node 最近的一個有效節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //把這個有效節點的線程喚醒,
        //喚醒以後,當前線程就能夠繼續自旋搶鎖了,(回到 park 的地方)
        LockSupport.unpark(s.thread);
}

下面畫一個流程圖更直觀的查看整個獲取鎖的過程。

公平鎖

公平鎖和非公平鎖的總體流程大體相同,只是在搶鎖以前先判斷一下是否已經有人排在前面,若是有的話,就不執行搶鎖。咱們經過源碼追蹤到 FairSync.tryAcquire 方法。會發現,多了一個 hasQueuedPredecessors 方法。

hasQueuedPredecessors

這個方法判斷邏輯稍微有點複雜,有多種狀況。

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}
  1. 若是 h == t,說明 h 和 t 都爲空(此時隊列還未初始化)或者它們是同一個節點(說明隊列已經初始化,而且只有一個節點,此時爲 enq 方法第一次自旋成功後)。此時,返回false。
  2. 若是 h != t,則判斷 head.next == null 是否成立,若是成立,則返回 true。這種狀況發生在有其餘線程第一次入隊時。在 AQS 的 enq 入隊方法,設置頭結點成功以後 compareAndSetHead(new Node()) ,還未執行 tail = head 時(仔細想想爲何?)。此時 tail = null , head = new Node(),head.next = null。
  3. 若是 h != t,而且 head.next != null,說明此時隊列中至少已經有兩個節點,則判斷 head.next 是不是當前線程。若是是,返回 false(注意是 false哦,由於用了 !),不然返回 true 。

總結:以上幾種狀況,只有最終返回 false 時,纔會繼續往下執行。由於 false,說明沒有線程排在當前線程前面,因而經過 CAS 嘗試把 state 值設置爲 1。若成功,則方法返回。若失敗,一樣須要去排隊。

公平鎖和非公平鎖區別

舉個例子來對比公平鎖和非公平鎖。好比,如今到飯點了,你們都到食堂打飯。把隊列中的節點比做排隊打飯的人,每一個打飯窗口都有一個管理員,只有排隊的人從管理員手中搶到鎖,纔有資格打飯。打飯的過程就是線程執行的過程。

若是,你發現前面沒有人在排隊,那麼就能夠直接從管理員手中拿到鎖,而後打飯。對於公平鎖來講,若是你前面有人在打飯,那麼你就要排隊到他後面(圖中B),等他打完以後,把鎖還給管理員。那麼,你就能夠從管理員手中拿到鎖,而後打飯了。後面的人依次排隊。這就是FIFO先進先出的隊列模型。

對於非公平鎖來講,若是你是圖中的 B,當 A 把鎖還給管理員後,有可能有另一個 D 插隊過來直接把鎖搶走。那麼,他就能夠打飯,你只能繼續等待了。

因此,能夠看出來。公平鎖是嚴格按照排隊的順序來的,先來後到嘛,你來的早,就能夠早點獲取鎖。優勢是,這樣不會形成某個線程等待時間過長,由於你們都是中規中矩的在排隊。而缺點呢,就是會頻繁的喚起線程,增長 CPU的開銷。

非公平鎖的優勢是吞吐量大,由於有可能正好鎖可用,而後線程來了,直接搶到鎖了,不用排隊了,這樣也減小了 CPU 喚醒排隊線程的開銷。 可是,缺點也很明顯,你說我排隊排了好長時間了,終於輪到我打飯了,憑什麼其餘人剛過來就插到我前面,比我還先打到飯,也太不公平了吧,後邊一大堆排隊的人更是怨聲載道。這要是每一個人來了都插到我前面去,我豈不是要餓死了。

獨佔鎖的釋放

咱們從 ReentrantLock 的 unlock 方法看起:

public void unlock() {
    //調用 AQS 的 release 方法
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //若是頭結點不爲空,而且 ws 不爲 0,則喚起後繼節點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這段邏輯比較簡單,當線程釋放鎖以後,就會喚醒後繼節點。 unparkSuccessor 已講,再也不贅述。而後看下 tryRelease 方法,公平鎖和非公平鎖走的是同一個方法。

protected final boolean tryRelease(int releases) {
    //每釋放一次鎖,state 值就會減 1,由於以前可能有鎖的重入
    int c = getState() - releases;
    //若是當前線程不是搶到鎖的線程,則拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        //只有 state 的值減到 0 的時候,纔會所有釋放鎖
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

由於,ReentrantLock 支持鎖的重入,因此每次重入 state 值都會加 1,相應的每次釋放鎖, state 的值也會減 1 。因此,這也是爲何每一個 lock 方法最後都要有一個 unlock 方法釋放鎖,它們的個數須要保證相同。

當 state 值爲 0 的時候,說明鎖徹底釋放。其餘線程才能夠有機會搶到鎖。

結語

以上已經講解了獨佔鎖主要的獲取方法 acquire ,另外還有一些其餘相關方法,再也不贅述,由於主要邏輯都是同樣的,只有部分稍有不一樣,只要理解了 acquire ,這些都是相通的。如 acquireInterruptibly 方法,它能夠在獲取鎖的時候響應中斷。還有超時獲取鎖的方法 doAcquireNanos 能夠設定獲取鎖的超時時間,超時以後就返回失敗。

下篇預告:分析 ReentrantReadWriteLock 讀寫鎖源碼,以及 AQS 共享鎖的獲取和釋放,敬請期待。

若是本文對你有用,歡迎點贊,評論,轉發。

學習是枯燥的,也是有趣的。我是「煙雨星空」,歡迎關注,可第一時間接收文章推送。

相關文章
相關標籤/搜索