線程間的同步與通訊(5)——ReentrantLock源碼分析

前言

系列文章目錄java

上一篇 咱們學習了lock接口,本篇咱們就以ReentrantLock爲例,學習一下Lock鎖的基本的實現。咱們先來看看Lock接口中的方法與ReentrantLock對其實現的對照表:node

Lock 接口 ReentrantLock 實現
lock() sync.lock()
lockInterruptibly() sync.acquireInterruptibly(1)
tryLock() sync.nonfairTryAcquire(1)
tryLock(long time, TimeUnit unit) sync.tryAcquireNanos(1, unit.toNanos(timeout))
unlock() sync.release(1)
newCondition() sync.newCondition()

從表中能夠看出,ReentrantLock對於Lock接口的實現都是直接「轉交」給sync對象的。編程

核心屬性

ReentrantLock只有一個sync屬性,別看只有一個屬性,這個屬性提供了全部的實現,咱們上面介紹ReentrantLock對Lock接口的實現的時候就說到,它對全部的Lock方法的實現都調用了sync的方法,這個sync就是ReentrantLock的屬性,它繼承了AQS.segmentfault

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
    abstract void lock();
    //...
}

在Sync類中,定義了一個抽象方法lock,該方法應當由繼承它的子類來實現,關於繼承它的子類,咱們在下一節分析構造函數時再看。併發

構造函數

ReentrantLock共有兩個構造函數:函數

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

默認的構造函數使用了非公平鎖,另一個構造函數經過傳入一個boolean類型的fair變量來決定使用公平鎖仍是非公平鎖。其中,FairSync和NonfairSync的定義以下:工具

static final class FairSync extends Sync {
    
    final void lock() {//省略實現}

    protected final boolean tryAcquire(int acquires) {//省略實現}
}

static final class NonfairSync extends Sync {
    
    final void lock() {//省略實現}

    protected final boolean tryAcquire(int acquires) {//省略實現}
}

這裏爲何默認建立的是非公平鎖呢?由於非公平鎖的效率高呀,當一個線程請求非公平鎖時,若是在發出請求的同時該鎖變成可用狀態,那麼這個線程會跳過隊列中全部的等待線程而得到鎖。有的同窗會說了,這不就是插隊嗎?
沒錯,這就是插隊!這也就是爲何它被稱做非公平鎖。
之因此使用這種方式是由於:源碼分析

在恢復一個被掛起的線程與該線程真正運行之間存在着嚴重的延遲。

在公平鎖模式下,你們講究先來後到,若是當前線程A在請求鎖,即便如今鎖處於可用狀態,它也得在隊列的末尾排着,這時咱們須要喚醒排在等待隊列隊首的線程H(在AQS中實際上是次頭節點),因爲恢復一個被掛起的線程而且讓它真正運行起來須要較長時間,那麼這段時間鎖就處於空閒狀態,時間和資源就白白浪費了,非公平鎖的設計思想就是將這段白白浪費的時間利用起來——因爲線程A在請求鎖的時候自己就處於運行狀態,所以若是咱們此時把鎖給它,它就會當即執行本身的任務,所以線程A有機會在線程H徹底喚醒以前得到、使用以及釋放鎖。這樣咱們就能夠把線程H恢復運行的這段時間給利用起來了,結果就是線程A更早的獲取了鎖,線程H獲取鎖的時刻也沒有推遲。所以提升了吞吐量。性能

固然,非公平鎖僅僅是在當前線程請求鎖,而且鎖處於可用狀態時有效,當請求鎖時,鎖已經被其餘線程佔有時,就只能仍是老老實實的去排隊了。學習

不管是非公平鎖的實現NonfairSync仍是公平鎖的實現FairSync,它們都覆寫了lock方法和tryAcquire方法,這兩個方法都將用於獲取一個鎖。

Lock接口方法實現

lock()

公平鎖實現

關於ReentrantLock對於lock方法的公平鎖的實現邏輯,咱們在逐行分析AQS源碼(1)——獨佔鎖的獲取中已經講過了,這裏再也不贅述。若是你尚未看過那篇文章或者還不瞭解AQS,建議先去看一下那一篇文章,而後再讀下文。

非公平鎖實現

接下來咱們看看非公平鎖的實現邏輯:

// NonfairSync中的lock方法
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

對比公平鎖中的lock方法:

// FairSync中的lock方法
final void lock() {
    acquire(1);
}

可見,相比公平鎖,非公平鎖在當前鎖沒有被佔用時,能夠直接嘗試去獲取鎖,而不用排隊,因此它在一開始就嘗試使用CAS操做去搶鎖,只有在該操做失敗後,纔會調用AQS的acquire方法。

因爲acquire方法中除了tryAcquire由子類實現外,其他都由AQS實現,咱們在前面的文章中已經介紹的很詳細了,這裏再也不贅述,咱們僅僅看一下非公平鎖的tryAcquire方法實現:

// NonfairSync中的tryAcquire方法實現
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

它調用了Sync類的nonfairTryAcquire方法:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 只有這一處和公平鎖的實現不一樣,其它的徹底同樣。
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

咱們能夠拿它和公平鎖的tryAcquire對比一下:

// FairSync中的tryAcquire方法實現
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

看見沒?這兩個方法幾乎如出一轍,惟一的區別就是非公平鎖在搶鎖時再也不須要調用hasQueuedPredecessors方法先去判斷是否有線程排在本身前面,而是直接爭鎖,其它的徹底和公平鎖一致。

lockInterruptibly()

前面的lock方法是阻塞式的,搶到鎖就返回,搶不到鎖就將線程掛起,而且在搶鎖的過程當中是不響應中斷的(關於不響應中斷,見這篇文章末尾的分析),lockInterruptibly提供了一種響應中斷的方式,在ReentrantLock中,不管是公平鎖仍是非公平鎖,這個方法的實現都是同樣的

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

他們都調用了AQS的acquireInterruptibly方法:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

該方法首先檢查當前線程是否已經被中斷過了,若是已經被中斷了,則當即拋出InterruptedException(這一點是lockInterruptibly要求的,參見上一篇Lock接口的介紹)。

若是調用這個方法時,當前線程尚未被中斷過,則接下來先嚐試用普通的方法來獲取鎖(tryAcquire)。若是獲取成功了,則萬事大吉,直接就返回了;不然,與前面的lock方法同樣,咱們須要將當前線程包裝成Node扔進等待隊列,所不一樣的是,此次,在隊列中嘗試獲取鎖時,若是發生了中斷,咱們須要對它作出響應, 並拋出異常

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return; //與acquireQueued方法的不一樣之處
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); //與acquireQueued方法的不一樣之處
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

若是你在上面分析lock方法的時候已經理解了acquireQueued方法,那麼再看這個方法就很輕鬆了,咱們把lock方法中的acquireQueued拿出來和上面對比一下:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false; //不一樣之處
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted; //不一樣之處
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; //不一樣之處
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

經過代碼對比能夠看出,doAcquireInterruptiblyacquireQueued(addWaiter(Node.EXCLUSIVE), arg))的調用本質上講並沒有區別。只不過對於addWaiter(Node.EXCLUSIVE),一個是外部調用,經過參數傳進來;一個是直接在方法內部調用。因此這兩個方法的邏輯幾乎是同樣的,惟一的不一樣就是在doAcquireInterruptibly中,當咱們檢測到中斷後,再也不是簡單的記錄中斷狀態,而是直接拋出InterruptedException

當拋出中斷異常後,在返回前,咱們將進入finally代碼塊進行善後工做,很明顯,此時failed是爲true的,咱們將調用cancelAcquire方法:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 由當前節點向前遍歷,跳過那些已經被cancel的節點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    
    // 從當前節點向前開始查找,找到第一個waitStatus>0的Node, 該節點爲pred
    // predNext便是pred節點的下一個節點
    // 到這裏可知,pred節點是沒有被cancel的節點,可是pred節點日後,一直到當前節點Node都處於被Cancel的狀態
    Node predNext = pred.next;

    //將當前節點的waitStatus的狀態設爲Node.CANCELLED
    node.waitStatus = Node.CANCELLED;

    // 若是當前節點是尾節點,則將以前找到的節點pred從新設置成尾節點,並將pred節點的next屬性由predNext修改爲Null
    // 這一段本質上是將pred節點後面的節點所有移出隊列,由於它們都被cancel掉了
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // 到這裏說明當前節點已經不是尾節點了,或者設置新的尾節點失敗了
        // 咱們前面說過,併發條件下,什麼都有可能發生
        // 即在當前線程運行這段代碼的過程當中,其餘線程可能已經入隊了,成爲了新的尾節點
        // 雖然咱們以前已經將當前節點的waitStatus設爲了CANCELLED 
        // 可是由咱們在分析lock方法的文章可知,新的節點入隊後會設置鬧鐘,將找一個沒有CANCEL的前驅節點,將它的status設置成SIGNAL以喚醒本身。
        // 因此,在當前節點的後繼節點入隊後,可能將當前節點的waitStatus修改爲了SIGNAL
        // 而在這時,咱們發起了中斷,又將這個waitStatus修改爲CANCELLED
        // 因此在當前節點出隊前,要負責喚醒後繼節點。
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

這個cancelAcquire方法不只是取消了當前節點的排隊,還會同時將當前節點以前的那些已經CANCEL掉的節點移出隊列。不過這裏尤爲須要注意的是,這裏是在併發條件下,此時此刻,新的節點可能已經入隊了,成爲了新的尾節點,這將會致使node == tail && compareAndSetTail(node, pred)這一條件失敗。

這個函數的前半部分是就是基於當前節點就是隊列的尾節點的,即在執行這個函數時,沒有新的節點入隊,這部分的邏輯比較簡單,你們直接看代碼中的註釋解釋便可。

然後半部分是基於有新的節點加進來,當前節點已經再也不是尾節點的狀況,咱們詳細看看這else部分:

if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
} else {
    int ws;
    if (pred != head &&
        ((ws = pred.waitStatus) == Node.SIGNAL ||
         (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
        pred.thread != null) {
        Node next = node.next;
        if (next != null && next.waitStatus <= 0)
            compareAndSetNext(pred, predNext, next); //將pred節點的後繼節點改成當前節點的後繼節點
    } else {
        unparkSuccessor(node);
    }

    node.next = node; // help GC
}

(這裏再說明一下pred變量所表明的含義:它表示了從當前節點向前遍歷所找到的第一個沒有被cancel的節點。)

執行到else代碼塊,則咱們目前的情況以下:

  1. 當前線程被中斷了,咱們已經將它的Node的waitStatus屬性設爲CANCELLED,thread屬性置爲null
  2. 在執行這個方法期間,又有其餘線程加入到隊列中來,成爲了新的尾節點,使得當前線程已經不是隊尾了

在這種狀況下,咱們將執行if語句,將pred節點的後繼節點改成當前節點的後繼節點(compareAndSetNext(pred, predNext, next)),即將從pred節點開始(不包含pred節點)一直到當前節點(包括當前節點)之間的全部節點所有移出隊列,由於他們都是被cancel的節點。固然這是基於必定條件的,條件爲:

  1. pred節點不是頭節點
  2. pred節點的thread不爲null
  3. pred節點的waitStatus屬性是SIGNAL或者是小於等於0可是被咱們成功的設置成signal

上面這三個條件保證了pred節點確實是一個正在正常等待鎖的線程,而且它的waitStatus屬性爲SIGNAL。
若是這一條件沒法被知足,那麼咱們將直接經過unparkSuccessor喚醒它的後繼節點。

到這裏,咱們總結一下cancelAcquire方法:

  1. 若是要cancel的節點已是尾節點了,則在咱們後面並無節點須要喚醒,咱們只須要從當前節點(即尾節點)開始向前遍歷,找到全部已經cancel的節點,將他們移出隊列便可
  2. 若是要cancel的節點後面還有別的節點,而且咱們找到的pred節點處於正常等待狀態,咱們仍是直接將從當前節點開始,到pred節點直接的全部節點,所有移出隊列,這裏並不須要喚醒當前節點的後繼節點,由於它已經接在了pred的後面,pred的waitStatus已經被置爲SIGNAL,它會負責喚醒後繼節點
  3. 若是上面的條件不知足,按說明當前節點往前已經沒有在等待中的線程了,咱們就直接將後繼節點喚醒。

有的同窗就要問了,那第3條只是把當前節點的後繼節點喚醒了,並無將當前節點移除隊列呀?可是當前節點已經取消排隊了,不是應該移除隊列嗎?
彆着急,在後繼節點被喚醒後,它會在搶鎖時調用的shouldParkAfterFailedAcquire方法裏面跳過已經CANCEL的節點,那個時候,當前節點就會被移出隊列了。

tryLock()

因爲tryLock僅僅是用於檢查鎖在當前調用的時候是否是可得到的,因此即便如今使用的是非公平鎖,在調用這個方法時,當前線程也會直接嘗試去獲取鎖,哪怕這個時候隊列中還有在等待中的線程。因此這一方法對於公平鎖和非公平鎖的實現是同樣的,它被定義在Sync類中,由FairSync和NonfairSync直接繼承使用:

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

這個nonfairTryAcquire咱們在上面分析非公平鎖的lock方法時已經講過了,這裏只是簡單的方法複用。該方法不存在任何和隊列相關的操做,僅僅就是直接嘗試去獲鎖,成功了就返回true,失敗了就返回false。

可能你們會以爲公平鎖也使用這種方式去tryLock就喪失了公平性,可是這種方式在某些狀況下是很是有用的,若是你仍是想維持公平性,那應該使用帶超時機制的tryLock

tryLock(long timeout, TimeUnit unit)

與當即返回的tryLock()不一樣,tryLock(long timeout, TimeUnit unit)帶了超時時間,因此是阻塞式的,而且在獲取鎖的過程當中能夠響應中斷異常:

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

lockInterruptibly方法同樣,該方法首先檢查當前線程是否已經被中斷過了,若是已經被中斷了,則當即拋出InterruptedException

隨後咱們經過調用tryAcquiredoAcquireNanos(arg, nanosTimeout)方法來嘗試獲取鎖,注意,這時公平鎖和非公平鎖對於tryAcquire方法就有不一樣的實現了,公平鎖首先會檢查當前有沒有別的線程在隊列中排隊,關於公平鎖和非公平鎖對tryAcquire的不一樣實現上文已經講過了,這裏再也不贅述。咱們直接來看doAcquireNanos,這個方法其實和前面說的doAcquireInterruptibly方法很像,咱們經過將相同的部分註釋掉,直接看不一樣的部分:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    /*final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;*/
                return true; // doAcquireInterruptibly中爲 return
            /*}*/
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
       /* }
    } finally {
        if (failed)
            cancelAcquire(node);
    }*/
}

能夠看出,這兩個方法的邏輯大差不差,只是doAcquireNanos多了對於截止時間的檢查。

不過這裏有兩點須要注意,一個是doAcquireInterruptibly是沒有返回值的,而doAcquireNanos是有返回值的。這是由於doAcquireNanos有可能由於獲取到鎖而返回,也有可能由於超時時間到了而返回,爲了區分這兩種狀況,由於超時時間而返回時,咱們將返回false,表明並無獲取到鎖。

另一點值得注意的是,上面有一個nanosTimeout > spinForTimeoutThreshold的條件,在它知足的時候纔會將當前線程掛起指定的時間,這個spinForTimeoutThreshold是個啥呢:

/**
 * The number of nanoseconds for which it is faster to spin
 * rather than to use timed park. A rough estimate suffices
 * to improve responsiveness with very short timeouts.
 */
static final long spinForTimeoutThreshold = 1000L;

它就是個閾值,是爲了提高性能用的。若是當前剩下的等待時間已經很短了,咱們就直接使用自旋的形式等待,而不是將線程掛起,可見做者爲了儘量地優化AQS鎖的性能費足了心思。

unlock()

unlock操做用於釋放當前線程所佔用的鎖,這一點對於公平鎖和非公平鎖的實現是同樣的,因此該方法被定義在Sync類中,由FairSync和NonfairSync直接繼承使用:

public void unlock() {
    sync.release(1);
}

關於ReentrantLock的釋放鎖的操做,咱們在逐行分析AQS源碼(2)——獨佔鎖的釋放中已經詳細的介紹過了,這裏就再也不贅述了。

newCondition()

ReentrantLock自己並無實現Condition方法,它是直接調用了AQS的newCondition方法

public Condition newCondition() {
    return sync.newCondition();
}

而AQS的newCondtion方法就是簡單地建立了一個ConditionObject對象:

final ConditionObject newCondition() {
    return new ConditionObject();
}

關於ConditionObject對象的源碼分析,請參見 逐行分析AQS源碼(4)——Condition接口實現

總結

ReentrantLock對於Lock接口方法的實現大多數是直接調用了AQS的方法,AQS中已經完成了大多數邏輯的實現,子類只須要直接繼承使用便可,這足見AQS在併發編程中的地位。固然,有一些邏輯仍是須要ReentrantLock本身去實現的,例如tryAcquire的邏輯。

AQS在併發編程中的地位舉足輕重,只要弄懂了它,咱們在學習其餘併發編程工具的時候就會容易不少。

(完)

系列文章目錄

相關文章
相關標籤/搜索