閱讀 JDK 源碼:可重入鎖 ReentrantLock

前幾篇文章介紹了 AQS(AbstractQueuedSynchronizer)中的獨佔模式和對 Condition 的實現,這一篇文章來聊聊基於 AQS 框架實現的鎖工具:ReentrantLock。java

ReentrantLock 是一個可重入的互斥鎖,也被稱爲獨佔鎖。node

  • 互斥:同一時刻只容許一個線程得到鎖。
  • 可重入:持有鎖的線程再一次訪問鎖,能夠直接獲取而不用與其餘線程爭奪。

ReentrantLock 具備兩種實現:公平鎖(fair lock)、非公平鎖(non-fair lock)。c#

本文基於 jdk1.8.0_91

1. 繼承體系

繼承體系

public class ReentrantLock implements Lock, java.io.Serializable

ReentrantLock 是 Lock 接口的實現,方法對照以下:segmentfault

Lock 接口                             ReentrantLock 實現

lock()                                  sync.lock()
lockInterruptibly()                     sync.acquireInterruptibly(1)
tryLock()                               sync.nonfairTryAcquire(1)
tryLock(long time, TimeUnit unit)       sync.tryAcquireNanos(1, unit.toNanos(timeout))
unlock()                                sync.release(1)
newCondition()                          sync.newCondition()

可知 ReentrantLock 對 Lock 的實現都是調用內部類 Sync 來作的。框架

1.1 AQS

Sync 繼承了 AQS,也就是說 ReentrantLock 的大部分實現都已經由 AQS 完成了。less

AQS

ReentrantLock 使用 AQS 中的 state 表示鎖是否被其餘線程鎖佔用。若是爲 0 則表示未被佔用,其餘值表示該鎖被重入的次數。對於獲取鎖失敗的線程,要麼在同步隊列中等待(Lock#lock),要麼直接返回(Lock#tryLock)。工具

此外,AQS 提供了一系列模板方法,對於互斥鎖來講,須要實現其中的 tryAcquire、tryRelease、isHeldExclusively 方法。ui

java.util.concurrent.locks.AbstractQueuedSynchronizerthis

// 獨佔獲取(資源數)
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 獨佔釋放(資源數)
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

// 共享獲取(資源數)
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// 共享獲取(資源數)
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

// 是否排它狀態
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

1.2 Sync

Sync 重寫了 AQS 的 tryRelease 和 isHeldExclusively 方法,而 AQS 的 tryAcquire 方法交由 Sync 的子類來實現。
Sync 中還具有了一個抽象的 lock 方法,強制子類實現。spa

java.util.concurrent.locks.ReentrantLock.Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    protected final boolean tryRelease(int releases) {...}

    protected final boolean isHeldExclusively() {...}
}

1.3 FailSync/NonfairSync

Sync 具備兩個子類 FailSync 和 NonfairSync,對應的是 ReentrantLock 的兩種實現:公平鎖(fair lock)、非公平鎖(non-fair lock)。

Sync 中包含了一個抽象方法 lock 須要子類來實現,設計該抽象方法的目的是,給非公平模式加鎖提供入口。
由於公平鎖和非公平鎖的區別,主要體如今獲取鎖的機制不一樣:

  1. 公平模式,先發起請求的線程先獲取鎖,後續線程嚴格排隊依次等待獲取鎖。
  2. 非公平模式,線程初次發起鎖請求時,只要鎖是可用狀態,線程就能夠嘗試獲取鎖。若是鎖不可用,當前線程只能進入同步隊列,以公平模式排隊等待獲取鎖。

也就是說,非公平鎖只有在當前線程未進入同步隊列以前,才能夠去爭奪鎖,一旦進入同步隊列,只能按排隊順序等待鎖。

2. 構造方法

/** Synchronizer providing all implementation mechanics */
// 提供全部實現機制的同步器
private final Sync sync;

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

爲何默認建立的是非公平鎖?

由於公平模式下,隊列頭部的線程從阻塞中被喚醒到真正運行,涉及到線程調度和 CPU 上下文的切換,比較耗時,這個過程當中鎖處於空閒狀態,浪費資源。
而非公平模式下,多個線程之間採用 CAS 來爭奪鎖,這中間沒有時間延遲,可以提升吞吐量。

3. Lock 接口實現

// 獲取鎖。 
void lock() 

// 若是當前線程未被中斷,則獲取鎖。
void lockInterruptibly() 

// 僅在調用時鎖未被另外一個線程保持的狀況下,才獲取該鎖。
boolean tryLock() 

// 若是鎖在給定等待時間內沒有被另外一個線程保持,且當前線程未被中斷,則獲取該鎖。
boolean tryLock(long timeout, TimeUnit unit) 

// 試圖釋放此鎖。 
void unlock() 

// 返回用來與此 Lock 實例一塊兒使用的 Condition 實例。
Condition newCondition()

3.1 Lock#lock

獲取鎖:

  1. 能夠得到鎖:若是沒有其餘線程得到鎖,則當即返回,設置持有鎖的數量爲 1(tryAcquire)。
  2. 已經得到鎖:若是當前線程已經持有鎖,則當即返回,設置持有鎖的數量加 1(tryAcquire)。
  3. 等待得到鎖:若是其餘線程持有鎖,則當前線程會進入阻塞直到得到鎖成功(acquireQueued)。

java.util.concurrent.locks.ReentrantLock#lock

/**
 * Acquires the lock.
 *
 * <p>Acquires the lock if it is not held by another thread and returns
 * immediately, setting the lock hold count to one.
 *
 * <p>If the current thread already holds the lock then the hold
 * count is incremented by one and the method returns immediately.
 *
 * <p>If the lock is held by another thread then the
 * current thread becomes disabled for thread scheduling
 * purposes and lies dormant until the lock has been acquired,
 * at which time the lock hold count is set to one.
 */
public void lock() {
    sync.lock();
}

3.1.1 Sync#lock

在 ReentrantLock 中,Sync 的兩個子類 FairSync、NonfairSync 各自實現了 Sync#lock 方法。

公平鎖

java.util.concurrent.locks.ReentrantLock.FairSync#lock

final void lock() {
    acquire(1);
}

非公平鎖

java.util.concurrent.locks.ReentrantLock.NonfairSync#lock

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

公平鎖的 lock 方法直接調用 AQS#acquire(1);
非公平鎖首先經過 CAS 修改 state 值來獲取鎖,當獲取失敗時纔會調用 AQS#acquire(1) 來獲取鎖。

3.1.2 AQS#acquire

AQS 中的 acquire 方法中,只有 tryAcquire 方法須要子類來實現。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) && 
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

其中 tryRelease 方法嘗試釋放鎖,釋放鎖成功會喚醒在同步隊列中等待的線程,詳細內容見AQS 獨佔模式實現原理,這裏只介紹 ReentrantLock 對 tryRelease 的實現。

3.1.3 tryAcquire

在 ReentrantLock 中,Sync 的兩個子類 FairSync、NonfairSync 各自實現了 AQS#tryAcquire 方法。

公平鎖

java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&              // 判斷同步隊列中是否有等待時間更長的節點
            compareAndSetState(0, acquires)) {       // 進入這裏,說明當前線程等待鎖時間最長,則CAS修改state
            setExclusiveOwnerThread(current);        // 將當前線程設置爲持有鎖的線程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 當前線程已持有鎖
        int nextc = c + acquires; // 重入次數
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded"); // 可重入的最大次數 Integer.MAX_VALUE
        setState(nextc);
        return true;
    }
    return false;
}

代碼流程:

  1. 若是鎖是可用狀態,且同步隊列中沒有比當前線程等待時間還長的節點,則嘗試獲取鎖,成功則設爲鎖的持有者。
  2. 若是當前線程已持有鎖,則進行重入,設置已獲取鎖的次數,最大次數爲 Integer.MAX_VALUE。
  3. 以上條件都不符合,則沒法獲取鎖。

非公平鎖

java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) { // 無論當前線程在同步隊列中是否等待最久,都來 CAS 爭奪鎖
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {  // 若是已得到鎖,則重入。邏輯與公平鎖一致
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

代碼流程:

  1. 若是鎖是可用狀態,(無論同步隊列中有沒有節點在阻塞等待中)直接嘗試獲取鎖,成功則設爲鎖的持有者。
  2. 若是當前線程已持有鎖,則進行重入,設置已獲取鎖的次數,最大次數爲 Integer.MAX_VALUE。
  3. 以上條件都不符合,則沒法獲取鎖。

能夠看到,公平鎖比非公平鎖多執行了 hasQueuedPredecessors 方法,用於判斷同步隊列中是否具備比當前線程等待鎖時間還長的節點。

在 AQS 中的同步隊列中,頭節點是一個 dummy node,所以等待時間最長的節點是頭節點的下一個節點,若該節點存在且不是當前線程,則 hasQueuedPredecessors 返回 true。說明當前節點不是同步隊列中等待時間最長的節點,沒法獲取鎖。

java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        // 頭節點的下一個節點,不是當前線程的節點,說明當前線程等待鎖時間不是最長的
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

3.2 Lock#lockInterruptibly

若是當前線程未被中斷,則獲取鎖。

java.util.concurrent.locks.ReentrantLock#lockInterruptibly

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

關注獲取鎖過程當中,對異常的處理。

  1. 獲取鎖以前,當前線程已經中斷,則直接拋出中斷異常。
  2. 在同步隊列中等待鎖的過程當中,若是被中斷喚醒,則放棄等待鎖,直接拋出異常。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        // 獲取鎖以前,當前線程已經中斷,則直接拋出中斷異常
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg); 
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly

/**
 * Acquires in exclusive interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 線程在阻塞等待鎖的過程當中,被中斷喚醒,則放棄等待鎖,直接拋出異常
                throw new InterruptedException(); 
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.3 Lock#tryLock

僅在調用時鎖未被另外一個線程保持的狀況下,才獲取該鎖。

與非公平鎖的 NonfairSync#tryAcquire 方法同樣,都是調用了 Sync#nonfairTryAcquire 方法。

  • ReentrantLock#lock -> NonfairSync#lock -> AQS#acquire -> NonfairSync#tryAcquire -> Sync#nonfairTryAcquire
  • ReentrantLock#tryLock -> Sync#nonfairTryAcquire

注意,Sync#nonfairTryAcquire 方法不存在任何和隊列相關的操做。
在鎖沒有被其餘線程持有的狀況下,無論使用的是公平鎖仍是非公平鎖,多個線程均可以調用該方法來 CAS 爭奪鎖,爭奪失敗了不用進入同步隊列,直接返回。

使用公平鎖的狀況下,在上一個持有鎖的線程釋放鎖(鎖狀態 state == 0)以後,同步隊列中的線程被喚醒但還沒有獲取鎖以前,此時當前線程執行 tryLock 可能會成功得到鎖,這其實是對公平鎖的公平性的一種破壞。
在某些狀況下,打破公平性的行爲多是有用的。若是但願遵照此鎖的公平設置,則使用 tryLock(0, TimeUnit.SECONDS) ,它幾乎是等效的(也檢測中斷)。

java.util.concurrent.locks.ReentrantLock#tryLock()

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) { // 無論同步隊列中是否具備等待好久的節點,當前線程直接 CAS 爭奪鎖
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {  // 若是已得到鎖,則重入。邏輯與公平鎖一致
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

3.4 Lock#tryLock(time, unit)

若是鎖在給定等待時間內沒有被另外一個線程保持,且當前線程未被中斷,則獲取該鎖。

java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

在 JDK 中,AQS 中的 tryAcquireNanos 方法只會被 ReentrantLock#tryLock(time, unit) 和 ReentrantReadWriteLock.WriteLock#tryLock(time, unit) 這兩個方法調用到。
說明只有獨佔模式才能調用。

代碼流程:

  1. 獲取鎖以前,若是當前線程已中斷,則拋出中斷異常。
  2. tryAcquire:當前線程嘗試獲取鎖,獲取成功直接返回,不然進入下一個步。
  3. doAcquireNanos:當前線程進入同步隊列中進行等待,直到得到鎖、發生中斷、等待超時。

java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireNanos

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireNanos

/**
 * Acquires in exclusive timed mode.
 *
 * @param arg the acquire argument
 * @param nanosTimeout max wait time
 * @return {@code true} if acquired
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) // 直到超時都沒有得到鎖,則返回false
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout); // 能夠進入阻塞的狀況下,剩餘時間大於閾值,則阻塞,不然自旋
            if (Thread.interrupted())
                throw new InterruptedException(); // 線程在阻塞等待鎖的過程當中,被中斷喚醒,則放棄等待鎖,直接拋出異常
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.5 Lock#unlock

嘗試釋放鎖。

java.util.concurrent.locks.ReentrantLock#unlock

/**
 * Attempts to release this lock.
 *
 * <p>If the current thread is the holder of this lock then the hold
 * count is decremented.  If the hold count is now zero then the lock
 * is released.  If the current thread is not the holder of this
 * lock then {@link IllegalMonitorStateException} is thrown.
 *
 * @throws IllegalMonitorStateException if the current thread does not
 *         hold this lock
 */
public void unlock() {
    sync.release(1);
}

3.5.1 AQS#release

嘗試釋放鎖,若釋放成功,且同步隊列中具備等待中的節點,則嘗試喚醒後繼節點。

java.util.concurrent.locks.AbstractQueuedSynchronizer#release

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 釋放鎖資源
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 喚醒head的後繼節點
        return true;
    }
    return false;
}

其中 tryRelease 方法嘗試釋放鎖,釋放鎖成功會喚醒在同步隊列中等待的線程,詳細內容見AQS 獨佔模式實現原理,這裏只介紹 ReentrantLock 對 tryRelease 的實現。

3.5.2 Sync#tryRelease

在 ReentrantLock 中,不論是公平鎖仍是非公平鎖,均使用相同的 Sync#tryRelease 方法。

  1. 若是當前線程不是持有鎖的線程,拋出 IllegalMonitorStateException。
  2. 因爲鎖是可重入的,必須把持有的鎖所有釋放(計數歸零)才代表當前線程再也不持有鎖。

java.util.concurrent.locks.ReentrantLock.Sync#tryRelease

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 計算釋放以後的state
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true; // 鎖已所有釋放(獲取鎖的次數必須等於釋放次數),返回true,代表能夠喚醒下一個等待線程
        setExclusiveOwnerThread(null); // 設置獨佔鎖持有線程爲null
    }
    setState(c);
    return free;
}

3.4 Lock#newCondition

建立 AQS 中的 ConditionObject 對象。是能夠與 Lock 一塊兒使用的 Condition 接口實例。

java.util.concurrent.locks.ReentrantLock.Sync#newCondition

final ConditionObject newCondition() {
    return new ConditionObject();
}

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#ConditionObject

public ConditionObject() {
}

Condition 實例支持與 Object 的監視器方法(wait、notify 和 notifyAll)相同的用法(await、signal 和 signalAll)。

  1. 在調用 Condition#await 或 Condition#signal 方法時,若是沒有持有鎖,則將拋出 IllegalMonitorStateException。
  2. 在調用 Condition#await 方法時,將釋放鎖並進入阻塞,當被喚醒時從新獲取鎖,並恢復調用 Condition#await 時鎖的持有計數。
  3. 若是線程在等待時被中斷,則等待將終止,待從新獲取鎖成功以後,再響應中斷(拋異常或從新中斷)。
  4. 等待線程按 FIFO 順序收到信號。
  5. 等待方法返回的線程從新獲取鎖的順序,與線程最初獲取鎖的順序相同(對於非公平鎖,是按獲取鎖的順序;對於公平鎖,等同於按等待鎖的時間排序)。

相關閱讀:
閱讀 JDK 源碼:AQS 中的獨佔模式
閱讀 JDK 源碼:AQS 中的共享模式
閱讀 JDK 源碼:AQS 對 Condition 的實現

做者:Sumkor
連接:https://segmentfault.com/a/11...

相關文章
相關標籤/搜索